diff --git a/kfp/pipeline_generator/README.md b/kfp/pipeline_generator/README.md new file mode 100644 index 000000000..6408fd5e3 --- /dev/null +++ b/kfp/pipeline_generator/README.md @@ -0,0 +1,4 @@ +## Steps to generate a new pipeline +- create a `pipeline_definitions.yaml` file for the required task (similar to the example [pipeline_definitions.yaml for the noop task](./example/pipeline_definitions.yaml). +- execute `./run.sh `. When `pipeline_definitions_file_path` is the path of the `pipeline_definitions.yaml` file that defines the pipeline and `destination directory` is a directory where new pipeline file +will be generated. diff --git a/kfp/pipeline_generator/example/pipeline_definitions.yaml b/kfp/pipeline_generator/example/pipeline_definitions.yaml new file mode 100644 index 000000000..be0877de6 --- /dev/null +++ b/kfp/pipeline_generator/example/pipeline_definitions.yaml @@ -0,0 +1,24 @@ +pipeline_parameters: + name: "noop" + description: "Pipeline for noop task" + script_name: "noop_transform.py" + prefix: "" + multi_s3: False + compute_func_name: "" + compute_func_import: "" + component_spec_path: "" + +pipeline_common_input_parameters_values: + kfp_base_image: "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.1.1" + transform_image: "quay.io/dataprep1/data-prep-kit/noop:0.8.0" + s3_access_secret: "s3-secret" + image_pull_secret: "prod-all-icr-io" + input_folder: "test/noop/input/" + output_folder: "test/noop/output/" + +pipeline_transform_input_parameters: + pipeline_arguments: + - name: "noop_sleep_sec" + type: "int" + value: 10 + description: "# noop sleep time" diff --git a/kfp/pipeline_generator/pipeline.ptmpl b/kfp/pipeline_generator/pipeline.ptmpl new file mode 100644 index 000000000..b21d4f7a8 --- /dev/null +++ b/kfp/pipeline_generator/pipeline.ptmpl @@ -0,0 +1,125 @@ +# NOTE: This file is auto generated by Pipeline Generator. + +import kfp.compiler as compiler +import kfp.components as comp +import kfp.dsl as dsl +from kfp_support.workflow_support.utils import ( + ONE_HOUR_SEC, + ONE_WEEK_SEC, + ComponentUtils, +) +__compute_import__ + +task_image = "__transform_image__" + +# the name of the job script +EXEC_SCRIPT_NAME: str = "__script_name__" +PREFIX: str = "__prefix_name__" + +# components +base_kfp_image = "__kfp_base_image__" + +# path to kfp component specifications files +component_spec_path = "__component_spec_path__" + +# compute execution parameters. Here different tranforms might need different implementations. As +# a result, instead of creating a component we are creating it in place here. +compute_exec_params_op = comp.func_to_container_op( + func=__compute_func_name__, base_image=base_kfp_image +) +# create Ray cluster +create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") +# execute job +execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "__execute_comp__") +# clean up Ray +cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml") + +# Task name is part of the pipeline name, the ray cluster name and the job name in DMF. +TASK_NAME: str = "__pipeline_name__" + + +# Pipeline to invoke execution on remote resource +@dsl.pipeline( + name=TASK_NAME + "-ray-pipeline", + description="__pipeline_description__", +) +def __pipeline_name__( + ray_name: str = "__pipeline_name__-kfp-ray", # name of Ray cluster + ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "__image_pull_secret__", ' + '"image": "' + task_image + '", "image_pull_policy": "Always" }', + ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, ' + '"image_pull_secret": "__image_pull_secret__", "image": "' + task_image + '", "image_pull_policy": "Always" }', + server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", + # data access + data_s3_config: str = "{'input_folder': '__input_folder__', 'output_folder': '__output_folder__'}", + data_s3_access_secret: str = "__s3_access_secret__", + data_max_files: int = -1, + data_num_samples: int = -1, + data_checkpointing: bool = False, + data_data_sets: str = "", + data_files_to_use: str = "['.parquet']", + # orchestrator + runtime_actor_options: str = "{'num_cpus': 0.8}", + runtime_pipeline_id: str = "pipeline_id", + runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}", + + additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}', + __pipeline_arguments__ +): + # create clean_up task + clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url) + ComponentUtils.add_settings_to_component(clean_up_task, 60) + # pipeline definition + with dsl.ExitHandler(clean_up_task): + # compute execution params + compute_exec_params = compute_exec_params_op( + worker_options=ray_worker_options, + actor_options=runtime_actor_options, + ) + ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2) + # start Ray cluster + ray_cluster = create_ray_op( + ray_name=ray_name, + run_id=dsl.RUN_ID_PLACEHOLDER, + ray_head_options=ray_head_options, + ray_worker_options=ray_worker_options, + server_url=server_url, + additional_params=additional_params, + ) + ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2) + ray_cluster.after(compute_exec_params) + + # Execute job + execute_job = execute_ray_jobs_op( + ray_name=ray_name, + run_id=dsl.RUN_ID_PLACEHOLDER, + additional_params=additional_params, + exec_params={ + "data_s3_config": data_s3_config, + "data_max_files": data_max_files, + "data_num_samples": data_num_samples, + "data_checkpointing": data_checkpointing, + "data_data_sets": data_data_sets, + "data_files_to_use": data_files_to_use, + "runtime_num_workers": compute_exec_params.output, + "runtime_worker_options": runtime_actor_options, + "runtime_pipeline_id": runtime_pipeline_id, + "runtime_job_id": dsl.RUN_ID_PLACEHOLDER, + "runtime_code_location": runtime_code_location, + __execute_job_params__ + }, + exec_script_name=EXEC_SCRIPT_NAME, + server_url=server_url, + __prefix_execute__ + ) + ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC) + ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret) + __prefix_set_secret__ + execute_job.after(ray_cluster) + + # Configure the pipeline level to one week (in seconds) + dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC) + +if __name__ == "__main__": + # Compiling the pipeline + compiler.Compiler().compile(__pipeline_name__, __file__.replace(".py", ".yaml")) diff --git a/kfp/pipeline_generator/pipeline_generator.py b/kfp/pipeline_generator/pipeline_generator.py new file mode 100644 index 000000000..9b6151c2d --- /dev/null +++ b/kfp/pipeline_generator/pipeline_generator.py @@ -0,0 +1,139 @@ +import json + +import yaml + + +PRE_COMMIT = "./pre-commit-config.yaml" +PIPELINE_TEMPLATE_FILE = "pipeline.ptmpl" + +INPUT_PARAMETERS = "input_parameters" +PIPELINE_PARAMETERS = "pipeline_parameters" +PIPELINE_COMMON_INPUT_PARAMETERS_VALUES = "pipeline_common_input_parameters_values" +PIPELINE_TRANSFORM_INPUT_PARAMETERS = "pipeline_transform_input_parameters" + +NAME = "name" +TYPE = "type" +VALUE = "value" +DESCRIPTION = "description" + + +def get_pipeline_input_parameters(arguments) -> str: + ret_str = "" + ret_str += get_generic_params(arguments.get("pipeline_arguments", None)) + return ret_str + + +def get_generic_params(params) -> str: + ret_str = "" + if params is None: + return ret_str + for param in params: + ret_str += f"\n {param[NAME]}: {param[TYPE]} = " + if param[TYPE] == "str": + ret_str += f'"{param[VALUE]}"' + else: + ret_str += f"{param[VALUE]}" + ret_str += f", {param.get(DESCRIPTION, '')}" + return ret_str + + +def get_execute_job_params_guf(args) -> (str): + ret_execute_job_params = "" + if args is not None: + pargs = args.get("pipeline_arguments", None) + if pargs is not None: + for a in pargs: + ret_execute_job_params += f'"{a[NAME]}": {a[NAME]},\n' + return ret_execute_job_params + + +if __name__ == "__main__": + import argparse + import os + from pathlib import Path + + from pre_commit.main import main + + parser = argparse.ArgumentParser(description="Kubeflow pipeline generator for Foundation Models") + parser.add_argument("-c", "--config_file", type=str, default="") + parser.add_argument("-od", "--output_dir_file", type=str, default="") + + args = parser.parse_args() + # open configuration file + with open(args.config_file, "r") as file: + pipeline_definitions = yaml.safe_load(file) + + pipeline_parameters = pipeline_definitions[PIPELINE_PARAMETERS] + common_input_params_values = pipeline_definitions[PIPELINE_COMMON_INPUT_PARAMETERS_VALUES] + + # Pipeline template file + fin = open(PIPELINE_TEMPLATE_FILE, "rt") + + # Output file to write the pipeline + fout = open(f"{pipeline_parameters[NAME]}_wf.py", "wt") + + # define the generated pipeline input parameters + transform_input_parameters = get_pipeline_input_parameters(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]) + + # define arguments to the Ray execution job + execute_job_params = get_execute_job_params_guf(pipeline_definitions[PIPELINE_TRANSFORM_INPUT_PARAMETERS]) + + component_spec_path = pipeline_parameters.get("component_spec_path", "") + if component_spec_path == "": + component_spec_path = "../../../../../kfp/kfp_ray_components/" + + compute_func_name = pipeline_parameters.get("compute_func_name", "") + if compute_func_name == "": + compute_func_name = "ComponentUtils.default_compute_execution_params" + + compute_func_import = pipeline_parameters.get("compute_func_import", "") + + execute_comp_file = "executeRayJobComponent.yaml" + prefix_name = "" + prefix_execute = "" + prefix_set_secret = "" + if pipeline_parameters.get("multi_s3", False) == True: + execute_comp_file = "executeRayJobComponent_multi_s3.yaml" + prefix_name = pipeline_parameters.get("prefix", "") + prefix_execute = "prefix=PREFIX" + prefix_set_secret = f"ComponentUtils.set_s3_env_vars_to_component(execute_job, {prefix_name}_s3_access_secret, prefix=PREFIX)" + + # For each line in the template file + for line in fin: + # read replace the string and write to output pipeline file + fout.write( + line.replace("__pipeline_name__", pipeline_parameters[NAME]) + .replace("__pipeline_description__", pipeline_parameters["description"]) + .replace("__pipeline_arguments__", transform_input_parameters) + .replace("__execute_job_params__", execute_job_params) + .replace("__compute_func_name__", compute_func_name) + .replace("__component_spec_path__", component_spec_path) + .replace("__compute_import__", compute_func_import) + .replace("__script_name__", pipeline_parameters["script_name"]) + .replace("__image_pull_secret__", common_input_params_values["image_pull_secret"]) + .replace("__s3_access_secret__", common_input_params_values["s3_access_secret"]) + .replace("__input_folder__", common_input_params_values.get("input_folder", "")) + .replace("__output_folder__", common_input_params_values.get("output_folder", "")) + .replace("__transform_image__", common_input_params_values["transform_image"]) + .replace("__kfp_base_image__", common_input_params_values["kfp_base_image"]) + .replace("__execute_comp__", execute_comp_file) + .replace("__prefix_name__", prefix_name) + .replace("__prefix_execute__", prefix_execute) + .replace("__prefix_set_secret__", prefix_set_secret) + ) + # Move the generated file to the output directory + curr_dir = os.getcwd() + src_file = Path(f"{curr_dir}/{pipeline_parameters[NAME]}_wf.py") + dst_file = Path(f"{args.output_dir_file}/{pipeline_parameters[NAME]}_wf.py") + src_file.rename(dst_file) + + fout.close() + + import sys + + from pre_commit.main import main + + print(f"Pipeline ${dst_file} auto generation completed") + # format the pipeline python file + args = ["run", "--file", f"{dst_file}", "-c", PRE_COMMIT] + sys.exit(main(args)) diff --git a/kfp/pipeline_generator/pre-commit-config.yaml b/kfp/pipeline_generator/pre-commit-config.yaml new file mode 100644 index 000000000..e34bd7815 --- /dev/null +++ b/kfp/pipeline_generator/pre-commit-config.yaml @@ -0,0 +1,31 @@ +######################################################### {COPYRIGHT-TOP} ### +# IBM Confidential +# IBM Watson Machine Learning Core - Internal Tooling +# Copyright IBM Corp. 2022 +######################################################### {COPYRIGHT-END} ### +repos: + - repo: https://github.com/pre-commit/mirrors-prettier + rev: v3.0.0-alpha.9-for-vscode + hooks: + - id: prettier + - repo: https://github.com/psf/black + rev: 22.3.0 + hooks: + - id: black + args: [--config=.black.toml] + - repo: https://github.com/PyCQA/isort + rev: 5.12.0 + hooks: + - id: isort +### Exclude submodules as some are part of other organizations with their own policies +exclude: | + (?x)^( + autopilot/.*| + codeflare-cli/.*| + codeflare-sdk/.*| + docker_build_scripts/.*| + mcad/.*| + datalake/.*| + torchx/.*| + tsfm/.* + )$ diff --git a/kfp/pipeline_generator/run.sh b/kfp/pipeline_generator/run.sh new file mode 100755 index 000000000..cba8ab41c --- /dev/null +++ b/kfp/pipeline_generator/run.sh @@ -0,0 +1,11 @@ +#!/bin/bash + +DEF_FILE=$1 +DIST_DIR=$2 +ROOT_DIR=${PWD} + +mkdir -p ${ROOT_DIR}/${DIST_DIR}/ +python3 -m venv venv +source venv/bin/activate +pip install pre-commit +python3 pipeline_generator.py -c ${DEF_FILE} -od ${ROOT_DIR}/${DIST_DIR}/