Skip to content

Commit

Permalink
Merge pull request #193 from Mohammad-nassar10/pipe-gen
Browse files Browse the repository at this point in the history
Auto generate kfp pipelines.
  • Loading branch information
roytman authored Jun 6, 2024
2 parents 676625a + be7a480 commit 5afcd40
Show file tree
Hide file tree
Showing 6 changed files with 334 additions and 0 deletions.
4 changes: 4 additions & 0 deletions kfp/pipeline_generator/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
## Steps to generate a new pipeline
- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml).
- execute `./run.sh <pipeline_definitions_file_path> <destination directory>`. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file
will be generated.
24 changes: 24 additions & 0 deletions kfp/pipeline_generator/example/pipeline_definitions.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
pipeline_parameters:
name: "noop"
description: "Pipeline for noop task"
script_name: "noop_transform.py"
prefix: ""
multi_s3: False
compute_func_name: ""
compute_func_import: ""
component_spec_path: ""

pipeline_common_input_parameters_values:
kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1"
transform_image: "quay.io/dataprep1/data-prep-kit/noop:0.8.0"
s3_access_secret: "s3-secret"
image_pull_secret: "prod-all-icr-io"
input_folder: "test/noop/input/"
output_folder: "test/noop/output/"

pipeline_transform_input_parameters:
pipeline_arguments:
- name: "noop_sleep_sec"
type: "int"
value: 10
description: "# noop sleep time"
125 changes: 125 additions & 0 deletions kfp/pipeline_generator/pipeline.ptmpl
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
# NOTE: This file is auto generated by Pipeline Generator.

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.utils import (
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)
__compute_import__

task_image = "__transform_image__"

# the name of the job script
EXEC_SCRIPT_NAME: str = "__script_name__"
PREFIX: str = "__prefix_name__"

# components
base_kfp_image = "__kfp_base_image__"

# path to kfp component specifications files
component_spec_path = "__component_spec_path__"

# compute execution parameters. Here different tranforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
compute_exec_params_op = comp.func_to_container_op(
func=__compute_func_name__, base_image=base_kfp_image
)
# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "__execute_comp__")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")

# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "__pipeline_name__"


# Pipeline to invoke execution on remote resource
@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="__pipeline_description__",
)
def __pipeline_name__(
ray_name: str = "__pipeline_name__-kfp-ray", # name of Ray cluster
ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "__image_pull_secret__", '
'"image": "' + task_image + '", "image_pull_policy": "Always" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image_pull_secret": "__image_pull_secret__", "image": "' + task_image + '", "image_pull_policy": "Always" }',
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': '__input_folder__', 'output_folder': '__output_folder__'}",
data_s3_access_secret: str = "__s3_access_secret__",
data_max_files: int = -1,
data_num_samples: int = -1,
data_checkpointing: bool = False,
data_data_sets: str = "",
data_files_to_use: str = "['.parquet']",
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",

additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
__pipeline_arguments__
):
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)

# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=dsl.RUN_ID_PLACEHOLDER,
additional_params=additional_params,
exec_params={
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"data_checkpointing": data_checkpointing,
"data_data_sets": data_data_sets,
"data_files_to_use": data_files_to_use,
"runtime_num_workers": compute_exec_params.output,
"runtime_worker_options": runtime_actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": dsl.RUN_ID_PLACEHOLDER,
"runtime_code_location": runtime_code_location,
__execute_job_params__
},
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
__prefix_execute__
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
__prefix_set_secret__
execute_job.after(ray_cluster)

# Configure the pipeline level to one week (in seconds)
dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC)

if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(__pipeline_name__, __file__.replace(".py", ".yaml"))
139 changes: 139 additions & 0 deletions kfp/pipeline_generator/pipeline_generator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,139 @@
import json

import yaml


PRE_COMMIT = "./pre-commit-config.yaml"
PIPELINE_TEMPLATE_FILE = "pipeline.ptmpl"

INPUT_PARAMETERS = "input_parameters"
PIPELINE_PARAMETERS = "pipeline_parameters"
PIPELINE_COMMON_INPUT_PARAMETERS_VALUES = "pipeline_common_input_parameters_values"
PIPELINE_TRANSFORM_INPUT_PARAMETERS = "pipeline_transform_input_parameters"

NAME = "name"
TYPE = "type"
VALUE = "value"
DESCRIPTION = "description"


def get_pipeline_input_parameters(arguments) -> str:
ret_str = ""
ret_str += get_generic_params(arguments.get("pipeline_arguments", None))
return ret_str


def get_generic_params(params) -> str:
ret_str = ""
if params is None:
return ret_str
for param in params:
ret_str += f"\n {param[NAME]}: {param[TYPE]} = "
if param[TYPE] == "str":
ret_str += f'"{param[VALUE]}"'
else:
ret_str += f"{param[VALUE]}"
ret_str += f", {param.get(DESCRIPTION, '')}"
return ret_str


def get_execute_job_params_guf(args) -> (str):
ret_execute_job_params = ""
if args is not None:
pargs = args.get("pipeline_arguments", None)
if pargs is not None:
for a in pargs:
ret_execute_job_params += f'"{a[NAME]}": {a[NAME]},\n'
return ret_execute_job_params


if __name__ == "__main__":
import argparse
import os
from pathlib import Path

from pre_commit.main import main

parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models")
parser.add_argument("-c", "--config_file", type=str, default="")
parser.add_argument("-od", "--output_dir_file", type=str, default="")

args = parser.parse_args()
# open configuration file
with open(args.config_file, "r") as file:
pipeline_definitions = yaml.safe_load(file)

pipeline_parameters = pipeline_definitions[PIPELINE_PARAMETERS]
common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES]

# Pipeline template file
fin = open(PIPELINE_TEMPLATE_FILE, "rt")

# Output file to write the pipeline
fout = open(f"{pipeline_parameters[NAME]}_wf.py", "wt")

# define the generated pipeline input parameters
transform_input_parameters = get_pipeline_input_parameters(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS])

# define arguments to the Ray execution job
execute_job_params = get_execute_job_params_guf(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS])

component_spec_path = pipeline_parameters.get("component_spec_path", "")
if component_spec_path == "":
component_spec_path = "../../../../../kfp/kfp_ray_components/"

compute_func_name = pipeline_parameters.get("compute_func_name", "")
if compute_func_name == "":
compute_func_name = "ComponentUtils.default_compute_execution_params"

compute_func_import = pipeline_parameters.get("compute_func_import", "")

execute_comp_file = "executeRayJobComponent.yaml"
prefix_name = ""
prefix_execute = ""
prefix_set_secret = ""
if pipeline_parameters.get("multi_s3", False) == True:
execute_comp_file = "executeRayJobComponent_multi_s3.yaml"
prefix_name = pipeline_parameters.get("prefix", "")
prefix_execute = "prefix=PREFIX"
prefix_set_secret = f"ComponentUtils.set_s3_env_vars_to_component(execute_job, {prefix_name}_s3_access_secret, prefix=PREFIX)"

# For each line in the template file
for line in fin:
# read replace the string and write to output pipeline file
fout.write(
line.replace("__pipeline_name__", pipeline_parameters[NAME])
.replace("__pipeline_description__", pipeline_parameters["description"])
.replace("__pipeline_arguments__", transform_input_parameters)
.replace("__execute_job_params__", execute_job_params)
.replace("__compute_func_name__", compute_func_name)
.replace("__component_spec_path__", component_spec_path)
.replace("__compute_import__", compute_func_import)
.replace("__script_name__", pipeline_parameters["script_name"])
.replace("__image_pull_secret__", common_input_params_values["image_pull_secret"])
.replace("__s3_access_secret__", common_input_params_values["s3_access_secret"])
.replace("__input_folder__", common_input_params_values.get("input_folder", ""))
.replace("__output_folder__", common_input_params_values.get("output_folder", ""))
.replace("__transform_image__", common_input_params_values["transform_image"])
.replace("__kfp_base_image__", common_input_params_values["kfp_base_image"])
.replace("__execute_comp__", execute_comp_file)
.replace("__prefix_name__", prefix_name)
.replace("__prefix_execute__", prefix_execute)
.replace("__prefix_set_secret__", prefix_set_secret)
)
# Move the generated file to the output directory
curr_dir = os.getcwd()
src_file = Path(f"{curr_dir}/{pipeline_parameters[NAME]}_wf.py")
dst_file = Path(f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py")
src_file.rename(dst_file)

fout.close()

import sys

from pre_commit.main import main

print(f"Pipeline ${dst_file} auto generation completed")
# format the pipeline python file
args = ["run", "--file", f"{dst_file}", "-c", PRE_COMMIT]
sys.exit(main(args))
31 changes: 31 additions & 0 deletions kfp/pipeline_generator/pre-commit-config.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
######################################################### {COPYRIGHT-TOP} ###
# IBM Confidential
# IBM Watson Machine Learning Core - Internal Tooling
# Copyright IBM Corp. 2022
######################################################### {COPYRIGHT-END} ###
repos:
- repo: https://github.com/pre-commit/mirrors-prettier
rev: v3.0.0-alpha.9-for-vscode
hooks:
- id: prettier
- repo: https://github.com/psf/black
rev: 22.3.0
hooks:
- id: black
args: [--config=.black.toml]
- repo: https://github.com/PyCQA/isort
rev: 5.12.0
hooks:
- id: isort
### Exclude submodules as some are part of other organizations with their own policies
exclude: |
(?x)^(
autopilot/.*|
codeflare-cli/.*|
codeflare-sdk/.*|
docker_build_scripts/.*|
mcad/.*|
datalake/.*|
torchx/.*|
tsfm/.*
)$
11 changes: 11 additions & 0 deletions kfp/pipeline_generator/run.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
#!/bin/bash

DEF_FILE=$1
DIST_DIR=$2
ROOT_DIR=${PWD}

mkdir -p ${ROOT_DIR}/${DIST_DIR}/
python3 -m venv venv
source venv/bin/activate
pip install pre-commit
python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/

0 comments on commit 5afcd40

Please sign in to comment.