Skip to content

Commit

Permalink
Merge pull request #272 from IBM/log-number-completed
Browse files Browse the repository at this point in the history
extended logging to print % and number processed files
  • Loading branch information
daw3rd authored Jun 17, 2024
2 parents 601efe4 + 10b43b2 commit 8bebde5
Show file tree
Hide file tree
Showing 80 changed files with 605 additions and 541 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ def list_files(self, key: str) -> tuple[list[dict[str, Any]], int]:
retries = 0
for page in pages:
# For every page
retries += page.get('ResponseMetadata', {}).get('RetryAttempts', 0)
retries += page.get("ResponseMetadata", {}).get("RetryAttempts", 0)
for obj in page.get("Contents", []):
# Get both file name and size
files.append({"name": f"{bucket}/{obj['Key']}", "size": obj["Size"]})
Expand All @@ -106,7 +106,7 @@ def _get_sub_folders(bck: str, p: str) -> tuple[list[str], int]:
internal_retries = 0
for page in page_iterator:
# for every page
internal_retries += page.get('ResponseMetadata', {}).get('RetryAttempts', 0)
internal_retries += page.get("ResponseMetadata", {}).get("RetryAttempts", 0)
for p in page.get("CommonPrefixes", []):
sub_folders.append(p["Prefix"])
# apply recursively
Expand All @@ -130,7 +130,7 @@ def read_file(self, key: str) -> tuple[bytes, int]:
for n in range(self.retries):
try:
obj = self.s3_client.get_object(Bucket=bucket, Key=prefix)
retries += obj.get('ResponseMetadata', {}).get('RetryAttempts', 0)
retries += obj.get("ResponseMetadata", {}).get("RetryAttempts", 0)
return obj["Body"].read(), retries
except Exception as e:
logger.error(f"failed to read file {key}, exception {e}, attempt {n}")
Expand All @@ -152,7 +152,7 @@ def save_file(self, key: str, data: bytes) -> tuple[dict[str, Any], int]:
for n in range(self.retries):
try:
res = self.s3_client.put_object(Bucket=bucket, Key=prefix, Body=data)
retries += res.get('ResponseMetadata', {}).get('RetryAttempts', 0)
retries += res.get("ResponseMetadata", {}).get("RetryAttempts", 0)
return res, retries
except Exception as e:
logger.error(f"Failed to upload file to to key {key}, exception {e}")
Expand Down Expand Up @@ -201,7 +201,7 @@ def delete_file(self, key: str) -> int:
for n in range(self.retries):
try:
res = self.s3_client.delete_object(Bucket=bucket, Key=prefix)
retries += res.get('ResponseMetadata', {}).get('RetryAttempts', 0)
retries += res.get("ResponseMetadata", {}).get("RetryAttempts", 0)
return retries
except Exception as e:
logger.error(f"failed to delete file {key}, exception {e}")
Expand All @@ -223,7 +223,7 @@ def move_file(self, source: str, dest: str) -> int:
for n in range(self.retries):
try:
res = self.s3_client.copy_object(CopySource=copy_source, Bucket=d_bucket, Key=d_prefix)
retries += res.get('ResponseMetadata', {}).get('RetryAttempts', 0)
retries += res.get("ResponseMetadata", {}).get("RetryAttempts", 0)
retries += self.delete_file(source)
return retries
except Exception as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,9 @@ def get_file(self, path: str) -> tuple[bytes, int]:
"""
pass

def get_folder_files(self, path: str, extensions: list[str] = None, return_data: bool = True) \
-> tuple[dict[str, bytes], int]:
def get_folder_files(
self, path: str, extensions: list[str] = None, return_data: bool = True
) -> tuple[dict[str, bytes], int]:
"""
Get a list of byte content of files. The path here is an absolute path and can be anywhere.
The current limitation for S3 and Lakehouse is that it has to be in the same bucket
Expand Down Expand Up @@ -235,8 +236,12 @@ def sample_input_data(self, n_samples: int = 10) -> tuple[dict[str, Any], int]:
# compute number of docs
number_of_docs = av_number_docs * len(path_list)
logger.info(f"Estimated number of docs {number_of_docs}")
return path_profile | {
"average table size MB": av_table_size,
"average doc size KB": av_doc_size,
"estimated number of docs": number_of_docs,
}, retries
return (
path_profile
| {
"average table size MB": av_table_size,
"average doc size KB": av_doc_size,
"estimated number of docs": number_of_docs,
},
retries,
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from data_processing.data_access import DataAccess
from data_processing.utils import TransformUtils, GB, MB, get_logger
from data_processing.utils import GB, MB, TransformUtils, get_logger


logger = get_logger(__name__)
Expand Down Expand Up @@ -366,8 +366,9 @@ def get_file(self, path: str) -> tuple[bytes, int]:
logger.error(f"Error reading file {path}: {e}")
raise e

def get_folder_files(self, path: str, extensions: list[str] = None, return_data: bool = True) \
-> tuple[dict[str, bytes], int]:
def get_folder_files(
self, path: str, extensions: list[str] = None, return_data: bool = True
) -> tuple[dict[str, bytes], int]:
"""
Get a list of byte content of files. The path here is an absolute path and can be anywhere.
The current limitation for S3 and Lakehouse is that it has to be in the same bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,11 @@ def __init__(
:param files_to_use: files extensions of files to include
:param files_to_checkpoint: files extensions of files to use for checkpointing
"""
if (s3_credentials is None or s3_credentials.get("access_key", None) is None
or s3_credentials.get("secret_key", None) is None):
if (
s3_credentials is None
or s3_credentials.get("access_key", None) is None
or s3_credentials.get("secret_key", None) is None
):
raise "S3 credentials is not defined"
self.s3_credentials = s3_credentials
if s3_config is None:
Expand Down Expand Up @@ -130,11 +133,15 @@ def _get_files_folder(
if max_file_size < size:
max_file_size = size
i += 1
return p_list, {
"max_file_size": max_file_size / MB,
"min_file_size": min_file_size / MB,
"total_file_size": total_input_file_size / MB,
}, retries
return (
p_list,
{
"max_file_size": max_file_size / MB,
"min_file_size": min_file_size / MB,
"total_file_size": total_input_file_size / MB,
},
retries,
)

def _get_input_files(
self,
Expand All @@ -153,11 +160,15 @@ def _get_input_files(
"""
if not self.checkpoint:
return self._get_files_folder(
path=input_path, files_to_use=self.files_to_use, cm_files=cm_files,
min_file_size=min_file_size, max_file_size=max_file_size
path=input_path,
files_to_use=self.files_to_use,
cm_files=cm_files,
min_file_size=min_file_size,
max_file_size=max_file_size,
)
pout_list, _, retries1 = self._get_files_folder(path=output_path,
files_to_use=self.files_to_checkpoint, cm_files=-1)
pout_list, _, retries1 = self._get_files_folder(
path=output_path, files_to_use=self.files_to_checkpoint, cm_files=-1
)
output_base_names_ext = [file.replace(self.output_folder, self.input_folder) for file in pout_list]
# In the case of binary transforms, an extension can be different, so just use the file names.
# Also remove duplicates
Expand All @@ -182,11 +193,15 @@ def _get_input_files(
if max_file_size < size:
max_file_size = size
i += 1
return p_list, {
"max_file_size": max_file_size / MB,
"min_file_size": min_file_size / MB,
"total_file_size": total_input_file_size / MB,
}, retries
return (
p_list,
{
"max_file_size": max_file_size / MB,
"min_file_size": min_file_size / MB,
"total_file_size": total_input_file_size / MB,
},
retries,
)

def get_files_to_process_internal(self) -> tuple[list[str], dict[str, float], int]:
"""
Expand Down Expand Up @@ -316,8 +331,9 @@ def get_file(self, path: str) -> tuple[bytes, int]:
filedata = gzip.decompress(filedata)
return filedata, retries

def get_folder_files(self, path: str, extensions: list[str] = None, return_data: bool = True) \
-> tuple[dict[str, bytes], int]:
def get_folder_files(
self, path: str, extensions: list[str] = None, return_data: bool = True
) -> tuple[dict[str, bytes], int]:
"""
Get a list of byte content of files. The path here is an absolute path and can be anywhere.
The current limitation for S3 and Lakehouse is that it has to be in the same bucket
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,11 @@ def orchestrate(
executor.process_file(path)
completed += 1
if completed % print_interval == 0:
logger.info(f"Completed {completed} files in {(time.time() - t_start)/60} min")
logger.debug("Done processing files, waiting for flush() completion.")
logger.info(
f"Completed {completed} files ({100 * completed / len(files)}%) "
f"in {(time.time() - t_start)/60} min"
)
logger.debug(f"Done processing {completed} files, waiting for flush() completion.")
# invoke flush to ensure that all results are returned
start = time.time()
executor.flush()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,12 @@ def validate_expected_files(files_list: list[tuple[bytes, str]], expected_files_
for i in range(l1):
f1 = files_list[i]
f2 = expected_files_list[i]
assert f1[1] == f2[1], f"produced file extension {f1[1]} is different from expected file extension {f2[1]}"
assert (len(f1[0]) - len(f2[0])) < 50, \
f"produced file length {len(f1[0])} is different from expected file extension {len(f2[0])}"
assert (
f1[1] == f2[1]
), f"produced file extension {f1[1]} is different from expected file extension {f2[1]}"
assert (
len(f1[0]) - len(f2[0])
) < 50, f"produced file length {len(f1[0])} is different from expected file extension {len(f2[0])}"

@staticmethod
def validate_expected_metadata_lists(metadata: list[dict[str, float]], expected_metadata: list[dict[str, float]]):
Expand Down Expand Up @@ -184,7 +187,9 @@ def _validate_table_files(parquet1: str, parquet2: str, drop_columns: list[str]
AbstractTest.validate_expected_tables([t1], [t2])

@staticmethod
def __confirm_diffs(src_dir: str, expected_dir: str, diff_files: list, dest_dir: str, drop_columns: list[str] = []):
def __confirm_diffs(
src_dir: str, expected_dir: str, diff_files: list, dest_dir: str, drop_columns: list[str] = []
):
"""
Copy all files from the source dir to the dest dir.
:param src_dir:
Expand Down Expand Up @@ -213,5 +218,8 @@ def __confirm_diffs(src_dir: str, expected_dir: str, diff_files: list, dest_dir:
da = DataAccessLocal()
f1_bytes = da.get_file(src)
f2_bytes = da.get_file(dest)
assert (len(f1_bytes) - len(f2_bytes)) < 50, \
assert (
len(f1_bytes) - len(f2_bytes)
) < 50, (
f"produced file length {len(f1_bytes)} is different from expected file extension {len(f2_bytes)}"
)
Original file line number Diff line number Diff line change
Expand Up @@ -29,22 +29,20 @@ class AbstractTransformLauncherTest(AbstractTest):
"""

@staticmethod
def _get_argv(
cli_params: dict[str, Any], in_table_path: str, out_table_path: str
):
def _get_argv(cli_params: dict[str, Any], in_table_path: str, out_table_path: str):
args = {} | cli_params
local_ast = {"input_folder": in_table_path, "output_folder": out_table_path}
args["data_local_config"] = local_ast
argv = ParamsUtils.dict_to_req(args)
return argv

def test_transform(
self,
launcher: AbstractTransformLauncher,
cli_params: dict[str, Any],
in_table_path: str,
expected_out_table_path: str,
ignore_columns: list[str],
self,
launcher: AbstractTransformLauncher,
cli_params: dict[str, Any],
in_table_path: str,
expected_out_table_path: str,
ignore_columns: list[str],
):
"""
Test the given transform and its runtime using the given CLI arguments, input directory of data files and expected output directory.
Expand Down Expand Up @@ -72,11 +70,11 @@ def _validate_directory_contents_match(self, dir: str, expected: str, ignore_col
def _install_test_fixtures(self, metafunc):
# Apply the fixtures for the method with these input names (i.e. test_transform()).
if (
"launcher" in metafunc.fixturenames
and "cli_params" in metafunc.fixturenames
and "in_table_path" in metafunc.fixturenames
and "expected_out_table_path" in metafunc.fixturenames
and "ignore_columns" in metafunc.fixturenames
"launcher" in metafunc.fixturenames
and "cli_params" in metafunc.fixturenames
and "in_table_path" in metafunc.fixturenames
and "expected_out_table_path" in metafunc.fixturenames
and "ignore_columns" in metafunc.fixturenames
):
# Let the sub-class define the specific tests and test data for the transform under test.
fixtures = self.get_test_transform_fixtures()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@
from .noop_transform import (
NOOPTransform,
NOOPPythonTransformConfiguration,
)
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

from typing import Tuple


from data_processing.test_support.abstract_test import AbstractTest
from data_processing.transform import AbstractBinaryTransform

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,22 +28,22 @@ class AbstractTableTransformTest(AbstractTest):
def _install_test_fixtures(self, metafunc):
# Apply the fixtures for the method with these input names (i.e. test_transform()).
if (
"transform" in metafunc.fixturenames
and "in_table_list" in metafunc.fixturenames
and "expected_table_list" in metafunc.fixturenames
and "expected_metadata_list" in metafunc.fixturenames
"transform" in metafunc.fixturenames
and "in_table_list" in metafunc.fixturenames
and "expected_table_list" in metafunc.fixturenames
and "expected_metadata_list" in metafunc.fixturenames
):
# Let the sub-class define the specific tests and test data for the transform under test.
f = self.get_test_transform_fixtures()
# Install the fixture, matching the parameter names used by test_transform() method.
metafunc.parametrize("transform,in_table_list,expected_table_list,expected_metadata_list", f)

def test_transform(
self,
transform: AbstractTableTransform,
in_table_list: list[pa.Table],
expected_table_list: list[pa.Table],
expected_metadata_list: list[dict[str, float]],
self,
transform: AbstractTableTransform,
in_table_list: list[pa.Table],
expected_table_list: list[pa.Table],
expected_metadata_list: list[dict[str, float]],
):
"""
Use the given transform to transform() the given table(s) and compare the results (list of tables and metadata)
Expand Down
Loading

0 comments on commit 8bebde5

Please sign in to comment.