diff --git a/rohmu/errors.py b/rohmu/errors.py index 3e2e1443..72175043 100644 --- a/rohmu/errors.py +++ b/rohmu/errors.py @@ -44,3 +44,7 @@ class InvalidByteRangeError(Error): class InvalidTransferError(Error): """You tried to access a transfer object that you already returned to the pool""" + + +class ConcurrentUploadError(StorageError): + """A generic error related to concurrent uploads""" diff --git a/rohmu/object_storage/local.py b/rohmu/object_storage/local.py index f2ef619c..77c1a652 100644 --- a/rohmu/object_storage/local.py +++ b/rohmu/object_storage/local.py @@ -9,7 +9,7 @@ from ..common.models import StorageModel, StorageOperation from ..common.statsd import StatsdConfig -from ..errors import FileNotFoundFromStorageError, StorageError +from ..errors import ConcurrentUploadError, FileNotFoundFromStorageError from ..notifier.interface import Notifier from ..typing import Metadata from ..util import BinaryStreamsConcatenation, ProgressStream @@ -260,7 +260,10 @@ def create_concurrent_upload( upload_id = uuid.uuid4().hex upload = ConcurrentUpload("local", upload_id, key, metadata, {}) chunks_dir = self._get_chunks_dir(upload) - os.makedirs(chunks_dir, exist_ok=True) + try: + os.makedirs(chunks_dir, exist_ok=True) + except OSError as ex: + raise ConcurrentUploadError("Failed to initiate multipart upload for {}".format(key)) from ex self.stats.operation(StorageOperation.create_multipart_upload) return upload @@ -283,7 +286,7 @@ def upload_concurrent_chunk( self.stats.operation(StorageOperation.store_file, size=bytes_read) upload.chunks_to_etags[chunk_number] = "no-etag" except OSError as ex: - raise StorageError( + raise ConcurrentUploadError( "Failed to upload chunk {} of multipart upload for {}".format(chunk_number, upload.key) ) from ex @@ -297,7 +300,7 @@ def complete_concurrent_upload(self, upload: ConcurrentUpload) -> None: chunk_files = (open(os.path.join(chunks_dir, chunk_file), "rb") for chunk_file in chunk_filenames) stream = BinaryStreamsConcatenation(chunk_files) except OSError as ex: - raise StorageError("Failed to complete multipart upload for {}".format(upload.key)) from ex + raise ConcurrentUploadError("Failed to complete multipart upload for {}".format(upload.key)) from ex self.store_file_object( upload.key, stream, # type: ignore[arg-type] @@ -313,7 +316,7 @@ def abort_concurrent_upload(self, upload: ConcurrentUpload) -> None: try: shutil.rmtree(chunks_dir) except OSError as ex: - raise StorageError("Failed to abort multipart upload for {}".format(upload.key)) from ex + raise ConcurrentUploadError("Failed to abort multipart upload for {}".format(upload.key)) from ex def _get_chunks_dir(self, upload: ConcurrentUpload) -> str: return self.format_key_for_backend(".concurrent_upload_" + upload.backend_id) diff --git a/rohmu/object_storage/s3.py b/rohmu/object_storage/s3.py index 99313c26..1010c3e6 100644 --- a/rohmu/object_storage/s3.py +++ b/rohmu/object_storage/s3.py @@ -10,7 +10,7 @@ from ..common.models import ProxyInfo, StorageModel, StorageOperation from ..common.statsd import StatsdConfig -from ..errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError +from ..errors import ConcurrentUploadError, FileNotFoundFromStorageError, InvalidConfigurationError, StorageError from ..notifier.interface import Notifier from ..typing import Metadata from .base import ( @@ -597,7 +597,7 @@ def create_concurrent_upload( try: cmu_response = self.s3_client.create_multipart_upload(**args) except botocore.exceptions.ClientError as ex: - raise StorageError("Failed to initiate multipart upload for {}".format(path)) from ex + raise ConcurrentUploadError("Failed to initiate multipart upload for {}".format(path)) from ex return ConcurrentUpload("aws", cmu_response["UploadId"], key, metadata, {}) @@ -617,7 +617,7 @@ def complete_concurrent_upload(self, upload: ConcurrentUpload) -> None: RequestPayer="requester", ) except botocore.exceptions.ClientError as ex: - raise StorageError("Failed to complete multipart upload for {}".format(upload.key)) from ex + raise ConcurrentUploadError("Failed to complete multipart upload for {}".format(upload.key)) from ex def abort_concurrent_upload(self, upload: ConcurrentUpload) -> None: backend_key = self.format_key_for_backend(upload.key, remove_slash_prefix=True) @@ -630,7 +630,7 @@ def abort_concurrent_upload(self, upload: ConcurrentUpload) -> None: RequestPayer="requester", ) except botocore.exceptions.ClientError as ex: - raise StorageError("Failed to abort multipart upload for {}".format(upload.key)) from ex + raise ConcurrentUploadError("Failed to abort multipart upload for {}".format(upload.key)) from ex def upload_concurrent_chunk( self, @@ -661,7 +661,7 @@ def upload_concurrent_chunk( self.stats.operation(StorageOperation.store_file, size=body.bytes_read) upload.chunks_to_etags[chunk_number] = response["ETag"] except botocore.exceptions.ClientError as ex: - raise StorageError( + raise ConcurrentUploadError( "Failed to upload chunk {} of multipart upload for {}".format(chunk_number, upload.key) ) from ex