Skip to content

Commit

Permalink
Fix parameters type for pii transform pipeline.
Browse files Browse the repository at this point in the history
Signed-off-by: Mohammad Nassar <[email protected]>
  • Loading branch information
Mohammad-nassar10 committed Aug 20, 2024
1 parent 63964d6 commit 3fc27a1
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 22 deletions.
21 changes: 10 additions & 11 deletions kfp/pipeline_generator/single-pipeline/templates/simple_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,14 @@
# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
worker_options: dict,
actor_options: dict,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
runtime_code_location: dict,
{%- for pipeline_argument in pipeline_arguments %}
{{ pipeline_argument.name }}: {{ pipeline_argument.type }},
{%- endfor %}
Expand All @@ -50,11 +50,11 @@ def compute_exec_params_func(
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)),
"runtime_worker_options": str(actor_options),
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"runtime_code_location": str(runtime_code_location),
{%- for pipeline_argument in pipeline_arguments %}
"{{ pipeline_argument.name }}": {{ pipeline_argument.name }},
{%- endfor %}
Expand Down Expand Up @@ -102,9 +102,8 @@ def compute_exec_params_func(
def {{ pipeline_name }}(
# Ray cluster
ray_name: str = "{{ pipeline_name }}-kfp-ray",
ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "", "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image_pull_secret": "{{ image_pull_secret }}", "image": "' + task_image + '"}',
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
{% if multi_s3 == False %}
Expand All @@ -116,9 +115,9 @@ def {{ pipeline_name }}(
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# {{ pipeline_name }} parameters
{%- for pipeline_argument in pipeline_arguments %}
{{ pipeline_argument.name }}: {{ pipeline_argument.type }}{% if pipeline_argument.value is defined %}{% if pipeline_argument.type == "int" %} = {{ pipeline_argument.value }}{% else %} = "{{ pipeline_argument.value }}"{% endif %}{% endif %},
Expand Down
21 changes: 10 additions & 11 deletions transforms/language/pii_redactor/kfp_ray/pii_redactor_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
# compute execution parameters. Here different transforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
worker_options: dict,
actor_options: dict,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
runtime_code_location: dict,
pii_redactor_contents: str,
) -> dict:
from runtime_utils import KFPUtils
Expand All @@ -47,11 +47,11 @@ def compute_exec_params_func(
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_num_workers": KFPUtils.default_compute_execution_params(str(worker_options), str(actor_options)),
"runtime_worker_options": str(actor_options),
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"runtime_code_location": str(runtime_code_location),
"pii_redactor_contents": pii_redactor_contents,
}

Expand Down Expand Up @@ -98,19 +98,18 @@ def pii_redactor(
# Ray cluster
ray_name: str = "pii-redactor-kfp-ray", # name of Ray cluster
# Add image_pull_secret and image_pull_policy to ray workers if needed
ray_head_options: str = '{"cpu": 1, "memory": 4, "image": "' + task_image + '" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image": "' + task_image + '"}',
ray_head_options: dict = {"cpu": 1, "memory": 4, "image": task_image},
ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image": task_image},
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "{'input_folder': 'test/pii_redactor/input/', 'output_folder': 'test/pii_redactor/output/'}",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_actor_options: dict = {'num_cpus': 0.8},
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# pii_redactor parameters
pii_redactor_contents: str = "title",
# additional parameters
Expand Down

0 comments on commit 3fc27a1

Please sign in to comment.