Skip to content

Commit

Permalink
added folder_transform
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Oct 10, 2024
1 parent efc1162 commit 47f4526
Show file tree
Hide file tree
Showing 11 changed files with 168 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

from data_processing.data_access import DataAccessFactoryBase
from data_processing.runtime import AbstractTransformFileProcessor
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.transform import AbstractTransform, TransformStatistics
from data_processing.utils import UnrecoverableException


Expand All @@ -28,19 +28,22 @@ def __init__(
data_access_factory: DataAccessFactoryBase,
statistics: TransformStatistics,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool,
):
"""
Init method
:param data_access_factory - data access factory
:param statistics - reference to statistics class
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder transform flag
"""
# invoke superclass
super().__init__(
data_access_factory=data_access_factory,
transform_parameters=dict(transform_params),
is_folder=is_folder,
)
self.transform_params["statistics"] = statistics
# Create local processor
Expand All @@ -52,7 +55,8 @@ def __init__(
# Create statistics
self.stats = statistics

def _publish_stats(self, stats: dict[str, Any]) -> None:

def _publish_stats(self, stats: dict[str, Any]) -> None:
self.stats.add_stats(stats)


Expand All @@ -65,17 +69,20 @@ def __init__(
self,
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool
):
"""
Init method
:param data_access_factory - data access factory
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder tranform flag
"""
super().__init__(
data_access_factory=data_access_factory,
transform_parameters=dict(transform_params),
is_folder=is_folder,
)
# Add data access and statistics to the processor parameters
self.transform_params["data_access"] = self.data_access
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
PythonTransformFileProcessor,
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractBinaryTransform, TransformStatistics
from data_processing.transform import AbstractBinaryTransform, TransformStatistics, AbstractFolderTransform
from data_processing.utils import GB, get_logger


Expand All @@ -48,8 +48,6 @@ def _execution_resources() -> dict[str, Any]:
"object_store": 0,
}



def orchestrate(
data_access_factory: DataAccessFactoryBase,
runtime_config: PythonTransformRuntimeConfiguration,
Expand All @@ -74,15 +72,21 @@ def orchestrate(
return 1
# create additional execution parameters
runtime = runtime_config.create_transform_runtime()
is_folder = issubclass(runtime_config.get_transform_class(), AbstractFolderTransform)
try:
# Get files to process
files, profile, retries = data_access.get_files_to_process()
if len(files) == 0:
logger.error("No input files to process - exiting")
return 0
if retries > 0:
statistics.add_stats({"data access retries": retries})
logger.info(f"Number of files is {len(files)}, source profile {profile}")
if is_folder:
# folder transform
files = AbstractFolderTransform.get_folders(data_access=data_access)
logger.info(f"Number of folders is {len(files)}")
else:
# Get files to process
files, profile, retries = data_access.get_files_to_process()
if len(files) == 0:
logger.error("No input files to process - exiting")
return 0
if retries > 0:
statistics.add_stats({"data access retries": retries})
logger.info(f"Number of files is {len(files)}, source profile {profile}")
# Print interval
print_interval = int(len(files) / 100)
if print_interval == 0:
Expand All @@ -99,6 +103,7 @@ def orchestrate(
data_access_factory=data_access_factory, statistics=statistics, files=files
),
transform_class=runtime_config.get_transform_class(),
is_folder=is_folder,
)
else:
# using sequential execution
Expand All @@ -111,6 +116,7 @@ def orchestrate(
data_access_factory=data_access_factory, statistics=statistics, files=files
),
transform_class=runtime_config.get_transform_class(),
is_folder=is_folder,
)
status = "success"
return_code = 0
Expand Down Expand Up @@ -157,7 +163,8 @@ def _process_transforms(
data_access_factory: DataAccessFactoryBase,
statistics: TransformStatistics,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
transform_class: type[AbstractTransform],
is_folder: bool,
) -> None:
"""
Process transforms sequentially
Expand All @@ -167,16 +174,16 @@ def _process_transforms(
:param data_access_factory: data access factory
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder transform flag
:return: metadata for the execution
:return: None
"""
# create executor
executor = PythonTransformFileProcessor(
data_access_factory=data_access_factory,
statistics=statistics,
transform_params=transform_params,
transform_class=transform_class,
is_folder=is_folder,
)
# process data
t_start = time.time()
Expand All @@ -203,6 +210,7 @@ def _process_transforms_multiprocessor(
data_access_factory: DataAccessFactoryBase,
transform_params: dict[str, Any],
transform_class: type[AbstractBinaryTransform],
is_folder: bool
) -> TransformStatistics:
"""
Process transforms using multiprocessing pool
Expand All @@ -212,13 +220,17 @@ def _process_transforms_multiprocessor(
:param data_access_factory: data access factory
:param transform_params - transform parameters
:param transform_class: transform class
:param is_folder: folder transform class
:return: metadata for the execution
"""
# result statistics
statistics = TransformStatistics()
# create processor
processor = PythonPoolTransformFileProcessor(
data_access_factory=data_access_factory, transform_params=transform_params, transform_class=transform_class
data_access_factory=data_access_factory,
transform_params=transform_params,
transform_class=transform_class,
is_folder=is_folder,
)
completed = 0
t_start = time.time()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,13 @@ def __init__(
self,
data_access_factory: DataAccessFactoryBase,
transform_parameters: dict[str, Any],
is_folder: bool = False,
):
"""
Init method
:param data_access_factory: Data Access Factory
:param transform_parameters: Transform parameters
:param is_folder: folder transform flag
"""
self.logger = get_logger(__name__)
# validate parameters
Expand All @@ -46,6 +48,7 @@ def __init__(
# Add data access and statistics to the processor parameters
self.transform_params = transform_parameters
self.transform_params["data_access"] = self.data_access
self.is_folder = is_folder

def process_file(self, f_name: str) -> None:
"""
Expand All @@ -58,25 +61,29 @@ def process_file(self, f_name: str) -> None:
self.logger.warning("No data_access found. Returning.")
return
t_start = time.time()
# Read source file
filedata, retries = self.data_access.get_file(path=f_name)
if retries > 0:
self._publish_stats({"data access retries": retries})
if filedata is None:
self.logger.warning(f"File read resulted in None for {f_name}. Returning.")
self._publish_stats({"failed_reads": 1})
return
self._publish_stats({"source_files": 1, "source_size": len(filedata)})
if not self.is_folder:
# Read source file only if we are processing file
filedata, retries = self.data_access.get_file(path=f_name)
if retries > 0:
self._publish_stats({"data access retries": retries})
if filedata is None:
self.logger.warning(f"File read resulted in None for {f_name}. Returning.")
self._publish_stats({"failed_reads": 1})
return
self._publish_stats({"source_files": 1, "source_size": len(filedata)})
# Process input file
try:
# execute local processing
name_extension = TransformUtils.get_file_extension(f_name)
self.logger.debug(f"Begin transforming file {f_name}")
out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata)
if not self.is_folder:
# execute local processing
out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata)
name_extension = TransformUtils.get_file_extension(f_name)
self.last_file_name = name_extension[0]
self.last_file_name_next_index = None
self.last_extension = name_extension[1]
else:
out_files, stats = self.transform.transform(folder_name=f_name)
self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files")
self.last_file_name = name_extension[0]
self.last_file_name_next_index = None
self.last_extension = name_extension[1]
# save results
self._submit_file(t_start=t_start, out_files=out_files, stats=stats)
# Process unrecoverable exceptions
Expand All @@ -95,10 +102,10 @@ def flush(self) -> None:
the hook for them to return back locally stored data and their statistics.
:return: None
"""
if self.last_file_name is None:
if self.last_file_name is None or self.is_folder:
# for some reason a given worker never processed anything. Happens in testing
# when the amount of workers is greater than the amount of files
self.logger.debug("skipping flush, no name for file is defined")
self.logger.debug("skipping flush, no name for file is defined or this is a folder transform")
return
try:
t_start = time.time()
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
from data_processing.transform.abstract_transform import AbstractTransform
from data_processing.transform.folder_transform import AbstractFolderTransform
from data_processing.transform.binary_transform import AbstractBinaryTransform
from data_processing.transform.table_transform import AbstractTableTransform
from data_processing.transform.transform_statistics import TransformStatistics
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

class AbstractTransform:
"""
Base class for all transform types
"""
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,11 @@
# limitations under the License.
################################################################################

from typing import Any, TypeVar
from typing import Any
from data_processing.transform import AbstractTransform


class AbstractBinaryTransform:
class AbstractBinaryTransform(AbstractTransform):
"""
Converts input binary file to output file(s) (binary)
Sub-classes must provide the transform() method to provide the conversion of one binary files to 0 or
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import Any
from data_processing.data_access import data_access
from data_processing.transform import AbstractTransform


class AbstractFolderTransform(AbstractTransform):
"""
Converts input folder to output file(s) (binary)
Sub-classes must provide the transform() method to provide the conversion of a folder to 0 or
more new binary files and metadata.
"""

def __init__(self, config: dict[str, Any]):
"""
Initialize based on the dictionary of configuration information.
This simply stores the given instance in this instance for later use.
"""
self.config = config

def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Converts input folder into o or more output files.
If there is an error, an exception must be raised - exit()ing is not generally allowed.
:param folder_name: the name of the folder containing arbitrary amount of files.
:return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated
to metadata. Each element of the return list, is a tuple of the transformed bytes and a string
holding the extension to be used when writing out the new bytes.
"""
raise NotImplemented()

@staticmethod
def get_folders(data_access:data_access) -> list(str):
"""
Compute the list of folders to use.
:param data_access - data access class
:return:
"""
raise NotImplemented()
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ def __init__(self, params: dict[str, Any]):
super().__init__(
data_access_factory=params.get("data_access_factory", None),
transform_parameters=dict(params.get("transform_params", {})),
is_folder=params.get("is_folder", False)
)
# Create statistics
self.stats = params.get("statistics", None)
Expand Down
Loading

0 comments on commit 47f4526

Please sign in to comment.