Skip to content

Commit

Permalink
MLTransform transform catalog snippets (#27709)
Browse files Browse the repository at this point in the history
* add snippets

* Draft

* add test

* fix imports in test

* Add doc ref to other places

* Add doc

* Apply suggestions from code review

Co-authored-by: Rebecca Szper <[email protected]>

* fix website snippets

* Fix test name

* fix artifact location

* address comments

* Apply suggestions from code review

Co-authored-by: Rebecca Szper <[email protected]>

* Update website/www/site/content/en/documentation/transforms/python/elementwise/mltransform.md

Co-authored-by: Rebecca Szper <[email protected]>

---------

Co-authored-by: Rebecca Szper <[email protected]>
  • Loading branch information
AnandInguva and rszper authored Aug 21, 2023
1 parent 41dd0e6 commit 448184e
Show file tree
Hide file tree
Showing 5 changed files with 333 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint: disable=reimported
# pylint:disable=line-too-long


def mltransform_scale_to_0_1(test=None):
# [START mltransform_scale_to_0_1]
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ScaleTo01
import tempfile

data = [
{
'x': [1, 5, 3]
},
{
'x': [4, 2, 8]
},
]

artifact_location = tempfile.mkdtemp()
scale_to_0_1_fn = ScaleTo01(columns=['x'])

with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
scale_to_0_1_fn)
| beam.Map(print))
# [END mltransform_scale_to_0_1]
if test:
test(transformed_data)


def mltransform_compute_and_apply_vocabulary(test=None):
# [START mltransform_compute_and_apply_vocabulary]
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile

artifact_location = tempfile.mkdtemp()
data = [
{
'x': ['I', 'love', 'Beam']
},
{
'x': ['Beam', 'is', 'awesome']
},
]
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
compute_and_apply_vocabulary_fn)
| beam.Map(print))
# [END mltransform_compute_and_apply_vocabulary]
if test:
test(transformed_data)


def mltransform_compute_and_apply_vocabulary_with_scalar(test=None):
# [START mltransform_compute_and_apply_vocabulary_with_scalar]
import apache_beam as beam
from apache_beam.ml.transforms.base import MLTransform
from apache_beam.ml.transforms.tft import ComputeAndApplyVocabulary
import tempfile
data = [
{
'x': 'I'
},
{
'x': 'love'
},
{
'x': 'Beam'
},
{
'x': 'Beam'
},
{
'x': 'is'
},
{
'x': 'awesome'
},
]
artifact_location = tempfile.mkdtemp()
compute_and_apply_vocabulary_fn = ComputeAndApplyVocabulary(columns=['x'])
with beam.Pipeline() as p:
transformed_data = (
p
| beam.Create(data)
| MLTransform(write_artifact_location=artifact_location).with_transform(
compute_and_apply_vocabulary_fn)
| beam.Map(print))
# [END mltransform_compute_and_apply_vocabulary_with_scalar]
if test:
test(transformed_data)
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
# coding=utf-8
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

# pytype: skip-file
# pylint: disable=ungrouped-imports

import unittest
from io import StringIO

import mock

from apache_beam.testing.test_pipeline import TestPipeline

try:
# fail when tft is not installed.
import tensorflow_transform as tft # pylint: disable=unused-import
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_scale_to_0_1
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary
from apache_beam.examples.snippets.transforms.elementwise.mltransform import mltransform_compute_and_apply_vocabulary_with_non_columnar_data
except ImportError:
raise unittest.SkipTest('tensorflow_transform is not installed.')


def check_mltransform_compute_and_apply_vocab():
expected = '''[START mltransform_compute_and_apply_vocab]
Row(x=array([4, 1, 0]))
Row(x=array([0, 2, 3]))
[END mltransform_compute_and_apply_vocab] '''.splitlines()[1:-1]
return expected


def check_mltransform_scale_to_0_1():
expected = '''[START mltransform_scale_to_0_1]
Row(x=array([0. , 0.5714286, 0.2857143], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
Row(x=array([0.42857143, 0.14285715, 1. ], dtype=float32), x_max=array([8.], dtype=float32), x_min=array([1.], dtype=float32))
[END mltransform_scale_to_0_1] '''.splitlines()[1:-1]
return expected


def check_mltransform_compute_and_apply_vocabulary_with_scalar():
expected = '''[START mltransform_compute_and_apply_vocabulary_with_scalar]
Row(x=array([4]))
Row(x=array([1]))
Row(x=array([0]))
Row(x=array([2]))
Row(x=array([3]))
[END mltransform_compute_and_apply_vocabulary_with_scalar] '''.splitlines(
)[1:-1]
return expected


@mock.patch('apache_beam.Pipeline', TestPipeline)
@mock.patch('sys.stdout', new_callable=StringIO)
class MLTransformStdOutTest(unittest.TestCase):
def test_mltransform_compute_and_apply_vocab(self, mock_stdout):
mltransform_compute_and_apply_vocabulary()
predicted = mock_stdout.getvalue().splitlines()
expected = check_mltransform_compute_and_apply_vocab()
self.assertEqual(predicted, expected)

def test_mltransform_scale_to_0_1(self, mock_stdout):
mltransform_scale_to_0_1()
predicted = mock_stdout.getvalue().splitlines()
expected = check_mltransform_scale_to_0_1()
self.assertEqual(predicted, expected)

def test_mltransform_compute_and_apply_vocab_scalar(self, mock_stdout):
mltransform_compute_and_apply_vocabulary_with_non_columnar_data()
predicted = mock_stdout.getvalue().splitlines()
expected = check_mltransform_compute_and_apply_vocabulary_with_scalar()
self.assertEqual(predicted, expected)


if __name__ == '__main__':
unittest.main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
---
title: "MLTransform"
---
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->

# MLTransform for data processing

{{< localstorage language language-py >}}


<table>
<tr>
<td>
<a>
{{< button-pydoc path="apache_beam.ml.transforms" class="MLTransform" >}}
</a>
</td>
</tr>
</table>


Use `MLTransform` to apply common machine learning (ML) processing tasks on keyed data. Apache Beam provides ML data processing transformations that you can use with `MLTransform`. For the full list of available data
processing transformations, see the [tft.py file](https://github.com/apache/beam/blob/ab93fb1988051baac6c3b9dd1031f4d68bd9a149/sdks/python/apache_beam/ml/transforms/tft.py#L52) in GitHub.


To define a data processing transformation by using `MLTransform`, create instances of data processing transforms with `columns` as input parameters. The data in the specified `columns` is transformed and outputted to the `beam.Row` object.

The following example demonstrates how to use `MLTransform` to normalize your data between 0 and 1 by using the minimum and maximum values from your entire dataset. `MLTransform` uses the `ScaleTo01` transformation.


```
scale_to_z_score_transform = ScaleToZScore(columns=['x', 'y'])
with beam.Pipeline() as p:
(data | MLTransform(write_artifact_location=artifact_location).with_transform(scale_to_z_score_transform))
```

In this example, `MLTransform` receives a value for `write_artifact_location`. `MLTransform` then uses this location value to write artifacts generated by the transform. To pass the data processing transform, you can use either the `with_transform` method of `MLTransform` or a list.

```
MLTransform(transforms=transforms, write_artifact_location=write_artifact_location)
```

The transforms passed to `MLTransform` are applied sequentially on the dataset. `MLTransform` expects a dictionary and returns a transformed row object with NumPy arrays.
## Examples

The following examples demonstrate how to to create pipelines that use `MLTransform` to preprocess data.

`MLTransform` can do a full pass on the dataset, which is useful when you need to transform a single element only after analyzing the entire dataset.
The first two examples require a full pass over the dataset to complete the data transformation.

* For the `ComputeAndApplyVocabulary` transform, the transform needs access to all of the unique words in the dataset.
* For the `ScaleTo01` transform, the transform needs to know the minimum and maximum values in the dataset.

### Example 1

This example creates a pipeline that uses `MLTransform` to scale data between 0 and 1.
The example takes a list of integers and converts them into the range of 0 to 1 using the transform `ScaleTo01`.

{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py"
class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" mltransform_scale_to_0_1 >}}
{{</ highlight >}}

{{< paragraph class="notebook-skip" >}}
Output:
{{< /paragraph >}}
{{< highlight class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py" mltransform_scale_to_0_1 >}}
{{< /highlight >}}


### Example 2

This example creates a pipeline that use `MLTransform` to compute vocabulary on the entire dataset and assign indices to each unique vocabulary item.
It takes a list of strings, computes vocabulary over the entire dataset, and then applies a unique index to each vocabulary item.


{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py"
class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" mltransform_compute_and_apply_vocabulary >}}
{{</ highlight >}}

{{< paragraph class="notebook-skip" >}}
Output:
{{< /paragraph >}}
{{< highlight class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py" mltransform_compute_and_apply_vocab >}}
{{< /highlight >}}


### Example 3

This example creates a pipeline that uses `MLTransform` to compute vocabulary on the entire dataset and assign indices to each unique vocabulary item. This pipeline takes a single element as input instead of a list of elements.


{{< highlight language="py" file="sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py"
class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform.py" mltransform_compute_and_apply_vocabulary_with_scalar >}}
{{</ highlight >}}

{{< paragraph class="notebook-skip" >}}
Output:
{{< /paragraph >}}
{{< highlight class="notebook-skip" >}}
{{< code_sample "sdks/python/apache_beam/examples/snippets/transforms/elementwise/mltransform_test.py" mltransform_compute_and_apply_vocabulary_with_scalar >}}
{{< /highlight >}}

Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ limitations under the License.
<tr><td><a href="/documentation/transforms/python/elementwise/keys">Keys</a></td><td>Extracts the key from each element in a collection of key-value pairs.</td></tr>
<tr><td><a href="/documentation/transforms/python/elementwise/kvswap">KvSwap</a></td><td>Swaps the key and value of each element in a collection of key-value pairs.</td></tr>
<tr><td><a href="/documentation/transforms/python/elementwise/map">Map</a></td><td>Applies a function to every element in the input and outputs the result.</td></tr>
<tr><td><a href="/documentation/transforms/python/elementwise/mltransform">MLTransform</a></td><td>Applies data processing transforms to the dataset.</td></tr>
<tr><td><a href="/documentation/transforms/python/elementwise/pardo">ParDo</a></td><td>The most-general mechanism for applying a user-defined <code>DoFn</code> to every element
in the input collection.</td></tr>
<tr><td><a href="/documentation/transforms/python/elementwise/partition">Partition</a></td><td>Routes each input element to a specific output collection based on some partition
Expand All @@ -39,6 +40,7 @@ limitations under the License.
and updates the implicit timestamp associated with each input. Note that it is only
safe to adjust timestamps forwards.</td></tr>
<tr><td><a href="/documentation/transforms/python/elementwise/values">Values</a></td><td>Extracts the value from each element in a collection of key-value pairs.</td></tr>

</table>

## Aggregation
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,6 +291,7 @@
<li><a href="/documentation/transforms/python/elementwise/keys/">Keys</a></li>
<li><a href="/documentation/transforms/python/elementwise/kvswap/">KvSwap</a></li>
<li><a href="/documentation/transforms/python/elementwise/map/">Map</a></li>
<li><a href="/documentation/transforms/python/elementwise/mltransform/">MLTransform</a></li>
<li><a href="/documentation/transforms/python/elementwise/pardo/">ParDo</a></li>
<li><a href="/documentation/transforms/python/elementwise/partition/">Partition</a></li>
<li><a href="/documentation/transforms/python/elementwise/regex/">Regex</a></li>
Expand Down

0 comments on commit 448184e

Please sign in to comment.