Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement a no-list strategy for wal-restore to improve recovery times #877

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 33 additions & 55 deletions barman/clients/cloud_walrestore.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,8 @@
NetworkErrorExit,
OperationErrorExit,
)
from barman.cloud import configure_logging, ALLOWED_COMPRESSIONS
from barman.cloud import configure_logging, ALLOWED_COMPRESSIONS, CloudObjectNotFoundError
from barman.cloud_providers import get_cloud_interface
from barman.exceptions import BarmanException
from barman.utils import force_str
from barman.xlog import hash_dir, is_any_xlog_file, is_backup_file

Expand Down Expand Up @@ -136,64 +135,43 @@ def download_wal(self, wal_name, wal_dest):

wal_path = os.path.join(source_dir, wal_name)

remote_name = None
# Automatically detect compression based on the file extension
compression = None
for item in self.cloud_interface.list_bucket(source_dir):
# perfect match (uncompressed file)
if item == wal_path:
remote_name = item
# look for compressed files or .partial files
elif item.startswith(wal_path):
# Detect compression
basename = item
for e, c in ALLOWED_COMPRESSIONS.items():
if item[-len(e) :] == e:
# Strip extension
basename = basename[: -len(e)]
compression = c
break

# Check basename is a known xlog file (.partial?)
if not is_any_xlog_file(basename):
logging.warning("Unknown WAL file: %s", item)
continue
# Exclude backup informative files (not needed in recovery)
elif is_backup_file(basename):
logging.info("Skipping backup file: %s", item)
if sys.version_info >= (3, 0, 0):
# Try to optimistically download the wal file in one of the supported compression formats
for suffix, compression in ALLOWED_COMPRESSIONS.items():
wal_compressed_file = wal_path + suffix
try:
self.cloud_interface.download_file(wal_compressed_file, wal_dest, compression)
logging.debug(
"Downloaded %s to %s (%s)",
wal_name,
wal_dest,
"decompressed " + compression,
)
return
except CloudObjectNotFoundError:
logging.debug(
"WAL file %s for server %s does not exist", wal_name + suffix, self.server_name
)
continue
else:
log.warning(
"Compressed WALs cannot be restored with Python 2.x - please upgrade to a supported version of Python 3"
)

# Found candidate
remote_name = item
logging.info(
"Found WAL %s for server %s as %s",
wal_name,
self.server_name,
remote_name,
)
break

if not remote_name:
# Try to download the wal file in non-compressed format
try:
self.cloud_interface.download_file(wal_path, wal_dest, None)
logging.debug(
"Downloaded %s to %s (%s)",
wal_name,
wal_dest,
"uncompressed",
)
except CloudObjectNotFoundError:
logging.info(
"WAL file %s for server %s does not exists", wal_name, self.server_name
"WAL file %s for server %s does not exist", wal_name, self.server_name
)
raise OperationErrorExit()

if compression and sys.version_info < (3, 0, 0):
raise BarmanException(
"Compressed WALs cannot be restored with Python 2.x - "
"please upgrade to a supported version of Python 3"
)

# Download the file
logging.debug(
"Downloading %s to %s (%s)",
remote_name,
wal_dest,
"decompressing " + compression if compression else "no compression",
)
self.cloud_interface.download_file(remote_name, wal_dest, compression)


if __name__ == "__main__":
main()
4 changes: 4 additions & 0 deletions barman/cloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ class CloudUploadingError(BarmanException):
This exception is raised when there are upload errors
"""

class CloudObjectNotFoundError(BarmanException):
"""
This exception is raised when objects cannot be found in the cloud
"""

class TarFileIgnoringTruncate(tarfile.TarFile):
"""
Expand Down
26 changes: 17 additions & 9 deletions barman/cloud_providers/aws_s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from barman.cloud import (
CloudInterface,
CloudProviderError,
CloudObjectNotFoundError,
CloudSnapshotInterface,
DecompressingStreamingIO,
DEFAULT_DELIMITER,
Expand Down Expand Up @@ -278,17 +279,24 @@ def download_file(self, key, dest_path, decompress):
:param str|None decompress: Compression scheme to use for decompression
"""
# Open the remote file
obj = self.s3.Object(self.bucket_name, key)
remote_file = obj.get()["Body"]
try:
obj = self.s3.Object(self.bucket_name, key)
remote_file = obj.get()["Body"]
# Write the dest file in binary mode
with open(dest_path, "wb") as dest_file:
# If the file is not compressed, just copy its content
if decompress is None:
shutil.copyfileobj(remote_file, dest_file)
return

# Write the dest file in binary mode
with open(dest_path, "wb") as dest_file:
# If the file is not compressed, just copy its content
if decompress is None:
shutil.copyfileobj(remote_file, dest_file)
return
decompress_to_file(remote_file, dest_file, decompress)
except ClientError as exc:
error_code = exc.response["Error"]["Code"]
if error_code == "NoSuchKey":
raise CloudObjectNotFoundError("Object %s not found" % key)
else:
raise exc

decompress_to_file(remote_file, dest_file, decompress)

def remote_open(self, key, decompressor=None):
"""
Expand Down
18 changes: 11 additions & 7 deletions barman/cloud_providers/azure_blob_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
from barman.cloud import (
CloudInterface,
CloudProviderError,
CloudObjectNotFoundError,
CloudSnapshotInterface,
DecompressingStreamingIO,
DEFAULT_DELIMITER,
Expand Down Expand Up @@ -313,13 +314,16 @@ def download_file(self, key, dest_path, decompress=None):
:param str dest_path: Where to put the destination file
:param str|None decompress: Compression scheme to use for decompression
"""
obj = self.container_client.download_blob(key)
with open(dest_path, "wb") as dest_file:
if decompress is None:
obj.download_to_stream(dest_file)
return
blob = StreamingBlobIO(obj)
decompress_to_file(blob, dest_file, decompress)
try:
obj = self.container_client.download_blob(key)
with open(dest_path, "wb") as dest_file:
if decompress is None:
obj.download_to_stream(dest_file)
return
blob = StreamingBlobIO(obj)
decompress_to_file(blob, dest_file, decompress)
except ResourceNotFoundError:
raise CloudObjectNotFoundError("Object %s not found" % key)

def remote_open(self, key, decompressor=None):
"""
Expand Down
18 changes: 11 additions & 7 deletions barman/cloud_providers/google_cloud_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
from barman.cloud import (
CloudInterface,
CloudProviderError,
CloudObjectNotFoundError,
CloudSnapshotInterface,
DecompressingStreamingIO,
DEFAULT_DELIMITER,
Expand Down Expand Up @@ -201,13 +202,16 @@ def download_file(self, key, dest_path, decompress):
:param str|None decompress: Compression scheme to use for decompression
"""
logging.debug("GCS.download_file")
blob = storage.Blob(key, self.container_client)
with open(dest_path, "wb") as dest_file:
if decompress is None:
self.client.download_blob_to_file(blob, dest_file)
return
with blob.open(mode="rb") as blob_reader:
decompress_to_file(blob_reader, dest_file, decompress)
try:
blob = storage.Blob(key, self.container_client)
with open(dest_path, "wb") as dest_file:
if decompress is None:
self.client.download_blob_to_file(blob, dest_file)
return
with blob.open(mode="rb") as blob_reader:
decompress_to_file(blob_reader, dest_file, decompress)
except NotFound:
raise CloudObjectNotFoundError("Object %s not found" % key)

def remote_open(self, key, decompressor=None):
"""
Expand Down
17 changes: 5 additions & 12 deletions tests/test_barman_cloud_wal_restore.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import pytest

from barman.clients import cloud_walrestore

from barman.cloud import CloudObjectNotFoundError

class TestMain(object):
"""
Expand Down Expand Up @@ -51,9 +51,7 @@ def test_succeeds_if_wal_is_found(self, get_cloud_interface_mock, caplog):
"""If the WAL is found we exit with status 0."""
cloud_interface_mock = get_cloud_interface_mock.return_value
cloud_interface_mock.path = "testfolder/"
cloud_interface_mock.list_bucket.return_value = [
"testfolder/test-server/wals/000000080000ABFF/000000080000ABFF000000C1"
]
cloud_interface_mock.download_file.side_effect = [CloudObjectNotFoundError("Object not found"), None]
cloud_walrestore.main(
[
"s3://test-bucket/testfolder/",
Expand All @@ -63,16 +61,14 @@ def test_succeeds_if_wal_is_found(self, get_cloud_interface_mock, caplog):
]
)
assert caplog.text == ""
cloud_interface_mock.download_file.assert_called_once()
cloud_interface_mock.download_file.assert_called()

@mock.patch("barman.clients.cloud_walrestore.get_cloud_interface")
def test_fails_if_wal_not_found(self, get_cloud_interface_mock, caplog):
"""If the WAL cannot be found we exit with status 1."""
cloud_interface_mock = get_cloud_interface_mock.return_value
cloud_interface_mock.path = "testfolder/"
cloud_interface_mock.list_bucket.return_value = [
"testfolder/test-server/wals/000000080000ABFF/000000080000ABFF000000C1"
]
cloud_interface_mock.download_file.side_effect = CloudObjectNotFoundError("Object not found")
caplog.set_level(logging.INFO)
with pytest.raises(SystemExit) as exc:
cloud_walrestore.main(
Expand All @@ -85,7 +81,7 @@ def test_fails_if_wal_not_found(self, get_cloud_interface_mock, caplog):
)
assert exc.value.code == 1
assert (
"WAL file 000000080000ABFF000000C0 for server test-server does not exists\n"
"WAL file 000000080000ABFF000000C0 for server test-server does not exist\n"
in caplog.text
)

Expand Down Expand Up @@ -127,9 +123,6 @@ def test_fails_on_download_exception(self, get_cloud_interface_mock, caplog):
"""Test that any cloud_interface.download exceptions cause exit status 4."""
cloud_interface_mock = get_cloud_interface_mock.return_value
cloud_interface_mock.path = "testfolder/"
cloud_interface_mock.list_bucket.return_value = [
"testfolder/test-server/wals/000000080000ABFF/000000080000ABFF000000C1"
]
cloud_interface_mock.download_file.side_effect = Exception(
"something went wrong"
)
Expand Down