Skip to content

Commit

Permalink
added noop testing
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Oct 11, 2024
1 parent 9c3ace7 commit 7091a2e
Show file tree
Hide file tree
Showing 9 changed files with 187 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ def process_file(self, f_name: str) -> None:
self.last_extension = name_extension[1]
else:
out_files, stats = self.transform.transform(folder_name=f_name)
self.last_file_name = f_name
self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files")
# save results
self._submit_file(t_start=t_start, out_files=out_files, stats=stats)
Expand Down Expand Up @@ -148,15 +149,21 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats
)
case 1:
# we have exactly 1 output file
file_ext = out_files[0]
lfn = self.last_file_name
if self.last_file_name_next_index is not None:
lfn = f"{lfn}_{self.last_file_name_next_index}"
output_name = self.data_access.get_output_location(path=f"{lfn}{file_ext[1]}")
if self.is_folder:
# its folder
output_name = out_files[0][1]
dt = out_files[0][0]
else:
file_ext = out_files[0]
lfn = self.last_file_name
if self.last_file_name_next_index is not None:
lfn = f"{lfn}_{self.last_file_name_next_index}"
output_name = self.data_access.get_output_location(path=f"{lfn}{file_ext[1]}")
dt = file_ext[0]
self.logger.debug(
f"Writing transformed file {self.last_file_name}{self.last_extension} to {output_name}"
)
save_res, retries = self.data_access.save_file(path=output_name, data=file_ext[0])
save_res, retries = self.data_access.save_file(path=output_name, data=dt)
if retries > 0:
self._publish_stats({"data access retries": retries})
if save_res is None:
Expand All @@ -166,7 +173,7 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats
self._publish_stats(
{
"result_files": 1,
"result_size": len(file_ext[0]),
"result_size": len(dt),
"processing_time": time.time() - t_start,
}
)
Expand All @@ -183,14 +190,21 @@ def _submit_file(self, t_start: float, out_files: list[tuple[bytes, str]], stats
start_index = 0
count = len(out_files)
for index in range(count):
file_ext = out_files[index]
output_name_indexed = f"{output_file_name}_{start_index + index}{file_ext[1]}"
file_sizes += len(file_ext[0])
self.logger.debug(
f"Writing transformed file {self.last_file_name}{self.last_extension}, {index + 1} "
f"of {count} to {output_name_indexed}"
)
save_res, retries = self.data_access.save_file(path=output_name_indexed, data=file_ext[0])
if self.is_folder:
# its a folder
output_name_indexed = out_files[index][1]
dt = out_files[index][0]
else:
# files
file_ext = out_files[index]
output_name_indexed = f"{output_file_name}_{start_index + index}{file_ext[1]}"
self.logger.debug(
f"Writing transformed file {self.last_file_name}{self.last_extension}, {index + 1} "
f"of {count} to {output_name_indexed}"
)
dt = file_ext[0]
file_sizes += len(dt)
save_res, retries = self.data_access.save_file(path=output_name_indexed, data=dt)
if retries > 0:
self._publish_stats({"data access retries": retries})
if save_res is None:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
from .table_transform_test import AbstractTableTransformTest
from .binary_transform_test import AbstractBinaryTransformTest
from .noop_transform import (
from data_processing.test_support.transform.table_transform_test import AbstractTableTransformTest
from data_processing.test_support.transform.binary_transform_test import AbstractBinaryTransformTest
from data_processing.test_support.transform.noop_transform import (
NOOPTransform,
NOOPPythonTransformConfiguration,
NOOPTransformConfiguration,
NOOPPythonTransformConfiguration
)
from data_processing.test_support.transform.noop_folder_transform import (
NOOPFolderTransform,
NOOPFolderPythonTransformConfiguration
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import time
from typing import Any

from data_processing.data_access import DataAccess
from data_processing.runtime.pure_python import (
PythonTransformLauncher,
PythonTransformRuntimeConfiguration,
DefaultPythonTransformRuntime)
from data_processing.transform import AbstractFolderTransform
from data_processing.utils import get_logger
from data_processing.test_support.transform import NOOPTransformConfiguration


logger = get_logger(__name__)


class NOOPFolderTransform(AbstractFolderTransform):
"""
Implements a simple copy of a pyarrow Table.
"""

def __init__(self, config: dict[str, Any]):
"""
Initialize based on the dictionary of configuration information.
This is generally called with configuration parsed from the CLI arguments defined
by the companion runtime, NOOPTransformRuntime. If running inside the RayMutatingDriver,
these will be provided by that class with help from the RayMutatingDriver.
"""
# Make sure that the param name corresponds to the name used in apply_input_params method
# of NOOPTransformConfiguration class
super().__init__(config)
self.sleep = config.get("sleep_sec", 1)
self.data_access = config.get("data_access")

def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Converts input folder into o or more output files.
If there is an error, an exception must be raised - exit()ing is not generally allowed.
:param folder_name: the name of the folder containing arbitrary amount of files.
:return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated
to metadata. Each element of the return list, is a tuple of the transformed bytes and a string
holding the file name to use.
"""
logger.debug(f"Transforming one folder {folder_name}")
metadata = {}
# get folder files
files, retries = self.data_access.get_folder_files(path=folder_name)
if retries > 0:
metadata |= {"data access retries": retries}
result = [()] * len(files)
index = 0
for name, file in files.items():
result[index] = (file, self.data_access.get_output_location(name))
if self.sleep is not None:
logger.info(f"Sleep for {self.sleep} seconds")
time.sleep(self.sleep)
logger.info("Sleep completed - continue")
index += 1
# Add some sample metadata.
metadata |= {"nfiles": len(files)}
return result, metadata


class NOOPFolderPythonRuntime(DefaultPythonTransformRuntime):
def get_folders(self, data_access: DataAccess) -> list[str]:
"""
Get folders to process
:param data_access: data access
:return: list of folders to process
"""
return [data_access.get_input_folder()]


class NOOPFolderPythonTransformConfiguration(PythonTransformRuntimeConfiguration):
"""
Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=NOOPTransformConfiguration(clazz=NOOPFolderTransform),
runtime_class=NOOPFolderPythonRuntime)


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = PythonTransformLauncher(NOOPFolderPythonTransformConfiguration())
logger.info("Launching noop transform")
launcher.launch()
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from data_processing.runtime.pure_python.runtime_configuration import (
PythonTransformRuntimeConfiguration,
)
from data_processing.transform import AbstractTableTransform, TransformConfiguration
from data_processing.transform import AbstractTableTransform, TransformConfiguration, AbstractTransform
from data_processing.utils import CLIArgumentProvider, get_logger


Expand Down Expand Up @@ -75,10 +75,10 @@ class NOOPTransformConfiguration(TransformConfiguration):
configuration with CLI args.
"""

def __init__(self):
def __init__(self, clazz: type[AbstractTransform] = NOOPTransform):
super().__init__(
name=short_name,
transform_class=NOOPTransform,
transform_class=clazz,
remove_from_metadata=[pwd_key],
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,6 @@ def transform(self, folder_name: str) -> tuple[list[tuple[bytes, str]], dict[str
:param folder_name: the name of the folder containing arbitrary amount of files.
:return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated
to metadata. Each element of the return list, is a tuple of the transformed bytes and a string
holding the extension to be used when writing out the new bytes.
holding the file name to use.
"""
raise NotImplemented()
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from argparse import ArgumentParser
from typing import Any

from data_processing.transform import AbstractBinaryTransform
from data_processing.transform import AbstractTransform
from data_processing.utils import CLIArgumentProvider


Expand All @@ -23,7 +23,7 @@ class TransformConfiguration(CLIArgumentProvider):
"""

def __init__(
self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = []
self, name: str, transform_class: type[AbstractTransform], remove_from_metadata: list[str] = []
):
"""
Initialization
Expand All @@ -36,7 +36,7 @@ def __init__(
self.remove_from_metadata = remove_from_metadata
self.params = {}

def get_transform_class(self) -> type[AbstractBinaryTransform]:
def get_transform_class(self) -> type[AbstractTransform]:
"""
Get the class extending AbstractBinaryTransform which implements a specific transformation.
The class will generally be instantiated with a dictionary of configuration produced by
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.test_support.transform import NOOPFolderPythonTransformConfiguration


class TestRayNOOPTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[tuple]:
basedir = "../../../test-data/data_processing/python/noop/"
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir))
launcher = PythonTransformLauncher(NOOPFolderPythonTransformConfiguration())
fixtures = [(launcher, {"noop_sleep_sec": 0}, basedir + "/input", basedir + "/expected")]
return fixtures
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,13 @@

import os

import pyarrow as pa
from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing_ray.runtime.ray import RayTransformLauncher
from data_processing_ray.test_support.transform import NOOPRayTransformConfiguration


table = pa.Table.from_pydict({"name": pa.array(["Tom"]), "age": pa.array([23])})
expected_table = table # We're a noop after all.
expected_metadata_list = [{"nfiles": 1, "nrows": 1}, {}] # transform() result # flush() result


class TestRayNOOPTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
Expand Down
9 changes: 4 additions & 5 deletions transforms/universal/ededup/ray/src/ededup_transform_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,13 +149,12 @@ def _load_snapshots(self, data_access_factory: DataAccessFactoryBase, statistics
statistics.add_stats.remote({"data access retries": retries})
self.logger.info(f"Found the following snapshot files {files.keys()}")
# process snapshot files
for file in files.keys():
# load the file
for file in files.values():
# convert the file
try:
b_hashes, _ = data_access.get_file(file)
snaps = pickle.loads(b_hashes)
snaps = pickle.loads(file)
except Exception as e:
self.logger.warning(f"Failed to load hashes from file {file} with exception {e}")
self.logger.warning(f"Failed to load hashes with exception {e}")
raise UnrecoverableException("failed to load hashes")
request = [[] for _ in range(len(self.filters))]
for h in snaps:
Expand Down

0 comments on commit 7091a2e

Please sign in to comment.