Skip to content

Commit

Permalink
concurrent-upload: use a custom error instead of generic StorageError
Browse files Browse the repository at this point in the history
  • Loading branch information
giacomo-alzetta-aiven committed Aug 1, 2023
1 parent e34ac4c commit 70d7775
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 10 deletions.
4 changes: 4 additions & 0 deletions rohmu/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,3 +44,7 @@ class InvalidByteRangeError(Error):

class InvalidTransferError(Error):
"""You tried to access a transfer object that you already returned to the pool"""


class ConcurrentUploadError(StorageError):
"""A generic error related to concurrent uploads"""
13 changes: 8 additions & 5 deletions rohmu/object_storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from ..common.models import StorageModel, StorageOperation
from ..common.statsd import StatsdConfig
from ..errors import FileNotFoundFromStorageError, StorageError
from ..errors import ConcurrentUploadError, FileNotFoundFromStorageError
from ..notifier.interface import Notifier
from ..typing import Metadata
from ..util import BinaryStreamsConcatenation, ProgressStream
Expand Down Expand Up @@ -260,7 +260,10 @@ def create_concurrent_upload(
upload_id = uuid.uuid4().hex
upload = ConcurrentUpload("local", upload_id, key, metadata, {})
chunks_dir = self._get_chunks_dir(upload)
os.makedirs(chunks_dir, exist_ok=True)
try:
os.makedirs(chunks_dir, exist_ok=True)
except OSError as ex:
raise ConcurrentUploadError("Failed to initiate multipart upload for {}".format(key)) from ex
self.stats.operation(StorageOperation.create_multipart_upload)
return upload

Expand All @@ -283,7 +286,7 @@ def upload_concurrent_chunk(
self.stats.operation(StorageOperation.store_file, size=bytes_read)
upload.chunks_to_etags[chunk_number] = "no-etag"
except OSError as ex:
raise StorageError(
raise ConcurrentUploadError(
"Failed to upload chunk {} of multipart upload for {}".format(chunk_number, upload.key)
) from ex

Expand All @@ -297,7 +300,7 @@ def complete_concurrent_upload(self, upload: ConcurrentUpload) -> None:
chunk_files = (open(os.path.join(chunks_dir, chunk_file), "rb") for chunk_file in chunk_filenames)
stream = BinaryStreamsConcatenation(chunk_files)
except OSError as ex:
raise StorageError("Failed to complete multipart upload for {}".format(upload.key)) from ex
raise ConcurrentUploadError("Failed to complete multipart upload for {}".format(upload.key)) from ex
self.store_file_object(
upload.key,
stream, # type: ignore[arg-type]
Expand All @@ -313,7 +316,7 @@ def abort_concurrent_upload(self, upload: ConcurrentUpload) -> None:
try:
shutil.rmtree(chunks_dir)
except OSError as ex:
raise StorageError("Failed to abort multipart upload for {}".format(upload.key)) from ex
raise ConcurrentUploadError("Failed to abort multipart upload for {}".format(upload.key)) from ex

def _get_chunks_dir(self, upload: ConcurrentUpload) -> str:
return self.format_key_for_backend(".concurrent_upload_" + upload.backend_id)
Expand Down
10 changes: 5 additions & 5 deletions rohmu/object_storage/s3.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

from ..common.models import ProxyInfo, StorageModel, StorageOperation
from ..common.statsd import StatsdConfig
from ..errors import FileNotFoundFromStorageError, InvalidConfigurationError, StorageError
from ..errors import ConcurrentUploadError, FileNotFoundFromStorageError, InvalidConfigurationError, StorageError
from ..notifier.interface import Notifier
from ..typing import Metadata
from .base import (
Expand Down Expand Up @@ -597,7 +597,7 @@ def create_concurrent_upload(
try:
cmu_response = self.s3_client.create_multipart_upload(**args)
except botocore.exceptions.ClientError as ex:
raise StorageError("Failed to initiate multipart upload for {}".format(path)) from ex
raise ConcurrentUploadError("Failed to initiate multipart upload for {}".format(path)) from ex

return ConcurrentUpload("aws", cmu_response["UploadId"], key, metadata, {})

Expand All @@ -617,7 +617,7 @@ def complete_concurrent_upload(self, upload: ConcurrentUpload) -> None:
RequestPayer="requester",
)
except botocore.exceptions.ClientError as ex:
raise StorageError("Failed to complete multipart upload for {}".format(upload.key)) from ex
raise ConcurrentUploadError("Failed to complete multipart upload for {}".format(upload.key)) from ex

def abort_concurrent_upload(self, upload: ConcurrentUpload) -> None:
backend_key = self.format_key_for_backend(upload.key, remove_slash_prefix=True)
Expand All @@ -630,7 +630,7 @@ def abort_concurrent_upload(self, upload: ConcurrentUpload) -> None:
RequestPayer="requester",
)
except botocore.exceptions.ClientError as ex:
raise StorageError("Failed to abort multipart upload for {}".format(upload.key)) from ex
raise ConcurrentUploadError("Failed to abort multipart upload for {}".format(upload.key)) from ex

def upload_concurrent_chunk(
self,
Expand Down Expand Up @@ -661,7 +661,7 @@ def upload_concurrent_chunk(
self.stats.operation(StorageOperation.store_file, size=body.bytes_read)
upload.chunks_to_etags[chunk_number] = response["ETag"]
except botocore.exceptions.ClientError as ex:
raise StorageError(
raise ConcurrentUploadError(
"Failed to upload chunk {} of multipart upload for {}".format(chunk_number, upload.key)
) from ex

Expand Down

0 comments on commit 70d7775

Please sign in to comment.