Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fuzzy dedup #699

Open
wants to merge 86 commits into
base: dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 79 commits
Commits
Show all changes
86 commits
Select commit Hold shift + click to select a range
47f4526
added folder_transform
blublinsky Oct 10, 2024
5fd20a1
added folder_transform
blublinsky Oct 10, 2024
38b4725
added folder_transform
blublinsky Oct 10, 2024
a3abf21
added folder_transform
blublinsky Oct 11, 2024
d93a06c
Merge branch 'folder_transform' into fuzzy-dedup
cmadam Oct 11, 2024
af8475d
Fuzzy dedup pure python implementation
cmadam Oct 11, 2024
7f9b503
Fuzzy dedup spark implementation
cmadam Oct 11, 2024
3349521
added folder_transform
blublinsky Oct 10, 2024
0553edf
added folder_transform
blublinsky Oct 10, 2024
a53412e
added folder_transform
blublinsky Oct 10, 2024
9c3ace7
added folder_transform
blublinsky Oct 11, 2024
7091a2e
added noop testing
blublinsky Oct 11, 2024
680c78a
Fuzzy dedup ray implementation
Kibnelson Oct 11, 2024
0c31dc0
Fixed bug in ray to distribute docs to remove file to all workers
Kibnelson Oct 11, 2024
47d8fdf
Merge with updated folder_transform branch
cmadam Oct 11, 2024
6ee6695
added folder_transform
blublinsky Oct 10, 2024
e7260ba
added folder_transform
blublinsky Oct 10, 2024
5856f3f
added folder_transform
blublinsky Oct 10, 2024
6519686
added folder_transform
blublinsky Oct 11, 2024
c728224
added noop testing
blublinsky Oct 11, 2024
6e2863a
added noop Ray testing
blublinsky Oct 13, 2024
3c9be57
added noop Spark testing
blublinsky Oct 13, 2024
371a712
more data access simplifications
blublinsky Oct 13, 2024
680f313
Renamed/refactored fuzzy dedup python orchestrator
cmadam Oct 14, 2024
c29d3bf
Rewrote cluster_analysis_transform as a folder_transform
cmadam Oct 14, 2024
aada59e
Wrote get_duplicate_list_transform as a folder_transform
cmadam Oct 14, 2024
2019d56
Added text preprocessing
cmadam Oct 14, 2024
9362803
Added python test data
cmadam Oct 14, 2024
ddbd602
Added project admin tools
cmadam Oct 14, 2024
4dac838
Bug fix
cmadam Oct 14, 2024
fbc2b58
Add op modes for data cleaning: filter (non)dupl and annotate
cmadam Oct 14, 2024
828ec41
Python and spark transforms for cluster analysis
cmadam Oct 14, 2024
a20fe76
Merge folder_transform
cmadam Oct 14, 2024
bc6b81c
Sync spark Makefile with dpk
cmadam Oct 14, 2024
4d486d3
Spark orchestration for fuzzy dedup
cmadam Oct 14, 2024
19e0844
Bug fix
cmadam Oct 14, 2024
2ce3d8c
Added spark test data
cmadam Oct 14, 2024
5e4022c
Setting input test data for ray
cmadam Oct 14, 2024
c14bdaa
Bug fix
cmadam Oct 14, 2024
1215ac5
Ray orchestration for fuzzy dedup
cmadam Oct 14, 2024
5966972
Merge with the latest dev branch
cmadam Oct 17, 2024
caf79a3
Added python test with expected data files
Kibnelson Oct 18, 2024
8fd9676
Added python tests and expected outputs for the tests
Kibnelson Oct 18, 2024
d07a23a
Update versions in pyproject.toml
cmadam Oct 18, 2024
ec2168c
Updated ray test data
cmadam Oct 18, 2024
fd0f52c
Updated ray tests
cmadam Oct 18, 2024
954dffd
Spark test data and tests
cmadam Oct 18, 2024
77d85fd
Adjust to file naming changes
cmadam Oct 18, 2024
310d813
Create python Dockerfile
cmadam Oct 18, 2024
7d97cef
Ray bug fixes
cmadam Oct 19, 2024
87902ac
Fix spark image to support testing
cmadam Oct 19, 2024
c847924
Removed file copy utils
cmadam Oct 25, 2024
ba9b07c
Add fdedup to kfp black list until we get kfp integration
cmadam Oct 25, 2024
f187948
Freeze polars version to 1.9.0 for now
cmadam Oct 25, 2024
84b9104
Fixed duplicate_list_location bug
cmadam Oct 25, 2024
08ff006
Allow input of s3 credentials on command line
cmadam Oct 25, 2024
d0c6f8a
Added license
cmadam Oct 25, 2024
63e11eb
Use str2bool for use_s3 argument
cmadam Oct 25, 2024
bf550fd
Add overwrite output path argument
cmadam Oct 29, 2024
272be36
Add separate data access objects for reading and writing files
cmadam Oct 30, 2024
ee411e1
Define 2 data access objects for data and duplicate list
cmadam Oct 31, 2024
3a30501
get fdedeup/python test-image to pass, and clean up req in ray version
daw3rd Nov 1, 2024
80ae8df
Added an option to run either word or char shingle
Kibnelson Nov 8, 2024
c531809
Use captured_arg_keys to list the arguments of each transform
cmadam Nov 10, 2024
fe43110
Ray implementation for get_duplicate_list_transform
cmadam Nov 10, 2024
82a1860
Bug fix: jaccard threshold type must be float
cmadam Nov 10, 2024
61ed40f
Get fuzzy dedup ray image ready for kfp
cmadam Nov 10, 2024
a8ede00
kfp implementation for fuzzy dedup
cmadam Nov 10, 2024
524236d
Merge word/char shingles
cmadam Nov 10, 2024
96edea4
Added params to captured_arg_keys
cmadam Nov 11, 2024
24163af
Add shingle type option (word or char) to kfp
cmadam Nov 11, 2024
3a43c3d
Utility to calculate number of bands and length of a band
cmadam Nov 13, 2024
83c05f9
Merge branch 'dev' into fuzzy-dedup
cmadam Nov 13, 2024
2f61be7
Set correct version for pyproject
cmadam Nov 13, 2024
cd5eb05
Change the name of the utils Makefile
cmadam Nov 13, 2024
6cc18cd
Copy whl file to the context folder
cmadam Nov 14, 2024
9f33620
Use keyword args in compute_common_params
cmadam Nov 14, 2024
528457c
Use dynamic dependencies
cmadam Nov 14, 2024
fffb630
Add FIXME for https://github.com/kubeflow/pipelines/issues/10914
cmadam Nov 14, 2024
5547d7f
Add FIXME for https://github.com/kubeflow/pipelines/issues/10914
cmadam Nov 14, 2024
09e56e0
Remove pyproject.toml dependencies
cmadam Nov 14, 2024
d3eac50
Fix bug in number of actors calculation
cmadam Nov 15, 2024
fa5959b
Cleanup main entry point and local implementation of python transforms
cmadam Nov 15, 2024
c4f889b
Cleanup main entry point and local implementation of ray transforms
cmadam Nov 15, 2024
f3c5be0
Cleanup main entry point and local implementation of spark transforms
cmadam Nov 15, 2024
4941d5b
Cleanup main entry point and local implementation of spark transforms
cmadam Nov 15, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion scripts/check-workflows.sh
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ if [ ! -d transforms ]; then
echo Please run this script from the top of the repository
exit 1
fi
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering header_cleanser"
KFP_BLACK_LIST="doc_chunk pdf2parquet pii_redactor text_encoder license_select repo_level_ordering header_cleanser fdedup"
while [ $# -ne 0 ]; do
case $1 in
-show-kfp-black-list) echo $KFP_BLACK_LIST; exit 0;
Expand Down
328 changes: 246 additions & 82 deletions transforms/universal/fdedup/kfp_ray/fdedup_wf.py

Large diffs are not rendered by default.

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions transforms/universal/fdedup/python/.dockerignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
venv/
44 changes: 44 additions & 0 deletions transforms/universal/fdedup/python/Dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
FROM docker.io/python:3.10.14-slim-bullseye

RUN pip install --upgrade --no-cache-dir pip

# install pytest
RUN pip install --no-cache-dir pytest
ARG DPK_WHEEL_FILE_NAME

# Create a user and use it to run the transform
RUN useradd -ms /bin/bash dpk
USER dpk
WORKDIR /home/dpk

# Copy and install data processing libraries
# These are expected to be placed in the docker context before this is run (see the make image).
COPY --chown=dpk:root data-processing-dist data-processing-dist
RUN pip install data-processing-dist/${DPK_WHEEL_FILE_NAME}

COPY --chown=dpk:root src/ src/
COPY --chown=dpk:root pyproject.toml pyproject.toml
COPY --chown=dpk:root README.md README.md
COPY --chown=dpk:root requirements.txt requirements.txt

RUN pip install --no-cache-dir -e .

# copy source data
COPY src/ src/

# copy source data
COPY ./src/signature_calc_transform_python.py fdedup_transform_python.py
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Questionable practice!!! Can we find an alternative ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

on renaming, not the move

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And why is signature_calc_transform the main entry point? Shouldn't it be fuzzy_dedup_python.py?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in commit fa5959b

COPY ./src/signature_calc_local_python.py local/

# copy test
COPY test/ test/
COPY test-data/ test-data/

# Set environment
ENV PYTHONPATH /home/dpk

# Put these at the end since they seem to upset the docker cache.
ARG BUILD_DATE
ARG GIT_COMMIT
LABEL build-date=$BUILD_DATE
LABEL git-commit=$GIT_COMMIT
64 changes: 64 additions & 0 deletions transforms/universal/fdedup/python/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Define the root of the local git clone for the common rules to be able
# know where they are running from.
REPOROOT=../../../..

# Set this, before including .make.defaults, to
# 1 if requirements reference the latest code in the data processing library
# in this repo (that is not yet published to pypi). This is the default setting.
# 0 if the transforms DPK dependencies are on wheels published to
# pypi (e.g. data-prep-toolkit=0.2.1)
#USE_REPO_LIB_SRC=1

# Include a library of common .transform.* targets which most
# transforms should be able to reuse. However, feel free
# to override/redefine the rules below.
include $(REPOROOT)/transforms/.make.transforms

# Include the common configuration for this transform
include ../transform.config

venv:: .transforms.python-venv

test:: .transforms.python-test

clean:: .transforms.clean

image:: .transforms.python-image

test-src:: .transforms.test-src

setup:: .transforms.setup

build:: build-dist image

publish: publish-image

publish-image:: .transforms.publish-image-python

setup:: .transforms.setup

# distribution versions is the same as image version.
set-versions:
$(MAKE) TRANSFORM_PYTHON_VERSION=$(FDEDUP_PYTHON_VERSION) TOML_VERSION=$(FDEDUP_PYTHON_VERSION) .transforms.set-versions

build-dist:: .defaults.build-dist

publish-dist:: .defaults.publish-dist

test-image:: .transforms.python-test-image

run-cli-sample: .transforms.run-cli-python-sample

run-local-sample: .transforms.run-local-sample

run-local-python-sample: .transforms.run-local-python-sample

#run-s3-ray-sample: .transforms.run-s3-ray-sample

minio-start: .minio-start

kind-load-image:: .transforms.kind-load-image

docker-load-image: .defaults.docker-load-image

docker-save-image: .defaults.docker-save-image
11 changes: 11 additions & 0 deletions transforms/universal/fdedup/python/README.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Some more words here to provide a gentle introduction would be nice. In addition, you need to describe all of the configuration keys. See doc_chunk for a template.

Copy link
Collaborator

@cmadam cmadam Nov 14, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am still working on the documentation.

Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
# Fuzzy Dedup

Please see the set of
[transform project conventions](../../../README.md)
for details on general project conventions, transform configuration,
testing and IDE set up.

## Summary

The basic implementation of the fuzzy dedup is based on [MinHash](https://en.wikipedia.org/wiki/MinHash). Also see
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgive me if this is a duplicate comment as I thought I had submitted once already, but...

  1. A more gentle introduction to what the transform does instead of only providing the links.
  2. The set of configuration keys should be documented. See doc_chunk for a nice example.
  3. This file needs to be linked from a ../README.md, which now only points to ray and python.

[here](http://infolab.stanford.edu/~ullman/mmds/ch3n.pdf) for more details.
Original file line number Diff line number Diff line change
@@ -1,31 +1,21 @@
[project]
name = "data_prep_toolkit_spark"
name = "dpk_fdedup_transform_python"
version = "0.2.2.dev2"
keywords = ["data", "data preprocessing", "data preparation", "llm", "generative", "ai", "fine-tuning", "llmapps" ]
requires-python = ">=3.10,<3.13"
description = "Data Preparation Toolkit Library for Spark"
description = "Fuzzy Dedup Transform for Python"
license = {text = "Apache-2.0"}
readme = {file = "README.md", content-type = "text/markdown"}
authors = [
{ name = "David Wood", email = "[email protected].com" },
{ name = "Boris Lublinsky", email = "blublinsk@ibm.com" },
{ name = "Nelson Bore", email = "k.nelsonbore@gmail.com" },
{ name = "Constantin Adam", email = "cmadam@us.ibm.com" },
]
dependencies = [
"data-prep-toolkit==0.2.2.dev2",
"pyspark>=3.5.2",
"psutil>=6.0.0",
"PyYAML>=6.0.2"
]

[project_urls]
Repository = "https://github.com/IBM/data-prep-kit"
Issues = "https://github.com/IBM/data-prep-kit/issues"
Documentation = "https://ibm.github.io/data-prep-kit/"
"Transform project" = "https://github.com/IBM/data-prep-kit/tree/dev/transforms/universal/noop"
dynamic = ["dependencies"]

[build-system]
requires = ["setuptools>=68.0.0", "wheel", "setuptools_scm[toml]>=7.1.0"]
build-backend = "setuptools.build_meta"
[tool.setuptools.dynamic]
dependencies = {file = ["requirements.txt"]}

[project.optional-dependencies]
dev = [
Expand All @@ -44,7 +34,7 @@ dev = [
package_dir = ["src","test"]

[options.packages.find]
where = ["src/data_processing_spark"]
where = ["src/"]

[tool.pytest.ini_options]
# Currently we use low coverage since we have to run tests separately (see makefile)
Expand Down
10 changes: 10 additions & 0 deletions transforms/universal/fdedup/python/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
data-prep-toolkit==0.2.2.dev2
pyyaml>=6.0.2
boto3>=1.34.69
kubernetes>=30.1.0
polars==1.9.0
disjoint-set>=0.8.0
scipy>=1.14.1, <2.0.0
numpy<1.29.0
sentencepiece>=0.2.0
mmh3>=4.1.0
112 changes: 112 additions & 0 deletions transforms/universal/fdedup/python/src/Murmur_MH.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################


import logging
import os
from typing import List, Set

import mmh3
import numpy as np


class Murmur_MH:
def __init__(self, num_perm=64, seed=42, hashfunc=None):
self.seed = seed
self.num_perm = num_perm # the number of buckets, i.e. the vector length after self.minhash() call
self.permutations = self._init_permutations(seed, num_perm)

def _init_permutations(self, seed, num_perm):
# see https://en.wikipedia.org/wiki/Universal_hashing#Avoiding_modular_arithmetic
max_int = np.uint64((1 << 64) - 1)
# initialize pseudo random number generator with given seed value
gen = np.random.RandomState(seed)
# get self.num_perm pseudo random numbers between 2 and max_int (excl)
permutations = np.array(
[gen.randint(0, max_int, dtype=np.uint64) for _ in range(num_perm)],
dtype=np.uint64,
).T
# make all even pseudo random numbers odd by adding 1
permutations[permutations % 2 == 0] += 1
return permutations

def minhash(self, shingles: List[str]):
"""return np.array of minhash"""
# see https://en.wikipedia.org/wiki/Universal_hashing#Avoiding_modular_arithmetic
hash_values = np.array([mmh3.hash(shingle, signed=False) for shingle in shingles], dtype=np.uint64)
return (
np.right_shift(
(hash_values * np.tile(self.permutations, (len(hash_values), 1)).T).T,
32,
)
.astype(np.uint32)
.min(axis=0)
)

def minhash2(self, shingles: List[str], doc_len: int):
"""
for each shingle (i.e. a group of k-words) it generates a digest value based on
mmh3-hash function (32-bit)

return tuple (A, B)
A = an array of values = np.array of minhash
B = document_length = number of characters"""
# see https://en.wikipedia.org/wiki/Universal_hashing#Avoiding_modular_arithmetic
hash_values = np.array([mmh3.hash(shingle, signed=False) for shingle in shingles], dtype=np.uint64)
return (
np.right_shift(
(hash_values * np.tile(self.permutations, (len(hash_values), 1)).T).T,
32,
)
.astype(np.uint32)
.min(axis=0),
doc_len,
)

def minhash2_nosalt(self, shingles: List[str], doc_len: int, doc_id: int):
"""
for each shingle (i.e. a group of k-words) it generates a digest value based on
mmh3-hash function (32-bit)

return tuple (A, B)
A = an array of values = np.array of minhash
B = document_length = number of characters"""
# see https://en.wikipedia.org/wiki/Universal_hashing#Avoiding_modular_arithmetic
hash_values = np.array([mmh3.hash(shingle, signed=False) for shingle in shingles], dtype=np.uint64)
return (
np.right_shift(
(hash_values * np.tile(self.permutations, (len(hash_values), 1)).T).T,
32,
)
.astype(np.uint32)
.min(axis=0)
.tolist(),
doc_len,
doc_id,
)

@staticmethod
def jaccard(mh1: np.array, mh2: np.array) -> float:
"""
The Jaccard similarity measures the similarity between two sets of data
to see which members are shared and distinct.

The Jaccard similarity is calculated by dividing the number of observations
in both sets by the number of observations in either set.

Developed by Paul Jaccard, the index ranges from 0 to 1.
The closer to 1, the more similar the two sets of data.

As a document is represented by a set. We use Jaccard distance to see how similar between two documents.
"""
assert len(mh1) == len(mh2)
return np.count_nonzero(mh1 == mh2) / len(mh1)
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
# (C) Copyright IBM Corp. 2024.
# Licensed under the Apache License, Version 2.0 (the “License”);
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an “AS IS” BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import os
import sys

from cluster_analysis_transform_python import (
ClusterAnalysisPythonTransformConfiguration,
)
from data_processing.runtime.pure_python import PythonTransformLauncher
from data_processing.utils import ParamsUtils


# create parameters
input_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output", "bands"))
daw3rd marked this conversation as resolved.
Show resolved Hide resolved
output_folder = os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "output", "docs_to_remove"))
local_conf = {
"input_folder": input_folder,
"output_folder": output_folder,
}
code_location = {"github": "github", "commit_hash": "12345", "path": "path"}
params = {
# Data access. Only required parameters are specified
"data_local_config": ParamsUtils.convert_to_ast(local_conf),
# execution info
"runtime_pipeline_id": "pipeline_id",
"runtime_job_id": "job_id",
"runtime_code_location": ParamsUtils.convert_to_ast(code_location),
"cluster_num_bands": 14,
"cluster_num_segments": 2,
"cluster_jaccard_similarity_threshold": 0.7,
}
if __name__ == "__main__":
# Set the simulated command line args
sys.argv = ParamsUtils.dict_to_req(d=params)
print(sys.argv)
# create launcher
launcher = PythonTransformLauncher(runtime_config=ClusterAnalysisPythonTransformConfiguration())
# Launch python to process the input
launcher.launch()
Loading
Loading