From 930250868d5a407070b2509745052e0cf3f5185f Mon Sep 17 00:00:00 2001 From: sjuls Date: Fri, 1 Dec 2023 15:27:47 +0100 Subject: [PATCH] Implement a no-list strategy for wal-restore to improve recovery times --- barman/clients/cloud_walrestore.py | 88 +++++++------------ barman/cloud.py | 4 + barman/cloud_providers/aws_s3.py | 26 ++++-- barman/cloud_providers/azure_blob_storage.py | 18 ++-- .../cloud_providers/google_cloud_storage.py | 18 ++-- tests/test_barman_cloud_wal_restore.py | 17 ++-- 6 files changed, 81 insertions(+), 90 deletions(-) diff --git a/barman/clients/cloud_walrestore.py b/barman/clients/cloud_walrestore.py index a473a0844..c797b6f13 100644 --- a/barman/clients/cloud_walrestore.py +++ b/barman/clients/cloud_walrestore.py @@ -28,9 +28,8 @@ NetworkErrorExit, OperationErrorExit, ) -from barman.cloud import configure_logging, ALLOWED_COMPRESSIONS +from barman.cloud import configure_logging, ALLOWED_COMPRESSIONS, CloudObjectNotFoundError from barman.cloud_providers import get_cloud_interface -from barman.exceptions import BarmanException from barman.utils import force_str from barman.xlog import hash_dir, is_any_xlog_file, is_backup_file @@ -136,64 +135,43 @@ def download_wal(self, wal_name, wal_dest): wal_path = os.path.join(source_dir, wal_name) - remote_name = None - # Automatically detect compression based on the file extension - compression = None - for item in self.cloud_interface.list_bucket(source_dir): - # perfect match (uncompressed file) - if item == wal_path: - remote_name = item - # look for compressed files or .partial files - elif item.startswith(wal_path): - # Detect compression - basename = item - for e, c in ALLOWED_COMPRESSIONS.items(): - if item[-len(e) :] == e: - # Strip extension - basename = basename[: -len(e)] - compression = c - break - - # Check basename is a known xlog file (.partial?) - if not is_any_xlog_file(basename): - logging.warning("Unknown WAL file: %s", item) - continue - # Exclude backup informative files (not needed in recovery) - elif is_backup_file(basename): - logging.info("Skipping backup file: %s", item) + if sys.version_info >= (3, 0, 0): + # Try to optimistically download the wal file in one of the supported compression formats + for suffix, compression in ALLOWED_COMPRESSIONS.items(): + wal_compressed_file = wal_path + suffix + try: + self.cloud_interface.download_file(wal_compressed_file, wal_dest, compression) + logging.debug( + "Downloaded %s to %s (%s)", + wal_name, + wal_dest, + "decompressed " + compression, + ) + return + except CloudObjectNotFoundError: + logging.debug( + "WAL file %s for server %s does not exist", wal_name + suffix, self.server_name + ) continue + else: + log.warning( + "Compressed WALs cannot be restored with Python 2.x - please upgrade to a supported version of Python 3" + ) - # Found candidate - remote_name = item - logging.info( - "Found WAL %s for server %s as %s", - wal_name, - self.server_name, - remote_name, - ) - break - - if not remote_name: + # Try to download the wal file in non-compressed format + try: + self.cloud_interface.download_file(wal_path, wal_dest, None) + logging.debug( + "Downloaded %s to %s (%s)", + wal_name, + wal_dest, + "uncompressed", + ) + except CloudObjectNotFoundError: logging.info( - "WAL file %s for server %s does not exists", wal_name, self.server_name + "WAL file %s for server %s does not exist", wal_name, self.server_name ) raise OperationErrorExit() - if compression and sys.version_info < (3, 0, 0): - raise BarmanException( - "Compressed WALs cannot be restored with Python 2.x - " - "please upgrade to a supported version of Python 3" - ) - - # Download the file - logging.debug( - "Downloading %s to %s (%s)", - remote_name, - wal_dest, - "decompressing " + compression if compression else "no compression", - ) - self.cloud_interface.download_file(remote_name, wal_dest, compression) - - if __name__ == "__main__": main() diff --git a/barman/cloud.py b/barman/cloud.py index 2c4ac4506..ea57d9a80 100644 --- a/barman/cloud.py +++ b/barman/cloud.py @@ -129,6 +129,10 @@ class CloudUploadingError(BarmanException): This exception is raised when there are upload errors """ +class CloudObjectNotFoundError(BarmanException): + """ + This exception is raised when objects cannot be found in the cloud + """ class TarFileIgnoringTruncate(tarfile.TarFile): """ diff --git a/barman/cloud_providers/aws_s3.py b/barman/cloud_providers/aws_s3.py index 2e0960879..f56f1b36d 100644 --- a/barman/cloud_providers/aws_s3.py +++ b/barman/cloud_providers/aws_s3.py @@ -24,6 +24,7 @@ from barman.cloud import ( CloudInterface, CloudProviderError, + CloudObjectNotFoundError, CloudSnapshotInterface, DecompressingStreamingIO, DEFAULT_DELIMITER, @@ -278,17 +279,24 @@ def download_file(self, key, dest_path, decompress): :param str|None decompress: Compression scheme to use for decompression """ # Open the remote file - obj = self.s3.Object(self.bucket_name, key) - remote_file = obj.get()["Body"] + try: + obj = self.s3.Object(self.bucket_name, key) + remote_file = obj.get()["Body"] + # Write the dest file in binary mode + with open(dest_path, "wb") as dest_file: + # If the file is not compressed, just copy its content + if decompress is None: + shutil.copyfileobj(remote_file, dest_file) + return - # Write the dest file in binary mode - with open(dest_path, "wb") as dest_file: - # If the file is not compressed, just copy its content - if decompress is None: - shutil.copyfileobj(remote_file, dest_file) - return + decompress_to_file(remote_file, dest_file, decompress) + except ClientError as exc: + error_code = exc.response["Error"]["Code"] + if error_code == "NoSuchKey": + raise CloudObjectNotFoundError("Object %s not found" % key) + else: + raise exc - decompress_to_file(remote_file, dest_file, decompress) def remote_open(self, key, decompressor=None): """ diff --git a/barman/cloud_providers/azure_blob_storage.py b/barman/cloud_providers/azure_blob_storage.py index 018bc356e..fe8ee21bf 100644 --- a/barman/cloud_providers/azure_blob_storage.py +++ b/barman/cloud_providers/azure_blob_storage.py @@ -26,6 +26,7 @@ from barman.cloud import ( CloudInterface, CloudProviderError, + CloudObjectNotFoundError, CloudSnapshotInterface, DecompressingStreamingIO, DEFAULT_DELIMITER, @@ -313,13 +314,16 @@ def download_file(self, key, dest_path, decompress=None): :param str dest_path: Where to put the destination file :param str|None decompress: Compression scheme to use for decompression """ - obj = self.container_client.download_blob(key) - with open(dest_path, "wb") as dest_file: - if decompress is None: - obj.download_to_stream(dest_file) - return - blob = StreamingBlobIO(obj) - decompress_to_file(blob, dest_file, decompress) + try: + obj = self.container_client.download_blob(key) + with open(dest_path, "wb") as dest_file: + if decompress is None: + obj.download_to_stream(dest_file) + return + blob = StreamingBlobIO(obj) + decompress_to_file(blob, dest_file, decompress) + except ResourceNotFoundError: + raise CloudObjectNotFoundError("Object %s not found" % key) def remote_open(self, key, decompressor=None): """ diff --git a/barman/cloud_providers/google_cloud_storage.py b/barman/cloud_providers/google_cloud_storage.py index 767cfe248..e5db91c68 100644 --- a/barman/cloud_providers/google_cloud_storage.py +++ b/barman/cloud_providers/google_cloud_storage.py @@ -24,6 +24,7 @@ from barman.cloud import ( CloudInterface, CloudProviderError, + CloudObjectNotFoundError, CloudSnapshotInterface, DecompressingStreamingIO, DEFAULT_DELIMITER, @@ -201,13 +202,16 @@ def download_file(self, key, dest_path, decompress): :param str|None decompress: Compression scheme to use for decompression """ logging.debug("GCS.download_file") - blob = storage.Blob(key, self.container_client) - with open(dest_path, "wb") as dest_file: - if decompress is None: - self.client.download_blob_to_file(blob, dest_file) - return - with blob.open(mode="rb") as blob_reader: - decompress_to_file(blob_reader, dest_file, decompress) + try: + blob = storage.Blob(key, self.container_client) + with open(dest_path, "wb") as dest_file: + if decompress is None: + self.client.download_blob_to_file(blob, dest_file) + return + with blob.open(mode="rb") as blob_reader: + decompress_to_file(blob_reader, dest_file, decompress) + except NotFound: + raise CloudObjectNotFoundError("Object %s not found" % key) def remote_open(self, key, decompressor=None): """ diff --git a/tests/test_barman_cloud_wal_restore.py b/tests/test_barman_cloud_wal_restore.py index c94b8c440..1d119a030 100644 --- a/tests/test_barman_cloud_wal_restore.py +++ b/tests/test_barman_cloud_wal_restore.py @@ -22,7 +22,7 @@ import pytest from barman.clients import cloud_walrestore - +from barman.cloud import CloudObjectNotFoundError class TestMain(object): """ @@ -51,9 +51,7 @@ def test_succeeds_if_wal_is_found(self, get_cloud_interface_mock, caplog): """If the WAL is found we exit with status 0.""" cloud_interface_mock = get_cloud_interface_mock.return_value cloud_interface_mock.path = "testfolder/" - cloud_interface_mock.list_bucket.return_value = [ - "testfolder/test-server/wals/000000080000ABFF/000000080000ABFF000000C1" - ] + cloud_interface_mock.download_file.side_effect = [CloudObjectNotFoundError("Object not found"), None] cloud_walrestore.main( [ "s3://test-bucket/testfolder/", @@ -63,16 +61,14 @@ def test_succeeds_if_wal_is_found(self, get_cloud_interface_mock, caplog): ] ) assert caplog.text == "" - cloud_interface_mock.download_file.assert_called_once() + cloud_interface_mock.download_file.assert_called() @mock.patch("barman.clients.cloud_walrestore.get_cloud_interface") def test_fails_if_wal_not_found(self, get_cloud_interface_mock, caplog): """If the WAL cannot be found we exit with status 1.""" cloud_interface_mock = get_cloud_interface_mock.return_value cloud_interface_mock.path = "testfolder/" - cloud_interface_mock.list_bucket.return_value = [ - "testfolder/test-server/wals/000000080000ABFF/000000080000ABFF000000C1" - ] + cloud_interface_mock.download_file.side_effect = CloudObjectNotFoundError("Object not found") caplog.set_level(logging.INFO) with pytest.raises(SystemExit) as exc: cloud_walrestore.main( @@ -85,7 +81,7 @@ def test_fails_if_wal_not_found(self, get_cloud_interface_mock, caplog): ) assert exc.value.code == 1 assert ( - "WAL file 000000080000ABFF000000C0 for server test-server does not exists\n" + "WAL file 000000080000ABFF000000C0 for server test-server does not exist\n" in caplog.text ) @@ -127,9 +123,6 @@ def test_fails_on_download_exception(self, get_cloud_interface_mock, caplog): """Test that any cloud_interface.download exceptions cause exit status 4.""" cloud_interface_mock = get_cloud_interface_mock.return_value cloud_interface_mock.path = "testfolder/" - cloud_interface_mock.list_bucket.return_value = [ - "testfolder/test-server/wals/000000080000ABFF/000000080000ABFF000000C1" - ] cloud_interface_mock.download_file.side_effect = Exception( "something went wrong" )