Skip to content

Commit

Permalink
concurrent-upload,local: initial implementation of the concurrent upl…
Browse files Browse the repository at this point in the history
…oad API for local storage
  • Loading branch information
giacomo-alzetta-aiven committed Jul 19, 2023
1 parent b8192f2 commit 1dfcbde
Show file tree
Hide file tree
Showing 4 changed files with 451 additions and 11 deletions.
196 changes: 188 additions & 8 deletions rohmu/object_storage/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,26 +5,33 @@
Copyright (c) 2022 Aiven, Helsinki, Finland. https://aiven.io/
See LICENSE for details
"""
from ..common.models import StorageModel

from __future__ import annotations

from ..common.models import StorageModel, StorageOperation
from ..common.statsd import StatsdConfig
from ..errors import FileNotFoundFromStorageError
from ..errors import FileNotFoundFromStorageError, StorageError, UninitializedError
from ..notifier.interface import Notifier
from ..typing import Metadata
from ..util import BinaryStreamsConcatenation
from .base import (
BaseTransfer,
ConcurrentUpload,
IncrementalProgressCallbackType,
IterKeyItem,
KEY_TYPE_OBJECT,
KEY_TYPE_PREFIX,
ProgressProportionCallbackType,
)
from pathlib import Path
from typing import Any, BinaryIO, Iterator, Optional, TextIO, Tuple, Union
from typing import Any, BinaryIO, Iterable, Iterator, Optional, TextIO, Tuple, Union

import base64
import contextlib
import datetime
import hashlib
import json
import logging
import os
import shutil
import tempfile
Expand All @@ -37,22 +44,31 @@
class Config(StorageModel):
directory: str
prefix: Optional[str] = None
concurrent_upload_directory: Optional[str] = None


class LocalTransfer(BaseTransfer[Config]): # pylint: disable=abstract-method
config_model = Config

is_thread_safe = True
supports_concurrent_upload = True

def __init__(
self,
directory: Union[str, Path],
prefix: Optional[str] = None,
notifier: Optional[Notifier] = None,
statsd_info: Optional[StatsdConfig] = None,
concurrent_upload_directory: Optional[str] = None,
) -> None:
prefix = os.path.join(directory, (prefix or "").strip("/"))
super().__init__(prefix=prefix, notifier=notifier, statsd_info=statsd_info)
# NOTE: I don't want to break the interface for existing clients, so if they don't have the configuration
# we just use a random directory... I assume that they won't be using the concurrent upload functionality
# so we shouldn't leave around stuff. When the clients are updated to use the concurrent upload functionality
# they should really configure this setting.
self._concurrent_upload_directory = concurrent_upload_directory or tempfile.mkdtemp(prefix="rohmu_mpu_")
self._mpu_cache: dict[str, LocalConcurrentUpload] = {}
self.log.debug("LocalTransfer initialized")

def copy_file(
Expand Down Expand Up @@ -209,7 +225,7 @@ def get_file_size(self, key: str) -> int:

def _save_metadata(self, target_path: str, metadata: Optional[Metadata]) -> None:
metadata_path = target_path + ".metadata"
with atomic_create_file(metadata_path) as fp:
with atomic_create_file(metadata_path, suffix=".metadata_tmp") as fp:
json.dump(self.sanitize_metadata(metadata), fp)

def store_file_object(
Expand Down Expand Up @@ -244,19 +260,183 @@ def store_file_object(
key=key, size=os.path.getsize(target_path), metadata=self.sanitize_metadata(self._filter_metadata(metadata))
)

def create_concurrent_upload(self, key: str, metadata: Optional[Metadata] = None) -> ConcurrentUpload:
upload = LocalConcurrentUpload(
transfer=self,
concurrent_upload_directory=self._concurrent_upload_directory,
key=key,
metadata=self.sanitize_metadata(metadata) if metadata is not None else None,
)
upload.start()
self._mpu_cache[upload.upload_id] = upload
return upload

def get_concurrent_upload(self, upload_id: str) -> ConcurrentUpload:
try:
return self._mpu_cache[upload_id]
except KeyError:
pass

info = json.loads(base64.b64decode(upload_id.encode("ascii")))
if info.pop("cloud") != "local":
raise StorageError("Upload {} is not for local".format(upload_id))
local_upload_id = info.pop("upload_id")
upload = LocalConcurrentUpload(transfer=self, **info)
upload.resume(local_upload_id)
self._mpu_cache[upload_id] = upload
return upload


@contextlib.contextmanager
def atomic_create_file(file_path: str) -> Iterator[TextIO]:
"""Open a temporary file for writing, rename to final name when done"""
def atomic_create_file(file_path: str, suffix: Optional[str] = None) -> Iterator[TextIO]:
"""Open a temporary file for writing in text mode, rename to final name when done"""
fd, tmp_file_path = tempfile.mkstemp(prefix=os.path.basename(file_path), dir=os.path.dirname(file_path), suffix=suffix)
try:
with os.fdopen(fd, "w") as out_file:
yield out_file

os.rename(tmp_file_path, file_path)
except Exception: # pytest: disable=broad-except
with contextlib.suppress(Exception):
os.unlink(tmp_file_path)
raise


@contextlib.contextmanager
def atomic_create_file_binary(file_path: str, suffix: Optional[str] = None) -> Iterator[BinaryIO]:
"""Open a temporary file for writing in text mode, rename to final name when done"""
fd, tmp_file_path = tempfile.mkstemp(
prefix=os.path.basename(file_path), dir=os.path.dirname(file_path), suffix=".metadata_tmp"
prefix=os.path.basename(file_path), dir=os.path.dirname(file_path), suffix=suffix, text=False
)
try:
with os.fdopen(fd, "w") as out_file:
with os.fdopen(fd, "wb") as out_file:
yield out_file

os.rename(tmp_file_path, file_path)
except Exception: # pytest: disable=broad-except
with contextlib.suppress(Exception):
os.unlink(tmp_file_path)
raise


class LocalConcurrentUpload:
def __init__(
self,
*,
transfer: LocalTransfer,
key: str,
concurrent_upload_directory: Optional[str] = None,
metadata: Optional[dict[str, str]] = None,
) -> None:
self.log = logging.getLogger(LocalConcurrentUpload.__name__)
self.transfer = transfer
self.concurrent_upload_directory = concurrent_upload_directory
self.key = key
self.metadata = metadata
self._upload_tmp_dir = "<uninitialized>"
self._started = False
self._completed = False
self._aborted = False

def _check_started(self) -> None:
if not self._started:
raise UninitializedError("Upload is not initialized")

def _check_not_started(self) -> None:
if self._started:
raise StorageError("Upload {} for {} was already started".format(self._upload_tmp_dir, self.key))

def _check_not_finished(self) -> None:
if self._completed or self._aborted:
raise StorageError("Upload {} for {} was already completed or aborted".format(self._upload_tmp_dir, self.key))

@property
def upload_id(self) -> str:
self._check_started()
info = {
"cloud": "local",
"upload_id": self._upload_tmp_dir,
"key": self.key,
}
return base64.b64encode(json.dumps(info).encode("ascii")).decode("ascii")

def resume(self, local_upload_id: str) -> None:
self._check_not_started()
self.log.debug("Resuming to upload multipart file: %r", self.key)
self._upload_tmp_dir = local_upload_id
self._started = True

def start(self) -> None:
self._check_not_started()
if self.concurrent_upload_directory is None:
raise UninitializedError("You must provide the concurrent_upload_directory to start a new concurrent upload")
self.log.debug("Starting to upload multipart file: %r", self.key)
self.transfer.stats.operation(StorageOperation.create_multipart_upload)
upload_directory = tempfile.mkdtemp(prefix="mpu-", dir=self.concurrent_upload_directory)
os.mkdir(os.path.join(upload_directory, "chunks"))
with atomic_create_file(os.path.join(upload_directory, "metadata"), suffix="metadata_tmp") as fp:
json.dump(self.metadata or {}, fp)

self._upload_tmp_dir = upload_directory
self._started = True

def list_uploaded_chunks(self) -> Iterable[int]:
self._check_started()
mpu_files = os.listdir(os.path.join(self._upload_tmp_dir, "chunks"))

for chunk_filename in sorted(int(chunk_file) for chunk_file in mpu_files):
yield int(chunk_filename)

def upload_chunk(self, chunk_number: int, fd: BinaryIO) -> None:
self._check_started()
self._check_not_finished()
try:
with atomic_create_file_binary(os.path.join(self._upload_tmp_dir, "chunks", str(chunk_number))) as chunk_fp:
for data in iter(lambda: fd.read(CHUNK_SIZE), b""):
chunk_fp.write(data)
except OSError as ex:
raise StorageError("Failed to upload chunk {} of multipart upload for {}".format(chunk_number, self.key)) from ex

def complete(self) -> None:
self._check_started()
if self._completed:
return
elif self._aborted:
raise StorageError("Upload {} for {} was already aborted".format(self._upload_tmp_dir, self.key))
try:
try:
with open(os.path.join(self._upload_tmp_dir, "metadata")) as metadata_fp:
metadata = json.load(metadata_fp)
except FileNotFoundError:
metadata = None
chunks_dir = os.path.join(self._upload_tmp_dir, "chunks")
chunk_filenames = sorted(
(chunk_file for chunk_file in os.listdir(chunks_dir)),
key=int,
)
chunk_files = (open(os.path.join(chunks_dir, chunk_file), "rb") for chunk_file in chunk_filenames)
stream = BinaryStreamsConcatenation(chunk_files)
except OSError as ex:
raise StorageError("Failed to complete multipart upload for {}".format(self.key)) from ex
self.transfer.store_file_object(
self.key,
stream, # type: ignore[arg-type]
metadata=metadata,
)
self._completed = True
try:
shutil.rmtree(self._upload_tmp_dir)
except OSError as ex:
self.log.exception("Could not clean up temporary directory %r", self._upload_tmp_dir)

def abort(self) -> None:
self._check_started()
if self._aborted:
return
elif self._completed:
raise StorageError("Upload {} for {} was already completed".format(self._upload_tmp_dir, self.key))
try:
shutil.rmtree(self._upload_tmp_dir)
self._aborted = True
except OSError as ex:
raise StorageError("Failed to abort multipart upload for {}".format(self.key)) from ex
41 changes: 40 additions & 1 deletion rohmu/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,10 @@
Copyright (c) 2022 Ohmu Ltd
See LICENSE for details
"""
from io import BytesIO
from itertools import islice
from rohmu.typing import HasFileno
from typing import Generator, Iterable, Optional, Tuple, TypeVar, Union
from typing import BinaryIO, Generator, Iterable, Optional, Tuple, TypeVar, Union

import fcntl
import logging
Expand Down Expand Up @@ -71,3 +72,41 @@ def get_total_size_from_content_range(content_range: str) -> Optional[int]:
length = content_range.rsplit("/", 1)[1]
# RFC 9110 section 14.4 specifies that the * can be returned when the total length is unknown
return int(length) if length != "*" else None


class BinaryStreamsConcatenation:
"""Concatenate a sequence of binary streams.
The concatenation only allows for the read() call.
"""

def __init__(self, files: Iterable[BinaryIO]) -> None:
self._iter_files = iter(files)
self._current_file: Optional[BinaryIO] = None

def _read_chunk(self, size: int = -1) -> bytes:
if self._current_file is None:
self._current_file = next(self._iter_files, None)
if self._current_file is None:
return b""
data = self._current_file.read(size) if size > 0 else self._current_file.read()
if not data:
self._current_file.close()
self._current_file = next(self._iter_files, None)
return data

def read(self, size: int = -1) -> bytes:
result = BytesIO()
size_left = size
while True:
chunk = self._read_chunk(size_left)
if not chunk and self._current_file is None:
# we finished reading all files
break
result.write(chunk)
size_left -= len(chunk)
if size > 0 and size_left == 0:
# we finished reading the amount requested
break

return result.getvalue()
Loading

0 comments on commit 1dfcbde

Please sign in to comment.