Skip to content

Commit

Permalink
Add Spark support and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
blublinsky committed Sep 23, 2024
1 parent 12e5ef0 commit 5dc6f93
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from data_processing_spark.transform.spark.pipeline_transform import SparkPipelineTransform
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from typing import Any
from data_processing.transform import AbstractPipelineTransform
from data_processing.transform import BaseTransformRuntime


class SparkPipelineTransform(AbstractPipelineTransform):
"""
Transform that executes a set of base transforms sequentially. Data is passed between
participating transforms in memory
"""

def __init__(self, config: dict[str, Any]):
"""
Initializes pipeline execution for the list of transforms
:param config - configuration parameters - list of transforms in the pipeline.
Note that transforms will be executed in the order they are defined
"""
self.partition = config.get("partition_index", 0)
super().__init__(config)

def _get_transform_params(self, runtime: BaseTransformRuntime) -> dict[str, Any]:
"""
get transform parameters
:param runtime - runtime
:return: transform params
"""
return runtime.get_transform_config(partition=self.partition,
data_access_factory=self.data_access_factory,statistics=self.statistics)

def _compute_execution_statistics(self, stats: dict[str, Any]) -> None:
"""
Compute execution statistics
:param stats: current statistics from flush
:return: None
"""
self.statistics.add_stats(stats)
for _, runtime in self.participants:
runtime.compute_execution_stats(stats=self.statistics)
45 changes: 45 additions & 0 deletions transforms/universal/noop/spark/src/noop_pipeline_local_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import sys

from data_processing_spark.runtime.spark import SparkTransformLauncher
from data_processing.utils import ParamsUtils
from noop_pipeline_transform_spark import NOOPPypelineSparkTransformConfiguration


# create parameters
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "test-data", "input"))
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output"))
local_conf = {
"input_folder": input_folder,
"output_folder": output_folder,
}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
# Data access. Only required parameters are specified
"data_local_config": ParamsUtils.convert_to_ast(local_conf),
# execution info
"runtime_pipeline_id": "pipeline_id",
"runtime_job_id": "job_id",
"runtime_code_location": ParamsUtils.convert_to_ast(code_location),
# noop params
"noop_sleep_sec": 1,
}
if __name__ == "__main__":
# Set the simulated command line args
sys.argv = ParamsUtils.dict_to_req(d=params)
# create launcher
launcher = SparkTransformLauncher(runtime_config=NOOPPypelineSparkTransformConfiguration())
# Launch the ray actor(s) to process the input
launcher.launch()
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

from data_processing_spark.runtime.spark import SparkTransformLauncher, SparkTransformRuntimeConfiguration
from data_processing.transform import PipelineTransformConfiguration
from data_processing_spark.transform.spark import SparkPipelineTransform
from data_processing.utils import get_logger
from noop_transform_spark import NOOPSparkTransformConfiguration

logger = get_logger(__name__)


class NOOPPypelineSparkTransformConfiguration(SparkTransformRuntimeConfiguration):
"""
Implements the PythonTransformConfiguration for NOOP as required by the PythonTransformLauncher.
NOOP does not use a RayRuntime class so the superclass only needs the base
python-only configuration.
"""

def __init__(self):
"""
Initialization
"""
super().__init__(transform_config=PipelineTransformConfiguration(
config={"transforms": [NOOPSparkTransformConfiguration()]},
transform_class=SparkPipelineTransform))


if __name__ == "__main__":
# launcher = NOOPRayLauncher()
launcher = SparkTransformLauncher(NOOPPypelineSparkTransformConfiguration())
logger.info("Launching resize/noop transform")
launcher.launch()
34 changes: 34 additions & 0 deletions transforms/universal/noop/spark/test/test_noop_pipeline_spark.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

from data_processing.test_support.launch.transform_test import (
AbstractTransformLauncherTest,
)
from data_processing_spark.runtime.spark import SparkTransformLauncher
from noop_pipeline_transform_spark import NOOPPypelineSparkTransformConfiguration


class TestSparkNOOPTransform(AbstractTransformLauncherTest):
"""
Extends the super-class to define the test data for the tests defined there.
The name of this class MUST begin with the word Test so that pytest recognizes it as a test class.
"""

def get_test_transform_fixtures(self) -> list[tuple]:
basedir = "../test-data"
basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), basedir))
fixtures = []
launcher = SparkTransformLauncher(NOOPPypelineSparkTransformConfiguration())
fixtures.append((launcher, {"noop_sleep_sec": 1}, basedir + "/input", basedir + "/expected"))
return fixtures

0 comments on commit 5dc6f93

Please sign in to comment.