Skip to content

Commit

Permalink
add license select transform
Browse files Browse the repository at this point in the history
Signed-off-by: Shivdeep Singh <[email protected]>
  • Loading branch information
shivdeep-singh-ibm committed Sep 3, 2024
1 parent b811b95 commit 0f2c5b6
Show file tree
Hide file tree
Showing 40 changed files with 2,306 additions and 0 deletions.
2 changes: 2 additions & 0 deletions .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,8 @@ PII_REDACTOR_PYTHON_VERSION=$(DPK_VERSION)

HTML2PARQUET_PYTHON_VERSION=$(DPK_VERSION)

LICENSE_SELECT_PYTHON_VERSION=$(DPK_VERSION)
LICENSE_SELECT_RAY_VERSION=$(DPK_VERSION)
################## ################## ################## ################## ################## ##################
# Begin versions that the repo depends on.

Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ Matrix below shows the the combination of modules and supported runtimes. All th
| [Malware annotation](transforms/code/malware/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Header cleanser](transforms/code/header_cleanser/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |
| [Semantic file ordering](transforms/code/repo_level_ordering/ray/README.md) | |:white_check_mark:| | |
| [License Select Annotation](transforms/code/license_select/python/README.md) | :white_check_mark: |:white_check_mark:| |:white_check_mark: |


Contributors are welcome to add new modules as well as add runtime support for existing modules!
Expand Down
1 change: 1 addition & 0 deletions kfp/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
| code/code2parquet | [code2parquet_wf.py](../transforms/code/code2parquet/kfp_ray/code2parquet_wf.py) |
| code/code_quality | [code_quality_wf.py](../transforms/code/code_quality/kfp_ray/code_quality_wf.py) |
| code/proglang_select | [proglang_select_wf.py](../transforms/code/proglang_select/kfp_ray/proglang_select_wf.py) |
| code/license_select | [license_select_wf.py](../transforms/code/license_select/kfp_ray/license_select_wf.py)
| universal/doc_id | [doc_id_wf.py](../transforms/universal/doc_id/kfp_ray/doc_id_wf.py) |
| universal/ededup | [ededup_wf.py](../transforms/universal/ededup/kfp_ray/ededup_wf.py) |
| universal/fdedup | [fdedup_wf.py](../transforms/universal/fdedup/kfp_ray/fdedup_wf.py) |
Expand Down
62 changes: 62 additions & 0 deletions transforms/code/license_select/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
REPOROOT=../../..
# Use make help, to see the available rules
include $(REPOROOT)/.make.defaults

setup::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

clean::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

build::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
venv::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

set-versions:
@# Help: Recursively $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

publish::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-src::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

load-image::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray workflow-venv

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray workflow-upload

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray workflow-build
11 changes: 11 additions & 0 deletions transforms/code/license_select/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# License Select

The License Select transform checks if the `license` of input data is in approved/denied list. It is implemented as per the set of [transform project conventions](../../README.md#transform-project-conventions) the following runtimes are available:

* [python](python/README.md) - provides the base python-based transformation
implementation.
* [ray](ray/README.md) - enables the running of the base python transformation
in a Ray runtime
* [kfp](kfp_ray/README.md) - enables running the ray docker image
in a kubernetes cluster using a generated `yaml` file.

51 changes: 51 additions & 0 deletions transforms/code/license_select/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

.PHONY: clean
clean:
@# Help: Clean up the virtual environment.
rm -rf ${REPOROOT}/transforms/venv

venv::

build::

setup::

test::

test-src::

test-image::

publish::

image::

load-image::

docker-load-image::

docker-save-image::

.PHONY: workflow-build
workflow-build: workflow-venv
$(MAKE) $(YAML_WF)

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=license_select_wf.yaml

.PHONY: workflow-upload
workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .workflows.upload-pipeline PIPELINE_FILE=$$file; \
done
210 changes: 210 additions & 0 deletions transforms/code/license_select/kfp_ray/license_select_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,210 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils


# the name of the job script
EXEC_SCRIPT_NAME: str = "license_select_transform_ray.py"

task_image = "quay.io/dataprep1/data-prep-kit/license_select-ray:latest"


# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:latest"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: dict,
actor_options: dict,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
lc_license_column_name: str,
lc_licenses_file: str,
) -> dict:
from runtime_utils import KFPUtils

return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)),
"runtime_worker_options": str(actor_options),
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": str(runtime_code_location),
"lc_license_column_name": lc_license_column_name,
"lc_licenses_file": lc_licenses_file,
}


# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the "
+ "same version of the same pipeline !!!"
)
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")
# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "license_select"
PREFIX: str = "license_select"


@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for license select task",
)
def license_select(
ray_name: str = "license_select-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {
"replicas": 2,
"max_replicas": 2,
"min_replicas": 2,
"cpu": 2,
"memory": 4,
"image": task_image,
},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/license_select/input/', 'output_folder': 'test/license_select/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: dict = {"num_cpus": 0.8},
runtime_pipeline_id: str = "runtime_pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
# license select parameters
lc_license_column_name: str = "license",
lc_licenses_file: str = "test/license_select/sample_approved_licenses.json",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
):
"""
Pipeline to execute License Select transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
min_replicas - min number of replicas
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
wait_cluster_ready_tmout - time to wait for cluster ready, sec
wait_cluster_up_tmout - time to wait for cluster up, sec
wait_job_ready_tmout - time to wait for job ready, sec
wait_print_tmout - time between prints, sec
http_retries - httpt retries for API server calls
:param data_s3_config - s3 configuration
:param data_s3_access_secret - s3 access secret
:param data_max_files - max files to process
:param data_num_samples - num samples to process
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param lc_license_column_name - Name of the column holds the license to process
:param lc_licenses_file - path to license list json file
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
lc_license_column_name=lc_license_column_name,
lc_licenses_file=lc_licenses_file,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)
# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for this transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
execute_job.after(ray_cluster)


if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(license_select, __file__.replace(".py", ".yaml"))
1 change: 1 addition & 0 deletions transforms/code/license_select/python/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
Loading

0 comments on commit 0f2c5b6

Please sign in to comment.