Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add zstd compression support to Barman Cloud #898

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions barman/clients/cloud_backup.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,13 @@ def parse_arguments(args=None):
const="snappy",
dest="compression",
)
compression.add_argument(
"--zstd",
help="zstd-compress the backup while uploading to the cloud",
action="store_const",
const="zst",
dest="compression",
)
parser.add_argument(
"-h",
"--host",
Expand Down
71 changes: 65 additions & 6 deletions barman/clients/cloud_compression.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ def _try_import_snappy():
return snappy


def _try_import_zstd():
try:
import zstandard
except ImportError:
raise SystemExit("Missing required python module: zstandard")
return zstandard


class ChunkedCompressor(with_metaclass(ABCMeta, object)):
"""
Base class for all ChunkedCompressors
Expand Down Expand Up @@ -92,20 +100,56 @@ def decompress(self, data):
return self.decompressor.decompress(data)


class ZstdCompressor(ChunkedCompressor):
"""
A ChunkedCompressor implementation based on zstandard
"""

def __init__(self):
zstd = _try_import_zstd()
self.compressor = zstd.ZstdCompressor()
self.decompressor = zstd.ZstdDecompressor().decompressobj(
read_across_frames=True
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Writing this here but is valid in other places of the code...
decompressobj doesn't support read_across_frames for versions of the zstandard lib that are older than 0.22, as you can see here: https://github.com/indygreg/python-zstandard/blob/0.21.0/zstandard/backend_cffi.py#L3903

Unfortunately, this means that this implementation is not compatible with Python 3.6 and 3.7, as you can also see in the unit tests run for this PR.

@Jellyfrog we cannot accept and merge this implementation as it is now, but would be nice if you could be able to make this patch compatible with older python versions that barman still needs to support.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gcalacoci I'm interested in finish this. zstd library doesn't seem to have a breaking change with 3.6 and 3.7. Just no longer build the binaries for 3.6 and 3.7. To solve this, we may need a fork of zstd, with some new releases. Does this work with barman? Is EDB interested in holding the fork as well?

Copy link
Author

@Jellyfrog Jellyfrog Oct 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi @gcalacoci I'm interested in finish this. zstd library doesn't seem to have a breaking change with 3.6 and 3.7. Just no longer build the binaries for 3.6 and 3.7. To solve this, we may need a fork of zstd, with some new releases. Does this work with barman? Is EDB interested in holding the fork as well?

Feel free to take over. I’m not going to work on this (not using Barman anymore, and especially not for EOL python versions)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think that's something we want to do or encourage people to do so. There are other solutions to this. Giulio is trying to encourage @Jellyfrog to address those issues.

In short, Barman has requirements, and we have to be careful about those requirements because there are users running Barman on systems that still have python 3.6. This PR would require the deprecation of python 3.6 and 3.7, and that's not something we want to do without some deep thinking.

)

def add_chunk(self, data):
"""
Compresses the supplied data and returns all the compressed bytes.

:param bytes data: The chunk of data to be compressed
:return: The compressed data
:rtype: bytes
"""
return self.compressor.compress(data)

def decompress(self, data):
"""
Decompresses the supplied chunk of data and returns at least part of the
uncompressed data.

:param bytes data: The chunk of data to be decompressed
:return: The decompressed data
:rtype: bytes
"""
return self.decompressor.decompress(data)


def get_compressor(compression):
"""
Helper function which returns a ChunkedCompressor for the specified compression
algorithm. Currently only snappy is supported. The other compression algorithms
algorithm. Currently snappy and zstd is supported. The other compression algorithms
supported by barman cloud use the decompression built into TarFile.

:param str compression: The compression algorithm to use. Can be set to snappy
:param str compression: The compression algorithm to use. Can be set to snappy, zstd
or any compression supported by the TarFile mode string.
:return: A ChunkedCompressor capable of compressing and decompressing using the
specified compression.
:rtype: ChunkedCompressor
"""
if compression == "snappy":
return SnappyCompressor()
elif compression == "zstd" or compression == "zst":
return ZstdCompressor()
return None


Expand All @@ -115,7 +159,7 @@ def compress(wal_file, compression):
compressed data.
:param IOBase wal_file: A file-like object containing the WAL file data.
:param str compression: The compression algorithm to apply. Can be one of:
bzip2, gzip, snappy.
bzip2, gzip, snappy, zstd.
:return: The compressed data
:rtype: BytesIO
"""
Expand All @@ -125,6 +169,12 @@ def compress(wal_file, compression):
snappy.stream_compress(wal_file, in_mem_snappy)
in_mem_snappy.seek(0)
return in_mem_snappy
elif compression == "zstd":
in_mem_zstd = BytesIO()
zstd = _try_import_zstd()
zstd.ZstdCompressor().copy_stream(wal_file, in_mem_zstd)
in_mem_zstd.seek(0)
return in_mem_zstd
elif compression == "gzip":
# Create a BytesIO for in memory compression
in_mem_gzip = BytesIO()
Expand All @@ -150,12 +200,17 @@ def get_streaming_tar_mode(mode, compression):
ignored so that barman-cloud can apply them itself.

:param str mode: The file mode to use, either r or w.
:param str compression: The compression algorithm to use. Can be set to snappy
:param str compression: The compression algorithm to use. Can be set to snappy, zstd
or any compression supported by the TarFile mode string.
:return: The full filemode for a streaming tar file
:rtype: str
"""
if compression == "snappy" or compression is None:
if (
compression == "snappy"
or compression == "zstd"
or compression == "zst"
or compression is None
):
return "%s|" % mode
else:
return "%s|%s" % (mode, compression)
Expand All @@ -170,13 +225,17 @@ def decompress_to_file(blob, dest_file, compression):
:param IOBase dest_file: A file-like object into which the uncompressed data
should be written.
:param str compression: The compression algorithm to apply. Can be one of:
bzip2, gzip, snappy.
bzip2, gzip, snappy, zstd.
:rtype: None
"""
if compression == "snappy":
snappy = _try_import_snappy()
snappy.stream_decompress(blob, dest_file)
return
elif compression == "zstd":
zstd = _try_import_zstd()
zstd.ZstdDecompressor().copy_stream(blob, dest_file)
return
elif compression == "gzip":
source_file = gzip.GzipFile(fileobj=blob, mode="rb")
elif compression == "bzip2":
Expand Down
12 changes: 12 additions & 0 deletions barman/clients/cloud_walarchive.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,14 @@ def parse_arguments(args=None):
const="snappy",
dest="compression",
)
compression.add_argument(
"--zstd",
help="zstd-compress the WAL while uploading to the cloud "
"(requires optional zstandard library)",
action="store_const",
const="zstd",
dest="compression",
)
add_tag_argument(
parser,
name="tags",
Expand Down Expand Up @@ -319,6 +327,10 @@ def retrieve_wal_name(self, wal_path):
elif self.compression == "snappy":
# add snappy extension
return "%s.snappy" % wal_name

elif self.compression == "zstd":
# add zstd extension
return "%s.zst" % wal_name
else:
raise ValueError("Unknown compression type: %s" % self.compression)

Expand Down
13 changes: 11 additions & 2 deletions barman/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,12 @@
LOGGING_FORMAT = "%(asctime)s [%(process)s] %(levelname)s: %(message)s"

# Allowed compression algorithms
ALLOWED_COMPRESSIONS = {".gz": "gzip", ".bz2": "bzip2", ".snappy": "snappy"}
ALLOWED_COMPRESSIONS = {
".gz": "gzip",
".bz2": "bzip2",
".snappy": "snappy",
".zst": "zstd",
}

DEFAULT_DELIMITER = "/"

Expand Down Expand Up @@ -193,7 +198,7 @@ def __init__(
self.buffer = None
self.counter = 0
self.compressor = None
# Some supported compressions (e.g. snappy) require CloudTarUploader to apply
# Some supported compressions (e.g. snappy, zstd) require CloudTarUploader to apply
# compression manually rather than relying on the tar file.
self.compressor = cloud_compression.get_compressor(compression)
# If the compression is supported by tar then it will be added to the filemode
Expand Down Expand Up @@ -366,6 +371,8 @@ def _build_dest_name(self, name, count=0):
components.append(".bz2")
elif self.compression == "snappy":
components.append(".snappy")
elif self.compression == "zst":
components.append(".zst")
return "".join(components)

def _get_tar(self, name):
Expand Down Expand Up @@ -2287,6 +2294,8 @@ def get_backup_files(self, backup_info, allow_missing=False):
info.compression = "bzip2"
elif ext == "tar.snappy":
info.compression = "snappy"
elif ext == "tar.zst":
info.compression = "zstd"
else:
logging.warning("Skipping unknown extension: %s", ext)
continue
Expand Down
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@
"azure": ["azure-identity", "azure-storage-blob"],
"azure-snapshots": ["azure-identity", "azure-mgmt-compute"],
"cloud": ["boto3"],
"zstd": ["zstandard"],
"google": [
"google-cloud-storage",
],
Expand Down
1 change: 1 addition & 0 deletions tests/requirements_dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
.[cloud]
.[azure]
.[snappy]
.[zstd]
.[google]
pytest
mock
Expand Down
30 changes: 30 additions & 0 deletions tests/test_barman_cloud_wal_archive.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import logging
import os
import snappy
import zstandard as zstd

import mock
import pytest
Expand Down Expand Up @@ -537,6 +538,23 @@ def test_retrieve_snappy_file_obj(self, tmpdir):
open_file.read()
) == "something".encode("utf-8")

def test_retrieve_zstd_file_obj(self, tmpdir):
"""
Test the retrieve_file_obj method with a zstd file
"""
# Setup the WAL
source = tmpdir.join("wal_dir/000000080000ABFF000000C1")
source.write("something".encode("utf-8"), ensure=True)
# Create a simple CloudWalUploader obj
uploader = CloudWalUploader(mock.MagicMock(), "test-server", compression="zstd")
open_file = uploader.retrieve_file_obj(source.strpath)
# Check the in memory file received
assert open_file
# Decompress on the fly to check content
assert zstd.ZstdDecompressor().decompressobj(
read_across_frames=True
).decompress(open_file.read()) == "something".encode("utf-8")

def test_retrieve_normal_file_name(self):
"""
Test the retrieve_wal_name method with an uncompressed file
Expand Down Expand Up @@ -589,6 +607,18 @@ def test_retrieve_snappy_file_name(self):
assert wal_final_name
assert wal_final_name == "000000080000ABFF000000C1.snappy"

def test_retrieve_zstd_file_name(self):
"""
Test the retrieve_wal_name method with zstd compression
"""
# Create a fake source name
source = "wal_dir/000000080000ABFF000000C1"
uploader = CloudWalUploader(mock.MagicMock(), "test-server", compression="zstd")
wal_final_name = uploader.retrieve_wal_name(source)
# Check the file name received
assert wal_final_name
assert wal_final_name == "000000080000ABFF000000C1.zst"

@mock.patch("barman.cloud.CloudInterface")
@mock.patch("barman.clients.cloud_walarchive.CloudWalUploader.retrieve_file_obj")
def test_upload_wal(self, rfo_mock, cloud_interface_mock):
Expand Down
37 changes: 31 additions & 6 deletions tests/test_cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from mock.mock import MagicMock
import pytest
import snappy
import zstandard as zstd

from barman.exceptions import BackupPreconditionException
from barman.infofile import BackupInfo
Expand Down Expand Up @@ -98,6 +99,9 @@ def _compression_helper(src, compression):
if compression == "snappy":
dest = BytesIO()
snappy.stream_compress(src, dest)
elif compression == "zstd":
dest = BytesIO()
zstd.ZstdCompressor().copy_stream(src, dest)
elif compression == "gzip":
dest = BytesIO()
with gzip.GzipFile(fileobj=dest, mode="wb") as gz:
Expand Down Expand Up @@ -975,7 +979,7 @@ def test_delete_objects_partial_failure(self, boto_mock, caplog):
) in caplog.text

@pytest.mark.skipif(sys.version_info < (3, 0), reason="Requires Python 3 or higher")
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy"))
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy", "zstd"))
@mock.patch("barman.cloud_providers.aws_s3.boto3")
def test_download_file(self, boto_mock, compression, tmpdir):
"""Verifies that cloud_interface.download_file decompresses correctly."""
Expand Down Expand Up @@ -1007,7 +1011,13 @@ def test_download_file(self, boto_mock, compression, tmpdir):

@pytest.mark.parametrize(
("compression", "file_ext"),
((None, ""), ("bzip2", ".bz2"), ("gzip", ".gz"), ("snappy", ".snappy")),
(
(None, ""),
("bzip2", ".bz2"),
("gzip", ".gz"),
("snappy", ".snappy"),
("zstd", ".zst"),
),
)
@mock.patch("barman.cloud_providers.aws_s3.boto3")
def test_extract_tar(self, boto_mock, compression, file_ext, tmpdir):
Expand Down Expand Up @@ -2028,7 +2038,7 @@ def test_delete_objects_404_not_failure(self, container_client_mock, caplog):
) in caplog.text

@pytest.mark.skipif(sys.version_info < (3, 0), reason="Requires Python 3 or higher")
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy"))
@pytest.mark.parametrize("compression", (None, "bzip2", "gzip", "snappy", "zstd"))
@mock.patch.dict(
os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"}
)
Expand Down Expand Up @@ -2077,7 +2087,13 @@ def test_download_file(self, container_client_mock, compression, tmpdir):

@pytest.mark.parametrize(
("compression", "file_ext"),
((None, ""), ("bzip2", ".bz2"), ("gzip", ".gz"), ("snappy", ".snappy")),
(
(None, ""),
("bzip2", ".bz2"),
("gzip", ".gz"),
("snappy", ".snappy"),
("zstd", ".zst"),
),
)
@mock.patch.dict(
os.environ, {"AZURE_STORAGE_CONNECTION_STRING": "connection_string"}
Expand Down Expand Up @@ -2564,6 +2580,9 @@ def test_download_file(self, gcs_storage_mock, open_mock):
"snappy_compression": {
"compression": "snappy",
},
"zstd_compression": {
"compression": "zstd",
},
}
for test_name, test_case in test_cases.items():
with self.subTest(msg=test_name, compression=test_case["compression"]):
Expand Down Expand Up @@ -3008,7 +3027,7 @@ def _verify_wal_is_in_catalog(self, wal_name, wal_path):
suffix,
),
]
for suffix in ("", ".gz", ".bz2", ".snappy")
for suffix in ("", ".gz", ".bz2", ".snappy", ".zst")
]
for spec in spec_group
],
Expand Down Expand Up @@ -3347,7 +3366,7 @@ class TestCloudTarUploader(object):
"compression",
# The CloudTarUploader expects the short form compression args set by the
# cloud_backup argument parser
(None, "bz2", "gz", "snappy"),
(None, "bz2", "gz", "snappy", "zstd"),
)
@mock.patch("barman.cloud.CloudInterface")
def test_add(self, mock_cloud_interface, compression, tmpdir):
Expand Down Expand Up @@ -3393,6 +3412,12 @@ def test_add(self, mock_cloud_interface, compression, tmpdir):
tar_fileobj = BytesIO()
snappy.stream_decompress(uploaded_data, tar_fileobj)
tar_fileobj.seek(0)
elif compression == "zstd":
tar_mode = "r|"
# We must manually decompress the zstd bytes before extracting
tar_fileobj = BytesIO()
zstd.ZstdDecompressor().copy_stream(uploaded_data, tar_fileobj)
tar_fileobj.seek(0)
else:
tar_mode = "r|%s" % compression
with open_tar(fileobj=tar_fileobj, mode=tar_mode) as tf:
Expand Down
Loading