diff --git a/.make.defaults b/.make.defaults index 276fa35c9..38e9606e2 100644 --- a/.make.defaults +++ b/.make.defaults @@ -275,6 +275,7 @@ __check_defined = \ .defaults.ray-lib-src-venv:: .defaults.create-venv .defaults.install-ray-lib-src-venv .defaults.install-local-requirements-venv # Install all source from the repo for a ray runtime transform into an existing venv +# And if there is an adjacent python dir (as for transforms), then also install that source .PHONY: .defaults.install-ray-lib-src-venv .defaults.install-ray-lib-src-venv:: @# Help: Install Ray and Python data processing library source into existing venv @@ -285,6 +286,9 @@ __check_defined = \ pip uninstall -y data-prep-toolkit-ray; \ $(MAKE) PYTHON_PROJECT_DIR=$(DPK_PYTHON_LIB_DIR) .defaults.install-src-venv; \ $(MAKE) PYTHON_PROJECT_DIR=$(DPK_RAY_LIB_DIR) .defaults.install-src-venv; \ + if [ -d ../python ]; then \ + $(MAKE) PYTHON_PROJECT_DIR=../python .defaults.install-src-venv; \ + fi echo Installed source from Ray data processing library for `which $(PYTHON)` # Install local requirements last as it generally includes our lib source diff --git a/.make.versions b/.make.versions index 08ce283f0..85dfe5428 100644 --- a/.make.versions +++ b/.make.versions @@ -20,6 +20,8 @@ DOC_ID_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX) DOC_ID_SPARK_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX) EDEDUP_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX) FDEDUP_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX) +FILTER_PYTHON_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX) +FILTER_RAY_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX) FILTER_VERSION=0.4.0$(RELEASE_VERSION_SUFFIX) FILTER_SPARK_VERSION=0.2.0$(RELEASE_VERSION_SUFFIX) NOOP_PYTHON_VERSION=0.9.0$(RELEASE_VERSION_SUFFIX) diff --git a/transforms/.make.transforms b/transforms/.make.transforms index 4c223fdf5..2b9db348d 100644 --- a/transforms/.make.transforms +++ b/transforms/.make.transforms @@ -100,9 +100,6 @@ extra-help: .PHONY: .transforms.ray-venv .transforms.ray-venv:: .defaults.ray-lib-src-venv - if [ -d ../python ]; then \ - $(MAKE) PYTHON_PROJECT_DIR=../python .defaults.install-src-venv; \ - fi # For now we do NOT install ../python source as we do for ray, since for now # spark implementations do not use the pure python transform. If/when @@ -153,13 +150,13 @@ extra-help: @# Help: Run the transform's tests and any '*local' .py files .PHONY: .transforms.python-test-image -.transforms.python-test-image:: .transforms.python-image .defaults.test-image-pytest +.transforms.python-test-image:: .transforms.python-image .transforms.test-image-help .defaults.test-image-pytest .PHONY: .transforms.ray-test-image -.transforms.ray-test-image:: .transforms.ray-image .defaults.test-image-pytest +.transforms.ray-test-image:: .transforms.ray-image .transforms.test-image-help .defaults.test-image-pytest .PHONY: .transforms.spark-test-image -.transforms.spark-test-image:: .transforms.spark-image .defaults.test-image-pytest +.transforms.spark-test-image:: .transforms.spark-image .transforms.test-image-help .defaults.test-image-pytest .PHONY: .transforms.test-image-pytest .transforms.test-image-pytest:: .defaults.test-image-pytest diff --git a/transforms/fix.sh b/transforms/fix.sh deleted file mode 100644 index 4f62e86ad..000000000 --- a/transforms/fix.sh +++ /dev/null @@ -1,11 +0,0 @@ -mf=$(find . -name Makefile) -dotmf=$(find . -name '*.make*) -for i in $mf $dotmf; do - cat $i | sed \ - -e "s/transforms.python-publish/transforms.publish-image-python/g" \ - -e "s/transforms.ray-publish/transforms.publish-image-ray/g" \ - -e "s/transforms.spark-publish/transforms.publish-image-spark/g" \ - > tt - mv tt $i -done - diff --git a/transforms/universal/filter/python/.dockerignore b/transforms/universal/filter/python/.dockerignore new file mode 100644 index 000000000..f7275bbbd --- /dev/null +++ b/transforms/universal/filter/python/.dockerignore @@ -0,0 +1 @@ +venv/ diff --git a/transforms/universal/filter/python/.gitignore b/transforms/universal/filter/python/.gitignore new file mode 100644 index 000000000..3ea7fd4ab --- /dev/null +++ b/transforms/universal/filter/python/.gitignore @@ -0,0 +1,38 @@ +test-data/output +output/* +/output/ +data-processing-lib/ + + +# Byte-compiled / optimized / DLL files +__pycache__/ +*.py[cod] +*$py.class + + +# Distribution / packaging +bin/ +build/ +develop-eggs/ +dist/ +eggs/ +lib/ +lib64/ +parts/ +sdist/ +var/ +*.egg-info/ +.installed.cfg +*.egg + +# Installer logs +pip-log.txt +pip-delete-this-directory.txt + +# Unit test / coverage reports +.tox/ +htmlcov +.coverage +.cache +nosetests.xml +coverage.xml \ No newline at end of file diff --git a/transforms/universal/filter/python/Dockerfile b/transforms/universal/filter/python/Dockerfile new file mode 100644 index 000000000..5d8a00f7e --- /dev/null +++ b/transforms/universal/filter/python/Dockerfile @@ -0,0 +1,43 @@ +FROM docker.io/python:3.10.14-slim-bullseye + +RUN pip install --upgrade pip + +# install pytest +RUN pip install --no-cache-dir pytest + +# Create a user and use it to run the transform +RUN useradd -ms /bin/bash dpk +USER dpk +WORKDIR /home/dpk + +# Copy and install data processing libraries +# These are expected to be placed in the docker context before this is run (see the make image). +COPY --chown=dpk:root data-processing-lib-python/ data-processing-lib-python/ +RUN cd data-processing-lib-python && pip install --no-cache-dir -e . + +# END OF STEPS destined for a data-prep-kit base image + +COPY --chown=dpk:root src/ src/ +COPY --chown=dpk:root pyproject.toml pyproject.toml +RUN pip install --no-cache-dir -e . + +#COPY requirements.txt requirements.txt +#RUN pip install --no-cache-dir -r requirements.txt + +# copy source data +COPY ./src/filter_transform.py . +COPY ./src/filter_transform_python.py . +COPY ./src/filter_local.py local/ + +# copy test +COPY test/ test/ +COPY test-data/ test-data/ + +# Set environment +ENV PYTHONPATH /home/dpk + +# Put these at the end since they seem to upset the docker cache. +ARG BUILD_DATE +ARG GIT_COMMIT +LABEL build-date=$BUILD_DATE +LABEL git-commit=$GIT_COMMIT diff --git a/transforms/universal/filter/python/Makefile b/transforms/universal/filter/python/Makefile new file mode 100644 index 000000000..717a82f39 --- /dev/null +++ b/transforms/universal/filter/python/Makefile @@ -0,0 +1,55 @@ +# Define the root of the local git clone for the common rules to be able +# know where they are running from. +REPOROOT=../../../.. +# Include a library of common .transform.* targets which most +# transforms should be able to reuse. However, feel free +# to override/redefine the rules below. + +# $(REPOROOT)/.make.versions file contains the versions + +TRANSFORM_NAME=filter +DOCKER_IMAGE_VERSION=${FILTER_PYTHON_VERSION} + +include $(REPOROOT)/transforms/.make.transforms + +venv:: .transforms.python-venv + +test:: .transforms.python-test + +clean:: .transforms.clean + +image:: .transforms.python-image + +test-src:: .transforms.test-src + +setup:: .transforms.setup + +build:: build-dist image + +publish:: publish-dist publish-image + +publish-image:: .transforms.publish-image-python + +setup:: .transforms.setup + +# distribution versions is the same as image version. +set-versions: + $(MAKE) TOML_VERSION=$(DOCKER_IMAGE_VERSION) .defaults.update-toml + +build-dist:: set-versions .defaults.build-dist + +publish-dist:: .defaults.publish-dist + +test-image:: .transforms.python-test-image + +run-cli-sample: .transforms.run-cli-python-sample + +run-local-sample: .transforms.run-local-sample + +run-local-python-sample: .transforms.run-local-python-sample + +#run-s3-ray-sample: .transforms.run-s3-ray-sample + +minio-start: .minio-start + +load-image:: .transforms.load-image diff --git a/transforms/universal/filter/python/README.md b/transforms/universal/filter/python/README.md new file mode 100644 index 000000000..8c693c573 --- /dev/null +++ b/transforms/universal/filter/python/README.md @@ -0,0 +1,278 @@ +# Filtering +Please see the set of +[transform project conventions](../../../README.md) +for details on general project conventions, transform configuration, +testing and IDE set up. + +## Summary + +Filtering cleans up data by: + * Removing the rows that do not meet a specific set of criteria. + * Dropping the columns that are no longer needed (e.g. annotation columns, used for filtering rows). + +## Configuration and command line Options + +The set of dictionary keys holding [FilterTransform](src/filter_transform_python.py) +configuration for values are as follows: + +* _filter_criteria_list_ - specifies the list of row filter criteria (in SQL WHERE clause format). Each filter criterion is a string. The default value of this parameter is `[]` (an empty list, meaning that all the rows in the input table will be kept). +* _filter_logical_operator_ - specifies the logical operator that joins filter criteria (`AND` or `OR`). The default value of this parameter is `AND`. +* _filter_columns_to_drop_ - the list with the names of the columns to drop after row filtering is complete. The default value of this parameter is `[]` (an empty list, meaning that all the columns in the input table will be kept) + +## Example +Consider a table with eight text documents, where each row has additional info about that document (date acquired, source URL, etc.), and a set of quality signals for that document. + +``` +|----------|----------|----------|----------|----------|----------|---------|----------|----------|----------|---------| +│ document | title | contents | date_acq | extra | cluster | ft_lang | ft_score | docq_tot | docq_mea | docq_pe │ +│ --- | --- | --- | uired | --- | --- | --- | --- | al_words | n_word_l | rplex_s │ +│ str | str | str | --- | struct[5 | i64 | str | f64 | --- | en | core │ +│ | | | datetime | ] | | | | i64 | --- | --- │ +│ | | | [ns] | | | | | | f64 | f64 │ +|----------|----------|----------|----------|----------|----------|---------|----------|----------|----------|---------| +│ CC-MAIN- | https:// | BACKGROU | 2023-07- | {"applic | -1 | en | 1.0 | 77 | 5.662338 | 226.5 │ +│ 20190221 | www.sema | ND | 05 | ation/ht | | | | | | │ +│ 132217-2 | nticscho | The | 05:00:00 | tp; msgt | | | | | | │ +│ 01902211 | lar.org/ | Rhinitis | | ype=resp | | | | | | │ +│ … | … | Control | | … | | | | | | │ +│ | | … | | | | | | | | │ +│ CC-MAIN- | https:// | Travel + | 2023-07- | {"applic | -1 | en | 1.0 | 321 | 5.05919 | 245.0 │ +│ 20200528 | www.torn | Leisure: | 27 | ation/ht | | | | | | │ +│ 232803-2 | osnews.g | The 5 | 05:00:00 | tp; msgt | | | | | | │ +│ 02005290 | r/en/tou | best | | ype=resp | | | | | | │ +│ … | … | res… | | … | | | | | | │ +│ CC-MAIN- | https:// | Stourbri | 2023-07- | {"applic | -1 | en | 1.0 | 646 | 5.27709 | 230.3 │ +│ 20190617 | www.stou | dge | 04 | ation/ht | | | | | | │ +│ 103006-2 | rbridgen | College | 05:00:00 | tp; msgt | | | | | | │ +│ 01906171 | ews.co.u | to close | | ype=resp | | | | | | │ +│ … | … | BMe… | | … | | | | | | │ +│ CC-MAIN- | https:// | Our | 2023-07- | {"applic | -1 | en | 1.0 | 242 | 5.557851 | 407.2 │ +│ 20180318 | www.univ | Guidance | 19 | ation/ht | | | | | | │ +│ 184945-2 | ariety.c | Philosop | 05:00:00 | tp; msgt | | | | | | │ +│ 01803182 | om/app/s | hy | | ype=resp | | | | | | │ +│ … | … | High | | … | | | | | | │ +│ | | sch… | | | | | | | | │ +│ CC-MAIN- | http://h | Hukun | 2023-07- | {"applic | -1 | en | 1.0 | 169 | 4.840237 | 240.5 │ +│ 20180120 | urur.com | Hurur | 18 | ation/ht | | | | | | │ +│ 083038-2 | /hukun-h | running | 05:00:00 | tp; msgt | | | | | | │ +│ 01801201 | urur-run | for Ward | | ype=resp | | | | | | │ +│ … | … | 1 c… | | … | | | | | | │ +│ CC-MAIN- | https:// | Life's | 2023-07- | {"applic | -1 | en | 1.0 | 61 | 4.786885 | 244.0 │ +│ 20180522 | www.chap | Reverie | 18 | ation/ht | | | | | | │ +│ 131652-2 | ters.ind | Kobo | 05:00:00 | tp; msgt | | | | | | │ +│ 01805221 | igo.ca/e | ebook | | | ype=resp | | | | | | │ +│ … | … | Sept… | | … | | | | | | │ +│ CC-MAIN- | http://w | Kamis, | 2023-07- | {"applic | 18008253 | en | 1.0 | 509 | 4.738703 | 224.6 │ +│ 20181120 | ww.onedo | 10 Maret | 05 | ation/ht | 113 | | | | | │ +│ 130743-2 | llarfoll | 2016 | 05:00:00 | tp; msgt | | | | | | │ +│ 01811201 | owers.co | Buy | | ype=resp | | | | | | │ +│ … | … | Twitter… | | … | | | | | | │ +│ CC-MAIN- | http://w | Rory | 2023-07- | {"applic | -1 | en | 1.0 | 223 | 4.829596 | 167.5 │ +│ 20171213 | ww.iron- | Fallon | 09 | ation/ht | | | | | | │ +│ 104259-2 | bru.co.u | joins | 05:00:00 | tp; msgt | | | | | | │ +│ 01712131 | k/rory-f | Bristol | | ype=resp | | | | | | │ +│ … | … | Rovers… | | … | | | | | | │ +|----------|----------|----------|----------|----------|----------|---------|----------|----------|----------|---------| +``` +#### Example 1 - two numerical filtering criteria joined by AND +To filter this table and only keep the documents that have between 100 and 500 words **and** a perplexity score less than 230, and furthermore, drop the `extra` and `cluster` columns, invoke filtering with the following parameters: +``` +filter_criteria_list = ["docq_total_words > 100 AND docq_total_words < 500", "docq_perplex_score < 230"] +filter_logical_operator = "AND" +filter_columns_to_drop = ["extra", "cluster"] +``` +This filter operation applied on the table above will return the following result: +``` +|-------------|-------------|-------------|-------------|---------|----------|-------------|-------------|-------------| +│ document | title | contents | date_acquir | ft_lang | ft_score | docq_total_ | docq_mean_w | docq_perple │ +│ --- | --- | --- | ed | --- | --- | words | ord_len | x_score │ +│ str | str | str | --- | str | f64 | --- | --- | --- │ +│ | | | datetime[ns | | | i64 | f64 | f64 │ +│ | | | ] | | | | | │ +|-------------|-------------|-------------|-------------|---------|----------|-------------|-------------|-------------| +│ CC-MAIN-201 | http://www. | Rory Fallon | 2023-07-09 | en | 1.0 | 223 | 4.829596 | 167.5 │ +│ 71213104259 | iron-bru.co | joins | 05:00:00 | | | | | │ +│ -201712131… | .uk/rory-f… | Bristol | | | | | | │ +│ | | Rovers… | | | | | | │ +|-------------|-------------|-------------|-------------|---------|----------|-------------|-------------|-------------| + +``` + +#### Example 2 - two numerical filtering criteria joined by OR +To filter this table and only keep the documents that have between 100 and 500 words **or** a perplexity score less than 230, and furthermore, drop the `extra` and `cluster` columns, invoke filtering with the following parameters: +``` +filter_criteria_list = ["docq_total_words > 100 AND docq_total_words < 500", "docq_perplex_score < 230"] +filter_logical_operator = "OR" +filter_columns_to_drop = ["extra", "cluster"] +``` +This filter operation applied on the table above will return the following result: +``` +|-------------|-------------|-------------|-------------|---------|----------|-------------|-------------|-------------| +│ document | title | contents | date_acquir | ft_lang | ft_score | docq_total_ | docq_mean_w | docq_perple | +│ --- | --- | --- | ed | --- | --- | words | ord_len | x_score │ +│ str | str | str | --- | str | f64 | --- | --- | --- │ +│ | | | datetime[ns | | | i64 | f64 | f64 │ +│ | | | ] | | | | | │ +|-------------|-------------|-------------|-------------|---------|----------|-------------|-------------|-------------| +│ CC-MAIN-201 | https://www | BACKGROUND | 2023-07-05 | en | 1.0 | 77 | 5.662338 | 226.5 │ +│ 90221132217 | .semanticsc | The | 05:00:00 | | | | | │ +│ -201902211… | holar.org/… | Rhinitis | | | | | | │ +│ | | Control … | | | | | | │ +│ CC-MAIN-201 | http://www. | Kamis, 10 | 2023-07-05 | en | 1.0 | 509 | 4.738703 | 224.6 │ +│ 81120130743 | onedollarfo | Maret 2016 | 05:00:00 | | | | | │ +│ -201811201… | llowers.co… | Buy | | | | | | │ +│ | | Twitter… | | | | | | │ +│ CC-MAIN-201 | http://www. | Rory Fallon | 2023-07-09 | en | 1.0 | 223 | 4.829596 | 167.5 │ +│ 71213104259 | iron-bru.co | joins | 05:00:00 | | | | | │ +│ -201712131… | .uk/rory-f… | Bristol | | | | | | │ +│ | | Rovers… | | | | | | │ +│ CC-MAIN-202 | https://www | Travel + | 2023-07-27 | en | 1.0 | 321 | 5.05919 | 245.0 │ +│ 00528232803 | .tornosnews | Leisure: | 05:00:00 | | | | | │ +│ -202005290… | .gr/en/tou… | The 5 best | | | | | | │ +│ | | res… | | | | | | │ +│ CC-MAIN-201 | https://www | Our | 2023-07-19 | en | 1.0 | 242 | 5.557851 | 407.2 │ +│ 80318184945 | .univariety | Guidance | 05:00:00 | | | | | │ +│ -201803182… | .com/app/s… | Philosophy | | | | | | │ +│ | | High sch… | | | | | | │ +│ CC-MAIN-201 | http://huru | Hukun Hurur | 2023-07-18 | en | 1.0 | 169 | 4.840237 | 240.5 │ +│ 80120083038 | r.com/hukun | running for | 05:00:00 | | | | | │ +│ -201801201… | -hurur-run… | Ward 1 c… | | | | | | │ +|-------------|-------------|-------------|-------------|---------|----------|-------------|-------------|-------------| +``` + +#### Example 3 - two filtering criteria based on non-numerical (datetime and string) types + +To filter this table and only keep the documents that were acquired between 2023-07-04 and 2023-07-08 and were downloaded using the `HTTPS` protocol, without dropping any columns, invoke filtering with the following parameters: +``` +filter_criteria_list = ["date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", "title LIKE 'https://%'"] +filter_logical_operator = "AND" +filter_columns_to_drop = [] +``` + +This filter operation applied on the table above will return the following result: +``` +|----------|----------|----------|----------|----------|----------|---------|----------|----------|----------|---------| +│ document | title | contents | date_acq | extra | cluster | ft_lang | ft_score | docq_tot | docq_mea | docq_pe │ +│ --- | --- | --- | uired | --- | --- | --- | --- | al_words | n_word_l | rplex_s │ +│ str | str | str | --- | struct[5 | i64 | str | f64 | --- | en | core │ +│ | | | datetime | ] | | | | i64 | --- | --- │ +│ | | | [ns] | | | | | | f64 | f64 │ +|----------|----------|----------|----------|----------|----------|---------|----------|----------|----------|---------| +│ CC-MAIN- | https:// | BACKGROU | 2023-07- | {"applic | -1 | en | 1.0 | 77 | 5.662338 | 226.5 │ +│ 20190221 | www.sema | ND | 05 | ation/ht | | | | | | │ +│ 132217-2 | nticscho | The | 05:00:00 | tp; msgt | | | | | | │ +│ 01902211 | lar.org/ | Rhinitis | | ype=resp | | | | | | │ +│ … | … | Control | | … | | | | | | │ +│ | | … | | | | | | | | │ +│ CC-MAIN- | https:// | Stourbri | 2023-07- | {"applic | -1 | en | 1.0 | 646 | 5.27709 | 230.3 │ +│ 20190617 | www.stou | dge | 04 | ation/ht | | | | | | │ +│ 103006-2 | rbridgen | College | 05:00:00 | tp; msgt | | | | | | │ +│ 01906171 | ews.co.u | to close | | ype=resp | | | | | | │ +│ … | … | BMe… | | … | | | | | | │ +|----------|----------|----------|----------|----------|---------|---------|----------|----------|----------|----------| +``` + + +## Running +You can run the [filter_local.py](src/filter_local.py) (python-only implementation) or [filter_local_ray.py](src/filter_local_ray.py) (ray-based implementation) to transform the `test1.parquet` file in [test input data](test-data/input) to an `output` directory. The directory will contain both the new annotated `test1.parquet` file and the `metadata.json` file. + +#### Running as ray-based application +``` +(venv) cma:src$ python filter_local_ray.py +12:48:01 INFO - Running locally +12:48:01 INFO - Using local configuration with: input_folder - /home/cma/de/data-prep-kit/transforms/universal/filtering/test-data/input output_folder - /home/cma/de/data-prep-kit/transforms/universal/filtering/output +12:48:01 INFO - Not using data sets, checkpointing False, max files -1 +12:48:01 INFO - number of workers 5 worker options {'num_cpus': 0.8} +12:48:01 INFO - pipeline id pipeline_id; number workers 5 +12:48:01 INFO - job details {'job category': 'preprocessing', 'job name': 'filter', 'job type': 'ray', 'job id': 'job_id'} +12:48:01 INFO - code location {'github': 'github', 'commit_hash': '12345', 'path': 'path'} +12:48:01 INFO - actor creation delay 0 +2024-03-31 12:48:03,390 INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at 127.0.0.1:8265 +(orchestrate pid=308034) 12:48:04 INFO - orchestrator started at 2024-03-31 12:48:04 +(orchestrate pid=308034) 12:48:04 INFO - Number of files is 1, source profile {'max_file_size': 0.15915775299072266, 'min_file_size': 0.15915775299072266, 'total_file_size': 0.15915775299072266} +(orchestrate pid=308034) 12:48:04 INFO - Cluster resources: {'cpus': 20, 'gpus': 0, 'memory': 31.60095291212201, 'object_store': 15.800476455129683} +(orchestrate pid=308034) 12:48:04 INFO - Number of workers - 5 with {'num_cpus': 0.8} each +(orchestrate pid=308034) 12:48:04 INFO - Completed 0 files in 5.312760670979818e-06 min. Waiting for completion +(orchestrate pid=308034) 12:48:06 INFO - Completed processing in 0.022701112429300944 min +12:48:16 INFO - Completed execution in 0.24697633584340414 min, execution result 0 +``` +#### Running as pure python application +
+% make venv
+% source venv/bin/activate
+(venv) % cd src
+(venv) % python filter_local_ray.py
+input table has 100 rows
+
+output table has 11 rows
+output metadata : {'total_docs_count': 100, 'total_bytes_count': 478602, 'total_columns_count': 25, "docs_filtered_by 'docq_total_words > 100 AND docq_total_words < 200'": 78, "bytes_filtered_by 'docq_total_words > 100 AND docq_total_words < 200'": 429191, "docs_filtered_by 'docq_perplex_score < 230'": 53, "bytes_filtered_by 'docq_perplex_score < 230'": 275911, 'docs_after_filter': 11, 'bytes_after_filter': 24061, 'columns_after_filter': 23}
+(venv) % deactivate
+% ls ../output
+metadata.json	test1.parquet
+%
+
+ +#### Passing parameters through command-line-interface + +When running filtering on a local terminal, double quotes need to be escaped accordingly. For example, to find documents that are written in Java or Python programming languages, a SQL query using the `IN` keyword is needed in the `filter_criteria_list` argument. The example below shows how to properly pass this argument to the filter app: +``` +python filter_transform_ray.py --filter_criteria_list "[\"language IN ('Java', 'Python')\"]" ... +``` +When filter runs from the command line, it needs to include the entire `filter_criteria_list` parameter within double quotes (`"`), so that the command line parser can determine where the parameter begins and ends. This, however, will conflict with the internal double quotes that are used to specify the conditions inside the list (`language IN ('Java', 'Python')`). To resolve this problem, the internal double quotes need to be escaped, as in the \"language IN ('Java', 'Python')\" notation. + +### Filter Statistics +As shown in the output of the local run of filtering, the metadata contains several statistics: +* Global statistics: + * `total_docs_count`, `total_bytes_count`, `total_columns_count`: total number of documents (rows), bytes, and columns that were present in the input table, before filtering took place + * `docs_after_filter`, `bytes_after_filter`, `columns_after_filter`: total number of documents (rows), bytes, and columns that were present in the output table, after filtering took place +* Per-criteria statistics: these statistics show the impact of each filtering criteria - number of documents and bytes that it filters out, when applied by itself. We ran the local filter with two filtering criteria, and these are the stats for each of them: + * `docs_filtered_by 'docq_total_words > 100 AND docq_total_words < 200'`, `bytes_filtered_by 'docq_total_words > 100 AND docq_total_words < 200'` - the number of documents and bytes filtered out by the `docq_total_words > 100 AND docq_total_words < 200` filtering condition + * `docs_filtered_by 'docq_perplex_score < 230'`, `bytes_filtered_by 'docq_perplex_score < 230'` - the number of documents and bytes filtered out by the `docq_perplex_score < 230` filtering condition + + +## Running + +### Launched Command Line Options +When running the transform with the Ray launcher (i.e. TransformLauncher), +the following command line arguments are available in addition to +the options provided by the [ray launcher](../../../../data-processing-lib/doc/ray-launcher-options.md) +and the [python launcher](../../../../data-processing-lib/doc/python-launcher-options.md). + +``` + --filter_criteria_list FILTER_CRITERIA_LIST + list of filter criteria (in SQL WHERE clause format), for example: [ + "docq_total_words > 100 AND docq_total_words < 200", + "docq_perplex_score < 230", + "date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", + "title LIKE 'https://%'", + "document_id IN ('doc-id-1', 'doc-id-2', 'doc-id-3')" + ] + --filter_columns_to_drop FILTER_COLUMNS_TO_DROP + list of columns to drop after filtering, for example: ["column1", "column2"] + --filter_logical_operator {AND,OR} + logical operator (AND or OR) that joins filter criteria + +``` + +### Running the samples +To run the samples, use the following `make` targets + +* `run-cli-sample` - runs src/filter_transform_ray.py using command line args +* `run-local-python-only-sample` - runs src/filter_local.py +* `run-local-sample` - runs src/filter_local_ray.py +* `run-s3-sample` - runs src/filter_s3_ray.py + * Requires prior invocation of `make minio-start` to load data into local minio for S3 access. + +These targets will activate the virtual environment and set up any configuration needed. +Use the `-n` option of `make` to see the detail of what is done to run the sample. + +For example, +```shell +make run-cli-sample +... +``` +Then +```shell +ls output +``` +To see results of the transform. diff --git a/transforms/universal/filter/python/pyproject.toml b/transforms/universal/filter/python/pyproject.toml new file mode 100644 index 000000000..82aaacc8a --- /dev/null +++ b/transforms/universal/filter/python/pyproject.toml @@ -0,0 +1,46 @@ +[project] +name = "dpk_filter_transform_python" +version = "0.9.0.dev6" +requires-python = ">=3.10" +description = "Filter Python Transform" +license = {text = "Apache-2.0"} +readme = {file = "README.md", content-type = "text/markdown"} +authors = [ + { name = "David Wood", email = "dawood@us.ibm.com" }, + { name = "Boris Lublinsky", email = "blublinsky@ibm.com" }, +] +dependencies = [ + "data-prep-toolkit==0.2.0.dev6", + "duckdb==0.10.1", +] + +[build-system] +requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] +build-backend = "setuptools.build_meta" + +[project.optional-dependencies] +dev = [ + "twine", + "pytest>=7.3.2", + "pytest-dotenv>=0.5.2", + "pytest-env>=1.0.0", + "pre-commit>=3.3.2", + "pytest-cov>=4.1.0", + "pytest-mock>=3.10.0", + "moto==5.0.5", + "markupsafe==2.0.1", +] + +[options] +package_dir = ["src","test"] + +[options.packages.find] +where = ["src/"] + +[tool.pytest.ini_options] +# Currently we use low coverage since we have to run tests separately (see makefile) +#addopts = "--cov --cov-report term-missing --cov-fail-under 25" +markers = ["unit: unit tests", "integration: integration tests"] + +[tool.coverage.run] +include = ["src/*"] diff --git a/transforms/universal/filter/ray/src/filter_local.py b/transforms/universal/filter/python/src/filter_local.py similarity index 98% rename from transforms/universal/filter/ray/src/filter_local.py rename to transforms/universal/filter/python/src/filter_local.py index 25ae3c2b2..074cc3031 100644 --- a/transforms/universal/filter/ray/src/filter_local.py +++ b/transforms/universal/filter/python/src/filter_local.py @@ -13,7 +13,7 @@ import os from data_processing.data_access import DataAccessLocal -from filter_transform_ray import ( +from filter_transform import ( FilterTransform, filter_columns_to_drop_key, filter_criteria_key, diff --git a/transforms/universal/filter/ray/src/filter_local_pure.py b/transforms/universal/filter/python/src/filter_local_python.py similarity index 98% rename from transforms/universal/filter/ray/src/filter_local_pure.py rename to transforms/universal/filter/python/src/filter_local_python.py index 26ab0ac47..86de7a3fa 100644 --- a/transforms/universal/filter/ray/src/filter_local_pure.py +++ b/transforms/universal/filter/python/src/filter_local_python.py @@ -15,7 +15,7 @@ from data_processing.runtime.pure_python import PythonTransformLauncher from data_processing.utils import ParamsUtils -from filter_transform_ray import ( +from filter_transform import ( FilterTransformConfiguration, filter_columns_to_drop_cli_param, filter_criteria_cli_param, diff --git a/transforms/universal/filter/python/src/filter_test_support.py b/transforms/universal/filter/python/src/filter_test_support.py new file mode 100644 index 000000000..19ba34a5e --- /dev/null +++ b/transforms/universal/filter/python/src/filter_test_support.py @@ -0,0 +1,135 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import os + +from data_processing.runtime import AbstractTransformLauncher +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.test_support.launch.transform_test import ( + AbstractTransformLauncherTest, +) +from filter_transform import ( + FilterPythonTransformConfiguration, + filter_columns_to_drop_cli_param, + filter_criteria_cli_param, + filter_logical_operator_cli_param, + filter_logical_operator_default, +) + + +class AbstractPythonFilterTransformTest(AbstractTransformLauncherTest): + """ + Extends the super-class to define the test data for the tests defined there. + The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. + """ + + def _get_launcher(self) -> (AbstractTransformLauncher, dict): + """ + Allow other runtimes to override with a different Launcher but share the test fixtures. + Returns: the launcher and any additional command line/configuration included in the + list of args given as the 2nd element of the fixtures. + """ + return (PythonTransformLauncher(FilterPythonTransformConfiguration()), {}) + + def _get_test_file_directory(self) -> str: + raise NotImplemented + + def get_test_transform_fixtures(self) -> list[tuple]: + fixtures = [] + basedir = self._get_test_file_directory() + basedir = os.path.abspath(os.path.join(basedir, "../test-data")) + + launcher, args = self._get_launcher() + fixtures.append( + ( + launcher, + args + | { + filter_criteria_cli_param: [ + "docq_total_words > 100 AND docq_total_words < 200", + "ibmkenlm_docq_perplex_score < 230", + ], + filter_logical_operator_cli_param: filter_logical_operator_default, + filter_columns_to_drop_cli_param: ["extra", "cluster"], + }, + os.path.join(basedir, "input"), + os.path.join(basedir, "expected", "test-and"), + ) + ) + + launcher, args = self._get_launcher() + fixtures.append( + ( + launcher, + args + | { + filter_criteria_cli_param: [ + "docq_total_words > 100 AND docq_total_words < 200", + "ibmkenlm_docq_perplex_score < 230", + ], + filter_logical_operator_cli_param: "OR", + filter_columns_to_drop_cli_param: ["extra", "cluster"], + }, + os.path.join(basedir, "input"), + os.path.join(basedir, "expected", "test-or"), + ) + ) + + # These test are also done in the python-only tests, so no real need to duplicate here. They slow down ci/cd builds. + # fixtures.append( + # ( + # RayTransformLauncher(FilterRayTransformConfiguration()), + # { + # "run_locally": True, + # filter_criteria_cli_param: [], + # filter_logical_operator_cli_param: filter_logical_operator_default, + # filter_columns_to_drop_cli_param: [], + # }, + # os.path.join(basedir, "input"), + # os.path.join(basedir, "expected", "test-default"), + # ) + # ) + # + # fixtures.append( + # ( + # RayTransformLauncher(FilterRayTransformConfiguration()), + # { + # "run_locally": True, + # filter_criteria_cli_param: [ + # "date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", + # "title LIKE 'https://%'", + # ], + # filter_logical_operator_cli_param: filter_logical_operator_default, + # filter_columns_to_drop_cli_param: [], + # }, + # os.path.join(basedir, "input"), + # os.path.join(basedir, "expected", "test-datetime-like"), + # ) + # ) + # + # fixtures.append( + # ( + # RayTransformLauncher(FilterRayTransformConfiguration()), + # { + # "run_locally": True, + # filter_criteria_cli_param: [ + # "document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')", + # ], + # filter_logical_operator_cli_param: filter_logical_operator_default, + # filter_columns_to_drop_cli_param: [], + # }, + # os.path.join(basedir, "input"), + # os.path.join(basedir, "expected", "test-in"), + # ) + # ) + + return fixtures diff --git a/transforms/universal/filter/python/src/filter_transform.py b/transforms/universal/filter/python/src/filter_transform.py new file mode 100644 index 000000000..89b4b237a --- /dev/null +++ b/transforms/universal/filter/python/src/filter_transform.py @@ -0,0 +1,203 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ + +import argparse +import ast +import json + +import duckdb +import pyarrow as pa +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.transform import AbstractTableTransform, TransformConfiguration +from data_processing.utils import CLIArgumentProvider, get_logger + + +logger = get_logger(__name__) + + +short_name = "filter" +cli_prefix = short_name + "_" + +filter_criteria_key = "criteria_list" +""" AST Key holds the list of filter criteria (in SQL WHERE clause format)""" +filter_logical_operator_key = "logical_operator" +""" Key holds the logical operator that joins filter criteria (AND or OR)""" +filter_columns_to_drop_key = "columns_to_drop" +""" AST Key holds the list of columns to drop after filtering""" + +filter_criteria_cli_param = f"{cli_prefix}{filter_criteria_key}" +""" AST Key holds the list of filter criteria (in SQL WHERE clause format)""" +filter_logical_operator_cli_param = f"{cli_prefix}{filter_logical_operator_key}" +""" Key holds the logical operator that joins filter criteria (AND or OR)""" +filter_columns_to_drop_cli_param = f"{cli_prefix}{filter_columns_to_drop_key}" +""" AST Key holds the list of columns to drop after filtering""" + +captured_arg_keys = [filter_criteria_key, filter_columns_to_drop_key] +""" The set of keys captured from the command line """ + +# defaults +filter_criteria_default = ast.literal_eval("[]") +""" The default list of filter criteria (in SQL WHERE clause format)""" +filter_logical_operator_default = "AND" +filter_columns_to_drop_default = ast.literal_eval("[]") +""" The default list of columns to drop""" + + +class FilterTransform(AbstractTableTransform): + """ + Implements filtering - select from a pyarrow.Table a set of rows that + satisfy a set of filtering criteria + """ + + def __init__(self, config: dict): + """ + Initialize based on the dictionary of configuration information. + This is generally called with configuration parsed from the CLI arguments defined + by the companion runtime, FilterTransformRuntime. If running from the Ray orchestrator, + these will be provided by that class with help from the RayMutatingDriver. + """ + + super().__init__(config) + self.filter_criteria = config.get(filter_criteria_key, filter_criteria_default) + self.logical_operator = config.get(filter_logical_operator_key, filter_logical_operator_default) + self.columns_to_drop = config.get(filter_columns_to_drop_key, filter_columns_to_drop_default) + + def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict]: + """ + This implementation filters the input table using a SQL statement and + returns the filtered table and execution stats + :param table: input table + :return: list of output tables and custom statistics + """ + + # move table under a different name, to avoid SQL query parsing error + input_table = table + total_docs = input_table.num_rows + total_columns = input_table.num_columns + total_bytes = input_table.nbytes + + # initialize the metadata dictionary + metadata = { + "total_docs_count": total_docs, + "total_bytes_count": total_bytes, + "total_columns_count": total_columns, + } + + # initialize the SQL statement used for filtering + sql_statement = "SELECT * FROM input_table" + if len(self.filter_criteria) > 0: + # populate metadata with filtering stats for each filter criterion + for filter_criterion in self.filter_criteria: + criterion_sql = f"{sql_statement} WHERE {filter_criterion}" + filter_table = duckdb.execute(criterion_sql).arrow() + docs_filtered = total_docs - filter_table.num_rows + bytes_filtered = total_bytes - filter_table.nbytes + metadata[f"docs_filtered_out_by '{filter_criterion}'"] = docs_filtered + metadata[f"bytes_filtered_out_by '{filter_criterion}'"] = bytes_filtered + + # use filtering criteria to build the SQL query for filtering + filter_clauses = [f"({x})" for x in self.filter_criteria] + where_clause = f" {self.logical_operator} ".join(filter_clauses) + sql_statement = f"{sql_statement} WHERE {where_clause}" + + # filter using SQL statement + try: + filtered_table = duckdb.execute(sql_statement).arrow() + except Exception as ex: + logger.error(f"FilterTransform::transform failed: {ex}") + raise ex + else: + filtered_table = table + + # drop any columns requested from the final result + if len(self.columns_to_drop) > 0: + filtered_table_cols_dropped = filtered_table.drop_columns(self.columns_to_drop) + else: + filtered_table_cols_dropped = filtered_table + + # add global filter stats to metadata + metadata["docs_after_filter"] = filtered_table.num_rows + metadata["columns_after_filter"] = filtered_table_cols_dropped.num_columns + metadata["bytes_after_filter"] = filtered_table.nbytes + + return [filtered_table_cols_dropped], metadata + + +class FilterTransformConfiguration(TransformConfiguration): + """ + Provides support for configuring and using the associated Transform class include + configuration with CLI args and combining of metadata. + """ + + def __init__(self): + super().__init__( + name=short_name, + transform_class=FilterTransform, + ) + + def add_input_params(self, parser: argparse.ArgumentParser) -> None: + """ + Add Transform-specific arguments to the given parser. + This will be included in a dictionary used to initialize the FilterTransform. + By convention a common prefix should be used for all mutator-specific CLI args + (e.g, noop_, pii_, etc.) + """ + + sample_sql = [ + "docq_total_words > 100 AND docq_total_words < 200", + "docq_perplex_score < 230", + "date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", + "title LIKE 'https://%%'", + "document_id IN ('doc-id-1', 'doc-id-2', 'doc-id-3')", + ] + columns_to_drop_example = ["column1", "column2"] + + parser.add_argument( + f"--{filter_criteria_cli_param}", + type=ast.literal_eval, + required=True, + default=ast.literal_eval("[]"), + help=f"list of filter criteria (in SQL WHERE clause format), for example: {json.dumps(sample_sql, indent=2, default=str)}", + ) + parser.add_argument( + f"--{filter_columns_to_drop_cli_param}", + type=ast.literal_eval, + required=False, + default=ast.literal_eval("[]"), + help=f"list of columns to drop after filtering, for example: {json.dumps(columns_to_drop_example)}", + ) + parser.add_argument( + f"--{filter_logical_operator_cli_param}", + type=str, + required=False, + default="AND", + choices=["AND", "OR"], + help="logical operator (AND or OR) that joins filter criteria", + ) + + def apply_input_params(self, args: argparse.Namespace) -> bool: + """ + Validate and apply the arguments that have been parsed + :param args: user defined arguments. + :return: True, if validate pass or False otherwise + """ + # Capture the args that are specific to this transform + captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) + self.params = self.params | captured + return True + + +class FilterPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + def __init__(self): + super().__init__(transform_config=FilterTransformConfiguration()) diff --git a/transforms/universal/filter/python/src/filter_transform_python.py b/transforms/universal/filter/python/src/filter_transform_python.py new file mode 100644 index 000000000..ff8a246bd --- /dev/null +++ b/transforms/universal/filter/python/src/filter_transform_python.py @@ -0,0 +1,31 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +from data_processing.runtime.pure_python import PythonTransformLauncher +from data_processing.runtime.pure_python.runtime_configuration import ( + PythonTransformRuntimeConfiguration, +) +from data_processing.utils import get_logger +from filter_transform import FilterTransformConfiguration + + +logger = get_logger(__name__) + + +class FilterPythonTransformConfiguration(PythonTransformRuntimeConfiguration): + def __init__(self): + super().__init__(transform_config=FilterTransformConfiguration()) + + +if __name__ == "__main__": + launcher = PythonTransformLauncher(FilterPythonTransformConfiguration()) + logger.info("Launching filtering") + launcher.launch() diff --git a/transforms/universal/filter/python/test-data/expected/test-and-local/metadata.json b/transforms/universal/filter/python/test-data/expected/test-and-local/metadata.json new file mode 100644 index 000000000..1a5e1f0bf --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-and-local/metadata.json @@ -0,0 +1,12 @@ +{ + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 78, + "bytes_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 429191, + "docs_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 53, + "bytes_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 275911, + "docs_after_filter": 11, + "bytes_after_filter": 24061, + "columns_after_filter": 23 +} diff --git a/transforms/universal/filter/python/test-data/expected/test-and-local/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-and-local/test1.parquet new file mode 100644 index 000000000..21e0ac434 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-and-local/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-and/metadata.json b/transforms/universal/filter/python/test-data/expected/test-and/metadata.json new file mode 100644 index 000000000..2f62f38e0 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-and/metadata.json @@ -0,0 +1,60 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "filter", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-03-29 18:58:19", + "end_time": "2024-03-29 18:58:20", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "criteria_list": ["docq_total_words > 100 AND docq_total_words < 200", "ibmkenlm_docq_perplex_score < 230"], + "columns_to_drop": ["extra", "cluster"], + "logical_operator": "AND", + "checkpointing": false, + "max_files": -1, + "number of workers": 5, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 20, + "gpus": 0, + "memory": 32.21153183095157, + "object_store": 16.105765914544463 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 478602, + "result_files": 1, + "result_size": 15459, + "table_processing": 0.0258023738861084, + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 78, + "bytes_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 429191, + "docs_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 53, + "bytes_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 275911, + "docs_after_filter": 11, + "bytes_after_filter": 24061, + "columns_after_filter": 23 + }, + "source": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/test-data/input", + "type": "path" + }, + "target": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/output", + "type": "path" + } +} diff --git a/transforms/universal/filter/python/test-data/expected/test-and/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-and/test1.parquet new file mode 100644 index 000000000..21e0ac434 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-and/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-datetime-like-local/metadata.json b/transforms/universal/filter/python/test-data/expected/test-datetime-like-local/metadata.json new file mode 100644 index 000000000..2c21ff96f --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-datetime-like-local/metadata.json @@ -0,0 +1,12 @@ +{ + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'date_acquired BETWEEN '2023-07-04' AND '2023-07-08''": 76, + "bytes_filtered_out_by 'date_acquired BETWEEN '2023-07-04' AND '2023-07-08''": 386230, + "docs_filtered_out_by 'title LIKE 'https://%''": 32, + "bytes_filtered_out_by 'title LIKE 'https://%''": 136638, + "docs_after_filter": 16, + "columns_after_filter": 25, + "bytes_after_filter": 60034 +} diff --git a/transforms/universal/filter/python/test-data/expected/test-datetime-like-local/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-datetime-like-local/test1.parquet new file mode 100644 index 000000000..c168fff93 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-datetime-like-local/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-datetime-like/metadata.json b/transforms/universal/filter/python/test-data/expected/test-datetime-like/metadata.json new file mode 100644 index 000000000..5bead8651 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-datetime-like/metadata.json @@ -0,0 +1,60 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "filter", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 10:59:46", + "end_time": "2024-04-02 10:59:48", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "criteria_list": ["date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", "title LIKE 'https://%'"], + "columns_to_drop": [], + "logical_operator": "AND", + "checkpointing": false, + "max_files": -1, + "number of workers": 5, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 20, + "gpus": 0, + "memory": 31.168567657470703, + "object_store": 15.584283828735352 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 478602, + "result_files": 1, + "result_size": 60034, + "table_processing": 0.020560264587402344, + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'date_acquired BETWEEN '2023-07-04' AND '2023-07-08''": 76, + "bytes_filtered_out_by 'date_acquired BETWEEN '2023-07-04' AND '2023-07-08''": 386230, + "docs_filtered_out_by 'title LIKE 'https://%''": 32, + "bytes_filtered_out_by 'title LIKE 'https://%''": 136638, + "docs_after_filter": 16, + "columns_after_filter": 25, + "bytes_after_filter": 60034 + }, + "source": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/test-data/input", + "type": "path" + }, + "target": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/output", + "type": "path" + } +} diff --git a/transforms/universal/filter/python/test-data/expected/test-datetime-like/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-datetime-like/test1.parquet new file mode 100644 index 000000000..c168fff93 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-datetime-like/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-default-local/metadata.json b/transforms/universal/filter/python/test-data/expected/test-default-local/metadata.json new file mode 100644 index 000000000..21049a486 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-default-local/metadata.json @@ -0,0 +1,8 @@ +{ + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_after_filter": 100, + "columns_after_filter": 25, + "bytes_after_filter": 478602 +} diff --git a/transforms/universal/filter/python/test-data/expected/test-default-local/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-default-local/test1.parquet new file mode 100644 index 000000000..1cc0cadb5 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-default-local/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-default/metadata.json b/transforms/universal/filter/python/test-data/expected/test-default/metadata.json new file mode 100644 index 000000000..c03d06c06 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-default/metadata.json @@ -0,0 +1,56 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "filter", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-01 14:16:37", + "end_time": "2024-04-01 14:16:38", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "criteria_list": [], + "columns_to_drop": [], + "logical_operator": "AND", + "checkpointing": false, + "max_files": -1, + "number of workers": 5, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 20, + "gpus": 0, + "memory": 31.155734254047275, + "object_store": 15.577867126092315 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 478602, + "result_files": 1, + "result_size": 478602, + "table_processing": 0.016501903533935547, + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_after_filter": 100, + "columns_after_filter": 25, + "bytes_after_filter": 478602 + }, + "source": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/test-data/input", + "type": "path" + }, + "target": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/output", + "type": "path" + } +} diff --git a/transforms/universal/filter/python/test-data/expected/test-default/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-default/test1.parquet new file mode 100644 index 000000000..1cc0cadb5 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-default/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-in-local/metadata.json b/transforms/universal/filter/python/test-data/expected/test-in-local/metadata.json new file mode 100644 index 000000000..3d97b9461 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-in-local/metadata.json @@ -0,0 +1,10 @@ +{ + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')'": 97, + "bytes_filtered_out_by 'document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')'": 468010, + "docs_after_filter": 3, + "columns_after_filter": 25, + "bytes_after_filter": 10592 +} diff --git a/transforms/universal/filter/python/test-data/expected/test-in-local/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-in-local/test1.parquet new file mode 100644 index 000000000..cf1e26c5d Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-in-local/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-in/metadata.json b/transforms/universal/filter/python/test-data/expected/test-in/metadata.json new file mode 100644 index 000000000..2611176b2 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-in/metadata.json @@ -0,0 +1,60 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "filter", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 12:01:55", + "end_time": "2024-04-02 12:01:59", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "criteria_list": [ + "document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')" + ], + "columns_to_drop": [], + "logical_operator": "AND", + "checkpointing": false, + "max_files": -1, + "number of workers": 5, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 20, + "gpus": 0, + "memory": 31.03808670118451, + "object_store": 15.519043349660933 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 478602, + "result_files": 1, + "result_size": 10592, + "table_processing": 0.03538918495178223, + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')'": 97, + "bytes_filtered_out_by 'document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')'": 468010, + "docs_after_filter": 3, + "columns_after_filter": 25, + "bytes_after_filter": 10592 + }, + "source": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/test-data/input", + "type": "path" + }, + "target": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/output", + "type": "path" + } +} diff --git a/transforms/universal/filter/python/test-data/expected/test-in/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-in/test1.parquet new file mode 100644 index 000000000..cf1e26c5d Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-in/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-or-local/metadata.json b/transforms/universal/filter/python/test-data/expected/test-or-local/metadata.json new file mode 100644 index 000000000..6712f8541 --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-or-local/metadata.json @@ -0,0 +1,12 @@ +{ + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 78, + "bytes_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 429191, + "docs_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 53, + "bytes_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 275911, + "docs_after_filter": 58, + "columns_after_filter": 23, + "bytes_after_filter": 228072 +} diff --git a/transforms/universal/filter/python/test-data/expected/test-or-local/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-or-local/test1.parquet new file mode 100644 index 000000000..7095122f9 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-or-local/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/expected/test-or/metadata.json b/transforms/universal/filter/python/test-data/expected/test-or/metadata.json new file mode 100644 index 000000000..d9bb9e40d --- /dev/null +++ b/transforms/universal/filter/python/test-data/expected/test-or/metadata.json @@ -0,0 +1,60 @@ +{ + "pipeline": "pipeline_id", + "job details": { + "job category": "preprocessing", + "job name": "filter", + "job type": "ray", + "job id": "job_id", + "start_time": "2024-04-02 10:40:39", + "end_time": "2024-04-02 10:40:41", + "status": "success" + }, + "code": { + "github": "github", + "commit_hash": "12345", + "path": "path" + }, + "job_input_params": { + "criteria_list": ["docq_total_words > 100 AND docq_total_words < 200", "ibmkenlm_docq_perplex_score < 230"], + "columns_to_drop": ["extra", "cluster"], + "logical_operator": "OR", + "checkpointing": false, + "max_files": -1, + "number of workers": 5, + "worker options": { + "num_cpus": 0.8 + }, + "actor creation delay": 0 + }, + "execution_stats": { + "cpus": 20, + "gpus": 0, + "memory": 31.129891205579042, + "object_store": 15.564945601858199 + }, + "job_output_stats": { + "source_files": 1, + "source_size": 478602, + "result_files": 1, + "result_size": 183767, + "table_processing": 0.02460765838623047, + "total_docs_count": 100, + "total_bytes_count": 478602, + "total_columns_count": 25, + "docs_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 78, + "bytes_filtered_out_by 'docq_total_words > 100 AND docq_total_words < 200'": 429191, + "docs_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 53, + "bytes_filtered_out_by 'ibmkenlm_docq_perplex_score < 230'": 275911, + "docs_after_filter": 58, + "columns_after_filter": 23, + "bytes_after_filter": 228072 + }, + "source": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/test-data/input", + "type": "path" + }, + "target": { + "name": "/home/cma/de/fm-data-engineering/transforms/universal/filtering/output", + "type": "path" + } +} diff --git a/transforms/universal/filter/python/test-data/expected/test-or/test1.parquet b/transforms/universal/filter/python/test-data/expected/test-or/test1.parquet new file mode 100644 index 000000000..7095122f9 Binary files /dev/null and b/transforms/universal/filter/python/test-data/expected/test-or/test1.parquet differ diff --git a/transforms/universal/filter/python/test-data/input/test1.parquet b/transforms/universal/filter/python/test-data/input/test1.parquet new file mode 100644 index 000000000..54cdd7f13 Binary files /dev/null and b/transforms/universal/filter/python/test-data/input/test1.parquet differ diff --git a/transforms/universal/filter/ray/test/test_filter.py b/transforms/universal/filter/python/test/test_filter.py similarity index 99% rename from transforms/universal/filter/ray/test/test_filter.py rename to transforms/universal/filter/python/test/test_filter.py index 46fa914cb..ed66fd298 100644 --- a/transforms/universal/filter/ray/test/test_filter.py +++ b/transforms/universal/filter/python/test/test_filter.py @@ -17,7 +17,7 @@ import pyarrow.parquet as pq from data_processing.test_support.transform import AbstractTransformTest from data_processing.transform import get_transform_config -from filter_transform_ray import ( +from filter_transform import ( FilterTransform, FilterTransformConfiguration, filter_columns_to_drop_cli_param, diff --git a/transforms/universal/filter/python/test/test_filter_python.py b/transforms/universal/filter/python/test/test_filter_python.py new file mode 100644 index 000000000..c87234cba --- /dev/null +++ b/transforms/universal/filter/python/test/test_filter_python.py @@ -0,0 +1,20 @@ +# (C) Copyright IBM Corp. 2024. +# Licensed under the Apache License, Version 2.0 (the “License”); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# http://www.apache.org/licenses/LICENSE-2.0 +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an “AS IS” BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +################################################################################ +import os + +from filter_test_support import AbstractPythonFilterTransformTest + + +class TestPythonFilterTransform(AbstractPythonFilterTransformTest): + def _get_test_file_directory(self) -> str: + dir = os.path.abspath(os.path.dirname(__file__)) + return dir diff --git a/transforms/universal/filter/ray/Dockerfile b/transforms/universal/filter/ray/Dockerfile index 606777485..587dc56aa 100644 --- a/transforms/universal/filter/ray/Dockerfile +++ b/transforms/universal/filter/ray/Dockerfile @@ -9,9 +9,15 @@ COPY --chown=ray:users data-processing-lib-python/ data-processing-lib-python/ RUN cd data-processing-lib-python && pip install --no-cache-dir -e . COPY --chown=ray:users data-processing-lib-ray/ data-processing-lib-ray/ RUN cd data-processing-lib-ray && pip install --no-cache-dir -e . +COPY --chown=ray:users python-transform/ python-transform +RUN cd python-transform && pip install --no-cache-dir -e . -COPY requirements.txt requirements.txt -RUN pip install --no-cache-dir -r requirements.txt +#COPY requirements.txt requirements.txt +#RUN pip install --no-cache-dir -r requirements.txt + +COPY --chown=ray:users src/ src/ +COPY --chown=ray:users pyproject.toml pyproject.toml +RUN pip install --no-cache-dir -e . # copy source data COPY ./src/filter_transform_ray.py . diff --git a/transforms/universal/filter/ray/pyproject.toml b/transforms/universal/filter/ray/pyproject.toml new file mode 100644 index 000000000..7e26492a6 --- /dev/null +++ b/transforms/universal/filter/ray/pyproject.toml @@ -0,0 +1,46 @@ +[project] +name = "dpk_filter_transform_ray" +version = "0.4.0.dev6" +requires-python = ">=3.10" +description = "Filter Ray Transform" +license = {text = "Apache-2.0"} +readme = {file = "README.md", content-type = "text/markdown"} +authors = [ + { name = "David Wood", email = "dawood@us.ibm.com" }, + { name = "Boris Lublinsky", email = "blublinsky@ibm.com" }, +] +dependencies = [ + "dpk-filter-transform-python==0.9.0.dev6", + "data-prep-toolkit-ray==0.2.0.dev6", +] + +[build-system] +requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"] +build-backend = "setuptools.build_meta" + +[project.optional-dependencies] +dev = [ + "twine", + "pytest>=7.3.2", + "pytest-dotenv>=0.5.2", + "pytest-env>=1.0.0", + "pre-commit>=3.3.2", + "pytest-cov>=4.1.0", + "pytest-mock>=3.10.0", + "moto==5.0.5", + "markupsafe==2.0.1", +] + +[options] +package_dir = ["src","test"] + +[options.packages.find] +where = ["src/"] + +[tool.pytest.ini_options] +# Currently we use low coverage since we have to run tests separately (see makefile) +#addopts = "--cov --cov-report term-missing --cov-fail-under 25" +markers = ["unit: unit tests", "integration: integration tests"] + +[tool.coverage.run] +include = ["src/*"] diff --git a/transforms/universal/filter/ray/requirements.txt b/transforms/universal/filter/ray/requirements.txt deleted file mode 100644 index e0566390e..000000000 --- a/transforms/universal/filter/ray/requirements.txt +++ /dev/null @@ -1,4 +0,0 @@ -# transform runtime -#data-prep-kit==0.1.2 -# filter -duckdb==0.10.1 diff --git a/transforms/universal/filter/ray/src/filter_local_ray.py b/transforms/universal/filter/ray/src/filter_local_ray.py index b013b366f..7d07bec93 100644 --- a/transforms/universal/filter/ray/src/filter_local_ray.py +++ b/transforms/universal/filter/ray/src/filter_local_ray.py @@ -15,12 +15,12 @@ from data_processing.utils import ParamsUtils from data_processing_ray.runtime.ray import RayTransformLauncher -from filter_transform_ray import ( - FilterRayTransformConfiguration, +from filter_transform import ( filter_columns_to_drop_cli_param, filter_criteria_cli_param, filter_logical_operator_cli_param, ) +from filter_transform_ray import FilterRayTransformConfiguration # create parameters diff --git a/transforms/universal/filter/ray/src/filter_s3_ray.py b/transforms/universal/filter/ray/src/filter_s3_ray.py index bc1e42281..d14582b21 100644 --- a/transforms/universal/filter/ray/src/filter_s3_ray.py +++ b/transforms/universal/filter/ray/src/filter_s3_ray.py @@ -14,12 +14,12 @@ from data_processing.utils import ParamsUtils from data_processing_ray.runtime.ray import RayTransformLauncher -from filter_transform_ray import ( - FilterRayTransformConfiguration, +from filter_transform import ( filter_columns_to_drop_cli_param, filter_criteria_cli_param, filter_logical_operator_cli_param, ) +from filter_transform_ray import FilterRayTransformConfiguration # create parameters diff --git a/transforms/universal/filter/ray/src/filter_transform_ray.py b/transforms/universal/filter/ray/src/filter_transform_ray.py index 7468bcdb4..6a84fad1c 100644 --- a/transforms/universal/filter/ray/src/filter_transform_ray.py +++ b/transforms/universal/filter/ray/src/filter_transform_ray.py @@ -10,203 +10,17 @@ # limitations under the License. ################################################################################ -import argparse -import ast -import json - -import duckdb -import pyarrow as pa -from data_processing.runtime.pure_python.runtime_configuration import ( - PythonTransformRuntimeConfiguration, -) -from data_processing.transform import AbstractTableTransform, TransformConfiguration -from data_processing.utils import CLIArgumentProvider, get_logger +from data_processing.utils import get_logger from data_processing_ray.runtime.ray import RayTransformLauncher from data_processing_ray.runtime.ray.runtime_configuration import ( RayTransformRuntimeConfiguration, ) +from filter_transform import FilterTransformConfiguration logger = get_logger(__name__) -short_name = "filter" -cli_prefix = short_name + "_" - -filter_criteria_key = "criteria_list" -""" AST Key holds the list of filter criteria (in SQL WHERE clause format)""" -filter_logical_operator_key = "logical_operator" -""" Key holds the logical operator that joins filter criteria (AND or OR)""" -filter_columns_to_drop_key = "columns_to_drop" -""" AST Key holds the list of columns to drop after filtering""" - -filter_criteria_cli_param = f"{cli_prefix}{filter_criteria_key}" -""" AST Key holds the list of filter criteria (in SQL WHERE clause format)""" -filter_logical_operator_cli_param = f"{cli_prefix}{filter_logical_operator_key}" -""" Key holds the logical operator that joins filter criteria (AND or OR)""" -filter_columns_to_drop_cli_param = f"{cli_prefix}{filter_columns_to_drop_key}" -""" AST Key holds the list of columns to drop after filtering""" - -captured_arg_keys = [filter_criteria_key, filter_columns_to_drop_key] -""" The set of keys captured from the command line """ - -# defaults -filter_criteria_default = ast.literal_eval("[]") -""" The default list of filter criteria (in SQL WHERE clause format)""" -filter_logical_operator_default = "AND" -filter_columns_to_drop_default = ast.literal_eval("[]") -""" The default list of columns to drop""" - - -class FilterTransform(AbstractTableTransform): - """ - Implements filtering - select from a pyarrow.Table a set of rows that - satisfy a set of filtering criteria - """ - - def __init__(self, config: dict): - """ - Initialize based on the dictionary of configuration information. - This is generally called with configuration parsed from the CLI arguments defined - by the companion runtime, FilterTransformRuntime. If running from the Ray orchestrator, - these will be provided by that class with help from the RayMutatingDriver. - """ - - super().__init__(config) - self.filter_criteria = config.get(filter_criteria_key, filter_criteria_default) - self.logical_operator = config.get(filter_logical_operator_key, filter_logical_operator_default) - self.columns_to_drop = config.get(filter_columns_to_drop_key, filter_columns_to_drop_default) - - def transform(self, table: pa.Table) -> tuple[list[pa.Table], dict]: - """ - This implementation filters the input table using a SQL statement and - returns the filtered table and execution stats - :param table: input table - :return: list of output tables and custom statistics - """ - - # move table under a different name, to avoid SQL query parsing error - input_table = table - total_docs = input_table.num_rows - total_columns = input_table.num_columns - total_bytes = input_table.nbytes - - # initialize the metadata dictionary - metadata = { - "total_docs_count": total_docs, - "total_bytes_count": total_bytes, - "total_columns_count": total_columns, - } - - # initialize the SQL statement used for filtering - sql_statement = "SELECT * FROM input_table" - if len(self.filter_criteria) > 0: - # populate metadata with filtering stats for each filter criterion - for filter_criterion in self.filter_criteria: - criterion_sql = f"{sql_statement} WHERE {filter_criterion}" - filter_table = duckdb.execute(criterion_sql).arrow() - docs_filtered = total_docs - filter_table.num_rows - bytes_filtered = total_bytes - filter_table.nbytes - metadata[f"docs_filtered_out_by '{filter_criterion}'"] = docs_filtered - metadata[f"bytes_filtered_out_by '{filter_criterion}'"] = bytes_filtered - - # use filtering criteria to build the SQL query for filtering - filter_clauses = [f"({x})" for x in self.filter_criteria] - where_clause = f" {self.logical_operator} ".join(filter_clauses) - sql_statement = f"{sql_statement} WHERE {where_clause}" - - # filter using SQL statement - try: - filtered_table = duckdb.execute(sql_statement).arrow() - except Exception as ex: - logger.error(f"FilterTransform::transform failed: {ex}") - raise ex - else: - filtered_table = table - - # drop any columns requested from the final result - if len(self.columns_to_drop) > 0: - filtered_table_cols_dropped = filtered_table.drop_columns(self.columns_to_drop) - else: - filtered_table_cols_dropped = filtered_table - - # add global filter stats to metadata - metadata["docs_after_filter"] = filtered_table.num_rows - metadata["columns_after_filter"] = filtered_table_cols_dropped.num_columns - metadata["bytes_after_filter"] = filtered_table.nbytes - - return [filtered_table_cols_dropped], metadata - - -class FilterTransformConfiguration(TransformConfiguration): - """ - Provides support for configuring and using the associated Transform class include - configuration with CLI args and combining of metadata. - """ - - def __init__(self): - super().__init__( - name=short_name, - transform_class=FilterTransform, - ) - - def add_input_params(self, parser: argparse.ArgumentParser) -> None: - """ - Add Transform-specific arguments to the given parser. - This will be included in a dictionary used to initialize the FilterTransform. - By convention a common prefix should be used for all mutator-specific CLI args - (e.g, noop_, pii_, etc.) - """ - - sample_sql = [ - "docq_total_words > 100 AND docq_total_words < 200", - "docq_perplex_score < 230", - "date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", - "title LIKE 'https://%%'", - "document_id IN ('doc-id-1', 'doc-id-2', 'doc-id-3')", - ] - columns_to_drop_example = ["column1", "column2"] - - parser.add_argument( - f"--{filter_criteria_cli_param}", - type=ast.literal_eval, - required=True, - default=ast.literal_eval("[]"), - help=f"list of filter criteria (in SQL WHERE clause format), for example: {json.dumps(sample_sql, indent=2, default=str)}", - ) - parser.add_argument( - f"--{filter_columns_to_drop_cli_param}", - type=ast.literal_eval, - required=False, - default=ast.literal_eval("[]"), - help=f"list of columns to drop after filtering, for example: {json.dumps(columns_to_drop_example)}", - ) - parser.add_argument( - f"--{filter_logical_operator_cli_param}", - type=str, - required=False, - default="AND", - choices=["AND", "OR"], - help="logical operator (AND or OR) that joins filter criteria", - ) - - def apply_input_params(self, args: argparse.Namespace) -> bool: - """ - Validate and apply the arguments that have been parsed - :param args: user defined arguments. - :return: True, if validate pass or False otherwise - """ - # Capture the args that are specific to this transform - captured = CLIArgumentProvider.capture_parameters(args, cli_prefix, False) - self.params = self.params | captured - return True - - -class FilterPythonTransformConfiguration(PythonTransformRuntimeConfiguration): - def __init__(self): - super().__init__(transform_config=FilterTransformConfiguration()) - - class FilterRayTransformConfiguration(RayTransformRuntimeConfiguration): def __init__(self): super().__init__(transform_config=FilterTransformConfiguration()) diff --git a/transforms/universal/filter/ray/test/test_filter_ray.py b/transforms/universal/filter/ray/test/test_filter_ray.py index 3c8062fa4..1bba524b5 100644 --- a/transforms/universal/filter/ray/test/test_filter_ray.py +++ b/transforms/universal/filter/ray/test/test_filter_ray.py @@ -12,109 +12,21 @@ import os -from data_processing.test_support.launch.transform_test import ( - AbstractTransformLauncherTest, -) +from data_processing.runtime import AbstractTransformLauncher from data_processing_ray.runtime.ray import RayTransformLauncher -from filter_transform_ray import ( - FilterRayTransformConfiguration, - filter_columns_to_drop_cli_param, - filter_criteria_cli_param, - filter_logical_operator_cli_param, - filter_logical_operator_default, -) +from filter_test_support import AbstractPythonFilterTransformTest +from filter_transform_ray import FilterRayTransformConfiguration -class TestRayFilterTransform(AbstractTransformLauncherTest): +class TestPythonFilterTransform(AbstractPythonFilterTransformTest): """ - Extends the super-class to define the test data for the tests defined there. + Extends the Python super-class to redefine the launcher as a RayTransformLauncher. The name of this class MUST begin with the word Test so that pytest recognizes it as a test class. """ - def get_test_transform_fixtures(self) -> list[tuple]: - fixtures = [] - basedir = os.path.abspath(os.path.join(os.path.dirname(__file__), "../test-data")) + def _get_test_file_directory(self) -> str: + dir = os.path.abspath(os.path.dirname(__file__)) + return dir - fixtures.append( - ( - RayTransformLauncher(FilterRayTransformConfiguration()), - { - "run_locally": True, - filter_criteria_cli_param: [ - "docq_total_words > 100 AND docq_total_words < 200", - "ibmkenlm_docq_perplex_score < 230", - ], - filter_logical_operator_cli_param: filter_logical_operator_default, - filter_columns_to_drop_cli_param: ["extra", "cluster"], - }, - os.path.join(basedir, "input"), - os.path.join(basedir, "expected", "test-and"), - ) - ) - - fixtures.append( - ( - RayTransformLauncher(FilterRayTransformConfiguration()), - { - "run_locally": True, - filter_criteria_cli_param: [ - "docq_total_words > 100 AND docq_total_words < 200", - "ibmkenlm_docq_perplex_score < 230", - ], - filter_logical_operator_cli_param: "OR", - filter_columns_to_drop_cli_param: ["extra", "cluster"], - }, - os.path.join(basedir, "input"), - os.path.join(basedir, "expected", "test-or"), - ) - ) - - # These test are also done in the python-only tests, so no real need to duplicate here. They slow down ci/cd builds. - # fixtures.append( - # ( - # RayTransformLauncher(FilterRayTransformConfiguration()), - # { - # "run_locally": True, - # filter_criteria_cli_param: [], - # filter_logical_operator_cli_param: filter_logical_operator_default, - # filter_columns_to_drop_cli_param: [], - # }, - # os.path.join(basedir, "input"), - # os.path.join(basedir, "expected", "test-default"), - # ) - # ) - # - # fixtures.append( - # ( - # RayTransformLauncher(FilterRayTransformConfiguration()), - # { - # "run_locally": True, - # filter_criteria_cli_param: [ - # "date_acquired BETWEEN '2023-07-04' AND '2023-07-08'", - # "title LIKE 'https://%'", - # ], - # filter_logical_operator_cli_param: filter_logical_operator_default, - # filter_columns_to_drop_cli_param: [], - # }, - # os.path.join(basedir, "input"), - # os.path.join(basedir, "expected", "test-datetime-like"), - # ) - # ) - # - # fixtures.append( - # ( - # RayTransformLauncher(FilterRayTransformConfiguration()), - # { - # "run_locally": True, - # filter_criteria_cli_param: [ - # "document IN ('CC-MAIN-20190221132217-20190221154217-00305.warc.gz', 'CC-MAIN-20200528232803-20200529022803-00154.warc.gz', 'CC-MAIN-20190617103006-20190617125006-00025.warc.gz')", - # ], - # filter_logical_operator_cli_param: filter_logical_operator_default, - # filter_columns_to_drop_cli_param: [], - # }, - # os.path.join(basedir, "input"), - # os.path.join(basedir, "expected", "test-in"), - # ) - # ) - - return fixtures + def _get_launcher(self) -> (AbstractTransformLauncher, dict): + return (RayTransformLauncher(FilterRayTransformConfiguration()), {"run_locally": True}) diff --git a/transforms/universal/noop/python/Dockerfile b/transforms/universal/noop/python/Dockerfile index 684ff141a..13a0e42f5 100644 --- a/transforms/universal/noop/python/Dockerfile +++ b/transforms/universal/noop/python/Dockerfile @@ -26,6 +26,7 @@ RUN pip install --no-cache-dir -e . # copy source data COPY ./src/noop_transform.py . +COPY ./src/noop_transform_python.py . COPY ./src/noop_local.py local/ # copy test