Skip to content

Commit

Permalink
Merge pull request #261 from IBM/pdf_readiness
Browse files Browse the repository at this point in the history
small changes to get ready for pdf
  • Loading branch information
daw3rd authored Jun 11, 2024
2 parents e571be2 + 6b91a9a commit d468da0
Show file tree
Hide file tree
Showing 22 changed files with 85 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ def __add_data_navigation_params(self, parser):
default=False,
help="checkpointing flag",
)
# In the case of binary files, the resulting extension can be different from the source extension
# The checkpointing extension is defined here. If multiple files (extensions) are produced from the
# source files, only the leading one is required here
parser.add_argument(
f"--{self.cli_arg_prefix}files_to_checkpoint",
type=ast.literal_eval,
default=ast.literal_eval("['.parquet']"),
help="list of file extensions to choose for checkpointing.",
)
parser.add_argument(
f"--{self.cli_arg_prefix}data_sets",
type=ast.literal_eval,
Expand Down Expand Up @@ -150,6 +159,7 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool:
data_sets = arg_dict.get(f"{self.cli_arg_prefix}data_sets", None)
n_samples = arg_dict.get(f"{self.cli_arg_prefix}num_samples", -1)
files_to_use = arg_dict.get(f"{self.cli_arg_prefix}files_to_use", [".parquet"])
files_to_checkpoint = arg_dict.get(f"{self.cli_arg_prefix}files_to_checkpoint", [".parquet"])
# check which configuration (S3, LakeHouse, or Local) is specified
s3_config_specified = 1 if s3_config is not None else 0
local_config_specified = 1 if local_config is not None else 0
Expand Down Expand Up @@ -209,18 +219,19 @@ def apply_input_params(self, args: Union[dict, argparse.Namespace]) -> bool:
self.max_files = max_files
self.n_samples = n_samples
self.files_to_use = files_to_use
self.files_to_checkpoint = files_to_checkpoint
self.dsets = data_sets
if data_sets is None or len(data_sets) < 1:
self.logger.info(
f"data factory {self.cli_arg_prefix} "
f"Not using data sets, checkpointing {self.checkpointing}, max files {self.max_files}, "
f"random samples {self.n_samples}, files to use {self.files_to_use}"
f"Not using data sets, checkpointing {checkpointing}, max files {max_files}, "
f"random samples {n_samples}, files to use {files_to_use}, files to checkpoint {files_to_checkpoint}"
)
else:
self.logger.info(
f"data factory {self.cli_arg_prefix} "
f"Using data sets {self.dsets}, checkpointing {self.checkpointing}, max files {self.max_files}, "
f"random samples {self.n_samples}, files to use {self.files_to_use}"
f"Using data sets {self.dsets}, checkpointing {checkpointing}, max files {max_files}, "
f"random samples {n_samples}, files to use {files_to_use}, files to checkpoint {files_to_checkpoint}"
)
return True

Expand All @@ -239,14 +250,16 @@ def create_data_access(self) -> DataAccess:
m_files=self.max_files,
n_samples=self.n_samples,
files_to_use=self.files_to_use,
files_to_checkpoint=self.files_to_checkpoint,
)
else:
# anything else is local data
return DataAccessLocal(
path_config=self.local_config,
local_config=self.local_config,
d_sets=self.dsets,
checkpoint=self.checkpointing,
m_files=self.max_files,
n_samples=self.n_samples,
files_to_use=self.files_to_use,
files_to_checkpoint=self.files_to_checkpoint,
)
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, cli_arg_prefix: str = "data_"):
self.max_files = -1
self.n_samples = -1
self.files_to_use = []
self.files_to_checkpoint = []
self.cli_arg_prefix = cli_arg_prefix
self.params = {}
self.logger = get_logger(__name__ + str(uuid.uuid4()))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
import pyarrow as pa
import pyarrow.parquet as pq
from data_processing.data_access import DataAccess
from data_processing.utils import GB, MB, get_logger
from data_processing.utils import TransformUtils, GB, MB, get_logger


logger = get_logger(__name__)
Expand All @@ -32,28 +32,36 @@ class DataAccessLocal(DataAccess):

def __init__(
self,
path_config: dict[str, str] = None,
local_config: dict[str, str] = None,
d_sets: list[str] = None,
checkpoint: bool = False,
m_files: int = -1,
n_samples: int = -1,
files_to_use: list[str] = [".parquet"],
files_to_checkpoint: list[str] = [".parquet"],
):
"""
Create data access class for folder based configuration
:param path_config: dictionary of path info
:param local_config: dictionary of path info
:param d_sets list of the data sets to use
:param checkpoint: flag to return only files that do not exist in the output directory
:param m_files: max amount of files to return
:param n_samples: amount of files to randomly sample
:param files_to_use: files extensions of files to include
:param files_to_checkpoint: files extensions of files to use for checkpointing
"""
if path_config is None:
if local_config is None:
self.input_folder = None
self.output_folder = None
else:
self.input_folder = path_config["input_folder"]
self.output_folder = path_config["output_folder"]
self.input_folder = local_config["input_folder"]
self.output_folder = local_config["output_folder"]
self.d_sets = d_sets
self.checkpoint = checkpoint
self.m_files = m_files
self.n_samples = n_samples
self.files_to_use = files_to_use
self.files_to_checkpoint = files_to_checkpoint

logger.debug(f"Local input folder: {self.input_folder}")
logger.debug(f"Local output folder: {self.output_folder}")
Expand All @@ -62,6 +70,7 @@ def __init__(
logger.debug(f"Local m_files: {self.m_files}")
logger.debug(f"Local n_samples: {self.n_samples}")
logger.debug(f"Local files_to_use: {self.files_to_use}")
logger.debug(f"Local files_to_checkpoint: {self.files_to_checkpoint}")

def get_num_samples(self) -> int:
"""
Expand Down Expand Up @@ -153,14 +162,17 @@ def _get_input_files(
)

input_files = self._get_all_files_ext(path=input_path, extensions=self.files_to_use)
output_files = self._get_all_files_ext(path=output_path, extensions=self.files_to_use)
output_files_ext = self._get_all_files_ext(path=output_path, extensions=self.files_to_checkpoint)
# In the case of binary transforms, an extension can be different, so just use the file names.
# Also remove duplicates
output_files = list(set([TransformUtils.get_file_extension(file)[0] for file in output_files_ext]))

total_input_file_size = 0
i = 0
result_files = []
for filename in sorted(input_files):
out_f_name = self.get_output_location(filename)
if out_f_name in output_files:
if TransformUtils.get_file_extension(out_f_name)[0] in output_files:
continue
if i >= cm_files > 0:
break
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

import gzip
import json
import os
from typing import Any

import pyarrow
Expand All @@ -37,6 +36,7 @@ def __init__(
m_files: int = -1,
n_samples: int = -1,
files_to_use: list[str] = [".parquet"],
files_to_checkpoint: list[str] = [".parquet"],
):
"""
Create data access class for folder based configuration
Expand All @@ -47,6 +47,7 @@ def __init__(
:param m_files: max amount of files to return
:param n_samples: amount of files to randomly sample
:param files_to_use: files extensions of files to include
:param files_to_checkpoint: files extensions of files to use for checkpointing
"""
if (s3_credentials is None or s3_credentials.get("access_key", None) is None
or s3_credentials.get("secret_key", None) is None):
Expand All @@ -63,6 +64,7 @@ def __init__(
self.m_files = m_files
self.n_samples = n_samples
self.files_to_use = files_to_use
self.files_to_checkpoint = files_to_checkpoint
self.arrS3 = ArrowS3(
access_key=s3_credentials.get("access_key"),
secret_key=s3_credentials.get("secret_key"),
Expand Down Expand Up @@ -97,11 +99,12 @@ def get_output_folder(self) -> str:
return self.output_folder

def _get_files_folder(
self, path: str, cm_files: int, max_file_size: int = 0, min_file_size: int = MB * GB
self, path: str, files_to_use: list[str], cm_files: int, max_file_size: int = 0, min_file_size: int = MB * GB
) -> tuple[list[str], dict[str, float], int]:
"""
Support method to get list input files and their profile
:param path: input path
:param files_to_use: file extensions to use
:param max_file_size: max file size
:param min_file_size: min file size
:param cm_files: overwrite for the m_files in the class
Expand All @@ -117,8 +120,8 @@ def _get_files_folder(
break
# Only use specified files
f_name = str(file["name"])
_, extension = os.path.splitext(f_name)
if extension in self.files_to_use:
name_extension = TransformUtils.get_file_extension(f_name)
if name_extension[1] in files_to_use:
p_list.append(f_name)
size = file["size"]
total_input_file_size += size
Expand Down Expand Up @@ -150,10 +153,15 @@ def _get_input_files(
"""
if not self.checkpoint:
return self._get_files_folder(
path=input_path, cm_files=cm_files, min_file_size=min_file_size, max_file_size=max_file_size
path=input_path, files_to_use=self.files_to_use, cm_files=cm_files,
min_file_size=min_file_size, max_file_size=max_file_size
)
pout_list, _, retries1 = self._get_files_folder(path=output_path, cm_files=-1)
output_base_names = [file.replace(self.output_folder, self.input_folder) for file in pout_list]
pout_list, _, retries1 = self._get_files_folder(path=output_path,
files_to_use=self.files_to_checkpoint, cm_files=-1)
output_base_names_ext = [file.replace(self.output_folder, self.input_folder) for file in pout_list]
# In the case of binary transforms, an extension can be different, so just use the file names.
# Also remove duplicates
output_base_names = list(set([TransformUtils.get_file_extension(file)[0] for file in output_base_names_ext]))
p_list = []
total_input_file_size = 0
i = 0
Expand All @@ -164,8 +172,8 @@ def _get_input_files(
break
# Only use .parquet files
f_name = str(file["name"])
_, extension = os.path.splitext(f_name)
if extension in self.files_to_use and f_name not in output_base_names:
name_extension = TransformUtils.get_file_extension(f_name)
if name_extension[1] in self.files_to_use and name_extension[0] not in output_base_names:
p_list.append(f_name)
size = file["size"]
total_input_file_size += size
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,7 @@ def process_file(self, f_name: str) -> None:
# execute local processing
name_extension = TransformUtils.get_file_extension(f_name)
self.logger.debug(f"Begin transforming file {f_name}")
out_files, stats = self.transform.transform_binary(
base_name=TransformUtils.get_file_basename(f_name), byte_array=filedata)
out_files, stats = self.transform.transform_binary(file_name=f_name, byte_array=filedata)
self.logger.debug(f"Done transforming file {f_name}, got {len(out_files)} files")
self.last_file_name = name_extension[0]
self.last_file_name_next_index = None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ def test_transform(
all_files_list = []
all_metadata_list = []
for in_file in in_binary_list:
files_list, metadata = transform.transform_binary(base_name=in_file[0], byte_array=in_file[1])
files_list, metadata = transform.transform_binary(file_name=in_file[0], byte_array=in_file[1])
all_files_list.extend(files_list)
all_metadata_list.append(metadata)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def __init__(self, config: dict[str, Any]):
super().__init__(config)
self.sleep = config.get("sleep_sec", 1)

def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict[str, Any]]:
def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict[str, Any]]:
"""
Put Transform-specific to convert one Table to 0 or more tables. It also returns
a dictionary of execution statistics - arbitrary dictionary
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,12 @@


class AbstractTransform(Generic[DATA]):
def transform(self, data: DATA) -> tuple[list[DATA], dict[str, Any]]:
def transform(self, data: DATA, file_name: str = None) -> tuple[list[DATA], dict[str, Any]]:
"""
Converts input table into an output table.
If there is an error, an exception must be raised - exit()ing is not generally allowed when running in Ray.
:param table: input table
:param data: input table
:param file_name: optional - name of the input file
:return: a tuple of a list of 0 or more converted tables and a dictionary of statistics that will be
propagated to metadata
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,12 @@ def __init__(self, config: dict[str, Any]):
"""
self.config = config

def transform_binary(self, base_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Converts input file into o or more output files.
If there is an error, an exception must be raised - exit()ing is not generally allowed.
:param byte_array: contents of the input file to be transformed.
:param base_name: the base name of the file containing the given byte_array.
:param file_name: the name of the file containing the given byte_array.
:return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated
to metadata. Each element of the return list, is a tuple of the transformed bytes and a string
holding the extension to be used when writing out the new bytes.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,19 +32,19 @@ def __init__(self, config: dict[str, Any]):
"""
super().__init__(config)

def transform_binary(self, base_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Converts input file into o or more output files.
If there is an error, an exception must be raised - exit()ing is not generally allowed.
:param byte_array: contents of the input file to be transformed.
:param base_name: the base name of the file containing the given byte_array.
:param file_name: the file name of the file containing the given byte_array.
:return: a tuple of a list of 0 or more tuples and a dictionary of statistics that will be propagated
to metadata. Each element of the return list, is a tuple of the transformed bytes and a string
holding the extension to be used when writing out the new bytes.
"""
# validate extension
if TransformUtils.get_file_extension(base_name)[1] != ".parquet":
logger.warning(f"Get wrong file type {base_name}")
if TransformUtils.get_file_extension(file_name)[1] != ".parquet":
logger.warning(f"Get wrong file type {file_name}")
return [], {"wrong file type": 1}
# convert to table
table = TransformUtils.convert_binary_to_arrow(data=byte_array)
Expand All @@ -56,7 +56,7 @@ def transform_binary(self, base_name: str, byte_array: bytes) -> tuple[list[tupl
logger.warning(f"table is empty, skipping processing")
return [], {"skipped empty tables": 1}
# transform table
out_tables, stats = self.transform(table=table)
out_tables, stats = self.transform(table=table, file_name=file_name)
# Add number of rows to stats
stats = stats | {"source_doc_count": table.num_rows}
# convert tables to files
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def __init__(self, config: dict):
self.code_quality["tokenizer"], use_auth_token=self.code_quality["hf_token"]
)

def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict]:
def transform(self, table: pa.Table, file_name: str = None) -> tuple[list[pa.Table], dict]:
"""
Chain all preprocessing steps into one function to not fill cache.
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,6 @@
byte_array, _ = data_access.get_file(file_to_process)
# Transform the table
files_list, metadata = transform.transform_binary(
base_name=TransformUtils.get_file_basename(file_to_process), byte_array=byte_array)
file_name=file_to_process, byte_array=byte_array)
print(f"Got {len(files_list)} output files")
print(f"output metadata : {metadata}")
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,13 @@ def _get_lang_from_ext(self, ext):
lang = self.languages_supported.get(ext, lang)
return lang

def transform_binary(self, base_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
def transform_binary(self, file_name: str, byte_array: bytes) -> tuple[list[tuple[bytes, str]], dict[str, Any]]:
"""
Converts raw data file (ZIP) to Parquet format
"""
# We currently only process .zip files
if TransformUtils.get_file_extension(base_name)[1] != ".zip":
logger.warning(f"Got unsupported file type {base_name}, skipping")
if TransformUtils.get_file_extension(file_name)[1] != ".zip":
logger.warning(f"Got unsupported file type {file_name}, skipping")
return [], {}
data = []
number_of_rows = 0
Expand All @@ -119,7 +119,7 @@ def transform_binary(self, base_name: str, byte_array: bytes) -> tuple[list[tupl
ext = TransformUtils.get_file_extension(member.filename)[1]
row_data = {
"title": member.filename,
"document": base_name,
"document": TransformUtils.get_file_basename(file_name),
"contents": content_string,
"document_id": TransformUtils.str_to_hash(content_string),
"ext": ext,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def get_test_transform_fixtures(self) -> list[tuple]:
lang_supported_file = os.path.abspath(os.path.join(basedir, "languages/lang_extensions.json"))
input_dir = os.path.join(basedir, "input")
input_files = get_files_in_folder(input_dir, ".zip")
input_files = [(TransformUtils.get_file_basename(name), binary) for name, binary in input_files.items()]
input_files = [(name, binary) for name, binary in input_files.items()]
expected_metadata_list = [{'number of rows': 2}, {'number of rows': 52}, {}]
config = {
ingest_supported_langs_file_key: lang_supported_file,
Expand Down
Loading

0 comments on commit d468da0

Please sign in to comment.