Skip to content

Commit

Permalink
Merge branch 'dev' into crawler-transform
Browse files Browse the repository at this point in the history
  • Loading branch information
touma-I committed Nov 13, 2024
2 parents 80e4ebe + f982ebe commit cf20268
Show file tree
Hide file tree
Showing 39 changed files with 111 additions and 115 deletions.
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REPOROOT=../../
REPOROOT=../../../
# Use make help, to see the available rules
include ${REPOROOT}/.make.defaults

Expand All @@ -16,11 +16,11 @@ else
endif

.PHONY: workflow-build
setup:
workflow-build:
ifeq ($(KFPv2), 1)
echo "Skipping build as KFPv2 is defined"
else
$(MAKE) -C ray/kfp_v1 setup
$(MAKE) -C ray/kfp_v1 workflow-build
endif

.PHONY: workflow-test
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
REPOROOT=${CURDIR}/../../../..
REPOROOT=${CURDIR}/../../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@
# empty comment to triigger pre-commit
# Components
# For every sub workflow we need a separate components, that knows about this subworkflow.
component_spec_path = "../../../kfp_ray_components/"
component_spec_path = "../../../../../kfp/kfp_ray_components/"
run_code_to_parquet_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_code_quality_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_malware_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_license_check_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_license_select_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_header_cleanser_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_proglang_select_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand All @@ -35,7 +35,7 @@
proglang_select_image = "quay.io/dataprep1/data-prep-kit/proglang_select-ray:latest"
code_quality_image = "quay.io/dataprep1/data-prep-kit/code_quality-ray:latest"
malware_image = "quay.io/dataprep1/data-prep-kit/malware-ray:latest"
license_check_image = "quay.io/dataprep1/data-prep-kit/license_check-ray:latest"
license_select_image = "quay.io/dataprep1/data-prep-kit/license_select-ray:latest"
header_cleanser_image = "quay.io/dataprep1/data-prep-kit/header-cleanser-ray:latest"
doc_id_image = "quay.io/dataprep1/data-prep-kit/doc_id-ray:latest"
ededup_image = "quay.io/dataprep1/data-prep-kit/ededup-ray:latest"
Expand All @@ -50,27 +50,27 @@
)
def sample_code_ray_orchestrator(
# the super pipeline parameters
p1_orch_code_to_parquet_name: str = "code_2_parquet_wf",
p1_orch_code_to_parquet_name: str = "code2parquet_wf",
p1_orch_code_quality_name: str = "code_quality_wf",
p1_orch_malware_name: str = "malware_wf",
p1_orch_license_check_name: str = "license_check_wf",
p1_orch_license_select_name: str = "license_select_wf",
p1_orch_header_cleanser_name: str = "header_cleanser_wf",
p1_orch_proglang_select_name: str = "proglang_select_wf",
p1_orch_doc_id_name: str = "doc_id_wf",
p1_orch_exact_dedup_name: str = "ededup_wf",
p1_orch_fuzzy_dedup_name: str = "fdedup_wf",
p1_orch_tokenization_wf_name: str = "tokenization_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": ""},
p2_pipeline_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""},
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_input_parent_path: str = "test/code2parquet/output/",
p2_pipeline_input_parent_path: str = "test/code2parquet/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
p2_pipeline_runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
p2_pipeline_runtime_actor_options: dict = {'num_cpus': 0.7},
p2_pipeline_data_max_files: int = -1,
p2_pipeline_data_num_samples: int = -1,
# code to parquet step parameters
Expand Down Expand Up @@ -186,16 +186,16 @@ def sample_code_ray_orchestrator(
+ malware_image
+ '"}}',
# license check step parameters
p10_name: str = "license_check",
p10_name: str = "license_select",
p10_skip: bool = False,
p10_lc_license_column_name: str = "license",
p10_lc_licenses_file: str = "test/license_check/sample_approved_licenses.json",
p10_lc_licenses_file: str = "test/license_select/sample_approved_licenses.json",
# orchestrator
# overriding parameters
p10_overriding_params: str = '{"ray_worker_options": {"image": "'
+ license_check_image
+ license_select_image
+ '"}, "ray_head_options": {"image": "'
+ license_check_image
+ license_select_image
+ '"}}',
# header cleanser step parameters
p11_name: str = "header_cleanser",
Expand Down Expand Up @@ -246,7 +246,7 @@ def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = No
op.after(prev_op)

# code to parquet deduplication
code_to_parquet = run_exact_dedup_op(
code_to_parquet = run_code_to_parquet_op(
name=p1_orch_code_to_parquet_name,
prefix="p3_",
params=args,
Expand Down Expand Up @@ -298,25 +298,25 @@ def _set_component(op: dsl.BaseOp, displaied_name: str, prev_op: dsl.BaseOp = No
_set_component(malware, "malware", code_quality)

# license check
license_check = run_license_check_op(
name=p1_orch_license_check_name, prefix="p10_", params=args, host=orch_host, input_folder=malware.output
license_select = run_license_select_op(
name=p1_orch_license_select_name, prefix="p10_", params=args, host=orch_host, input_folder=malware.output
)
_set_component(license_check, "license_check", malware)
_set_component(license_select, "license_select", malware)

# header cleanser
header_cleanser = run_header_cleanser_op(
name=p1_orch_header_cleanser_name,
prefix="p11_",
params=args,
host=orch_host,
input_folder=license_check.output,
input_folder=license_select.output,
)
_set_component(header_cleanser, "header_cleanser", license_check)
_set_component(header_cleanser, "header_cleanser", license_select)

# tokenization
tokenization = run_tokenization_op(
name=p1_orch_tokenization_wf_name,
prefix="p10_",
prefix="p12_",
params=args,
host=orch_host,
input_folder=header_cleanser.output,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,11 @@
import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl
from kfp_support.workflow_support.runtime_utils import ONE_WEEK_SEC

from workflow_support.compile_utils import ONE_WEEK_SEC

# Components
# path to kfp component specifications files
component_spec_path = "../../../kfp_ray_components/"
component_spec_path = "../../../../../kfp/kfp_ray_components/"
# For every sub workflow we need a separate components, that knows about this subworkflow.
run_doc_id_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
run_exact_dedup_op = comp.load_component_from_file(component_spec_path + "executeSubWorkflowComponent.yaml")
Expand All @@ -39,16 +38,16 @@ def sample_ray_orchestrator(
p1_orch_exact_dedup_name: str = "ededup_wf",
p1_orch_fuzzy_dedup_name: str = "fdedup_wf",
p2_pipeline_runtime_pipeline_id: str = "pipeline_id",
p2_pipeline_ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""}',
p2_pipeline_ray_head_options: dict = {"cpu": 1, "memory": 4, "image_pull_secret": ""},
p2_pipeline_ray_worker_options: dict = {"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, "image_pull_secret": ""},
p2_pipeline_server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
p2_pipeline_input_parent_path: str = "test/doc_id/input/",
p2_pipeline_output_parent_path: str = "test/super/output/",
p2_pipeline_parent_path_suffix: str = "",
p2_pipeline_additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5, "delete_cluster_delay_minutes": 0}',
p2_pipeline_data_s3_access_secret: str = "s3-secret",
p2_pipeline_runtime_code_location: str = '{"github": "github", "commit_hash": "12345", "path": "path"}',
p2_pipeline_runtime_actor_options: str = '{"num_cpus": 0.8}',
p2_pipeline_runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
p2_pipeline_runtime_actor_options: dict = {'num_cpus': 0.7},
# data access.
p2_pipeline_data_max_files: int = -1,
p2_pipeline_data_num_samples: int = -1,
Expand Down
File renamed without changes.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ Another useful feature of the KFP v2 is the `Json` editor for the `dict` type in

### How to compile the superpipeline
```
cd kfp/superworkflows/ray/kfp_v2/
cd examples/kfp/superworkflows/ray/kfp_v2/
make clean
export KFPv2=1
export PYTHONPATH=../../../../transforms
Expand Down
4 changes: 2 additions & 2 deletions kfp/doc/multi_transform_pipeline.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ The sections that follow display two super pipelines as examples:

### Dedups super pipeline <a name = "dedups"></a>

This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py).
This pipeline combines several transforms, `doc_id`, `ededup`, and `fdedup`, can be found in [superworkflow_dedups_sample_wf.py](../../examples/kfp/superworkflows/ray/kfp_v1/superworkflow_dedups_sample_wf.py).

![super pipeline](super_pipeline.png)

The input parameters of the super pipelines are described in this [section](#super-pipeline-Input-Parameters).

### Programming languages super pipeline <a name = "code"></a>

This pipeline combines transforms for programming languages data preprocessing: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py).
This pipeline combines transforms for programming languages data preprocessing: `ededup`, `doc_id`, `fdedup`, `proglang_select`, `code_quality`, `malware` and `tokenization`. It can be found in [superworkflow_code_wf.py](../../examples/kfp/superworkflows/ray/kfp_v1/superworkflow_code_sample_wf.py).

![super pipeline](super-code-pipeline.png)

Expand Down
6 changes: 1 addition & 5 deletions kfp/kfp_ray_components/src/subworkflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ def _skip_task(prms) -> bool:
input_folder = input_folder.replace('"', "'")
output_folder = output_folder.replace('"', "'")
data_s3_config = {"input_folder": input_folder, "output_folder": output_folder}
prm["data_s3_config"] = data_s3_config
prm["data_s3_config"] = ParamsUtils.convert_to_ast(data_s3_config)
# Check if to skip preprocessing
if _skip_task(prm):
print("skipped preprocess step")
Expand All @@ -150,10 +150,6 @@ def _skip_task(prms) -> bool:

_remove_unused_params(prm)

for key, value in prm.items():
if isinstance(value, dict):
prm[key] = ParamsUtils.convert_to_ast(value)

print(f"start pipeline {name} with parameters {prm}")

utils = PipelinesUtils(host="http://ml-pipeline:8888")
Expand Down
1 change: 1 addition & 0 deletions resources.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
1. **"Building Successful LLM Apps: The Power of high quality data"** - [Video](https://www.youtube.com/watch?v=u_2uiZBBVIE) | [Slides](https://www.slideshare.net/slideshow/data_prep_techniques_challenges_methods-pdf-a190/271527890)
2. **"Hands on session for fine tuning LLMs"** - [Video](https://www.youtube.com/watch?v=VEHIA3E64DM)
3. **"Build your own data preparation module using data-prep-kit"** - [Video](https://www.youtube.com/watch?v=0WUMG6HIgMg)
4. **"Data Prep Kit: A Comprehensive Cloud-Native Toolkit for Scalable Data Preparation in GenAI App"** - [Video](https://www.youtube.com/watch?v=WJ147TGULwo) | [Slides](https://ossaidevjapan24.sched.com/event/1jKBm)

## Example Code

Expand Down
2 changes: 1 addition & 1 deletion scripts/check-workflows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if [ ! -d transforms ]; then
echo Please run this script from the top of the repository
exit 1
fi
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering"
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering header_cleanser"
while [ $# -ne 0 ]; do
case $1 in
-show-kfp-black-list) echo $KFP_BLACK_LIST; exit 0;
Expand Down
2 changes: 2 additions & 0 deletions scripts/k8s-setup/populate_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ mc cp --recursive ${REPOROOT}/transforms/code/proglang_select/ray/test-data/lang
mc cp --recursive ${REPOROOT}/transforms/code/malware/ray/test-data/input/ kfp/test/malware/input
mc cp --recursive ${REPOROOT}/transforms/code/header_cleanser/ray/test-data/input/ kfp/test/header_cleanser/input
mc cp --recursive ${REPOROOT}/transforms/code/repo_level_ordering/ray/test-data/input/ kfp/test/repo_level_ordering/input
mc cp --recursive ${REPOROOT}/transforms/code/license_select/ray/test-data/input/ kfp/test/license_select/input
mc cp --recursive ${REPOROOT}/transforms/code/license_select/ray/test-data/sample_approved_licenses.json kfp/test/license_select/
# language
mc cp --recursive ${REPOROOT}/transforms/language/lang_id/ray/test-data/input/ kfp/test/lang_id/input
mc cp --recursive ${REPOROOT}/transforms/language/doc_quality/ray/test-data/input/ kfp/test/doc_quality/input
Expand Down
2 changes: 1 addition & 1 deletion scripts/k8s-setup/ray_api_server_values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ replicaCount: 1
name: "kuberay-apiserver"
image:
repository: quay.io/kuberay/apiserver
tag: nightly
tag: v1.2.2
pullPolicy: IfNotPresent

## Install Default RBAC roles and bindings
Expand Down
4 changes: 2 additions & 2 deletions transforms/code/code_profiler/python/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ asttokens==2.4.1
attrs==23.2.0
blinker==1.8.2
cachetools==5.3.3
certifi==2024.6.2
certifi==2024.7.4
charset-normalizer==3.3.2
click==8.1.7
comm==0.2.2
Expand Down Expand Up @@ -83,7 +83,7 @@ streamlit==1.37.0
tenacity==8.4.2
toml==0.10.2
toolz==0.12.1
tornado==6.4
tornado==6.4.1
traitlets==5.14.3
tree-sitter==0.21.3
tree-sitter-cpp==0.22.1
Expand Down
10 changes: 8 additions & 2 deletions transforms/code/code_profiler/python/src/UAST_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,9 @@
import json
from tree_sitter import Tree
import os
import sys
sys.setrecursionlimit(10000)

"""
Initialize the parser with a path for rules and grammar.
"""
Expand Down Expand Up @@ -251,7 +254,10 @@ def _dfs(self, AST_node, parent) :
parent = node

for child in AST_node.children:
self._dfs(AST_node= child, parent = parent)
try:
self._dfs(AST_node= child, parent = parent)
except RecursionError as e:
print(f"RecursionError caught: {str(e)}")

def _extract(self, ast_snippet, node_type, exec_string):
code_snippet = ast_snippet
Expand All @@ -262,4 +268,4 @@ def _extract(self, ast_snippet, node_type, exec_string):
try:
return self.grammar[node_type]["keyword"] + " " + self.extracted
except Exception as e:
print(e)
print(e)
6 changes: 3 additions & 3 deletions transforms/code/license_select/kfp_ray/license_select_wf.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def compute_exec_params_func(
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
runtime_code_location: dict,
lc_license_column_name: str,
lc_licenses_file: str,
) -> dict:
Expand Down Expand Up @@ -117,9 +117,9 @@ def license_select(
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: dict = {"num_cpus": 0.8},
runtime_actor_options: dict = {"num_cpus": 0.7},
runtime_pipeline_id: str = "runtime_pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
runtime_code_location: dict = {'github': 'github', 'commit_hash': '12345', 'path': 'path'},
# license select parameters
lc_license_column_name: str = "license",
lc_licenses_file: str = "test/license_select/sample_approved_licenses.json",
Expand Down
2 changes: 0 additions & 2 deletions transforms/code/license_select/python/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@ include $(REPOROOT)/transforms/.make.transforms
# Include the common configuration for this transform
include ../transform.config

DOCKER_IMAGE_VERSION=${LICENSE_SELECT_PYTHON_VERSION}

# Use default rule inherited from makefile.common
clean:: .transforms.clean

Expand Down
2 changes: 0 additions & 2 deletions transforms/code/license_select/ray/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ include ../transform.config

BASE_IMAGE=$(RAY_BASE_IMAGE)

DOCKER_IMAGE_VERSION=${LICENSE_SELECT_RAY_VERSION}

# Use default rule inherited from makefile.common
clean:: .transforms.clean

Expand Down
Loading

0 comments on commit cf20268

Please sign in to comment.