diff --git a/.make.versions b/.make.versions index d095bf977..0dcbc7520 100644 --- a/.make.versions +++ b/.make.versions @@ -14,6 +14,7 @@ RELEASE_VERSION_SUFFIX=.dev6 DPK_LIB_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX) DPK_LIB_KFP_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX) DPK_LIB_KFP_VERSION_v2=0.2.0$(RELEASE_VERSION_SUFFIX) +DPK_LIB_KFP_SHARED=0.2.0$(RELEASE_VERSION_SUFFIX) # Begin transform versions/tags BLOCKLIST_VERSION=0.4.2$(RELEASE_VERSION_SUFFIX) diff --git a/kfp/kfp_ray_components/Dockerfile b/kfp/kfp_ray_components/Dockerfile index a012640ec..fce8a8b31 100644 --- a/kfp/kfp_ray_components/Dockerfile +++ b/kfp/kfp_ray_components/Dockerfile @@ -11,8 +11,8 @@ RUN cd data-processing-lib-python && pip install --no-cache-dir -e . COPY --chown=ray:users data-processing-lib-ray/ data-processing-lib-ray/ RUN cd data-processing-lib-ray && pip install --no-cache-dir -e . -COPY --chown=ray:users python_apiserver_client python_apiserver_client/ -RUN cd python_apiserver_client && pip install --no-cache-dir -e . +COPY --chown=ray:users shared_workflow_support_lib shared_workflow_support_lib/ +RUN cd shared_workflow_support_lib && pip install --no-cache-dir -e . COPY --chown=ray:users workflow_support_lib workflow_support_lib/ RUN cd workflow_support_lib && pip install --no-cache-dir -e . diff --git a/kfp/kfp_ray_components/Makefile b/kfp/kfp_ray_components/Makefile index d51ed8268..d1b2d0141 100644 --- a/kfp/kfp_ray_components/Makefile +++ b/kfp/kfp_ray_components/Makefile @@ -28,12 +28,12 @@ DOCKER_IMG=$(DOCKER_LOCAL_IMAGE) .lib-src-image:: $(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_RAY_LIB_DIR) LIB_NAME=data-processing-lib-ray $(MAKE) .defaults.copy-lib LIB_PATH=$(DPK_PYTHON_LIB_DIR) LIB_NAME=data-processing-lib-python - $(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/python_apiserver_client LIB_NAME=python_apiserver_client + $(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/shared_workflow_support LIB_NAME=shared_workflow_support_lib $(MAKE) .defaults.copy-lib LIB_PATH=$(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB) LIB_NAME=workflow_support_lib $(MAKE) .defaults.image rm -rf data-processing-lib-ray rm -rf data-processing-lib-python - rm -rf python_apiserver_client + rm -rf shared_workflow_support_lib rm -rf workflow_support_lib .PHONY: image diff --git a/kfp/kfp_ray_components/src/create_ray_cluster.py b/kfp/kfp_ray_components/src/create_ray_cluster.py index a2b16d577..9510023c5 100644 --- a/kfp/kfp_ray_components/src/create_ray_cluster.py +++ b/kfp/kfp_ray_components/src/create_ray_cluster.py @@ -10,7 +10,7 @@ # limitations under the License. ################################################################################ import sys -from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs +from runtime_utils import KFPUtils, RayRemoteJobs def start_ray_cluster( diff --git a/kfp/kfp_ray_components/src/delete_ray_cluster.py b/kfp/kfp_ray_components/src/delete_ray_cluster.py index 55cf2f34b..ed5092e04 100644 --- a/kfp/kfp_ray_components/src/delete_ray_cluster.py +++ b/kfp/kfp_ray_components/src/delete_ray_cluster.py @@ -10,7 +10,7 @@ # limitations under the License. ################################################################################ import sys -from workflow_support.runtime_utils import KFPUtils, RayRemoteJobs +from runtime_utils import KFPUtils, RayRemoteJobs # Cleans and shutdowns the Ray cluster def cleanup_ray_cluster( diff --git a/kfp/kfp_ray_components/src/execute_ray_job.py b/kfp/kfp_ray_components/src/execute_ray_job.py index 173ccb06a..da4e1b8cd 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job.py +++ b/kfp/kfp_ray_components/src/execute_ray_job.py @@ -9,7 +9,7 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ -from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs +from runtime_utils import KFPUtils, execute_ray_jobs if __name__ == "__main__": import argparse diff --git a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py index b7b5d9863..2e45328f8 100644 --- a/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py +++ b/kfp/kfp_ray_components/src/execute_ray_job_multi_s3.py @@ -10,7 +10,7 @@ # limitations under the License. ################################################################################ -from workflow_support.runtime_utils import KFPUtils, execute_ray_jobs +from runtime_utils import KFPUtils, execute_ray_jobs if __name__ == "__main__": import argparse diff --git a/kfp/kfp_ray_components/src/subworkflow.py b/kfp/kfp_ray_components/src/subworkflow.py index f15877d86..f08a9b5d8 100644 --- a/kfp/kfp_ray_components/src/subworkflow.py +++ b/kfp/kfp_ray_components/src/subworkflow.py @@ -1,6 +1,6 @@ import sys -from workflow_support.runtime_utils import KFPUtils +from runtime_utils import KFPUtils from workflow_support.pipeline_utils import PipelinesUtils diff --git a/kfp/kfp_support_lib/README.md b/kfp/kfp_support_lib/README.md index 440fc16c3..20758c957 100644 --- a/kfp/kfp_support_lib/README.md +++ b/kfp/kfp_support_lib/README.md @@ -3,7 +3,7 @@ This provides support for implementing KFP pipelines automating transform's execution. It comprises 3 main modules -* [api server client](python_apiserver_client/README.md) +* [shared_workflow_support](shared_workflow_support/README.md) * [kfp_v1_workflow_support](kfp_v1_workflow_support//README.md) * [kfp_v2_workflow_support](kfp_v2_workflow_support//README.md) diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/Makefile b/kfp/kfp_support_lib/kfp_v1_workflow_support/Makefile index 6c7a8930d..b33204209 100644 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/Makefile +++ b/kfp/kfp_support_lib/kfp_v1_workflow_support/Makefile @@ -27,7 +27,7 @@ clean:: set-versions: .check-env @# Help: Copy the Makefile distribution version into the pyproject.toml sed -i.back 's/^version[ ]*=.*/version = "'${DPK_LIB_KFP_VERSION}'"/' pyproject.toml - sed -i.back 's/data_prep_toolkit_ray==[0-9].*/data_prep_toolkit_ray==${DPK_LIB_VERSION}",/' pyproject.toml + sed -i.back 's/data_prep_toolkit==[0-9].*/data_prep_toolkit==${DPK_LIB_VERSION}",/' pyproject.toml sed -i.back 's/kfp==[0-9].*/kfp==${KFP_v1}",/' pyproject.toml sed -i.back 's/ray=[0-9].*/ray==${RAY}",/' pyproject.toml @@ -60,12 +60,11 @@ else $(PYTHON) -m venv venv . ${VENV_ACTIVATE}; \ cd ../../../data-processing-lib/python && make set-versions && cd -; \ + pip install --upgrade pip; \ pip install -e ../../../data-processing-lib/python; \ - cd ../../../data-prepossesing-lib/ray && make set-versions && cd -; \ - pip install -e ../../../data-processing-lib/ray; \ - cd ../python_apiserver_client && make set-versions && cd -; \ - pip install -e ../python_apiserver_client; \ - pip install -e .; \ + cd ../shared_workflow_support && make set-versions && cd -; \ + pip install -e ../shared_workflow_support; \ + pip install -e .; \ pip install pytest pytest-cov endif @@ -76,7 +75,6 @@ ifeq ($(KFPv2), 1) else @# Help: Use the already-built virtual environment to run pytest on the test directory. ifeq ($(DEPLOY_KUBEFLOW),1) - . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) ray_remote_jobs_test.py; . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) pipeline_utils_test.py; endif endif diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/pyproject.toml b/kfp/kfp_support_lib/kfp_v1_workflow_support/pyproject.toml index 679f7ed08..3e2690923 100644 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/pyproject.toml +++ b/kfp/kfp_support_lib/kfp_v1_workflow_support/pyproject.toml @@ -15,8 +15,7 @@ dependencies = [ "kfp==1.8.22", "ray==2.9.3", "requests", - "data_prep_toolkit_ray==0.2.0.dev6", - "python_apiserver_client==0.1.0", + "data_prep_toolkit==0.2.0.dev6", ] [build-system] @@ -40,7 +39,6 @@ package_dir = ["src"] [options.packages.find] where = ["src/workflow_support"] - [tool.pytest.ini_options] addopts = "--cov --cov-report term-missing --cov-fail-under 10" markers = ["unit: unit tests", "integration: integration tests"] diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/__init__.py b/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/__init__.py deleted file mode 100644 index 8d2cdd648..000000000 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from workflow_support.runtime_utils.kfp_utils import KFPUtils -from workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py deleted file mode 100644 index feb081dd2..000000000 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py +++ /dev/null @@ -1,160 +0,0 @@ -# (C) Copyright IBM Corp. 2024. -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -import json -import os -import re -import sys -from typing import Any - -from data_processing.utils import get_logger - - -logger = get_logger(__name__) - - -class KFPUtils: - """ - Helper utilities for KFP implementations - """ - - @staticmethod - def credentials( - access_key: str = "S3_KEY", secret_key: str = "S3_SECRET", endpoint: str = "ENDPOINT" - ) -> tuple[str, str, str]: - """ - Get credentials from the environment - :param access_key: environment variable for access key - :param secret_key: environment variable for secret key - :param endpoint: environment variable for S3 endpoint - :return: - """ - s3_key = os.getenv(access_key, None) - s3_secret = os.getenv(secret_key, None) - s3_endpoint = os.getenv(endpoint, None) - if s3_key is None or s3_secret is None or s3_endpoint is None: - logger.warning("Failed to load s3 credentials") - return s3_key, s3_secret, s3_endpoint - - @staticmethod - def get_namespace() -> str: - """ - Get k8 namespace that we are running it - :return: - """ - ns = "" - try: - file = open("/var/run/secrets/kubernetes.io/serviceaccount/namespace", "r") - except Exception as e: - logger.warning( - f"Failed to open /var/run/secrets/kubernetes.io/serviceaccount/namespace file, " f"exception {e}" - ) - else: - with file: - ns = file.read() - return ns - - @staticmethod - def runtime_name(ray_name: str = "", run_id: str = "") -> str: - """ - Get unique runtime name - :param ray_name: - :param run_id: - :return: runtime name - """ - # K8s objects cannot contain special characters, except '_', All characters should be in lower case. - if ray_name != "": - ray_name = ray_name.replace("_", "-").lower() - pattern = r"[^a-zA-Z0-9-]" # the ray_name cannot contain upper case here, but leave it just in case. - ray_name = re.sub(pattern, "", ray_name) - else: - ray_name = "a" - # the return value plus namespace name will be the name of the Ray Route, - # which length is restricted to 64 characters, - # therefore we restrict the return name by 15 character. - if run_id != "": - return f"{ray_name[:9]}-{run_id[:5]}" - return ray_name[:15] - - @staticmethod - def dict_to_req(d: dict[str, Any], executor: str = "transformer_launcher.py") -> str: - res = f"python {executor} " - for key, value in d.items(): - if str(value) != "": - if isinstance(value, str): - if '"' in value: - logger.warning(f"can't parse inputs with double quotation marks, please use single quotation marks instead") - res += f'--{key}="{value}" ' - elif isinstance(value, bool): - if value: - res += f"--{key} " - else: - res += f"--{key}={value} " - - logger.info(f"request to execute: {res}") - return res - - # Load a string that represents a json to python dictionary - @staticmethod - def load_from_json(js: str) -> dict[str, Any]: - try: - return json.loads(js) - except Exception as e: - logger.warning(f"Failed to load parameters {js} with error {e}") - sys.exit(1) - - @staticmethod - def default_compute_execution_params( - worker_options: str, # ray worker configuration - actor_options: str, # cpus per actor - ) -> str: - """ - This is the most simplistic transform execution parameters computation - :param worker_options: configuration of ray workers - :param actor_options: actor request requirements - :return: number of actors - """ - import sys - - from data_processing.utils import GB, get_logger - from workflow_support.runtime_utils import KFPUtils - - logger = get_logger(__name__) - - # convert input - w_options = KFPUtils.load_from_json(worker_options.replace("'", '"')) - a_options = KFPUtils.load_from_json(actor_options.replace("'", '"')) - # Compute available cluster resources - cluster_cpu = w_options["replicas"] * w_options["cpu"] - cluster_mem = w_options["replicas"] * w_options["memory"] - cluster_gpu = w_options["replicas"] * w_options.get("gpu", 0.0) - logger.info(f"Cluster available CPUs {cluster_cpu}, Memory {cluster_mem}, GPUs {cluster_gpu}") - # compute number of actors - n_actors_cpu = int(cluster_cpu * 0.85 / a_options.get("num_cpus", 0.5)) - n_actors_memory = int(cluster_mem * 0.85 / (a_options.get("memory", GB) / GB)) - n_actors = min(n_actors_cpu, n_actors_memory) - # Check if we need gpu calculations as well - actor_gpu = a_options.get("num_gpus", 0) - if actor_gpu > 0: - n_actors_gpu = int(cluster_gpu / actor_gpu) - n_actors = min(n_actors, n_actors_gpu) - logger.info(f"Number of actors - {n_actors}") - if n_actors < 1: - logger.warning( - f"Not enough cpu/gpu/memory to run transform, " - f"required cpu {a_options.get('num_cpus', .5)}, available {cluster_cpu}, " - f"required memory {a_options.get('memory', 1)}, available {cluster_mem}, " - f"required cpu {actor_gpu}, available {cluster_gpu}" - ) - sys.exit(1) - - return str(n_actors) \ No newline at end of file diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py b/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py deleted file mode 100644 index 0b20b28c4..000000000 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py +++ /dev/null @@ -1,527 +0,0 @@ -# (C) Copyright IBM Corp. 2024. -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -import re -import sys -import time -from typing import Any - -from data_processing.data_access import DataAccess, DataAccessFactory -from data_processing.utils import ParamsUtils, get_logger -from python_apiserver_client import KubeRayAPIs -from python_apiserver_client.params import ( - DEFAULT_HEAD_START_PARAMS, - DEFAULT_WORKER_START_PARAMS, - Cluster, - ClusterSpec, - HeadNodeSpec, - RayJobRequest, - Template, - WorkerNodeSpec, - environment_variables_decoder, - volume_decoder, -) -from workflow_support.runtime_utils import KFPUtils -from ray.job_submission import JobStatus - - -logger = get_logger(__name__) - - -class RayRemoteJobs: - """ - class supporting Ray remote jobs - """ - - ansi_escape = re.compile(r"\x1B\[[0-?]*[ -/]*[@-~]") - - def __init__( - self, - server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888", - default_image: str = "rayproject/ray:2.9.3-py310", - http_retries: int = 5, - wait_interval: int = 2, - ): - """ - Initialization - :param server_url: API server URL. Default value is assuming running inside the cluster - :param default_image - default Ray image - :param wait_interval: wait interval - :param http_retries: http retries - """ - self.api_server_client = KubeRayAPIs( - server_url=server_url, http_retries=http_retries, wait_interval=wait_interval - ) - self.default_image = default_image - - def create_ray_cluster( - self, - name: str, - namespace: str, - head_node: dict[str, Any], - worker_nodes: list[dict[str, Any]], - wait_cluster_ready: int = -1, - ) -> tuple[int, str]: - """ - Create Ray cluster - :param name: name, _ are not allowed in the name - :param namespace: namespace - :param head_node: head node specification dictionary including the following: - mandatory fields: - cpu - number of cpus - memory memory size (GB) - image - image to use - optional fields: - gpu - number of gpus - gpu_accelerator - gpu accelerator to use - image_pull_secret - image pull secret - ray_start_params - dictionary of ray start parameters - volumes - list of volumes for head node - service_account - service account to use (has to be created) - environment - dictionary of head node environment - annotations: dictionary of head node annotation - labels: dictionary of head node labels - image_pull_policy: image pull policy, default IfNotPresent - - :param worker_nodes: an array of worker node specification dictionary including the following: - mandatory fields: - cpu - number of cpus - memory memory size (GB) - image - image to use - max_replicas - max replicas for this worker group - optional fields: - gpu - number of gpus - gpu_accelerator - gpu accelerator to use - replicas - number of replicas to create for this group (default 1) - min_replicas - min number of replicas for this group (default 0) - image_pull_secret - image pull secret - ray_start_params - dictionary of ray start parameters - volumes - list of volumes for this group - service_account - service account to use (has to be created) - environment - dictionary of node of this group environment - annotations: dictionary of node of this group annotation - labels: dictionary of node of this group labels - image_pull_policy: image pull policy, default IfNotPresent - - :param wait_cluster_ready - time to wait for cluster ready sec (-1 forever) - :return:tuple containing - http return code - message - only returned if http return code is not equal to 200 - """ - # start with templates - # head_node - cpus = head_node.get("cpu", 1) - memory = head_node.get("memory", 1) - gpus = head_node.get("gpu", 0) - accelerator = head_node.get("gpu_accelerator", None) - head_node_template_name = f"{name}-head-template" - _, _ = self.api_server_client.delete_compute_template(ns=namespace, name=head_node_template_name) - head_template = Template( - name=head_node_template_name, - namespace=namespace, - cpu=cpus, - memory=memory, - gpu=gpus, - gpu_accelerator=accelerator, - ) - status, error = self.api_server_client.create_compute_template(head_template) - if status != 200: - return status, error - worker_template_names = [""] * len(worker_nodes) - index = 0 - # For every worker group - for worker_node in worker_nodes: - cpus = worker_node.get("cpu", 1) - memory = worker_node.get("memory", 1) - gpus = worker_node.get("gpu", 0) - accelerator = worker_node.get("gpu_accelerator", None) - worker_node_template_name = f"{name}-worker-template-{index}" - _, _ = self.api_server_client.delete_compute_template(ns=namespace, name=worker_node_template_name) - worker_template = Template( - name=worker_node_template_name, - namespace=namespace, - cpu=cpus, - memory=memory, - gpu=gpus, - gpu_accelerator=accelerator, - ) - status, error = self.api_server_client.create_compute_template(worker_template) - if status != 200: - return status, error - worker_template_names[index] = worker_node_template_name - index += 1 - # Build head node spec - image = head_node.get("image", self.default_image) - image_pull_secret = head_node.get("image_pull_secret", None) - image_pull_policy = head_node.get("image_pull_policy", None) - ray_start_params = head_node.get("ray_start_params", DEFAULT_HEAD_START_PARAMS) - volumes_dict = head_node.get("volumes", None) - service_account = head_node.get("service_account", None) - environment_dict = head_node.get("environment", None) - annotations = head_node.get("annotations", None) - labels = head_node.get("labels", None) - if volumes_dict is None: - volumes = None - else: - volumes = [volume_decoder(v) for v in volumes_dict] - if environment_dict is None: - environment = None - else: - environment = environment_variables_decoder(environment_dict) - head_node_spec = HeadNodeSpec( - compute_template=head_node_template_name, - image=image, - ray_start_params=ray_start_params, - volumes=volumes, - service_account=service_account, - image_pull_secret=image_pull_secret, - environment=environment, - annotations=annotations, - labels=labels, - image_pull_policy=image_pull_policy, - ) - # build worker nodes - worker_groups = [] - index = 0 - for worker_node in worker_nodes: - max_replicas = worker_node.get("max_replicas", 1) - replicas = worker_node.get("replicas", 1) - min_replicas = worker_node.get("min_replicas", 0) - image = worker_node.get("image", self.default_image) - image_pull_secret = worker_node.get("image_pull_secret", None) - image_pull_policy = head_node.get("image_pull_policy", None) - ray_start_params = worker_node.get("ray_start_params", DEFAULT_WORKER_START_PARAMS) - volumes_dict = worker_node.get("volumes", None) - service_account = worker_node.get("service_account", None) - environment_dict = worker_node.get("environment", None) - annotations = worker_node.get("annotations", None) - labels = worker_node.get("labels", None) - if volumes_dict is None: - volumes = None - else: - volumes = [volume_decoder(v) for v in volumes_dict] - if environment_dict is None: - environment = None - else: - environment = environment_variables_decoder(environment_dict) - worker_groups.append( - WorkerNodeSpec( - group_name=f"worker-group-{index}", - compute_template=worker_template_names[index], - image=image, - max_replicas=max_replicas, - replicas=replicas, - min_replicas=min_replicas, - ray_start_params=ray_start_params, - volumes=volumes, - service_account=service_account, - image_pull_secret=image_pull_secret, - environment=environment, - annotations=annotations, - labels=labels, - image_pull_policy=image_pull_policy, - ) - ) - index += 1 - # Build cluster spec - cluster_spec = ClusterSpec(head_node=head_node_spec, worker_groups=worker_groups) - # Build cluster - cluster = Cluster(name=name, namespace=namespace, user="dataprep", version="2.9.3", cluster_spec=cluster_spec) - status, error = self.api_server_client.create_cluster(cluster) - if status != 200: - return status, error - # Wait for cluster ready - return self.api_server_client.wait_cluster_ready(name=name, ns=namespace, wait=wait_cluster_ready) - - def delete_ray_cluster(self, name: str, namespace: str) -> tuple[int, str]: - """ - Clean up Ray cluster and supporting template - :param name: cluster name - :param namespace: cluster namespace - :return:tuple containing - http return code - message - only returned if http return code is not equal to 200 - """ - # delete cluster - status, error = self.api_server_client.delete_cluster(ns=namespace, name=name) - if status != 200: - return status, error - # clean up templates - status, error, template_array = self.api_server_client.list_compute_templates_namespace(ns=namespace) - if status != 200: - return status, error - for template in template_array: - if template.name.startswith(name): - status, error = self.api_server_client.delete_compute_template(ns=namespace, name=template.name) - if status != 200: - return status, error - return status, error - - def submit_job( - self, - name: str, - namespace: str, - request: dict[str, Any], - runtime_env: str = None, - executor: str = "transformer_launcher.py", - ) -> tuple[int, str, str]: - """ - Submit job for execution - :param name: cluster name - :param namespace: cluster namespace - :param request: dictionary of the remote job request - :param runtime_env: runtime environment string - :param executor: python file to execute - :return:tuple containing - http return code - message - only returned if http return code is not equal to 200 - submission id - submission id - """ - # Although the cluster is ready, the service web server might not be ready yet at this point. - # To ensure that it is ready, trying to get jobs info from the cluster. Even if it fails - # couple of times, its harmless - _, _, _ = self.api_server_client.list_job_info(ns=namespace, name=name) - time.sleep(5) - # Build job request - job_request = RayJobRequest(entrypoint=KFPUtils.dict_to_req(d=request, executor=executor)) - if runtime_env is not None: - job_request.runtime_env = runtime_env - return self.api_server_client.submit_job(ns=namespace, name=name, job_request=job_request) - - def _get_job_status(self, name: str, namespace: str, submission_id: str) -> tuple[int, str, str]: - """ - Get job status - :param name: cluster name - :param namespace: cluster namespace - :param submission_id: job submission ID - :return:tuple containing - http return code - message - only returned if http return code is not equal to 200 - status - job status - """ - # get job info - status, error, info = self.api_server_client.get_job_info(ns=namespace, name=name, sid=submission_id) - if status // 100 != 2: - return status, error, "" - return status, error, info.status - - @staticmethod - def _print_log(log: str, previous_log_len: int) -> None: - """ - Prints the delta between current and previous logs - :param log: current log - :param previous_log_len: previous log length - :return: None - """ - l_to_print = log[previous_log_len:] - if len(l_to_print) > 0: - l_to_print = RayRemoteJobs.ansi_escape.sub("", l_to_print) - print(l_to_print) - - def follow_execution( - self, - name: str, - namespace: str, - submission_id: str, - data_access: DataAccess = None, - job_ready_timeout: int = 600, - print_timeout: int = 120, - ) -> None: - """ - Follow remote job execution - :param name: cluster name - :param namespace: cluster namespace - :param submission_id: job submission ID - :param data_access - data access class - :param job_ready_timeout: timeout to wait for fob to become ready - :param print_timeout: print interval - :return: None - """ - # Wait for job to start running - job_status = JobStatus.PENDING - while job_status != JobStatus.RUNNING and job_ready_timeout > 0: - status, error, job_status = self._get_job_status( - name=name, namespace=namespace, submission_id=submission_id - ) - if status // 100 != 2: - sys.exit(1) - if job_status in {JobStatus.STOPPED, JobStatus.SUCCEEDED, JobStatus.FAILED, JobStatus.RUNNING}: - break - time.sleep(self.api_server_client.wait_interval) - job_ready_timeout -= self.api_server_client.wait_interval - logger.info(f"job status is {job_status}") - if job_ready_timeout <= 0: - logger.warning("timed out waiting for job become ready, exiting") - sys.exit(1) - # While job is running print log - previous_log_len = 0 - # At this point job could succeeded, failed, stop or running. So print log regardless - status, error, log = self.api_server_client.get_job_log(ns=namespace, name=name, sid=submission_id) - if status // 100 != 2: - sys.exit(1) - self._print_log(log=log, previous_log_len=previous_log_len) - previous_log_len = len(log) - # continue printing log, while job is running - while job_status == JobStatus.RUNNING: - time.sleep(print_timeout) - status, error, log = self.api_server_client.get_job_log(ns=namespace, name=name, sid=submission_id) - if status // 100 != 2: - sys.exit(1) - self._print_log(log=log, previous_log_len=previous_log_len) - previous_log_len = len(log) - status, error, job_status = self._get_job_status( - name=name, namespace=namespace, submission_id=submission_id - ) - if status // 100 != 2: - sys.exit(1) - # Print the final log and execution status - # Sleep here to avoid racing conditions - time.sleep(2) - status, error, log = self.api_server_client.get_job_log(ns=namespace, name=name, sid=submission_id) - if status // 100 != 2: - sys.exit(1) - self._print_log(log=log, previous_log_len=previous_log_len) - logger.info(f"Job completed with execution status {job_status}") - if job_status != JobStatus.SUCCEEDED: - sys.exit(1) - if data_access is None: - return - # Here data access is either S3 or lakehouse both of which contain self.output_folder - try: - output_folder = data_access.get_output_folder() - except Exception as e: - logger.warning(f"failed to get output folder {e}") - return - output_folder = output_folder if output_folder.endswith("/") else output_folder + "/" - execution_log_path = f"{output_folder}execution.log" - logger.info(f"saving execution log to {execution_log_path}") - data_access.save_file(path=execution_log_path, data=bytes(log, "UTF-8")) - - -def _execute_remote_job( - name: str, - ns: str, - script: str, - params: dict[str, Any], - data_access_params: dict[str, Any], - additional_params: dict[str, Any], - remote_jobs: RayRemoteJobs, -) -> None: - """ - Execute remote job on Ray cluster - :param name: cluster name - :param ns: execution/cluster namespace - :param additional_params: additional parameters for the job - :param data_access_params: data access parameters - :param params: job execution parameters (specific for a specific transform, - generated by the transform workflow) - :param script: script to run (has to be present in the image) - :param remote_jobs: remote jobs execution support class - :return: - """ - - status, error, submission = remote_jobs.submit_job(name=name, namespace=ns, request=params, executor=script) - if status != 200: - logger.error(f"Failed to submit job - status: {status}, error: {error}") - exit(1) - - logger.info(f"submitted job successfully, submission id {submission}") - # create data access - data_factory = DataAccessFactory() - data_factory.apply_input_params(args=data_access_params) - data_access = data_factory.create_data_access() - # print execution log - remote_jobs.follow_execution( - name=name, - namespace=ns, - submission_id=submission, - data_access=data_access, - print_timeout=additional_params.get("wait_print_tmout", 120), - job_ready_timeout=additional_params.get("wait_job_ready_tmout", 600), - ) - - -def execute_ray_jobs( - name: str, # name of Ray cluster - additional_params: dict[str, Any], - e_params: dict[str, Any], - exec_script_name: str, - server_url: str, -) -> None: - """ - Execute Ray jobs on a cluster periodically printing execution log. Completes when all Ray job complete. - All of the jobs will be executed, although some of the jobs may fail. - :param name: cluster name - :param additional_params: additional parameters for the job - :param e_params: job execution parameters (specific for a specific transform, - generated by the transform workflow) - :param exec_script_name: script to run (has to be present in the image) - :param server_url: API server url - :return: None - """ - # prepare for execution - ns = KFPUtils.get_namespace() - if ns == "": - logger.warning(f"Failed to get namespace") - sys.exit(1) - # create remote jobs class - remote_jobs = RayRemoteJobs( - server_url=server_url, - http_retries=additional_params.get("http_retries", 5), - wait_interval=additional_params.get("wait_interval", 2), - ) - # find config parameter - config = ParamsUtils.get_config_parameter(e_params) - if config is None: - exit(1) - # get config value - config_value = KFPUtils.load_from_json(e_params[config].replace("'", '"')) - s3_creds = KFPUtils.load_from_json(e_params["data_s3_cred"].replace("'", '"')) - if type(config_value) is not list: - # single request - return _execute_remote_job( - name=name, - ns=ns, - script=exec_script_name, - data_access_params={config: config_value, "data_s3_cred": s3_creds}, - params=e_params, - additional_params=additional_params, - remote_jobs=remote_jobs, - ) - # remove config key from the dictionary - launch_params = dict(e_params) - del launch_params[config] - # Loop through all configuration - n_launches = 0 - for conf in config_value: - # populate individual config and launch - launch_params[config] = ParamsUtils.convert_to_ast(d=conf) - try: - _execute_remote_job( - name=name, - ns=ns, - script=exec_script_name, - data_access_params={config: conf, "data_s3_cred": s3_creds}, - params=launch_params, - additional_params=additional_params, - remote_jobs=remote_jobs, - ) - n_launches += 1 - except SystemExit: - logger.warning(f"Failed to execute job for configuration {conf}") - continue - - if n_launches == 0: - logger.warning("All executions failed") - sys.exit(1) - else: - logger.info(f"{n_launches} ot of {len(config_value)} succeeded") diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile b/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile index 59552f9e1..371cef589 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/Makefile @@ -27,7 +27,7 @@ clean:: set-versions: .check-env @# Help: Copy the Makefile distribution version into the pyproject.toml sed -i.back 's/^version[ ]*=.*/version = "'${DPK_LIB_KFP_VERSION_v2}'"/' pyproject.toml - sed -i.back 's/data_prep_toolkit_ray==[0-9].*/data_prep_toolkit_ray==${DPK_LIB_VERSION}",/' pyproject.toml + sed -i.back 's/data_prep_toolkit==[0-9].*/data_prep_toolkit==${DPK_LIB_VERSION}",/' pyproject.toml sed -i.back 's/kfp==[0-9].*/kfp==${KFP_v2}",/' pyproject.toml sed -i.back 's/ray=[0-9].*/ray==${RAY}",/' pyproject.toml @@ -60,12 +60,11 @@ else $(PYTHON) -m venv venv . ${VENV_ACTIVATE}; \ cd ../../../data-processing-lib/python && make set-versions && cd -; \ + pip install --upgrade pip; \ pip install -e ../../../data-processing-lib/python; \ - cd ../../../data-prepossesing-lib/ray && make set-versions && cd -; \ - pip install -e ../../../data-processing-lib/ray; \ - cd ../python_apiserver_client && make set-versions && cd -; \ - pip install -e ../python_apiserver_client; \ - pip install -e .; \ + cd ../shared_workflow_support && make set-versions && cd -; \ + pip install -e ../shared_workflow_support; \ + pip install -e .; \ pip install pytest pytest-cov endif @@ -76,8 +75,6 @@ ifneq ($(KFPv2), 1) else @# Help: Use the already-built virtual environment to run pytest on the test directory. ifeq ($(DEPLOY_KUBEFLOW),1) - . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) kuberay_api_test.py; - . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) ray_remote_jobs_test.py; . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) pipeline_utils_test.py; endif endif diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml b/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml index 3e1607ee6..86ae43e5d 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/pyproject.toml @@ -17,7 +17,6 @@ dependencies = [ "ray==2.9.3", "requests", "data_prep_toolkit_ray==0.2.0.dev6", - "python_apiserver_client", ] [build-system] diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_compile_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_compile_utils/__init__.py new file mode 100644 index 000000000..aaa107060 --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_compile_utils/__init__.py @@ -0,0 +1,6 @@ +from workflow_utils import ( + ONE_HOUR_SEC, + ONE_DAY_SEC, + ONE_WEEK_SEC, + ComponentUtils +) \ No newline at end of file diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_compile_utils/component.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_compile_utils/component.py new file mode 100644 index 000000000..4fa47290f --- /dev/null +++ b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_compile_utils/component.py @@ -0,0 +1,58 @@ +import kfp.dsl as dsl +from kfp import kubernetes +from typing import Dict + +RUN_NAME = "KFP_RUN_NAME" + +ONE_HOUR_SEC = 60 * 60 +ONE_DAY_SEC = ONE_HOUR_SEC * 24 +ONE_WEEK_SEC = ONE_DAY_SEC * 7 + +class ComponentUtils: + """ + Class containing methods supporting building pipelines + """ + + @staticmethod + def add_settings_to_component( + task: dsl.PipelineTask, + timeout: int, + image_pull_policy: str = "IfNotPresent", + cache_strategy: bool = False, + ) -> None: + """ + Add settings to kfp task + :param task: kfp task + :param timeout: timeout to set to the component in seconds + :param image_pull_policy: pull policy to set to the component + :param cache_strategy: cache strategy + """ + + kubernetes.use_field_path_as_env(task, env_name=RUN_NAME, + field_path="metadata.annotations['pipelines.kubeflow.org/run_name']") + # Set cashing + task.set_caching_options(enable_caching=cache_strategy) + # image pull policy + kubernetes.set_image_pull_policy(task, image_pull_policy) + # Set the timeout for the task to one day (in seconds) + kubernetes.set_timeout(task, seconds=timeout) + + @staticmethod + def set_s3_env_vars_to_component( + task: dsl.PipelineTask, + secret: str = '', + env2key: Dict[str, str] = {'s3-key': 'S3_KEY', 's3-secret': 'S3_SECRET', 's3-endpoint': 'ENDPOINT'}, + prefix: str = None, + ) -> None: + """ + Set S3 env variables to KFP component + :param task: kfp task + :param secret: secret name with the S3 credentials + :param env2key: dict with mapping each env variable to a key in the secret + :param prefix: prefix to add to env name + """ + + if prefix is not None: + for env_name, _ in env2key.items(): + env2key[prefix + "_" + env_name] = env2key.pop(env_name) + kubernetes.use_secret_as_env(task=task, secret_name='s3-secret', secret_key_to_env=env2key) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py deleted file mode 100644 index 8d2cdd648..000000000 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/__init__.py +++ /dev/null @@ -1,2 +0,0 @@ -from workflow_support.runtime_utils.kfp_utils import KFPUtils -from workflow_support.runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py b/kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py deleted file mode 100644 index f409550e9..000000000 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/test/ray_remote_jobs_test.py +++ /dev/null @@ -1,91 +0,0 @@ -# (C) Copyright IBM Corp. 2024. -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -from configmaps import ConfigmapsManager -from python_apiserver_client.params import ConfigMapVolume -from workflow_support.runtime_utils import RayRemoteJobs - -server_url = "http:localhost:8080/ray/" - -def test_ray_remote_jobs(): - """ - Test the full cycle of job submission - :return: - """ - # This shows how to create volumes dictionary - volumes = [ - ConfigMapVolume( - name="code-sample", - mount_path="/home/ray/samples", - source="ray-job-code-sample", - items={"sample_code.py": "sample_code.py"}, - ) - ] - dct_volumes = {"volumes": [v.to_dict() for v in volumes]} - - head_node = { - "cpu": 2, - "memory": 4, - "image": "rayproject/ray:2.9.3-py310", - # Ray start params, just to show - "ray_start_params": {"metrics-export-port": "8080", "num-cpus": "0", "dashboard-host": "0.0.0.0"}, - "image_pull_policy": "Always", - } | dct_volumes - - worker_node = { - "cpu": 2, - "memory": 4, - "image": "rayproject/ray:2.9.3-py310", - "replicas": 1, - "min_replicas": 1, - "max_replicas": 1, - "image_pull_policy": "Always", - } | dct_volumes - - # Create configmap for testing - cm_manager = ConfigmapsManager() - cm_manager.delete_code_map() - cm_manager.create_code_map() - - # create cluster - remote_jobs = RayRemoteJobs(server_url=server_url) - status, error = remote_jobs.create_ray_cluster( - name="job-test", namespace="default", head_node=head_node, worker_nodes=[worker_node] - ) - print(f"Created cluster - status: {status}, error: {error}") - assert status == 200 - assert error is None - # submitting ray job - runtime_env = """ - pip: - - requests==2.26.0 - - pendulum==2.1.2 - env_vars: - counter_name: test_counter - """ - status, error, submission = remote_jobs.submit_job( - name="job-test", - namespace="default", - request={}, - runtime_env=runtime_env, - executor="/home/ray/samples/sample_code.py", - ) - print(f"submit job - status: {status}, error: {error}, submission id {submission}") - assert status == 200 - assert error is None - # print execution log - remote_jobs.follow_execution(name="job-test", namespace="default", submission_id=submission, print_timeout=20) - # cleanup - status, error = remote_jobs.delete_ray_cluster(name="job-test", namespace="default") - print(f"Deleted cluster - status: {status}, error: {error}") - assert status == 200 - assert error is None diff --git a/kfp/kfp_support_lib/python_apiserver_client/.gitignore b/kfp/kfp_support_lib/python_apiserver_client/.gitignore deleted file mode 100644 index 3ff12a7a8..000000000 --- a/kfp/kfp_support_lib/python_apiserver_client/.gitignore +++ /dev/null @@ -1,32 +0,0 @@ -# Byte-compiled / optimized / DLL files -__pycache__/ -*.py[cod] -*$py.class - - -# Distribution / packaging -bin/ -build/ -develop-eggs/ -dist/ -eggs/ -lib/ -lib64/ -parts/ -sdist/ -var/ -*.egg-info/ -.installed.cfg -*.egg - -# Installer logs -pip-log.txt -pip-delete-this-directory.txt - -# Unit test / coverage reports -.tox/ -htmlcov -.coverage -.cache -nosetests.xml -coverage.xml \ No newline at end of file diff --git a/kfp/kfp_support_lib/python_apiserver_client/pyproject.toml b/kfp/kfp_support_lib/python_apiserver_client/pyproject.toml deleted file mode 100644 index a933f1bbc..000000000 --- a/kfp/kfp_support_lib/python_apiserver_client/pyproject.toml +++ /dev/null @@ -1,28 +0,0 @@ -[build-system] -requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] -build-backend = "setuptools.build_meta" -[options] -package_dir = ["src"] -[project] -name = "python_apiserver_client" -version = "0.1.0" -dependencies = [ - "requests", - "kubernetes", - "data-prep-toolkit==0.2.0.dev6", -] -authors = [ - { name="KubeRay project"}, -] -description = "A Kuberay python client library to manage clusters based on the KubeRay API server" -readme = {file = "README.md", content-type = "text/markdown"} -license = {text = "Apache-2.0"} -requires-python = ">=3.10" -classifiers = [ - "Programming Language :: Python :: 3", - "License :: Apache License 2.0", - "Operating System :: OS Independent", -] - -[project.urls] -"Homepage" = "https://github.com/ray-project/kuberay" \ No newline at end of file diff --git a/kfp/kfp_support_lib/python_apiserver_client/test/configmaps.py b/kfp/kfp_support_lib/python_apiserver_client/test/configmaps.py deleted file mode 100644 index 65e53e828..000000000 --- a/kfp/kfp_support_lib/python_apiserver_client/test/configmaps.py +++ /dev/null @@ -1,72 +0,0 @@ -# (C) Copyright IBM Corp. 2024. -# Licensed under the Apache License, Version 2.0 (the “License”); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# http://www.apache.org/licenses/LICENSE-2.0 -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an “AS IS” BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ - -from kubernetes import client, config - - -CMAP_VALUE = """ -import ray -import os -import requests - -ray.init() - -@ray.remote -class Counter: - def __init__(self): - # Used to verify runtimeEnv - self.name = os.getenv("counter_name") - assert self.name == "test_counter" - self.counter = 0 - - def inc(self): - self.counter += 1 - - def get_counter(self): - return "{} got {}".format(self.name, self.counter) - -counter = Counter.remote() - -for _ in range(5): - ray.get(counter.inc.remote()) - print(ray.get(counter.get_counter.remote())) - -# Verify that the correct runtime env was used for the job. -assert requests.__version__ == "2.26.0" -""" -CMAP_NAME = "ray-job-code-sample" - - -class ConfigmapsManager: - """ - Simple support class to manage config maps. Assumes local access to Kubectl - """ - - def __init__(self): - config.load_kube_config() - self.api_instance = client.CoreV1Api() - - def list_configmaps(self) -> list[str]: - cm_list = self.api_instance.list_namespaced_config_map(namespace="default").items - return [cm.metadata.name for cm in cm_list] - - def create_code_map(self) -> None: - cmap = client.V1ConfigMap() - cmap.metadata = client.V1ObjectMeta(name=CMAP_NAME) - cmap.data = {"sample_code.py": CMAP_VALUE} - self.api_instance.create_namespaced_config_map(namespace="default", body=cmap) - - def delete_code_map(self) -> None: - try: - self.api_instance.delete_namespaced_config_map(name="ray-job-code-sample", namespace="default") - except Exception as e: - print("config map ray-job-code-sample does not exist") diff --git a/kfp/kfp_support_lib/python_apiserver_client/Makefile b/kfp/kfp_support_lib/shared_workflow_support/Makefile similarity index 80% rename from kfp/kfp_support_lib/python_apiserver_client/Makefile rename to kfp/kfp_support_lib/shared_workflow_support/Makefile index b62645ade..451430243 100644 --- a/kfp/kfp_support_lib/python_apiserver_client/Makefile +++ b/kfp/kfp_support_lib/shared_workflow_support/Makefile @@ -6,7 +6,7 @@ include ${REPOROOT}/kfp/requirements.env # Include the common rules. # Use "make help" to see them. -include ../../../.make.defaults +include ${REPOROOT}/.make.defaults # Command to run pytest PYTHON_VERSION=$(shell $(PYTHON) --version) @@ -26,7 +26,9 @@ clean:: set-versions: .check-env @# Help: Copy the Makefile distribution version into the pyproject.toml - sed -i.back 's/data-prep-toolkit==[0-9].*/data-prep-toolkit==${DPK_LIB_VERSION}",/' pyproject.toml + sed -i.back 's/^version[ ]*=.*/version = "'${DPK_LIB_KFP_SHARED}'"/' pyproject.toml + sed -i.back 's/data_prep_toolkit_ray==[0-9].*/data_prep_toolkit_ray==${DPK_LIB_VERSION}",/' pyproject.toml + sed -i.back 's/ray=[0-9].*/ray==${RAY}",/' pyproject.toml build:: set-versions venv @# Help: Build the distribution for publishing to a pypi @@ -42,7 +44,7 @@ publish:: .check-env ${PYTHON} -m twine check dist/* ${PYTHON} -m twine upload --verbose --non-interactive dist/* -venv::pyproject.toml .check-env +venv:: pyproject.toml .check-env @# Help: Create the virtual environment using pyproject.toml rm -rf venv $(PYTHON) -m venv venv @@ -53,10 +55,10 @@ venv::pyproject.toml .check-env pip install -e .; \ pip install pytest pytest-cov -test:: venv +test:: venv @# Help: Use the already-built virtual environment to run pytest on the test directory. . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) api_params_test.py; ifeq ($(DEPLOY_KUBEFLOW),1) . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) kuberay_api_test.py; -endif - + . ${VENV_ACTIVATE}; export PYTHONPATH=../src; cd test; $(PYTEST) ray_remote_jobs_test.py; +endif \ No newline at end of file diff --git a/kfp/kfp_support_lib/shared_workflow_support/README.md b/kfp/kfp_support_lib/shared_workflow_support/README.md new file mode 100644 index 000000000..9dec904d8 --- /dev/null +++ b/kfp/kfp_support_lib/shared_workflow_support/README.md @@ -0,0 +1,68 @@ +# Shared Workflow Support + +This provides support for implementing KFP pipelines automating transform's execution. +It comprises 2 main modules + +* [python apiserver client](src/python_apiserver_client/README.md) +* [workflow support](src/runtime_utils/README.md) + +## Development + +### Requirements +1. python 3.10 or later +2. git command line tools +3. [pre-commit](https://pre-commit.com/) +4. twine (pip install twine) + * but on Mac you may have to include a dir in your PATH, such as `export PATH=$PATH:/Library/Frameworks/Python.framework/Versions/3.10/bin` + +### Git +Simple clone the repo and set up the pre-commit hooks. +```shell +git clone git@github.com:IBM/data-prep-kit.git +cd kfp/kfp_support_lib/shared_workflow_support +pre-commit install +``` +If you don't have pre-commit, you can install from [here](https://pre-commit.com/) + +## Library Artifact Build and Publish + +The process of creating a release for `fm_data_processing_kfp` package involves the following steps: + +cd to the package directory. + +update the version in [requirements.env](../../requirements.env) file. + +run `make build` and `make publish`. + +## Testing + +To run the package tests perform the following: + +To begin with, establish a Kind cluster and deploy all required components by executing the makfefile command in the main directory of this repository. As an alternative, you can manually execute the instructions provided in the [README.md](../../kind/README.md) file. + +```bash +make setup +``` + +The next step is to deploy the `data-prep-kit-kfp` package locally within a Python virtual environment. + +```bash +make build +``` + +lastly, execute the tests: + +```bash +make test +``` + +### Cleanup + +It is advisable to execute the following command prior to running `make test` once more. This will ensure that any +previous test runs resources are removed before starting new tests. + +```bash +kubectl delete workflows -n kubeflow --all + +This is a copy of [Kuberay API server-client python APIs](https://github.com/ray-project/kuberay/tree/master/clients/python-apiserver-client) +Because these APIs are not exposed by any PyPi, we added them to the project \ No newline at end of file diff --git a/kfp/kfp_support_lib/shared_workflow_support/pyproject.toml b/kfp/kfp_support_lib/shared_workflow_support/pyproject.toml new file mode 100644 index 000000000..2da001c72 --- /dev/null +++ b/kfp/kfp_support_lib/shared_workflow_support/pyproject.toml @@ -0,0 +1,48 @@ +[project] +name = "data_prep_toolkit_kfp_shared" +version = "0.2.0.dev6" +requires-python = ">=3.10,<3.12" +description = "Data Preparation Kit Library. KFP support" +license = {text = "Apache-2.0"} +readme = {file = "README.md", content-type = "text/markdown"} +authors = [ + { name = "Boris Lublinsky", email = "blublinsky@ibm.com" }, + { name = "Alexey Roytman", email = "roytman@il.ibm.com" }, + { name = "Mohammad Nassar", email = "Mohammad.Nassar@ibm.com" }, + { name = "Revital Eres", email = "eres@il.ibm.com" }, +] +dependencies = [ + "ray==2.9.3", + "requests", + "kubernetes", + "data_prep_toolkit==0.2.0.dev6", +] + +[build-system] +requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] +build-backend = "setuptools.build_meta" + +[project.optional-dependencies] +dev = [ + "twine", + "pytest>=7.3.2", + "pytest-dotenv>=0.5.2", + "pytest-env>=1.0.0", + "pre-commit>=3.3.2", + "pytest-cov>=4.1.0", + "pytest-mock>=3.10.0", +] + +[options] +package_dir = ["src"] + +[options.packages.find] +where = ["src/runtime_utils", "src/python_apiserver_client"] + + +[tool.pytest.ini_options] +addopts = "--cov --cov-report term-missing --cov-fail-under 10" +markers = ["unit: unit tests", "integration: integration tests"] + +[tool.coverage.run] +include = ["src/*"] diff --git a/kfp/kfp_support_lib/python_apiserver_client/README.md b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/README.md similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/README.md rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/README.md diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/__init__.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/__init__.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/__init__.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/__init__.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/kuberay_apis.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/kuberay_apis.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/kuberay_apis.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/kuberay_apis.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/__init__.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/__init__.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/__init__.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/__init__.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/cluster.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/cluster.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/cluster.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/cluster.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/environmentvariables.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/environmentvariables.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/environmentvariables.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/environmentvariables.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/headnode.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/headnode.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/headnode.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/headnode.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/jobsubmission.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/jobsubmission.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/jobsubmission.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/jobsubmission.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/templates.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/templates.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/templates.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/templates.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/volumes.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/volumes.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/volumes.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/volumes.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/workernode.py b/kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/workernode.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/src/python_apiserver_client/params/workernode.py rename to kfp/kfp_support_lib/shared_workflow_support/src/python_apiserver_client/params/workernode.py diff --git a/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/__init__.py b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/__init__.py new file mode 100644 index 000000000..df34d7ac0 --- /dev/null +++ b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/__init__.py @@ -0,0 +1,2 @@ +from runtime_utils.kfp_utils import KFPUtils +from runtime_utils.remote_jobs_utils import RayRemoteJobs, execute_ray_jobs diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py similarity index 99% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py rename to kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py index 0e9951282..2b5e6509b 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/kfp_utils.py +++ b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/kfp_utils.py @@ -126,7 +126,7 @@ def default_compute_execution_params( import sys from data_processing.utils import GB, get_logger - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils logger = get_logger(__name__) diff --git a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/remote_jobs_utils.py similarity index 99% rename from kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py rename to kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/remote_jobs_utils.py index 0b20b28c4..4ae7b05b5 100644 --- a/kfp/kfp_support_lib/kfp_v2_workflow_support/src/workflow_support/runtime_utils/remote_jobs_utils.py +++ b/kfp/kfp_support_lib/shared_workflow_support/src/runtime_utils/remote_jobs_utils.py @@ -30,7 +30,7 @@ environment_variables_decoder, volume_decoder, ) -from workflow_support.runtime_utils import KFPUtils +from runtime_utils import KFPUtils from ray.job_submission import JobStatus diff --git a/kfp/kfp_support_lib/python_apiserver_client/test/api_params_test.py b/kfp/kfp_support_lib/shared_workflow_support/test/api_params_test.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/test/api_params_test.py rename to kfp/kfp_support_lib/shared_workflow_support/test/api_params_test.py diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/test/configmaps.py b/kfp/kfp_support_lib/shared_workflow_support/test/configmaps.py similarity index 100% rename from kfp/kfp_support_lib/kfp_v1_workflow_support/test/configmaps.py rename to kfp/kfp_support_lib/shared_workflow_support/test/configmaps.py diff --git a/kfp/kfp_support_lib/python_apiserver_client/test/kuberay_api_test.py b/kfp/kfp_support_lib/shared_workflow_support/test/kuberay_api_test.py similarity index 100% rename from kfp/kfp_support_lib/python_apiserver_client/test/kuberay_api_test.py rename to kfp/kfp_support_lib/shared_workflow_support/test/kuberay_api_test.py diff --git a/kfp/kfp_support_lib/kfp_v1_workflow_support/test/ray_remote_jobs_test.py b/kfp/kfp_support_lib/shared_workflow_support/test/ray_remote_jobs_test.py similarity index 98% rename from kfp/kfp_support_lib/kfp_v1_workflow_support/test/ray_remote_jobs_test.py rename to kfp/kfp_support_lib/shared_workflow_support/test/ray_remote_jobs_test.py index ab25573b0..69e1886fb 100644 --- a/kfp/kfp_support_lib/kfp_v1_workflow_support/test/ray_remote_jobs_test.py +++ b/kfp/kfp_support_lib/shared_workflow_support/test/ray_remote_jobs_test.py @@ -12,7 +12,7 @@ from configmaps import ConfigmapsManager from python_apiserver_client.params import ConfigMapVolume -from workflow_support.runtime_utils import RayRemoteJobs +from runtime_utils import RayRemoteJobs server_url = "http://localhost:8080/ray/" diff --git a/transforms/.make.workflows b/transforms/.make.workflows index 4a9d0d0a8..d1a5fe82f 100644 --- a/transforms/.make.workflows +++ b/transforms/.make.workflows @@ -43,7 +43,7 @@ ${WORKFLOW_VENV_ACTIVATE}: ${REPOROOT}/.make.versions ${REPOROOT}/kfp/requiremen rm -rf ${REPOROOT}/transforms/venv $(MAKE) -C ${REPOROOT}/transforms .defaults.python-lib-src-venv . ${WORKFLOW_VENV_ACTIVATE}; \ - pip install -e $(REPOROOT)/kfp/kfp_support_lib/python_apiserver_client; \ + pip install -e $(REPOROOT)/kfp/kfp_support_lib/shared_workflow_support; \ pip install -e $(REPOROOT)/kfp/kfp_support_lib/$(WORKFLOW_SUPPORT_LIB); @# Help: Create the virtual environment common to all workflows diff --git a/transforms/code/code_quality/kfp_ray/code_quality_wf.py b/transforms/code/code_quality/kfp_ray/code_quality_wf.py index b89f74083..1428ce4c0 100644 --- a/transforms/code/code_quality/kfp_ray/code_quality_wf.py +++ b/transforms/code/code_quality/kfp_ray/code_quality_wf.py @@ -45,7 +45,7 @@ def compute_exec_params_func( cq_tokenizer: str, cq_hf_token: str, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, @@ -192,7 +192,7 @@ def code_quality( # start Ray cluster ray_cluster = create_ray_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, ray_head_options=ray_head_options, ray_worker_options=ray_worker_options, server_url=server_url, @@ -204,7 +204,7 @@ def code_quality( # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, additional_params=additional_params, # note that the parameters below are specific for NOOP transform exec_params=compute_exec_params.output, diff --git a/transforms/code/malware/kfp_ray/malware_wf.py b/transforms/code/malware/kfp_ray/malware_wf.py index d0e22643b..507917219 100644 --- a/transforms/code/malware/kfp_ray/malware_wf.py +++ b/transforms/code/malware/kfp_ray/malware_wf.py @@ -44,7 +44,7 @@ def compute_exec_params_func( malware_input_column: str, malware_output_column: str, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, @@ -174,7 +174,7 @@ def malware( # start Ray cluster ray_cluster = create_ray_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, ray_head_options=ray_head_options, ray_worker_options=ray_worker_options, server_url=server_url, @@ -185,7 +185,7 @@ def malware( # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, additional_params=additional_params, # note that the parameters below are specific for malware transform exec_params=compute_exec_params.output, diff --git a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py index ad256903f..bc9eeff84 100644 --- a/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py +++ b/transforms/code/proglang_select/kfp_ray/proglang_select_wf.py @@ -45,7 +45,7 @@ def compute_exec_params_func( proglang_select_allowed_langs_file: str, proglang_select_language_column: str, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from workflow_utils.runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, @@ -82,6 +82,7 @@ def compute_exec_params_func( else: compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image) run_id = dsl.RUN_ID_PLACEHOLDER + # create Ray cluster create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml") # execute job @@ -178,7 +179,7 @@ def lang_select( # start Ray cluster ray_cluster = create_ray_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, ray_head_options=ray_head_options, ray_worker_options=ray_worker_options, server_url=server_url, @@ -189,7 +190,7 @@ def lang_select( # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, additional_params=additional_params, # note that the parameters below are specific for this transform exec_params=compute_exec_params.output, @@ -203,7 +204,7 @@ def lang_select( execute_job.after(ray_cluster) # Configure the pipeline level to one week (in seconds) - dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC) + # dsl.get_pipeline_conf().set_timeout(ONE_WEEK_SEC) if __name__ == "__main__": diff --git a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py index 5cbb3e974..a8ddc6b6e 100644 --- a/transforms/universal/doc_id/kfp_ray/doc_id_wf.py +++ b/transforms/universal/doc_id/kfp_ray/doc_id_wf.py @@ -46,7 +46,7 @@ def compute_exec_params_func( doc_id_hash_column: str, doc_id_int_column: str, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, diff --git a/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py b/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py index 52749a3b9..f325a34a4 100644 --- a/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py +++ b/transforms/universal/ededup/kfp_ray/src/ededup_compute_execution_params.py @@ -48,7 +48,7 @@ def ededup_compute_execution_params( from data_processing.data_access import DataAccessS3 from data_processing.utils import GB, KB - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils EXECUTION_OF_KB_DOC = 0.00025 diff --git a/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py b/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py index f50bf0bdf..7e5c66b45 100644 --- a/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py +++ b/transforms/universal/fdedup/kfp_ray/src/fdedup_compute_execution_params.py @@ -74,7 +74,7 @@ def fdedup_compute_execution_params( from data_processing.data_access import DataAccessS3 from data_processing.utils import GB, KB from scipy.integrate import quad as integrate - from workflow_support.runtime_utils import KFPUtils + from workflow_support.compile_utilsruntime_utils import KFPUtils EXECUTION_OF_KB_DOC = 0.003 diff --git a/transforms/universal/filter/kfp_ray/filter_wf.py b/transforms/universal/filter/kfp_ray/filter_wf.py index 90d2b197b..b398652f6 100644 --- a/transforms/universal/filter/kfp_ray/filter_wf.py +++ b/transforms/universal/filter/kfp_ray/filter_wf.py @@ -9,13 +9,13 @@ # See the License for the specific language governing permissions and # limitations under the License. ################################################################################ - import os +from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils + import kfp.compiler as compiler import kfp.components as comp import kfp.dsl as dsl -from workflow_support.compile_utils import ONE_HOUR_SEC, ONE_WEEK_SEC, ComponentUtils # the name of the job script @@ -45,7 +45,7 @@ def compute_exec_params_func( filter_logical_operator: str, filter_columns_to_drop: str, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, @@ -181,7 +181,7 @@ def filtering( # start Ray cluster ray_cluster = create_ray_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, ray_head_options=ray_head_options, ray_worker_options=ray_worker_options, server_url=server_url, @@ -193,7 +193,7 @@ def filtering( # Execute job execute_job = execute_ray_jobs_op( ray_name=ray_name, - run_id=dsl.RUN_ID_PLACEHOLDER, + run_id=run_id, additional_params=additional_params, exec_params=compute_exec_params.output, exec_script_name=EXEC_SCRIPT_NAME, diff --git a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py index 67b4aead0..5e39c3249 100644 --- a/transforms/universal/noop/kfp_ray/noop_multiple_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_multiple_wf.py @@ -42,9 +42,7 @@ def compute_exec_params_func( runtime_code_location: str, noop_sleep_sec: int, ) -> dict: - import uuid - - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, diff --git a/transforms/universal/noop/kfp_ray/noop_wf.py b/transforms/universal/noop/kfp_ray/noop_wf.py index 8748a60ca..87a73059e 100644 --- a/transforms/universal/noop/kfp_ray/noop_wf.py +++ b/transforms/universal/noop/kfp_ray/noop_wf.py @@ -42,7 +42,7 @@ def compute_exec_params_func( runtime_code_location: str, noop_sleep_sec: int, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config, diff --git a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py index 65b32f4bf..21af91374 100644 --- a/transforms/universal/tokenization/kfp_ray/tokenization_wf.py +++ b/transforms/universal/tokenization/kfp_ray/tokenization_wf.py @@ -48,7 +48,7 @@ def compute_exec_params_func( tkn_text_lang: str, tkn_chunk_size: int, ) -> dict: - from workflow_support.runtime_utils import KFPUtils + from runtime_utils import KFPUtils return { "data_s3_config": data_s3_config,