From e12d2b959695a60fcfab4b6811b96de632d5b0cc Mon Sep 17 00:00:00 2001 From: Pablo Rodriguez Defino Date: Fri, 18 Oct 2024 16:18:54 -0700 Subject: [PATCH] implemented transforms to handle cdc mutation info functions for static and dyn destinations --- sdks/python/apache_beam/io/gcp/bigquery.py | 183 +++++++++++++++++---- 1 file changed, 154 insertions(+), 29 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index c01341f2d4d0..dc744453d49c 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -2544,6 +2544,26 @@ class StorageWriteToBigQuery(PTransform): CDC_SQN = "change_sequence_number" # magic string to tell Java that these rows are going to dynamic destinations DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS" + # CDC record input for type hints + CDC_INFO_TYPE_HINT = ( + CDC_INFO, + RowTypeConstraint.from_fields([(CDC_MUTATION_TYPE, str), (CDC_SQN, str)])) + # BQ table schema for CDC related information + CDC_INFO_SCHEMA = { + "name": "row_mutation_info", + "type": "STRUCT", + "fields": [ + # setting both fields are required + { + "name": "mutation_type", "type": "STRING", "mode": "REQUIRED" + }, + { + "name": "change_sequence_number", + "type": "STRING", + "mode": "REQUIRED" + } + ] + } def __init__( self, @@ -2598,7 +2618,9 @@ def expand(self, input): # if writing to one destination, just convert to Beam rows and send over if not callable(table): if is_rows: - input_beam_rows = input + input_beam_rows = ( + input | "Prepare Beam Row" >> self.PrepareBeamRows( + input.element_type, False).with_output_types()) else: input_beam_rows = ( input @@ -2620,16 +2642,8 @@ def expand(self, input): if is_rows: input_beam_rows = ( input_rows - | "Wrap in Beam Row" >> beam.Map( - lambda row: beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row[0], - StorageWriteToBigQuery.RECORD: row[1] - })).with_output_types( - RowTypeConstraint.from_fields([ - (StorageWriteToBigQuery.DESTINATION, str), - (StorageWriteToBigQuery.RECORD, input.element_type) - ]))) + | "Prepare Beam Row" >> self.PrepareBeamRows( + input.element_type, True).with_output_types()) # otherwise, convert to Beam Rows else: input_beam_rows = ( @@ -2639,6 +2653,9 @@ def expand(self, input): # communicate to Java that this write should use dynamic destinations table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS + cdc_writes = self._use_cdc_writes if not callable( + self._use_cdc_writes) else True + output = ( input_beam_rows | SchemaAwareExternalTransform( @@ -2652,7 +2669,7 @@ def expand(self, input): auto_sharding=self._with_auto_sharding, num_streams=self._num_storage_api_streams, use_at_least_once_semantics=self._use_at_least_once, - use_cdc_writes=self._use_cdc_writes, + use_cdc_writes=cdc_writes, primary_key=self._primary_key, error_handling={ 'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS @@ -2676,44 +2693,152 @@ def expand(self, input): failed_rows=failed_rows, failed_rows_with_errors=failed_rows_with_errors) + class PrepareBeamRows(PTransform): + def __init__( + self, + element_type, + dynamic_destinations, + mutation_info_fn: CdcWritesWithRows = None): + self.element_type = element_type + self.dynamic_destinations = dynamic_destinations + self.mutation_info_fn = mutation_info_fn + + def expand(self, input_rows): + if self.dynamic_destinations: + return ( + input_rows + | "Wrap and maybe include CDC info" >> beam.Map( + lambda data_and_dest: self. + maybe_add_mutation_info_dynamic_destination(data_and_dest))) + else: + if callable(self.mutation_info_fn): + return ( + input_rows + | "Wrap and maybe include CDC info" >> beam.Map( + lambda row: beam.Row( + **{ + StorageWriteToBigQuery.RECORD: row, + StorageWriteToBigQuery.CDC_INFO: self. + mutation_info_fn(row) + }))) + else: + return input_rows + + def with_output_types(self): + if self.dynamic_destinations: + fields = [(StorageWriteToBigQuery.DESTINATION, str), + (StorageWriteToBigQuery.RECORD, self.element_type)] + if callable(self.mutation_info_fn): + fields.append(CDC_INFO_TYPE_HINT) + type_hint = RowTypeConstraint.from_fields(fields) + else: + if callable(self.mutation_info_fn): + type_hint = RowTypeConstraint.from_fields([ + (StorageWriteToBigQuery.RECORD, self.element_type), + StorageWriteToBigQuery.CDC_INFO_TYPE_HINT + ]) + else: + type_hint = self.element_type + + return super().with_output_types(type_hint) + + def maybe_add_mutation_info_dynamic_destination(self, row_and_dest): + if callable(self.mutation_info_fn): + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row_and_dest[0], + StorageWriteToBigQuery.RECORD: row_and_dest[1], + StorageWriteToBigQuery.CDC_INFO: self.mutation_info_fn( + row_and_dest[1]) + }) + else: + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: row_and_dest[0], + StorageWriteToBigQuery.RECORD: row_and_dest[1] + }) + class ConvertToBeamRows(PTransform): - def __init__(self, schema, dynamic_destinations): + def __init__( + self, + schema, + dynamic_destinations, + mutation_info_fn: CdcWritesWithDicts = None): self.schema = schema self.dynamic_destinations = dynamic_destinations + self.mutation_info_fn = mutation_info_fn def expand(self, input_dicts): if self.dynamic_destinations: return ( input_dicts | "Convert dict to Beam Row" >> beam.Map( - lambda row: beam.Row( - **{ - StorageWriteToBigQuery.DESTINATION: row[0], - StorageWriteToBigQuery.RECORD: bigquery_tools. - beam_row_from_dict(row[1], self.schema) - }))) + lambda data_with_dest: self. + maybe_add_mutation_info_dynamic_destination(data_with_dest))) else: return ( input_dicts - | "Convert dict to Beam Row" >> beam.Map( - lambda row: bigquery_tools.beam_row_from_dict(row, self.schema)) - ) + | "Convert dict to Beam Row" >> + beam.Map(lambda dict_data: self.maybe_add_mutation_info(dict_data))) def with_output_types(self): row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema( self.schema) if self.dynamic_destinations: - type_hint = RowTypeConstraint.from_fields([ - (StorageWriteToBigQuery.DESTINATION, str), - ( - StorageWriteToBigQuery.RECORD, - RowTypeConstraint.from_fields(row_type_hints)) - ]) + fields = [(StorageWriteToBigQuery.DESTINATION, str), + ( + StorageWriteToBigQuery.RECORD, + RowTypeConstraint.from_fields(row_type_hints))] + if callable(self.mutation_info_fn): + fields.append(CDC_INFO_TYPE_HINT) + type_hint = RowTypeConstraint.from_fields(fields) else: - type_hint = RowTypeConstraint.from_fields(row_type_hints) + if callable(self.mutation_info_fn): + type_hint = RowTypeConstraint.from_fields([ + ( + StorageWriteToBigQuery.RECORD, + RowTypeConstraint.from_fields(row_type_hints)), + StorageWriteToBigQuery.CDC_INFO_TYPE_HINT + ]) + else: + type_hint = RowTypeConstraint.from_fields(row_type_hints) return super().with_output_types(type_hint) + def maybe_add_mutation_info_dynamic_destination(self, data_and_dest): + if callable(self.mutation_info_fn): + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: data_and_dest[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(data_and_dest[1], self.schema), + StorageWriteToBigQuery.CDC_INFO: bigquery_tools. + beam_row_from_dict( + self.mutation_info_fn(data_and_dest[1]), + StorageWriteToBigQuery.CDC_INFO_SCHEMA) + }) + else: + return beam.Row( + **{ + StorageWriteToBigQuery.DESTINATION: data_and_dest[0], + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(data_and_dest[1], self.schema) + }) + + def maybe_add_mutation_info(self, dict_data): + if callable(self.mutation_info_fn): + return beam.Row( + **{ + StorageWriteToBigQuery.RECORD: bigquery_tools. + beam_row_from_dict(dict_data, self.schema), + StorageWriteToBigQuery.CDC_INFO: bigquery_tools. + beam_row_from_dict( + self.mutation_info_fn(dict_data), + StorageWriteToBigQuery.CDC_INFO_SCHEMA) + }) + else: + return bigquery_tools.beam_row_from_dict(dict_data, self.schema) + class ReadFromBigQuery(PTransform): # pylint: disable=line-too-long,W1401