Skip to content

Commit

Permalink
Update how we specify artifact modes (apache#27939)
Browse files Browse the repository at this point in the history
* Update how we specify artifact modes

* Wording/docs

* Formatting/style feedback

* fix breakages

* Lint

* Don’t overwrite vars

* Typo/lint
  • Loading branch information
damccorm authored Aug 10, 2023
1 parent 6f60a6c commit 71e060c
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 58 deletions.
28 changes: 12 additions & 16 deletions sdks/python/apache_beam/examples/ml_transform/ml_transform_basic.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,27 +61,26 @@ def parse_args():
return parser.parse_known_args()


def preprocess_data_for_ml_training(train_data, artifact_mode, args):
def preprocess_data_for_ml_training(train_data, args):
"""
Preprocess the data for ML training. This method runs a pipeline to
preprocess the data needed for ML training. It produces artifacts that
can be used for ML inference later.
preprocess the data needed for ML training. It produces artifacts that can
be used for ML inference later.
"""

with beam.Pipeline() as p:
train_data_pcoll = (p | "CreateData" >> beam.Create(train_data))

# When 'artifact_mode' is set to 'produce', the ComputeAndApplyVocabulary
# When using write_artifact_location, the ComputeAndApplyVocabulary
# function generates a vocabulary file. This file, stored in
# 'artifact_location', contains the vocabulary of the entire dataset.
# 'write_artifact_location', contains the vocabulary of the entire dataset.
# This is considered as an artifact of ComputeAndApplyVocabulary transform.
# The indices of the vocabulary in this file are returned as
# the output of MLTransform.
transformed_data_pcoll = (
train_data_pcoll
| 'MLTransform' >> MLTransform(
artifact_location=args.artifact_location,
artifact_mode=artifact_mode,
write_artifact_location=args.artifact_location,
).with_transform(ComputeAndApplyVocabulary(
columns=['x'])).with_transform(TFIDF(columns=['x'])))

Expand All @@ -93,7 +92,7 @@ def preprocess_data_for_ml_training(train_data, artifact_mode, args):
# 0.5008155 ], dtype=float32), x_vocab_index=array([ 0, 2, 3, 5, 21]))


def preprocess_data_for_ml_inference(test_data, artifact_mode, args):
def preprocess_data_for_ml_inference(test_data, args):
"""
Preprocess the data for ML inference. This method runs a pipeline to
preprocess the data needed for ML inference. It consumes the artifacts
Expand All @@ -108,8 +107,7 @@ def preprocess_data_for_ml_inference(test_data, artifact_mode, args):
transformed_data_pcoll = (
test_data_pcoll
| "MLTransformOnTestData" >> MLTransform(
artifact_location=args.artifact_location,
artifact_mode=artifact_mode,
read_artifact_location=args.artifact_location,
# ww don't need to specify transforms as they are already saved in
# in the artifacts.
))
Expand Down Expand Up @@ -149,18 +147,16 @@ def run(args):

# Preprocess the data for ML training.
# For the data going into the ML model training, we want to produce the
# artifacts. So, we set artifact_mode to ArtifactMode.PRODUCE.
preprocess_data_for_ml_training(
train_data, artifact_mode=ArtifactMode.PRODUCE, args=args)
# artifacts.
preprocess_data_for_ml_training(train_data, args=args)

# Do some ML model training here.

# Preprocess the data for ML inference.
# For the data going into the ML model inference, we want to consume the
# artifacts produced during the stage where we preprocessed the data for ML
# training. So, we set artifact_mode to ArtifactMode.CONSUME.
preprocess_data_for_ml_inference(
test_data, artifact_mode=ArtifactMode.CONSUME, args=args)
# training.
preprocess_data_for_ml_inference(test_data, args=args)

# To fetch the artifacts produced in MLTransform, you can use
# ArtifactsFetcher for fetching vocab related artifacts. For
Expand Down
47 changes: 35 additions & 12 deletions sdks/python/apache_beam/ml/transforms/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,8 +115,8 @@ class MLTransform(beam.PTransform[beam.PCollection[ExampleT],
def __init__(
self,
*,
artifact_location: str,
artifact_mode: str = ArtifactMode.PRODUCE,
write_artifact_location: Optional[str] = None,
read_artifact_location: Optional[str] = None,
transforms: Optional[Sequence[BaseOperation]] = None):
"""
MLTransform is a Beam PTransform that can be used to apply
Expand All @@ -134,7 +134,7 @@ def __init__(
themselves.
Args:
artifact_location: A storage location for artifacts resulting from
write_artifact_location: A storage location for artifacts resulting from
MLTransform. These artifacts include transformations applied to
the dataset and generated values like min, max from ScaleTo01,
and mean, var from ScaleToZScore. Artifacts are produced and stored
Expand All @@ -143,23 +143,46 @@ def __init__(
retrieved from this location. Note that when consuming artifacts,
it is not necessary to pass the transforms since they are inherently
stored within the artifacts themselves. The value assigned to
`artifact_location` should be a valid storage path where the artifacts
can be written to or read from.
`write_artifact_location` should be a valid storage directory where the
artifacts from this transform can be written to. If no directory exists
at this location, one will be created. This will overwrite any
artifacts already in this location, so distinct locations should be
used for each instance of MLTransform. Only one of
write_artifact_location and read_artifact_location should be specified.
read_artifact_location: A storage location to read artifacts resulting
froma previous MLTransform. These artifacts include transformations
applied to the dataset and generated values like min, max from
ScaleTo01, and mean, var from ScaleToZScore. Note that when consuming
artifacts, it is not necessary to pass the transforms since they are
inherently stored within the artifacts themselves. The value assigned
to `read_artifact_location` should be a valid storage path where the
artifacts can be read from. Only one of write_artifact_location and
read_artifact_location should be specified.
transforms: A list of transforms to apply to the data. All the transforms
are applied in the order they are specified. The input of the
i-th transform is the output of the (i-1)-th transform. Multi-input
transforms are not supported yet.
artifact_mode: Whether to produce or consume artifacts. If set to
'consume', MLTransform will assume that the artifacts are already
computed and stored in the artifact_location. Pass the same artifact
location that was passed during produce phase to ensure that the
right artifacts are read. If set to 'produce', MLTransform
will compute the artifacts and store them in the artifact_location.
The artifacts will be read from this location during the consume phase.
"""
if transforms:
_ = [self._validate_transform(transform) for transform in transforms]

if read_artifact_location and write_artifact_location:
raise ValueError(
'Only one of read_artifact_location or write_artifact_location can '
'be specified to initialize MLTransform')

if not read_artifact_location and not write_artifact_location:
raise ValueError(
'Either a read_artifact_location or write_artifact_location must be '
'specified to initialize MLTransform')

if read_artifact_location:
artifact_location = read_artifact_location
artifact_mode = ArtifactMode.CONSUME
else:
artifact_location = write_artifact_location # type: ignore[assignment]
artifact_mode = ArtifactMode.PRODUCE

# avoid circular import
# pylint: disable=wrong-import-order, wrong-import-position
from apache_beam.ml.transforms.handlers import TFTProcessHandler
Expand Down
19 changes: 11 additions & 8 deletions sdks/python/apache_beam/ml/transforms/base_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def test_ml_transform_appends_transforms_to_process_handler_correctly(self):
fake_fn_1 = _FakeOperation(name='fake_fn_1', columns=['x'])
transforms = [fake_fn_1]
ml_transform = base.MLTransform(
transforms=transforms, artifact_location=self.artifact_location)
transforms=transforms, write_artifact_location=self.artifact_location)
ml_transform = ml_transform.with_transform(
transform=_FakeOperation(name='fake_fn_2', columns=['x']))

Expand All @@ -80,7 +80,8 @@ def test_ml_transform_on_dict(self):
p
| beam.Create(data)
| base.MLTransform(
artifact_location=self.artifact_location, transforms=transforms))
write_artifact_location=self.artifact_location,
transforms=transforms))
expected_output = [
np.array([0.0], dtype=np.float32),
np.array([1.0], dtype=np.float32),
Expand All @@ -97,7 +98,8 @@ def test_ml_transform_on_list_dict(self):
p
| beam.Create(data)
| base.MLTransform(
transforms=transforms, artifact_location=self.artifact_location))
transforms=transforms,
write_artifact_location=self.artifact_location))
expected_output = [
np.array([0, 0.2, 0.4], dtype=np.float32),
np.array([0.6, 0.8, 1], dtype=np.float32),
Expand Down Expand Up @@ -170,7 +172,7 @@ def test_ml_transform_dict_output_pcoll_schema(
beam.row_type.RowTypeConstraint.from_fields(
list(input_types.items()))))
transformed_data = schema_data | base.MLTransform(
artifact_location=self.artifact_location, transforms=transforms)
write_artifact_location=self.artifact_location, transforms=transforms)
for name, typ in transformed_data.element_type._fields:
if name in expected_dtype:
self.assertEqual(expected_dtype[name], typ)
Expand All @@ -187,8 +189,7 @@ def test_ml_transform_fail_for_non_global_windows_in_produce_mode(self):
| beam.WindowInto(beam.window.FixedWindows(1))
| base.MLTransform(
transforms=transforms,
artifact_location=self.artifact_location,
artifact_mode=base.ArtifactMode.PRODUCE,
write_artifact_location=self.artifact_location,
))

def test_ml_transform_on_multiple_columns_single_transform(self):
Expand All @@ -199,7 +200,8 @@ def test_ml_transform_on_multiple_columns_single_transform(self):
p
| beam.Create(data)
| base.MLTransform(
transforms=transforms, artifact_location=self.artifact_location))
transforms=transforms,
write_artifact_location=self.artifact_location))
expected_output_x = [
np.array([0, 0.5, 1], dtype=np.float32),
]
Expand All @@ -225,7 +227,8 @@ def test_ml_transforms_on_multiple_columns_multiple_transforms(self):
p
| beam.Create(data)
| base.MLTransform(
transforms=transforms, artifact_location=self.artifact_location))
transforms=transforms,
write_artifact_location=self.artifact_location))
expected_output_x = [
np.array([0, 0.5, 1], dtype=np.float32),
]
Expand Down
9 changes: 5 additions & 4 deletions sdks/python/apache_beam/ml/transforms/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,10 +437,11 @@ def process_data(
if not os.path.exists(os.path.join(
self.artifact_location, RAW_DATA_METADATA_DIR, SCHEMA_FILE)):
raise FileNotFoundError(
"Artifacts not found at location: %s when artifact_mode=consume."
"Make sure you've run the pipeline in `produce` mode using "
"this artifact location before setting artifact_mode to `consume`."
% os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR))
"Artifacts not found at location: %s when using "
"read_artifact_location. Make sure you've run the pipeline with "
"write_artifact_location using this artifact location before "
"running with read_artifact_location set." %
os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR))
raw_data_metadata = metadata_io.read_metadata(
os.path.join(self.artifact_location, RAW_DATA_METADATA_DIR))

Expand Down
37 changes: 19 additions & 18 deletions sdks/python/apache_beam/ml/transforms/tft_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def test_z_score(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
_ = (result | beam.Map(assert_z_score_artifacts))

Expand All @@ -109,7 +109,7 @@ def test_z_score_list_data(self):
p
| "listCreate" >> beam.Create(list_data)
| "listMLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleToZScore(columns=['x'])))
_ = (list_result | beam.Map(assert_z_score_artifacts))

Expand All @@ -128,7 +128,7 @@ def test_ScaleTo01_list(self):
p
| "listCreate" >> beam.Create(list_data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))
_ = (list_result | beam.Map(assert_ScaleTo01_artifacts))

Expand All @@ -147,7 +147,7 @@ def test_ScaleTo01(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleTo01(columns=['x'])))

_ = (result | beam.Map(assert_ScaleTo01_artifacts))
Expand Down Expand Up @@ -177,7 +177,7 @@ def test_bucketize(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
_ = (result | beam.Map(assert_bucketize_artifacts))

Expand All @@ -200,7 +200,7 @@ def test_bucketize_list(self):
p
| "Create" >> beam.Create(list_data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=3)))
_ = (list_result | beam.Map(assert_bucketize_artifacts))

Expand Down Expand Up @@ -232,7 +232,7 @@ def test_bucketize_boundaries(self, test_input, expected_boundaries):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.Bucketize(columns=['x'], num_buckets=num_buckets)))
actual_boundaries = (
result
Expand Down Expand Up @@ -263,7 +263,7 @@ def test_apply_buckets(self, test_inputs, bucket_boundaries):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ApplyBuckets(
columns=['x'], bucket_boundaries=bucket_boundaries)))
expected_output = []
Expand Down Expand Up @@ -302,7 +302,7 @@ def test_compute_and_apply_vocabulary_inputs(self):
p
| "Create" >> beam.Create(input_data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x'])))
actual_data |= beam.Map(lambda x: x.as_dict())

Expand Down Expand Up @@ -334,7 +334,7 @@ def test_compute_and_apply_vocabulary(self):
p
| "Create" >> beam.Create(input_data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x'])))
actual_output = (result | beam.Map(lambda x: x.x))
assert_that(
Expand All @@ -351,7 +351,7 @@ def test_with_basic_example_list(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(columns=['x'])))
result = result | beam.Map(lambda x: x.x)
expected_result = [np.array([3, 2, 1]), np.array([0, 0, 1])]
Expand All @@ -368,7 +368,7 @@ def test_string_split_with_single_delimiter(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(
columns=['x'], split_string_by_delimiter=' ')))
result = result | beam.Map(lambda x: x.x)
Expand All @@ -389,7 +389,7 @@ def test_string_split_with_multiple_delimiters(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ComputeAndApplyVocabulary(
columns=['x'], split_string_by_delimiter=' ;')))
result = result | beam.Map(lambda x: x.x)
Expand Down Expand Up @@ -420,7 +420,8 @@ def test_tfidf_compute_vocab_size_during_runtime(self):
p
| "Create" >> beam.Create(raw_data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location, transforms=transforms))
write_artifact_location=self.artifact_location,
transforms=transforms))
actual_output |= beam.Map(lambda x: x.as_dict())

def equals_fn(a, b):
Expand Down Expand Up @@ -467,7 +468,7 @@ def test_scale_to_min_max(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location).with_transform(
write_artifact_location=self.artifact_location).with_transform(
tft.ScaleByMinMax(
columns=['x'],
min_value=-1,
Expand Down Expand Up @@ -509,7 +510,7 @@ def test_ngrams_on_list_separated_words(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location,
write_artifact_location=self.artifact_location,
transforms=[
tft.NGrams(
columns=['x'], ngram_range=(1, 3), ngrams_separator=' ')
Expand All @@ -536,7 +537,7 @@ def test_with_string_split_delimiter(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location,
write_artifact_location=self.artifact_location,
transforms=[
tft.NGrams(
columns=['x'],
Expand Down Expand Up @@ -567,7 +568,7 @@ def test_with_multiple_string_delimiters(self):
p
| "Create" >> beam.Create(data)
| "MLTransform" >> base.MLTransform(
artifact_location=self.artifact_location,
write_artifact_location=self.artifact_location,
transforms=[
tft.NGrams(
columns=['x'],
Expand Down

0 comments on commit 71e060c

Please sign in to comment.