Skip to content

Commit

Permalink
Merge pull request #612 from revit13/timeout
Browse files Browse the repository at this point in the history
Add delay before deleting ray cluster in kfp component.
  • Loading branch information
roytman authored Sep 23, 2024
2 parents c79e88a + 214c032 commit 2b0c7a7
Show file tree
Hide file tree
Showing 36 changed files with 102 additions and 90 deletions.
6 changes: 3 additions & 3 deletions kfp/doc/simple_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ The input parameters section defines all the parameters required for the pipelin
# noop parameters
noop_sleep_sec: int = 10,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
```

The parameters used here are as follows:
Expand Down Expand Up @@ -146,8 +146,8 @@ component execution and parameters submitted to every component.

```python
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=dsl.RUN_ID_PLACEHOLDER, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
3 changes: 3 additions & 0 deletions kfp/kfp_ray_components/deleteRayClusterComponent.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ inputs:
- { name: ray_name, type: String, description: "Ray cluster user name" }
- { name: run_id, type: String, description: "The KFP Run ID" }
- { name: server_url, type: String, default: "", description: "url of api server" }
- { name: additional_params, type: String, default: "{}", description: "additional parameters" }

implementation:
container:
Expand All @@ -24,4 +25,6 @@ implementation:
{ inputValue: run_id },
--server_url,
{ inputValue: server_url },
--additional_params,
{ inputValue: additional_params },
]
7 changes: 7 additions & 0 deletions kfp/kfp_ray_components/src/delete_ray_cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
# limitations under the License.
################################################################################
import sys
import time

from runtime_utils import KFPUtils, RayRemoteJobs

Expand All @@ -18,13 +19,17 @@
def cleanup_ray_cluster(
name: str, # name of Ray cluster
server_url: str, # url of api server
additional_params: str, # additional parameters for
):
# get current namespace
ns = KFPUtils.get_namespace()
if ns == "":
print(f"Failed to get namespace")
sys.exit(1)

dict_params = KFPUtils.load_from_json(additional_params.replace("'", '"'))
delete_cluster_delay_minutes = dict_params.get("delete_cluster_delay_minutes", 0)
time.sleep(delete_cluster_delay_minutes * 60)
# cleanup
remote_jobs = RayRemoteJobs(server_url=server_url)
status, error = remote_jobs.delete_ray_cluster(name=name, namespace=ns)
Expand All @@ -38,6 +43,7 @@ def cleanup_ray_cluster(
parser.add_argument("-rn", "--ray_name", type=str, default="")
parser.add_argument("-id", "--run_id", type=str, default="")
parser.add_argument("-su", "--server_url", default="", type=str)
parser.add_argument("-ap", "--additional_params", default="{}", type=str)
args = parser.parse_args()

cluster_name = KFPUtils.runtime_name(
Expand All @@ -48,4 +54,5 @@ def cleanup_ray_cluster(
cleanup_ray_cluster(
name=cluster_name,
server_url=args.server_url,
additional_params=args.additional_params,
)
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def {{ pipeline_name }}(
{{ pipeline_argument.name }}: {{ pipeline_argument.type }}{% if pipeline_argument.value is defined %}{% if pipeline_argument.type == "int" %} = {{ pipeline_argument.value }}{% else %} = "{{ pipeline_argument.value }}"{% endif %}{% endif %},
{%- endfor %}
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute {{ pipeline_name }} transform
Expand Down Expand Up @@ -164,8 +164,8 @@ def {{ pipeline_name }}(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def super_pipeline(
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ def super_pipeline(
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
__add_p2_parameters__

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def super_pipeline(
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
{%- for p2_parameter in add_p2_parameters %}
p2_pipeline_{{ p2_parameter.name }}: {{ p2_parameter.type }}{% if p2_parameter.value is defined %}{% if p2_parameter.type == "str" %} = "{{ p2_parameter.value }}"{% else %} = {{ p2_parameter.value }}{% endif %}{% endif %},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def sample_code_ray_orchestrator(
p2_pipeline_input_parent_path: str = "test/code2parquet/output/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ def sample_ray_orchestrator(
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ def super_pipeline(
p1_pipeline_input_path: str = "test/doc_id/input/",
p1_pipeline_output_path: str = "test/super/output/",
p1_pipeline_intermediate_path: str = "test/super/output/tmp",
p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
p1_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p1_pipeline_data_s3_access_secret: str = "s3-secret",
p1_pipeline_runtime_code_location: dict = {"github": "github", "commit_hash": "12345", "path": "path"},
p1_pipeline_runtime_actor_options: dict = {'num_cpus': 0.8},
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/code2parquet/kfp_ray/code2parquet_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def code2parquet(
code2parquet_snapshot: str = "github",
code2parquet_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
) -> None:
"""
Pipeline to execute NOOP transform
Expand Down Expand Up @@ -171,8 +171,8 @@ def code2parquet(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/code_quality/kfp_ray/code_quality_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def code_quality(
cq_tokenizer: str = "codeparrot/codeparrot",
cq_hf_token: str = "None",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute Code Quality transform
Expand Down Expand Up @@ -164,8 +164,8 @@ def code_quality(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/header_cleanser/kfp_ray/header_cleanser_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def header_cleanser(
header_cleanser_license: bool = True,
header_cleanser_copyright: bool = True,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 800, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute Header Cleanser transform
Expand Down Expand Up @@ -160,8 +160,8 @@ def header_cleanser(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/malware/kfp_ray/malware_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ def malware(
malware_input_column: str = "contents",
malware_output_column: str = "virus_detection",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute malware transform
Expand Down Expand Up @@ -153,8 +153,8 @@ def malware(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/code/proglang_select/kfp_ray/proglang_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ def lang_select(
proglang_select_language_column: str = "language",
proglang_select_s3_access_secret: str = "s3-secret",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
) -> None:
"""
Pipeline to execute NOOP transform
Expand Down Expand Up @@ -158,8 +158,8 @@ def lang_select(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def repo_level_order(
repo_lvl_output_by_langs: bool = False,
repo_lvl_combine_rows: bool = False,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute repo_level_order transform
Expand Down Expand Up @@ -191,8 +191,8 @@ def repo_level_order(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def doc_chunk(
doc_chunk_output_chunk_column_name: str = "contents",
doc_chunk_dl_min_chunk_len: int = 64,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute chunk documents transform
Expand Down Expand Up @@ -161,8 +161,8 @@ def doc_chunk(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
6 changes: 3 additions & 3 deletions transforms/language/doc_chunk/kfp_ray/doc_chunk_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def doc_chunk(
doc_chunk_output_chunk_column_name: str = "contents",
doc_chunk_dl_min_chunk_len: int = 64,
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
):
"""
Pipeline to execute chunk documents transform
Expand Down Expand Up @@ -162,8 +162,8 @@ def doc_chunk(
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url, additional_params=additional_params)
ComponentUtils.add_settings_to_component(clean_up_task, ONE_HOUR_SEC * 2)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
Expand Down
Loading

0 comments on commit 2b0c7a7

Please sign in to comment.