Skip to content

Commit

Permalink
Merge pull request #32286 from reeba212/enrichment-yaml
Browse files Browse the repository at this point in the history
[yaml] Add enrichment transform to Beam YAML
  • Loading branch information
liferoad authored Oct 17, 2024
2 parents 2cc49e0 + c288f1a commit 33fdb23
Show file tree
Hide file tree
Showing 5 changed files with 291 additions and 1 deletion.
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/yaml/integration_tests.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'):
dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex)
bigquery_client.get_or_create_dataset(project, dataset_id)
logging.info("Created dataset %s in project %s", dataset_id, project)
yield f'{project}:{dataset_id}.tmp_table'
yield f'{project}.{dataset_id}.tmp_table'
request = bigquery.BigqueryDatasetsDeleteRequest(
projectId=project, datasetId=dataset_id, deleteContents=True)
logging.info("Deleting dataset %s in project %s", dataset_id, project)
Expand Down
5 changes: 5 additions & 0 deletions sdks/python/apache_beam/yaml/standard_providers.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -101,3 +101,8 @@
Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1"
config:
gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar'

- type: 'python'
config: {}
transforms:
Enrichment: 'apache_beam.yaml.yaml_enrichment.enrichment_transform'
84 changes: 84 additions & 0 deletions sdks/python/apache_beam/yaml/tests/enrichment.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

fixtures:
- name: BQ_TABLE
type: "apache_beam.yaml.integration_tests.temp_bigquery_table"
config:
project: "apache-beam-testing"
- name: TEMP_DIR
type: "apache_beam.yaml.integration_tests.gcs_temp_dir"
config:
bucket: "gs://temp-storage-for-end-to-end-tests/temp-it"

pipelines:
- pipeline:
type: chain
transforms:
- type: Create
name: Rows
config:
elements:
- {label: '11a', rank: 0}
- {label: '37a', rank: 1}
- {label: '389a', rank: 2}

- type: WriteToBigQuery
config:
table: "{BQ_TABLE}"

- pipeline:
type: chain
transforms:
- type: Create
name: Data
config:
elements:
- {label: '11a', name: 'S1'}
- {label: '37a', name: 'S2'}
- {label: '389a', name: 'S3'}
- type: Enrichment
name: Enriched
config:
enrichment_handler: 'BigQuery'
handler_config:
project: apache-beam-testing
table_name: "{BQ_TABLE}"
fields: ['label']
row_restriction_template: "label = '37a'"
timeout: 30

- type: MapToFields
config:
language: python
fields:
label:
callable: 'lambda x: x.label'
output_type: string
rank:
callable: 'lambda x: x.rank'
output_type: integer
name:
callable: 'lambda x: x.name'
output_type: string

- type: AssertEqual
config:
elements:
- {label: '37a', rank: 1, name: 'S2'}
options:
yaml_experimental_features: [ 'Enrichment' ]
126 changes: 126 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_enrichment.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

from typing import Any
from typing import Dict
from typing import Optional

import apache_beam as beam
from apache_beam.yaml import options

try:
from apache_beam.transforms.enrichment import Enrichment
from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler
from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler
except ImportError:
Enrichment = None # type: ignore
BigQueryEnrichmentHandler = None # type: ignore
BigTableEnrichmentHandler = None # type: ignore
VertexAIFeatureStoreEnrichmentHandler = None # type: ignore

try:
from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler
except ImportError:
FeastFeatureStoreEnrichmentHandler = None # type: ignore


@beam.ptransform.ptransform_fn
def enrichment_transform(
pcoll,
enrichment_handler: str,
handler_config: Dict[str, Any],
timeout: Optional[float] = 30):
"""
The Enrichment transform allows you to dynamically
enhance elements in a pipeline by performing key-value
lookups against external services like APIs or databases.
Args:
enrichment_handler: Specifies the source from
where data needs to be extracted
into the pipeline for enriching data.
It can be a string value in ["BigQuery",
"BigTable", "FeastFeatureStore",
"VertexAIFeatureStore"].
handler_config: Specifies the parameters for
the respective enrichment_handler in a dictionary format.
BigQuery = (
"BigQuery: "
"project, table_name, row_restriction_template, "
"fields, column_names, "condition_value_fn, "
"query_fn, min_batch_size, max_batch_size"
)
BigTable = (
"BigTable: "
"project_id, instance_id, table_id, "
"row_key, row_filter, app_profile_id, "
"encoding, ow_key_fn, exception_level, include_timestamp"
)
FeastFeatureStore = (
"FeastFeatureStore: "
"feature_store_yaml_path, feature_names, "
"feature_service_name, full_feature_names, "
"entity_row_fn, exception_level"
)
VertexAIFeatureStore = (
"VertexAIFeatureStore: "
"project, location, api_endpoint, feature_store_name, "
"feature_view_name, row_key, exception_level"
)
Example Usage:
- type: Enrichment
config:
enrichment_handler: 'BigTable'
handler_config:
project_id: 'apache-beam-testing'
instance_id: 'beam-test'
table_id: 'bigtable-enrichment-test'
row_key: 'product_id'
timeout: 30
"""
options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment')

if not Enrichment:
raise ValueError(
f"gcp dependencies not installed. Cannot use {enrichment_handler} "
f"handler. Please install using 'pip install apache-beam[gcp]'.")

if (enrichment_handler == 'FeastFeatureStore' and
not FeastFeatureStoreEnrichmentHandler):
raise ValueError(
"FeastFeatureStore handler requires 'feast' package to be installed. " +
"Please install using 'pip install feast[gcp]' and try again.")

handler_map = {
'BigQuery': BigQueryEnrichmentHandler,
'BigTable': BigTableEnrichmentHandler,
'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler,
'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler
}

if enrichment_handler not in handler_map:
raise ValueError(f"Unknown enrichment source: {enrichment_handler}")

handler = handler_map[enrichment_handler](**handler_config)
return pcoll | Enrichment(source_handler=handler, timeout=timeout)
75 changes: 75 additions & 0 deletions sdks/python/apache_beam/yaml/yaml_enrichment_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#
# Licensed to the Apache Software Foundation (ASF) under one or more
# contributor license agreements. See the NOTICE file distributed with
# this work for additional information regarding copyright ownership.
# The ASF licenses this file to You under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#

import logging
import unittest

import mock

import apache_beam as beam
from apache_beam import Row
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.yaml.yaml_transform import YamlTransform


class FakeEnrichmentTransform:
def __init__(self, enrichment_handler, handler_config, timeout=30):
self._enrichment_handler = enrichment_handler
self._handler_config = handler_config
self._timeout = timeout

def __call__(self, enrichment_handler, *, handler_config, timeout=30):
assert enrichment_handler == self._enrichment_handler
assert handler_config == self._handler_config
assert timeout == self._timeout
return beam.Map(lambda x: beam.Row(**x._asdict()))


class EnrichmentTransformTest(unittest.TestCase):
def test_enrichment_with_bigquery(self):
input_data = [
Row(label="item1", rank=0),
Row(label="item2", rank=1),
]

handler = 'BigQuery'
config = {
"project": "apache-beam-testing",
"table_name": "project.database.table",
"row_restriction_template": "label='item1' or label='item2'",
"fields": ["label"]
}

with beam.Pipeline() as p:
with mock.patch('apache_beam.yaml.yaml_enrichment.enrichment_transform',
FakeEnrichmentTransform(enrichment_handler=handler,
handler_config=config)):
input_pcoll = p | 'CreateInput' >> beam.Create(input_data)
result = input_pcoll | YamlTransform(
f'''
type: Enrichment
config:
enrichment_handler: {handler}
handler_config: {config}
''')
assert_that(result, equal_to(input_data))


if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()

0 comments on commit 33fdb23

Please sign in to comment.