diff --git a/sdks/python/apache_beam/yaml/integration_tests.py b/sdks/python/apache_beam/yaml/integration_tests.py index af1be7b1e8e5..72b3918195da 100644 --- a/sdks/python/apache_beam/yaml/integration_tests.py +++ b/sdks/python/apache_beam/yaml/integration_tests.py @@ -69,7 +69,7 @@ def temp_bigquery_table(project, prefix='yaml_bq_it_'): dataset_id = '%s_%s' % (prefix, uuid.uuid4().hex) bigquery_client.get_or_create_dataset(project, dataset_id) logging.info("Created dataset %s in project %s", dataset_id, project) - yield f'{project}:{dataset_id}.tmp_table' + yield f'{project}.{dataset_id}.tmp_table' request = bigquery.BigqueryDatasetsDeleteRequest( projectId=project, datasetId=dataset_id, deleteContents=True) logging.info("Deleting dataset %s in project %s", dataset_id, project) diff --git a/sdks/python/apache_beam/yaml/standard_providers.yaml b/sdks/python/apache_beam/yaml/standard_providers.yaml index 574179805959..242faaa9a77b 100644 --- a/sdks/python/apache_beam/yaml/standard_providers.yaml +++ b/sdks/python/apache_beam/yaml/standard_providers.yaml @@ -101,3 +101,8 @@ Explode: "beam:schematransform:org.apache.beam:yaml:explode:v1" config: gradle_target: 'sdks:java:extensions:sql:expansion-service:shadowJar' + +- type: 'python' + config: {} + transforms: + Enrichment: 'apache_beam.yaml.yaml_enrichment.enrichment_transform' diff --git a/sdks/python/apache_beam/yaml/tests/enrichment.yaml b/sdks/python/apache_beam/yaml/tests/enrichment.yaml new file mode 100644 index 000000000000..6469c094b8b4 --- /dev/null +++ b/sdks/python/apache_beam/yaml/tests/enrichment.yaml @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +fixtures: + - name: BQ_TABLE + type: "apache_beam.yaml.integration_tests.temp_bigquery_table" + config: + project: "apache-beam-testing" + - name: TEMP_DIR + type: "apache_beam.yaml.integration_tests.gcs_temp_dir" + config: + bucket: "gs://temp-storage-for-end-to-end-tests/temp-it" + +pipelines: + - pipeline: + type: chain + transforms: + - type: Create + name: Rows + config: + elements: + - {label: '11a', rank: 0} + - {label: '37a', rank: 1} + - {label: '389a', rank: 2} + + - type: WriteToBigQuery + config: + table: "{BQ_TABLE}" + + - pipeline: + type: chain + transforms: + - type: Create + name: Data + config: + elements: + - {label: '11a', name: 'S1'} + - {label: '37a', name: 'S2'} + - {label: '389a', name: 'S3'} + - type: Enrichment + name: Enriched + config: + enrichment_handler: 'BigQuery' + handler_config: + project: apache-beam-testing + table_name: "{BQ_TABLE}" + fields: ['label'] + row_restriction_template: "label = '37a'" + timeout: 30 + + - type: MapToFields + config: + language: python + fields: + label: + callable: 'lambda x: x.label' + output_type: string + rank: + callable: 'lambda x: x.rank' + output_type: integer + name: + callable: 'lambda x: x.name' + output_type: string + + - type: AssertEqual + config: + elements: + - {label: '37a', rank: 1, name: 'S2'} + options: + yaml_experimental_features: [ 'Enrichment' ] \ No newline at end of file diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment.py b/sdks/python/apache_beam/yaml/yaml_enrichment.py new file mode 100644 index 000000000000..00f2a5c1b1d1 --- /dev/null +++ b/sdks/python/apache_beam/yaml/yaml_enrichment.py @@ -0,0 +1,126 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from typing import Any +from typing import Dict +from typing import Optional + +import apache_beam as beam +from apache_beam.yaml import options + +try: + from apache_beam.transforms.enrichment import Enrichment + from apache_beam.transforms.enrichment_handlers.bigquery import BigQueryEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.bigtable import BigTableEnrichmentHandler + from apache_beam.transforms.enrichment_handlers.vertex_ai_feature_store import VertexAIFeatureStoreEnrichmentHandler +except ImportError: + Enrichment = None # type: ignore + BigQueryEnrichmentHandler = None # type: ignore + BigTableEnrichmentHandler = None # type: ignore + VertexAIFeatureStoreEnrichmentHandler = None # type: ignore + +try: + from apache_beam.transforms.enrichment_handlers.feast_feature_store import FeastFeatureStoreEnrichmentHandler +except ImportError: + FeastFeatureStoreEnrichmentHandler = None # type: ignore + + +@beam.ptransform.ptransform_fn +def enrichment_transform( + pcoll, + enrichment_handler: str, + handler_config: Dict[str, Any], + timeout: Optional[float] = 30): + """ + The Enrichment transform allows you to dynamically + enhance elements in a pipeline by performing key-value + lookups against external services like APIs or databases. + + Args: + enrichment_handler: Specifies the source from + where data needs to be extracted + into the pipeline for enriching data. + It can be a string value in ["BigQuery", + "BigTable", "FeastFeatureStore", + "VertexAIFeatureStore"]. + handler_config: Specifies the parameters for + the respective enrichment_handler in a dictionary format. + BigQuery = ( + "BigQuery: " + "project, table_name, row_restriction_template, " + "fields, column_names, "condition_value_fn, " + "query_fn, min_batch_size, max_batch_size" + ) + + BigTable = ( + "BigTable: " + "project_id, instance_id, table_id, " + "row_key, row_filter, app_profile_id, " + "encoding, ow_key_fn, exception_level, include_timestamp" + ) + + FeastFeatureStore = ( + "FeastFeatureStore: " + "feature_store_yaml_path, feature_names, " + "feature_service_name, full_feature_names, " + "entity_row_fn, exception_level" + ) + + VertexAIFeatureStore = ( + "VertexAIFeatureStore: " + "project, location, api_endpoint, feature_store_name, " + "feature_view_name, row_key, exception_level" + ) + + Example Usage: + + - type: Enrichment + config: + enrichment_handler: 'BigTable' + handler_config: + project_id: 'apache-beam-testing' + instance_id: 'beam-test' + table_id: 'bigtable-enrichment-test' + row_key: 'product_id' + timeout: 30 + + """ + options.YamlOptions.check_enabled(pcoll.pipeline, 'Enrichment') + + if not Enrichment: + raise ValueError( + f"gcp dependencies not installed. Cannot use {enrichment_handler} " + f"handler. Please install using 'pip install apache-beam[gcp]'.") + + if (enrichment_handler == 'FeastFeatureStore' and + not FeastFeatureStoreEnrichmentHandler): + raise ValueError( + "FeastFeatureStore handler requires 'feast' package to be installed. " + + "Please install using 'pip install feast[gcp]' and try again.") + + handler_map = { + 'BigQuery': BigQueryEnrichmentHandler, + 'BigTable': BigTableEnrichmentHandler, + 'FeastFeatureStore': FeastFeatureStoreEnrichmentHandler, + 'VertexAIFeatureStore': VertexAIFeatureStoreEnrichmentHandler + } + + if enrichment_handler not in handler_map: + raise ValueError(f"Unknown enrichment source: {enrichment_handler}") + + handler = handler_map[enrichment_handler](**handler_config) + return pcoll | Enrichment(source_handler=handler, timeout=timeout) diff --git a/sdks/python/apache_beam/yaml/yaml_enrichment_test.py b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py new file mode 100644 index 000000000000..e26d6140af23 --- /dev/null +++ b/sdks/python/apache_beam/yaml/yaml_enrichment_test.py @@ -0,0 +1,75 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +import logging +import unittest + +import mock + +import apache_beam as beam +from apache_beam import Row +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to +from apache_beam.yaml.yaml_transform import YamlTransform + + +class FakeEnrichmentTransform: + def __init__(self, enrichment_handler, handler_config, timeout=30): + self._enrichment_handler = enrichment_handler + self._handler_config = handler_config + self._timeout = timeout + + def __call__(self, enrichment_handler, *, handler_config, timeout=30): + assert enrichment_handler == self._enrichment_handler + assert handler_config == self._handler_config + assert timeout == self._timeout + return beam.Map(lambda x: beam.Row(**x._asdict())) + + +class EnrichmentTransformTest(unittest.TestCase): + def test_enrichment_with_bigquery(self): + input_data = [ + Row(label="item1", rank=0), + Row(label="item2", rank=1), + ] + + handler = 'BigQuery' + config = { + "project": "apache-beam-testing", + "table_name": "project.database.table", + "row_restriction_template": "label='item1' or label='item2'", + "fields": ["label"] + } + + with beam.Pipeline() as p: + with mock.patch('apache_beam.yaml.yaml_enrichment.enrichment_transform', + FakeEnrichmentTransform(enrichment_handler=handler, + handler_config=config)): + input_pcoll = p | 'CreateInput' >> beam.Create(input_data) + result = input_pcoll | YamlTransform( + f''' + type: Enrichment + config: + enrichment_handler: {handler} + handler_config: {config} + ''') + assert_that(result, equal_to(input_data)) + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()