Skip to content

Commit

Permalink
Alternate spark runtime implementation (#406)
Browse files Browse the repository at this point in the history
* initial implementation

* Added dockerfile

* refactoring of code

* removed unnecessary abstract transform class

* removed unnecessary abstract transform class

* removed unnecessary abstract transform class

* fixed spark image help

* documentation update

* Added filter

* preparing for docid

* implementing docid

* implementing docid

* Add comments to implementation

* Add comments to implementation

* Add support for explicit parallelization

* Add support for explicit parallelization

* Add support for explicit parallelization

* Add support for explicit parallelization

* Addressed comments

* Addressed comments

* run pre commit

* small fixes

* addressed comments

* addressed comments - launcher refactoring

* added support for runtime

* small cleanup

* re factored doc id

* Use multi-stage build

Signed-off-by: Constantin M Adam <[email protected]>

* changed Spark version

* changed Spark version

* changed Spark version

* changed Spark version

---------

Signed-off-by: Constantin M Adam <[email protected]>
Co-authored-by: Constantin M Adam <[email protected]>
  • Loading branch information
blublinsky and cmadam authored Sep 12, 2024
1 parent 7c42b8f commit 03cba30
Show file tree
Hide file tree
Showing 84 changed files with 1,140 additions and 1,512 deletions.
2 changes: 1 addition & 1 deletion .make.defaults
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ DOCKER_REGISTRY_KEY?=$(DPK_DOCKER_REGISTRY_KEY)
DOCKER_REGISTRY_ENDPOINT?=$(DOCKER_HOSTNAME)/$(DOCKER_NAMESPACE)
DOCKER_LOCAL_IMAGE=$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
DOCKER_REMOTE_IMAGE=$(DOCKER_REGISTRY_ENDPOINT)/$(DOCKER_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
DOCKER_SPARK_BASE_IMAGE_NAME=data-prep-kit-spark-3.5.1
DOCKER_SPARK_BASE_IMAGE_NAME=data-prep-kit-spark-3.5.2
DOCKER_SPARK_BASE_IMAGE=$(DOCKER_SPARK_BASE_IMAGE_NAME):$(DOCKER_IMAGE_VERSION)
RAY_BASE_IMAGE?=docker.io/rayproject/ray:${RAY}-py310
# Deprecated in favor of DOCKER_REMOTE_IMAGE
Expand Down
11 changes: 6 additions & 5 deletions data-processing-lib/doc/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,13 @@ This might include operations such as de-duplication, merging, and splitting.
The framework uses a plug-in model for the primary functions. The core
transformation-specific classes/interfaces are as follows:

* [AbstractTransform](../python/src/data_processing/transform/abstract_transform.py) -
* [AbstractBinaryTransform](../python/src/data_processing/transform/binary_transform.py) -
a simple, easily-implemented interface allowing the definition transforms
over arbitrary data types. Support is provided for both
[files](../python/src/data_processing/transform/binary_transform.py) of arbitrary data as a byte array and
[parquet/arrow](../python/src/data_processing/transform/table_transform.py) tables.
* [TransformConfiguration](../python/src/data_processing/transform/transform_configuration.py) - defines
of arbitrary data as a byte array. Additionally
[table](../python/src/data_processing/transform/table_transform.py) transform interface
is provided allowing definition of transforms operating on
[pyarrow](https://arrow.apache.org/docs/python/index.html) tables.
* [TransformConfiguration](../python/src/data_processing/runtime//transform_configuration.py) - defines
the transform short name, its implementation class, and command line configuration
parameters.

Expand Down
17 changes: 7 additions & 10 deletions data-processing-lib/doc/spark-launcher-options.md
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
# Spark Launcher Command Line Options


A number of command line options are available when launching a transform using Spark.

The following is a current --help output (a work in progress) for
the `NOOPTransform` (note the --noop_sleep_sec and --noop_pwd options):

```
usage: noop_transform.py [-h] [--noop_sleep_sec NOOP_SLEEP_SEC] [--noop_pwd NOOP_PWD] [--data_s3_cred DATA_S3_CRED] [--data_s3_config DATA_S3_CONFIG]
[--data_local_config DATA_LOCAL_CONFIG] [--data_max_files DATA_MAX_FILES] [--data_checkpointing DATA_CHECKPOINTING]
[--data_data_sets DATA_DATA_SETS] [--data_files_to_use DATA_FILES_TO_USE] [--data_num_samples DATA_NUM_SAMPLES]
[--runtime_pipeline_id RUNTIME_PIPELINE_ID] [--runtime_job_id RUNTIME_JOB_ID] [--runtime_code_location RUNTIME_CODE_LOCATION]
[--spark_local_config_filepath SPARK_LOCAL_CONFIG_FILEPATH] [--spark_kube_config_filepath SPARK_KUBE_CONFIG_FILEPATH]
usage: noop_python_runtime.py [-h] [--noop_sleep_sec NOOP_SLEEP_SEC] [--noop_pwd NOOP_PWD] [--data_s3_cred DATA_S3_CRED] [--data_s3_config DATA_S3_CONFIG] [--data_local_config DATA_LOCAL_CONFIG] [--data_max_files DATA_MAX_FILES]
[--data_checkpointing DATA_CHECKPOINTING] [--data_data_sets DATA_DATA_SETS] [--data_files_to_use DATA_FILES_TO_USE] [--data_num_samples DATA_NUM_SAMPLES] [--runtime_pipeline_id RUNTIME_PIPELINE_ID]
[--runtime_job_id RUNTIME_JOB_ID] [--runtime_code_location RUNTIME_CODE_LOCATION]
Driver for noop processing on Spark
Driver for noop processing
options:
-h, --help show this help message and exit
Expand Down Expand Up @@ -59,8 +59,5 @@ options:
path: Path within the repository
Example: { 'github': 'https://github.com/somerepo', 'commit_hash': '1324',
'path': 'transforms/universal/code' }
--spark_local_config_filepath SPARK_LOCAL_CONFIG_FILEPATH
Path to spark configuration for run
--spark_kube_config_filepath SPARK_KUBE_CONFIG_FILEPATH
Path to Kubernetes-based configuration.
```

63 changes: 52 additions & 11 deletions data-processing-lib/doc/spark-runtime.md
Original file line number Diff line number Diff line change
@@ -1,18 +1,59 @@
# Spark Framework
The Spark runtime extends the base framework with the following set of components:
The Spark runtime implementation is roughly based on the ideas from
[here](https://wrightturn.wordpress.com/2015/07/22/getting-spark-data-from-aws-s3-using-boto-and-pyspark/),
[here](https://medium.com/how-to-become-a-data-architect/get-best-performance-for-pyspark-jobs-using-parallelize-48c8fa03a21e)
and [here](https://medium.com/@shuklaprashant9264/alternate-of-for-loop-in-pyspark-25a00888ec35).
Spark itself is basically used for execution parallelization, but all data access is based on the
framework's [data access](data-access-factory.md), thus preserving all the implemented features. At
the start of the execution, the list of files to process is obtained (using data access framework)
and then split between Spark workers for reading actual data, its transformation and writing it back.
The implementation is based on Spark RDD (For comparison of the three Apache Spark APIs:
RDDs, DataFrames, and Datasets see this
[Databricks blog post](https://www.databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html))
As defined by Databricks:
```text
RDD was the primary user-facing API in Spark since its inception. At the core, an RDD is an
immutable distributed collection of elements of your data, partitioned across nodes in your
cluster that can be operated in parallel with a low-level API that offers transformations
and actions.
```
This APIs fits perfectly into what we are implementing. It allows us to fully leverage our
existing DataAccess APIs thus preserving all of the investments into flexible, reliable data
access. Additionally RDDs flexible low-level control allows us to work on partition level,
thus limiting the amount of initialization and set up.
Note that in our approach transform's processing is based on either binary or parquet data,
not Spark DataFrames or DataSet. We are not currently supporting supporting these Spark APIs,
as they are not well mapped into what we are implementing.

In our implementation we are using
[pyspark.SparkContext.parallelize](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.SparkContext.parallelize.html)
for running multiple transforms in parallel. We allow 2 options for specifying the number of partitions, determining
how many partitions the RDD should be divided into. See
[here](https://sparktpoint.com/how-to-create-rdd-using-parallelize/) for the explanation
of this parameter:
* If you specify a positive value of the parameter, Spark will attempt to evenly
distribute the data from seq into that many partitions. For example, if you have
a collection of 100 elements and you specify numSlices as 4, Spark will try
to create 4 partitions with approximately 25 elements in each partition.
* If you don’t specify this parameter, Spark will use a default value, which is
typically determined based on the cluster configuration or the available resources
(number of workers).

## Transforms

* [AbstractSparkTransform](../spark/src/data_processing_spark/runtime/spark/spark_transform.py) - this
is the base class for all spark-based transforms over spark DataFrames.
* [SparkTransformConfiguration](../spark/src/data_processing_spark/runtime/spark/spark_transform_config.py) - this
is simple extension of the base TransformConfiguration class to hold the transformation class
(an extension of AbstractSparkTransform).
* [SparkTransformRuntimeConfiguration](../spark/src/data_processing_spark/transform/runtime_configuration.py) allows
to configure transform to use PySpark


## Runtime

* [SparkTransformLauncher](../spark/src/data_processing_spark/runtime/spark/spark_launcher.py) - this is a
class generally used to implement `main()` that makes use of a `SparkTransformConfiguration` to
start the Spark runtime and execute the transform over the specified set of input files.
* [SparkTransformRuntimeConfiguration](../spark/src/data_processing_spark/runtime/spark/runtime_config.py) - this
class is a simple extension of the transform's base TransformConfiguration class.
Spark runtime extends the base framework with the following set of components:
* [SparkTransformExecutionConfiguration](../spark/src/data_processing_spark/runtime/spark/execution_configuration.py)
allows to configure Spark execution
* [SparkTransformFileProcessor](../spark/src/data_processing_spark/runtime/spark/transform_file_processor.py) extends
[AbstractTransformFileProcessor](../python/src/data_processing/runtime/transform_file_processor.py) to work on
PySpark
* [SparkTransformLauncher](../spark/src/data_processing_spark/runtime/spark/transform_launcher.py) allows
to launch PySpark runtime and execute a transform
* [orchestrate](../spark/src/data_processing_spark/runtime/spark/transform_orchestrator.py) function orchestrates Spark
based execution
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from data_processing.runtime.execution_configuration import TransformExecutionConfiguration
from data_processing.runtime.execution_configuration import TransformExecutionConfiguration, runtime_cli_prefix
from data_processing.runtime.runtime_configuration import TransformRuntimeConfiguration
from data_processing.runtime.transform_launcher import AbstractTransformLauncher, multi_launcher
from data_processing.runtime.transform_file_processor import AbstractTransformFileProcessor
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
logger = get_logger(__name__)


cli_prefix = "runtime_"
runtime_cli_prefix = "runtime_"


class TransformExecutionConfiguration(CLIArgumentProvider):
Expand All @@ -45,16 +45,16 @@ def add_input_params(self, parser: argparse.ArgumentParser) -> None:
:param parser: parser
:return:
"""
parser.add_argument(f"--{cli_prefix}pipeline_id", type=str, default="pipeline_id", help="pipeline id")
parser.add_argument(f"--{cli_prefix}job_id", type=str, default="job_id", help="job id")
parser.add_argument(f"--{runtime_cli_prefix}pipeline_id", type=str, default="pipeline_id", help="pipeline id")
parser.add_argument(f"--{runtime_cli_prefix}job_id", type=str, default="job_id", help="job id")

help_example_dict = {
"github": ["https://github.com/somerepo", "Github repository URL."],
"commit_hash": ["1324", "github commit hash"],
"path": ["transforms/universal/code", "Path within the repository"],
}
parser.add_argument(
f"--{cli_prefix}code_location",
f"--{runtime_cli_prefix}code_location",
type=ast.literal_eval,
default=None,
help="AST string containing code location\n" + ParamsUtils.get_ast_help_text(help_example_dict),
Expand All @@ -66,7 +66,7 @@ def apply_input_params(self, args: argparse.Namespace) -> bool:
:param args: user defined arguments
:return: True, if validate pass or False otherwise
"""
captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False)
captured = CLIArgumentProvider.capture_parameters(args, runtime_cli_prefix, False)
# store parameters locally
self.pipeline_id = captured["pipeline_id"]
self.job_details = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
# limitations under the License.
################################################################################

import argparse
import time

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
Expand Down Expand Up @@ -44,29 +43,6 @@ def __init__(
super().__init__(runtime_config, data_access_factory)
self.execution_config = PythonTransformExecutionConfiguration(name=runtime_config.get_name())

def __get_parameters(self) -> bool:
"""
This method creates arg parser, fills it with the parameters
and does parameters validation
:return: True if validation passes or False, if not
"""
parser = argparse.ArgumentParser(
description=f"Driver for {self.name} processing",
# RawText is used to allow better formatting of ast-based arguments
# See uses of ParamsUtils.dict_to_str()
formatter_class=argparse.RawTextHelpFormatter,
)
# add additional arguments
self.runtime_config.add_input_params(parser=parser)
self.data_access_factory.add_input_params(parser=parser)
self.execution_config.add_input_params(parser=parser)
args = parser.parse_args()
return (
self.runtime_config.apply_input_params(args=args)
and self.execution_config.apply_input_params(args=args)
and self.data_access_factory.apply_input_params(args=args)
)

def _submit_for_execution(self) -> int:
"""
Submit for execution
Expand All @@ -87,12 +63,3 @@ def _submit_for_execution(self) -> int:
finally:
logger.info(f"Completed execution in {round((time.time() - start)/60., 3)} min, execution result {res}")
return res

def launch(self) -> int:
"""
Execute method orchestrates driver invocation
:return:
"""
if self.__get_parameters():
return self._submit_for_execution()
return 1
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,4 @@ def compute_execution_stats(self, stats: TransformStatistics) -> None:
:param stats: output of statistics as aggregated across all calls to all transforms.
:return: job execution statistics. These are generally reported as metadata by the Ray Orchestrator.
"""
return stats
pass
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import sys
from typing import Any
import argparse

from data_processing.data_access import DataAccessFactory, DataAccessFactoryBase
from data_processing.runtime import TransformRuntimeConfiguration
Expand All @@ -36,9 +37,59 @@ def __init__(
self.name = self.runtime_config.get_name()
self.data_access_factory = data_access_factory

def launch(self):
def _get_parser(self) -> argparse.ArgumentParser:
"""
This method creates a parser
:return: parser
"""
return argparse.ArgumentParser(
description=f"Driver for {self.name} processing",
# RawText is used to allow better formatting of ast-based arguments
# See uses of ParamsUtils.dict_to_str()
formatter_class=argparse.RawTextHelpFormatter,
)

def _get_arguments(self, parser: argparse.ArgumentParser) -> argparse.Namespace:
"""
Parse input parameters
:param parser: parser
:return: list of arguments
"""
# add additional arguments
self.runtime_config.add_input_params(parser=parser)
self.data_access_factory.add_input_params(parser=parser)
self.execution_config.add_input_params(parser=parser)
return parser.parse_args()

def _get_parameters(self, args: argparse.Namespace) -> bool:
"""
This method creates arg parser, fills it with the parameters
and does parameters validation
:return: True if validation passes or False, if not
"""
return (
self.runtime_config.apply_input_params(args=args)
and self.execution_config.apply_input_params(args=args)
and self.data_access_factory.apply_input_params(args=args)
)

def _submit_for_execution(self) -> int:
"""
Submit for execution
:return:
"""
raise ValueError("must be implemented by subclass")

def launch(self):
"""
Execute method orchestrates driver invocation
:return:
"""
args = self._get_arguments(self._get_parser())
if self._get_parameters(args):
return self._submit_for_execution()
return 1

def get_transform_name(self) -> str:
return self.name

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
from data_processing.transform.abstract_transform import AbstractTransform
from data_processing.transform.binary_transform import AbstractBinaryTransform
from data_processing.transform.table_transform import AbstractTableTransform
from data_processing.transform.transform_statistics import TransformStatistics
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,8 @@

from typing import Any, TypeVar

from data_processing.transform.abstract_transform import AbstractTransform


DATA = TypeVar("DATA")


class AbstractBinaryTransform(AbstractTransform[DATA]):
class AbstractBinaryTransform:
"""
Converts input binary file to output file(s) (binary)
Sub-classes must provide the transform() method to provide the conversion of one binary files to 0 or
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
from data_processing.utils import TransformUtils


class AbstractTableTransform(AbstractBinaryTransform[pa.Table]):
class AbstractTableTransform(AbstractBinaryTransform):
"""
Extends AbstractBinaryTransform to expect the byte arrays from to contain a pyarrow Table.
Sub-classes are expected to implement transform() on the parsed Table instances.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from argparse import ArgumentParser
from typing import Any

from data_processing.transform import AbstractTransform
from data_processing.transform import AbstractBinaryTransform
from data_processing.utils import CLIArgumentProvider


Expand All @@ -22,7 +22,9 @@ class TransformConfiguration(CLIArgumentProvider):
This is a base transform configuration class defining transform's input/output parameter
"""

def __init__(self, name: str, transform_class: type[AbstractTransform], remove_from_metadata: list[str] = []):
def __init__(
self, name: str, transform_class: type[AbstractBinaryTransform], remove_from_metadata: list[str] = []
):
"""
Initialization
:param name: transformer name
Expand All @@ -34,12 +36,12 @@ def __init__(self, name: str, transform_class: type[AbstractTransform], remove_f
self.remove_from_metadata = remove_from_metadata
self.params = {}

def get_transform_class(self) -> type[AbstractTransform]:
def get_transform_class(self) -> type[AbstractBinaryTransform]:
"""
Get the class extending AbstractTransform which implements a specific transformation.
Get the class extending AbstractBinaryTransform which implements a specific transformation.
The class will generally be instantiated with a dictionary of configuration produced by
the associated TransformRuntime get_transform_config() method.
:return: class extending AbstractTransform
:return: class extending AbstractBinaryTransform
"""
return self.transform_class

Expand Down
Loading

0 comments on commit 03cba30

Please sign in to comment.