-
Notifications
You must be signed in to change notification settings - Fork 63
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Copy example gen prod #226
Merged
rcrowe-google
merged 18 commits into
tensorflow:main
from
alxndrnh:copy_example_gen_prod
May 19, 2023
Merged
Changes from 10 commits
Commits
Show all changes
18 commits
Select commit
Hold shift + click to select a range
2a33f73
added CopyExampleGen custom python component
alxndrnh c50a98d
added CopyExampleGen custom python component and made changes to read…
alxndrnh d74a7fe
changes to format
alxndrnh 88ec64d
changes to format
alxndrnh 5151e37
changes to format
alxndrnh 82fb9c7
changes to format
alxndrnh 3452389
changes to format
alxndrnh 38ccf78
changes to format
alxndrnh 81bf3e2
Final commit for CopyExampleGen: PASSING
alxndrnh 77ed568
Delete .items after calling dictionary
alxndrnh 0594a08
Reviewed Michael's comments and implemented all changes
alxndrnh 4099118
Reviewed Michael's comments and implemented all changes
alxndrnh 815636b
Reviewed Michael's comments and implemented all changes. Tested in Ve…
alxndrnh 339324b
Reviewed Michael's comments and implemented all changes. Tested in Ve…
alxndrnh 3499a94
Reviewed Michael's comments and implemented all changes. Tested in Ve…
alxndrnh 9302962
component.py split_names parse change and readme.md file
alxndrnh 4991484
component.py split_names parse change and readme.md file
alxndrnh ff15f35
component.py split_names parse change and readme.md file
alxndrnh File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -10,31 +10,71 @@ | |
**Project name:** CopyExampleGen component | ||
|
||
## Project Description | ||
CopyExampleGen will allow the user to copy a pre-existing Tfrecord dataset or raw data and ingest it into the pipeline, ultimately skipping the process of shuffling and running the Beam job. This process will require a dict input with split_names and their respective URI. This will output an Examples Artifact (same as the Artifact output from the ExampleGen component) in which downstream components can use. | ||
CopyExampleGen will allow the user to copy pre-existing tfrecords and ingest it into the pipeline as examples, ultimately skipping the process of shuffling and running the Beam job that is in the standard component, ExampleGen. This process will require a dict input with split names as keys and their respective URIs as the value from the user. Following suit, the component will set the artifact’s properties, generate output dict, and register contexts and execution. Lastly, it will output an Examples Artifact in which downstream components can use. | ||
|
||
Example of pipeline component definition: | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
```python | ||
copy_example_gen = component.CopyExampleGen( | ||
input_dict = tfrecords_dict | ||
) | ||
``` | ||
|
||
Currently tfx.dsl.components.Parameter only supports primitive types therefore, in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] needs to be converted into a JSON str. We can do this by simply using `json.dumps()` by adding 'tfrecords_dict' in as an argument. | ||
|
||
|
||
## Project Category | ||
Component | ||
Addon Component | ||
|
||
## Project Use-Case(s) | ||
CopyExampleGen will allow the user to add a dict input with split_names as the key and their respective pre-existing Tfrecords URIs as their value, then format the director structure so that it matches that of an Example Artifact. | ||
CopyExampleGen will replace ExampleGen when tfrecords and split names are already in the possession of the user. Hence, a Beam job will not be run nor will the tfrecords be shuffled and/ or randomized saving data ingestion pipeline process time. | ||
|
||
Currently, ingesting data with the ExampleGen component does not provide a way to split without random data shuffling and always runs a beam job. This component will save significant time (hours for large amounts of data) per pipeline run when a pipeline run does not require data to be shuffled. Additionally, this component will save hundreds of dollars in Dataflow consumption every time the pipeline is ran/ reran. Some challenges users have had: | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
1. “Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam”. | ||
|
||
2. “When the use case is a time series problem using sliding windows, shuffling before splitting in train and eval set is counterproductive as the user would need a coherent training set”. | ||
|
||
Currently, ingesting data with the ExampleGen requires a Beam job to be ran and requires the data to be shuffled. This component will save users hours/ days of having to create a workaround fully custom ExampleGen component. Some challenges our users have had: | ||
Reshuffle doesn't work well with DirectRunner and causes OOMing. Users have been patching out shuffling in every release and doing it in the DB query. They have given up on Beam based ExampleGen and have created an entire custom ExampleGen that reads from the database and doesn’t use Beam. Link. | ||
When the use case is a time series problem using sliding windows, shuffling before splitting in train and eval set is counterproductive as the user would need a coherent training set. Link. | ||
Almost impossible to use ExampleGen based components for large datasets. Without it, Beam knows how to write to disk after transforming from input format to output format, allowing it to transform (slowly) large datasets that would otherwise not fit into memory. Link. | ||
|
||
## Project Implementation | ||
Use case #1 - Tfrecords as input URIs: | ||
This component will: | ||
1. Accept a dict i.e. {'split_name1': './path/to/split_name1/tfrecord1', 'split_name2': './path/to/split_name2/tfrecord2'} | ||
2. Retrieve the tfrecords | ||
3. Create an Examples Artifact, following Examples directory structure and properties required for an Examples Artifact | ||
4. Register the Examples Artifact into MLMD | ||
5. Output as 'examples' to be ingested from downstream components | ||
### Component | ||
|
||
Custom Python function component: CopyExampleGen | ||
|
||
- `input_json_str`: will be the input parameter for CopyExampleGen of type `tfx.dsl.components.Parameter[str]`, where the user will assign their Dict[str, str] input, tfrecords_dict. However, because Python custom component development only supports primitive types, we must assign `input_json_str` to `json.dumps(tfrecords_dict)` and place the tfrecords_dict in as an argument. | ||
|
||
- `output_example`: Output artifact can be referenced as an object of its' specified type ArtifactType in the component function being declared. For example, if the ArtifactType is Examples, one can reference properties in an Examples ArtifactType (span, version, split_names, etc.) by calling the OutputArtifact object. This will be the variable we reference to build and register our Examples Artifact after pasrsing the tfrecords_dict input. | ||
|
||
|
||
### Python Custom Component | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
#### Part 1 | ||
|
||
Using the keys and values from `tfrecords_dict`: | ||
1. function `_split_names_string_builder(tfrecords_dict)`: determine the source (and possibly destination–see question #2) for the files in each split, building exact URIs as necessary. Additionally, parse the input into the list of split names that will become `split` properties of the output Examples artifact. Example: `[“train”,”eval”]` | ||
|
||
|
||
#### Part 2 | ||
|
||
Transform the result of `parse_tfrecords_dict` we created above into an Examples Artifact. Importer Node has the functionality and process we are trying to recreate in this CopyExampleGen because it registers an external resource into MLMD and outputs the user defined Artifact type. | ||
|
||
Using fileio.mkdir and fileio.copy,, the component will then create a directory folder for each name in `split_name`. Following the creation of the `Split-name` folder, the files in the uri path will then be copied into the designated `Split-name` folder. | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
Thoughts from original implementation in phase 1: | ||
This step can possibly use the [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) function: | ||
Create standard ‘output_dict’ variable. The value will be created by calling the worker function. If file copying is done before this step, this method can probably be used as is to register the artifact. | ||
|
||
## Open Implementation Questions | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
1. There's a few open questions about how the file copying should actually done. Where does the copying that importer does actually happen? And what's the best way to change that? Are there other ways in TFX to do copying in a robust way? Maybe something in tfx.io? If there's an existing method, what has to happen in the `parse_tfrecords_dict`. Depending on the copying capabilities available, will there be a need to detect the execution environment? Does TFX rely on other tools to execute a copy that handle this? Is detection of the execution environment and the copying itself separate? What could be reused? | ||
|
||
- If it's not easy to detect the execution environment without also performing a copy, will the user have to specify the execution environment and therefore how to do the copy (e.g., local copy, GCS, S3). And then what's the best way to handle that? | ||
|
||
2. Should the dictionary of file inputs take a path to a folder? Globs? Lists of individual files? | ||
3. Assuming file copying is done entirely separately, [importer.generate_output_dict](https://github.com/tensorflow/tfx/blob/f8ce19339568ae58519d4eecfdd73078f80f84a2/tfx/dsl/components/common/importer.py#L153) be used as is to register the artifacts, or does some separate code using [MLMD](https://www.tensorflow.org/tfx/guide/mlmd) in a different way need to be written | ||
|
||
## Project Dependencies | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
Using: Python 3.8.2, Tensorflow 2.11.0, TFX 1.12.0 | ||
Possibly libraries that directly access blob storage platforms, e.g. google-cloud-storage. | ||
|
||
|
||
## Project Team | ||
Alex Ho, [email protected], @alxndrnh | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,14 @@ | ||
# Copyright 2023 Google LLC. All Rights Reserved. | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
# ============================================================================== |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,109 @@ | ||
# Copyright 2023 Google LLC. All Rights Reserved. | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
# | ||
# Licensed under the Apache License, Version 2.0 (the "License"); | ||
# you may not use this file except in compliance with the License. | ||
# You may obtain a copy of the License at | ||
# | ||
# http://www.apache.org/licenses/LICENSE-2.0 | ||
# | ||
# Unless required by applicable law or agreed to in writing, software | ||
# distributed under the License is distributed on an "AS IS" BASIS, | ||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
# See the License for the specific language governing permissions and | ||
# limitations under the License. | ||
"""CopyExampleGen custom component. | ||
|
||
This component will accept tfrecord files and register them as an | ||
Examples Artifact for downstream components to use. CopyExampleGen accepts | ||
a dictionary where keys are the split-names and their respective value is a | ||
uri to the folder that contains the tfrecords file(s). | ||
|
||
User will need to create a dictionary of type Dict[str, str], in this case | ||
we will title this dictionary 'tfrecords_dict' and assign it to a dictionary: | ||
|
||
tfrecords_dict: Dict[str, str]={ | ||
"train":"gs://path/to/examples/Split-train/", | ||
"eval":"gs://path/to/examples/Split-eval/" | ||
} | ||
|
||
Currently tfx.dsl.components.Parameter only supports primitive types therefore, | ||
in order to properly use CopyExampleGen, the 'input_dict' of type Dict[str, str] | ||
needs to be converted into a JSON str. We can do this by simply using | ||
'json.dumps()' by adding 'tfrecords_dict' in as a parameter like so: | ||
|
||
copy_example=component.CopyExampleGen( | ||
input_json_str=json.dumps(tfrecords_dict) | ||
) | ||
|
||
""" | ||
import json | ||
import os | ||
from typing import List | ||
|
||
from tfx import v1 as tfx | ||
from tfx.dsl.component.experimental.decorators import component | ||
from tfx.dsl.io import fileio | ||
from tfx.v1.types.standard_artifacts import Examples | ||
|
||
|
||
def _split_names_string_builder(split_names_list: List): | ||
""" | ||
_split_names_string_builder() creates a string of split-names for input to | ||
output_example.split_names property. | ||
|
||
""" | ||
|
||
str1 = "[" | ||
urlist_len = len(split_names_list) - 1 | ||
index = 0 | ||
|
||
for element in split_names_list: | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if index == urlist_len: | ||
str1 += "\"" + element + "\"" + "]" | ||
break | ||
str1 += "\"" + element + "\"" + "," | ||
index += 1 | ||
return str1 | ||
|
||
|
||
@component | ||
def CopyExampleGen( # pylint: disable=C0103 | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
input_json_str: tfx.dsl.components.Parameter[str], | ||
output_example: tfx.dsl.components.OutputArtifact[Examples] | ||
) -> tfx.dsl.components.OutputDict(): | ||
""" | ||
CopyExampleGen first converts the string input to a type Dict and extracts | ||
the keys from the dictionary, input_dict, and creates a string containing | ||
the names. This string is assigned to the output_example.split_uri property | ||
to register split_names. | ||
|
||
This component then creates a directory folder for each name in split_name. | ||
Following the creation of the `Split-name` folder, the files in the uri path | ||
will then be copied into the designated `Split-name` folder. | ||
|
||
""" | ||
|
||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
input_dict = json.loads(input_json_str) | ||
|
||
# Parse input_dict: creates a directory from the split-names and tfrecord uris provided | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
split_names = [] | ||
for key in input_dict: | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
split_names.append(key) | ||
|
||
split_names_string = _split_names_string_builder(split_names) | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
output_example.split_names = str(split_names_string) | ||
|
||
# Make directories | ||
tfrecords_list = [] | ||
output_example_uri = output_example.uri | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
for split in input_dict: | ||
split_value = (f"/Split-{split}/") | ||
fileio.mkdir(f"{output_example_uri}{split_value}") | ||
tfrecords_list = fileio.glob(f"{input_dict[split]}*.gz") | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
# Copy files into directories | ||
for tfrecord in tfrecords_list: | ||
file_name = os.path.basename(os.path.normpath(tfrecord)) | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
fileio.copy(tfrecord, output_example.uri + split_value + file_name, | ||
alxndrnh marked this conversation as resolved.
Show resolved
Hide resolved
|
||
True) |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add copyright to the readme. Use The Tensorflow Authors (not Google -- see other files in addons).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do not see any copyright lines in other readme files. May you link me to an example? @michaelwsherman