From 84b9104a7791661d368345d3c5b8e8cd02a67a19 Mon Sep 17 00:00:00 2001 From: Constantin M Adam Date: Fri, 25 Oct 2024 10:08:47 -0400 Subject: [PATCH] Fixed duplicate_list_location bug Signed-off-by: Constantin M Adam --- .../python/src/data_cleaning_transform_python.py | 15 +++++++++++---- .../fdedup/ray/src/data_cleaning_transform_ray.py | 8 +++++--- .../spark/src/data_cleaning_transform_spark.py | 15 +++++++++++---- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/transforms/universal/fdedup/python/src/data_cleaning_transform_python.py b/transforms/universal/fdedup/python/src/data_cleaning_transform_python.py index e5c1e5025..9c60ecbba 100644 --- a/transforms/universal/fdedup/python/src/data_cleaning_transform_python.py +++ b/transforms/universal/fdedup/python/src/data_cleaning_transform_python.py @@ -13,7 +13,11 @@ import os from typing import Any -from data_cleaning_transform import DataCleaningTransformConfiguration +from data_cleaning_transform import ( + DataCleaningTransformConfiguration, + duplicate_list_location_default, + duplicate_list_location_key, +) from data_processing.data_access import DataAccessFactoryBase from data_processing.runtime.pure_python import PythonTransformLauncher from data_processing.runtime.pure_python.runtime_configuration import ( @@ -53,9 +57,12 @@ def get_transform_config( :return: dictionary of transform init params """ data_access = data_access_factory.create_data_access() - duplicate_list_location = os.path.abspath( - os.path.join(data_access.output_folder, "..", self.params["duplicate_list_location"]) - ) + duplicate_list_location = self.params.get(duplicate_list_location_key, duplicate_list_location_default) + if not duplicate_list_location.startswith("/"): + out_paths = data_access.output_folder.rstrip("/").split("/") + dupl_list_paths = duplicate_list_location.split("/") + paths = out_paths[:-1] + dupl_list_paths + duplicate_list_location = "/".join([p.strip("/") for p in paths]) if duplicate_list_location.startswith("s3://"): _, duplicate_list_location = duplicate_list_location.split("://") self.duplicate_list, retries = data_access.get_file(duplicate_list_location) diff --git a/transforms/universal/fdedup/ray/src/data_cleaning_transform_ray.py b/transforms/universal/fdedup/ray/src/data_cleaning_transform_ray.py index e83960c24..5ed2cecbe 100644 --- a/transforms/universal/fdedup/ray/src/data_cleaning_transform_ray.py +++ b/transforms/universal/fdedup/ray/src/data_cleaning_transform_ray.py @@ -91,9 +91,11 @@ def get_transform_config( """ data_access = data_access_factory.create_data_access() duplicate_list_location = self.params.get(duplicate_list_location_key, duplicate_list_location_default) - duplicate_list_location = os.path.abspath( - os.path.join(data_access.output_folder, "..", duplicate_list_location) - ) + if not duplicate_list_location.startswith("/"): + out_paths = data_access.output_folder.rstrip("/").split("/") + dupl_list_paths = duplicate_list_location.split("/") + paths = out_paths[:-1] + dupl_list_paths + duplicate_list_location = "/".join([p.strip("/") for p in paths]) if duplicate_list_location.startswith("s3://"): _, duplicate_list_location = duplicate_list_location.split("://") duplicate_list, retries = data_access.get_file(duplicate_list_location) diff --git a/transforms/universal/fdedup/spark/src/data_cleaning_transform_spark.py b/transforms/universal/fdedup/spark/src/data_cleaning_transform_spark.py index 29890d05f..56c10d801 100644 --- a/transforms/universal/fdedup/spark/src/data_cleaning_transform_spark.py +++ b/transforms/universal/fdedup/spark/src/data_cleaning_transform_spark.py @@ -13,7 +13,11 @@ import os from typing import Any -from data_cleaning_transform import DataCleaningTransformConfiguration +from data_cleaning_transform import ( + DataCleaningTransformConfiguration, + duplicate_list_location_default, + duplicate_list_location_key, +) from data_processing.data_access import DataAccessFactoryBase from data_processing.transform import TransformStatistics from data_processing.utils import get_logger @@ -53,9 +57,12 @@ def get_transform_config( :return: dictionary of transform init params """ data_access = data_access_factory.create_data_access() - duplicate_list_location = os.path.abspath( - os.path.join(data_access.output_folder, "..", self.params["duplicate_list_location"]) - ) + duplicate_list_location = self.params.get(duplicate_list_location_key, duplicate_list_location_default) + if not duplicate_list_location.startswith("/"): + out_paths = data_access.output_folder.rstrip("/").split("/") + dupl_list_paths = duplicate_list_location.split("/") + paths = out_paths[:-1] + dupl_list_paths + duplicate_list_location = "/".join([p.strip("/") for p in paths]) if duplicate_list_location.startswith("s3://"): _, duplicate_list_location = duplicate_list_location.split("://") self.duplicate_list, retries = data_access.get_file(duplicate_list_location)