Skip to content

Commit

Permalink
Merge pull request #256 from dtsuzuku-ibm/add-language_id
Browse files Browse the repository at this point in the history
add language identification transform module
  • Loading branch information
daw3rd authored Jun 11, 2024
2 parents 66c0fc0 + 50349d5 commit e571be2
Show file tree
Hide file tree
Showing 46 changed files with 1,953 additions and 2 deletions.
3 changes: 2 additions & 1 deletion .make.versions
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ NOOP_PYTHON_VERSION=0.9.0$(RELEASE_VERSION_SUFFIX)
NOOP_RAY_VERSION=0.9.0$(RELEASE_VERSION_SUFFIX)
NOOP_SPARK_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX)
RESIZE_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
LANG_ID_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
LANG_ID_PYTHON_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
LANG_ID_RAY_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
TOKENIZATION_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
MALWARE_VERSION=0.5.0$(RELEASE_VERSION_SUFFIX)
PROGLANG_SELECT_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX)
Expand Down
2 changes: 1 addition & 1 deletion data-processing-lib/doc/transform-s3-testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ mc cp --recursive code/proglang_select/test-data/languages/ local/test/proglang_
mc cp --recursive code/malware/test-data/input/ local/test/malware/input

mc cp --recursive language/doc_quality/test-data/input/ local/test/doc_quality/input
mc cp --recursive language/language_id/test-data/input/ local/test/language_id/input
mc cp --recursive language/lang_id/ray/test-data/input/ local/test/lang_id/input

mc cp --recursive universal/blocklist/test-data/input/ local/test/blocklist/input
mc cp --recursive universal/blocklist/test-data/domains/ local/test/blocklist/domains
Expand Down
2 changes: 2 additions & 0 deletions kind/hack/populate_minio.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ mc cp --recursive ${ROOT_DIR}/../transforms/code/ingest2parquet/ray/test-data/la
mc cp --recursive ${ROOT_DIR}/../transforms/code/proglang_select/ray/test-data/input/ kfp/test/proglang_select/input
mc cp --recursive ${ROOT_DIR}/../transforms/code/proglang_select/ray/test-data/languages/ kfp/test/proglang_select/languages
mc cp --recursive ${ROOT_DIR}/../transforms/code/malware/ray/test-data/input/ kfp/test/malware/input
# language
mc cp --recursive ${ROOT_DIR}/../transforms/language/lang_id/ray/test-data/input/ kfp/test/lang_id/input
# universal
mc cp --recursive ${ROOT_DIR}/../transforms/universal/doc_id/ray/test-data/input/ kfp/test/doc_id/input
mc cp --recursive ${ROOT_DIR}/../transforms/universal/ededup/ray/test-data/input/ kfp/test/ededup/input
Expand Down
66 changes: 66 additions & 0 deletions transforms/language/lang_id/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
REPOROOT=../../..
# Use make help, to see the available rules
include $(REPOROOT)/.make.defaults

setup::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

clean::
@# Help: Recursively make $@ all subdirs
$(MAKE) RULE=$@ .recurse

build::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse
venv::
@# Help: Recursively make $@ in subdirs
$(MAKE) RULE=$@ .recurse

image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

set-versions::
@# Help: Recursively $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

publish::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-image::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test::
@# Help: Recursively make $@ in all subdirs
@$(MAKE) RULE=$@ .recurse

test-src::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

load-image::
@# Help: Recursively make $@ in all subdirs
$(MAKE) RULE=$@ .recurse

.PHONY: workflow-venv
workflow-venv:
$(MAKE) -C kfp_ray/v1 workflow-venv

.PHONY: workflow-build
workflow-build:
$(MAKE) -C kfp_ray/v1 workflow-build

.PHONY: workflow-test
workflow-test:
$(MAKE) -C kfp_ray/v1 workflow-test

.PHONY: workflow-upload
workflow-upload:
$(MAKE) -C kfp_ray/v1 workflow-upload

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
$(MAKE) -C kfp_ray/v1 workflow-reconcile-requirements
12 changes: 12 additions & 0 deletions transforms/language/lang_id/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
# Language Identification Transform
The Language Identification transforms serves as a simple exemplar to demonstrate the development
of a simple 1:1 transform. Per the set of
[transform project conventions](../../README.md#transform-project-conventions)
the following runtimes are available:

* [python](python/README.md) - provides the base python-based transformation
implementation.
* [ray](ray/README.md) - enables the running of the base python transformation
in a Ray runtime
* [kfp](kfp_ray/README.md) - enables running the ray docker image for
noop in a kubernetes cluster using a generated `yaml` file.
54 changes: 54 additions & 0 deletions transforms/language/lang_id/kfp_ray/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
REPOROOT=${CURDIR}/../../../../
WORKFLOW_VENV_ACTIVATE=${REPOROOT}/transforms/venv/bin/activate
include $(REPOROOT)/transforms/.make.workflows

SRC_DIR=${CURDIR}/../ray/

PYTHON_WF := $(shell find ./ -name '*_wf.py')
YAML_WF := $(patsubst %.py, %.yaml, ${PYTHON_WF})

workflow-venv: .check_python_version ${WORKFLOW_VENV_ACTIVATE}

clean:: .defaults.clean

setup::

venv::

build::

test::

test-src::

test-image::

publish::

image::

load-image::

set-versions: workflow-reconcile-requirements

.PHONY: workflow-build
workflow-build: workflow-venv
@for file in $(YAML_WF); do \
$(MAKE) $$file; \
done

.PHONY: workflow-test
workflow-test: workflow-build
$(MAKE) .transforms_workflows.test-pipeline TRANSFORM_SRC=${SRC_DIR} PIPELINE_FILE=lang_id_wf.yaml

.PHONY: workflow-upload
workflow-upload: workflow-build
@for file in $(YAML_WF); do \
$(MAKE) .transforms_workflows.upload-pipeline PIPELINE_FILE=$$file; \
done

.PHONY: workflow-reconcile-requirements
workflow-reconcile-requirements:
@for file in $(PYTHON_WF); do \
$(MAKE) .transforms_workflows.reconcile-requirements PIPELINE_FILE=$$file; \
done
220 changes: 220 additions & 0 deletions transforms/language/lang_id/kfp_ray/lang_id_multiple_wf.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,220 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os

from workflow_support.compile_utils import (
ONE_HOUR_SEC,
ONE_WEEK_SEC,
ComponentUtils,
)

import kfp.compiler as compiler
import kfp.components as comp
import kfp.dsl as dsl


task_image = "quay.io/dataprep1/data-prep-kit/lang_id-ray:0.4.0.dev6"

# the name of the job script
EXEC_SCRIPT_NAME: str = "lang_id_transform_ray.py"

# components
base_kfp_image = "quay.io/dataprep1/data-prep-kit/kfp-data-processing:0.2.0.dev6"

# path to kfp component specifications files
component_spec_path = "../../../../kfp/kfp_ray_components/"

# compute execution parameters. Here different tranforms might need different implementations. As
# a result, instead of creating a component we are creating it in place here.
def compute_exec_params_func(
worker_options: str,
actor_options: str,
data_s3_config: str,
data_max_files: int,
data_num_samples: int,
runtime_pipeline_id: str,
runtime_job_id: str,
runtime_code_location: str,
lang_id_model_credential: str,
lang_id_model_kind: str,
lang_id_model_url: str,
lang_id_content_column_name: str,
) -> dict:
import uuid

from workflow_support.runtime_utils import KFPUtils

return {
"data_s3_config": data_s3_config,
"data_max_files": data_max_files,
"data_num_samples": data_num_samples,
"runtime_num_workers": KFPUtils.default_compute_execution_params(worker_options, actor_options),
"runtime_worker_options": actor_options,
"runtime_pipeline_id": runtime_pipeline_id,
"runtime_job_id": runtime_job_id,
"runtime_code_location": runtime_code_location,
"lang_id_model_credential": lang_id_model_credential,
"lang_id_model_kind": lang_id_model_kind,
"lang_id_model_url": lang_id_model_url,
"lang_id_content_column_name": lang_id_content_column_name,
}


# KFPv1 and KFP2 uses different methods to create a component from a function. KFPv1 uses the
# `create_component_from_func` function, but it is deprecated by KFPv2 and so has a different import path.
# KFPv2 recommends using the `@dsl.component` decorator, which doesn't exist in KFPv1. Therefore, here we use
# this if/else statement and explicitly call the decorator.
if os.getenv("KFPv2", "0") == "1":
# In KFPv2 dsl.RUN_ID_PLACEHOLDER is deprecated and cannot be used since SDK 2.5.0. On another hand we cannot create
# a unique string in a component (at runtime) and pass it to the `clean_up_task` of `ExitHandler`, due to
# https://github.com/kubeflow/pipelines/issues/10187. Therefore, meantime we use a unique string created at
# compilation time.
import uuid

compute_exec_params_op = dsl.component_decorator.component(
func=compute_exec_params_func, base_image=base_kfp_image
)
print(
"WARNING: the ray cluster name can be non-unique at runtime, please do not execute simultaneous Runs of the " +
"same version of the same pipeline !!!")
run_id = uuid.uuid4().hex
else:
compute_exec_params_op = comp.create_component_from_func(func=compute_exec_params_func, base_image=base_kfp_image)
run_id = dsl.RUN_ID_PLACEHOLDER

# create Ray cluster
create_ray_op = comp.load_component_from_file(component_spec_path + "createRayClusterComponent.yaml")
# execute job
execute_ray_jobs_op = comp.load_component_from_file(component_spec_path + "executeRayJobComponent.yaml")
# clean up Ray
cleanup_ray_op = comp.load_component_from_file(component_spec_path + "deleteRayClusterComponent.yaml")
# Task name is part of the pipeline name, the ray cluster name and the job name in DMF.
TASK_NAME: str = "lang_id"


@dsl.pipeline(
name=TASK_NAME + "-ray-pipeline",
description="Pipeline for multiple lang_id",
)
def lang_id(
# Ray cluster
ray_name: str = "lang_id-kfp-ray", # name of Ray cluster
ray_head_options: str = '{"cpu": 1, "memory": 4, "image_pull_secret": "", '
'"image": "' + task_image + '", "image_pull_policy": "Always" }',
ray_worker_options: str = '{"replicas": 2, "max_replicas": 2, "min_replicas": 2, "cpu": 2, "memory": 4, '
'"image_pull_secret": "", "image": "' + task_image + '", "image_pull_policy": "Always" }',
server_url: str = "http://kuberay-apiserver-service.kuberay.svc.cluster.local:8888",
# data access
data_s3_config: str = "[{'input_folder': 'test/lang_id/input/', 'output_folder': 'test/lang_id/output/'}]",
data_s3_access_secret: str = "s3-secret",
data_max_files: int = -1,
data_num_samples: int = -1,
# orchestrator
runtime_actor_options: str = "{'num_cpus': 0.8}",
runtime_pipeline_id: str = "pipeline_id",
runtime_code_location: str = "{'github': 'github', 'commit_hash': '12345', 'path': 'path'}",
# lang_id parameters
lang_id_model_credential: str = "PUT YOUR OWN HUGGINGFACE CREDENTIAL",
lang_id_model_kind: str = "fasttext",
lang_id_model_url: str = "facebook/fasttext-language-identification",
lang_id_content_column_name: str = "contents",
# additional parameters
additional_params: str = '{"wait_interval": 2, "wait_cluster_ready_tmout": 400, "wait_cluster_up_tmout": 300, "wait_job_ready_tmout": 400, "wait_print_tmout": 30, "http_retries": 5}',
):
"""
Pipeline to execute Language Identification transform
:param ray_name: name of the Ray cluster
:param ray_head_options: head node options, containing the following:
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param ray_worker_options: worker node options (we here are using only 1 worker pool), containing the following:
replicas - number of replicas to create
max_replicas - max number of replicas
min_replicas - min number of replicas
cpu - number of cpus
memory - memory
image - image to use
image_pull_secret - image pull secret
:param server_url - server url
:param additional_params: additional (support) parameters, containing the following:
wait_interval - wait interval for API server, sec
wait_cluster_ready_tmout - time to wait for cluster ready, sec
wait_cluster_up_tmout - time to wait for cluster up, sec
wait_job_ready_tmout - time to wait for job ready, sec
wait_print_tmout - time between prints, sec
http_retries - http retries for API server calls
:param data_s3_access_secret - s3 access secret
:param data_s3_config - s3 configuration. Note that config here should be an array
:param data_max_files - max files to process
:param data_num_samples - num samples to process
:param runtime_actor_options - actor options
:param runtime_pipeline_id - pipeline id
:param runtime_code_location - code location
:param lang_id_model_credential - credential you use to get model
:param lang_id_model_kind - what kind of model you want to use for language identification
:param lang_id_model_url - url that model locates
:param lang_id_content_column_name - name of the column containing documents
:return: None
"""
# create clean_up task
clean_up_task = cleanup_ray_op(ray_name=ray_name, run_id=run_id, server_url=server_url)
ComponentUtils.add_settings_to_component(clean_up_task, 60)
# pipeline definition
with dsl.ExitHandler(clean_up_task):
# compute execution params
compute_exec_params = compute_exec_params_op(
worker_options=ray_worker_options,
actor_options=runtime_actor_options,
data_s3_config=data_s3_config,
data_max_files=data_max_files,
data_num_samples=data_num_samples,
runtime_pipeline_id=runtime_pipeline_id,
runtime_job_id=run_id,
runtime_code_location=runtime_code_location,
lang_id_model_credential=lang_id_model_credential,
lang_id_model_kind=lang_id_model_kind,
lang_id_model_url=lang_id_model_url,
lang_id_content_column_name=lang_id_content_column_name,
)
ComponentUtils.add_settings_to_component(compute_exec_params, ONE_HOUR_SEC * 2)
# start Ray cluster
ray_cluster = create_ray_op(
ray_name=ray_name,
run_id=run_id,
ray_head_options=ray_head_options,
ray_worker_options=ray_worker_options,
server_url=server_url,
additional_params=additional_params,
)
ComponentUtils.add_settings_to_component(ray_cluster, ONE_HOUR_SEC * 2)
ray_cluster.after(compute_exec_params)
# Execute job
execute_job = execute_ray_jobs_op(
ray_name=ray_name,
run_id=run_id,
additional_params=additional_params,
# note that the parameters below are specific for Language Identification transform
exec_params=compute_exec_params.output,
exec_script_name=EXEC_SCRIPT_NAME,
server_url=server_url,
)
ComponentUtils.add_settings_to_component(execute_job, ONE_WEEK_SEC)
ComponentUtils.set_s3_env_vars_to_component(execute_job, data_s3_access_secret)
execute_job.after(ray_cluster)



if __name__ == "__main__":
# Compiling the pipeline
compiler.Compiler().compile(lang_id, __file__.replace(".py", ".yaml"))
Loading

0 comments on commit e571be2

Please sign in to comment.