Skip to content

Commit

Permalink
refactoring of data access code (#592)
Browse files Browse the repository at this point in the history
* refactoring of data access code

* small bug fix in transform file processor

* small bug fixes
  • Loading branch information
blublinsky authored Sep 18, 2024
1 parent a688bda commit 5d56398
Show file tree
Hide file tree
Showing 8 changed files with 363 additions and 502 deletions.
6 changes: 5 additions & 1 deletion data-processing-lib/doc/data-access-factory.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,15 @@ With this in mind, the following function is provided:
* input folder
* sub-directory selection (aka data sets))
* file extension
* files extensions to checkpoint
* maximum count
* random sampling
* Output file identification (for a given input)
* Checkpointing - determines the set of input files that need processing
(i.e. which do not have corresponding output files).
(i.e. which do not have corresponding output files). In the case of parquet files, where
inputs and outputs are parquet this comparison is fairly simple. In the case of binary
files it is a little bit more involved as input and output files may have different extensions.
in this case you need to specify both `files extensions` and `files extensions to checkpoint`
* Reading and writing of files.

Each transform runtime uses a DataAccessFactory to create a DataAccess instance which
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,29 +95,27 @@ def list_folders(self, key: str) -> tuple[list[str], int]:
:param key: complete folder
:return: list of folders within a given folder and number of retries
"""
bucket, prefix = self._get_bucket_key(key)

def _get_sub_folders(bck: str, p: str) -> tuple[list[str], int]:
sub_folders = []
# use paginator
paginator = self.s3_client.get_paginator("list_objects_v2")
# use Delimiter to get folders just folders
page_iterator = paginator.paginate(Bucket=bck, Prefix=p, Delimiter="/")
sub_folders = []
internal_retries = 0
for page in page_iterator:
# for every page
internal_retries += page.get("ResponseMetadata", {}).get("RetryAttempts", 0)
for p in page.get("CommonPrefixes", []):
sub_folders.append(p["Prefix"])
sf = p["Prefix"]
sub_folders.append(sf)
# apply recursively
sf, r = _get_sub_folders(bck, p["Prefix"])
sf, r = _get_sub_folders(bck=bck, p=sf)
internal_retries += r
sub_folders.extend(sf)
return sub_folders, internal_retries

prefixes, retries = _get_sub_folders(bck=bucket, p=prefix)
# remove base prefix
return [p.removeprefix(prefix) for p in prefixes], retries
bucket, prefix = self._get_bucket_key(key)
subs, retries = _get_sub_folders(bck=bucket, p=prefix)
return [f"{bucket}/{f}" for f in subs], retries

def read_file(self, key: str) -> tuple[bytes, int]:
"""
Expand Down
Loading

0 comments on commit 5d56398

Please sign in to comment.