Skip to content

Commit

Permalink
implemented transforms to handle cdc mutation info functions for stat…
Browse files Browse the repository at this point in the history
…ic and dyn destinations
  • Loading branch information
prodriguezdefino committed Oct 18, 2024
1 parent c638241 commit e12d2b9
Showing 1 changed file with 154 additions and 29 deletions.
183 changes: 154 additions & 29 deletions sdks/python/apache_beam/io/gcp/bigquery.py
Original file line number Diff line number Diff line change
Expand Up @@ -2544,6 +2544,26 @@ class StorageWriteToBigQuery(PTransform):
CDC_SQN = "change_sequence_number"
# magic string to tell Java that these rows are going to dynamic destinations
DYNAMIC_DESTINATIONS = "DYNAMIC_DESTINATIONS"
# CDC record input for type hints
CDC_INFO_TYPE_HINT = (
CDC_INFO,
RowTypeConstraint.from_fields([(CDC_MUTATION_TYPE, str), (CDC_SQN, str)]))
# BQ table schema for CDC related information
CDC_INFO_SCHEMA = {
"name": "row_mutation_info",
"type": "STRUCT",
"fields": [
# setting both fields are required
{
"name": "mutation_type", "type": "STRING", "mode": "REQUIRED"
},
{
"name": "change_sequence_number",
"type": "STRING",
"mode": "REQUIRED"
}
]
}

def __init__(
self,
Expand Down Expand Up @@ -2598,7 +2618,9 @@ def expand(self, input):
# if writing to one destination, just convert to Beam rows and send over
if not callable(table):
if is_rows:
input_beam_rows = input
input_beam_rows = (
input | "Prepare Beam Row" >> self.PrepareBeamRows(
input.element_type, False).with_output_types())
else:
input_beam_rows = (
input
Expand All @@ -2620,16 +2642,8 @@ def expand(self, input):
if is_rows:
input_beam_rows = (
input_rows
| "Wrap in Beam Row" >> beam.Map(
lambda row: beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: row[0],
StorageWriteToBigQuery.RECORD: row[1]
})).with_output_types(
RowTypeConstraint.from_fields([
(StorageWriteToBigQuery.DESTINATION, str),
(StorageWriteToBigQuery.RECORD, input.element_type)
])))
| "Prepare Beam Row" >> self.PrepareBeamRows(
input.element_type, True).with_output_types())
# otherwise, convert to Beam Rows
else:
input_beam_rows = (
Expand All @@ -2639,6 +2653,9 @@ def expand(self, input):
# communicate to Java that this write should use dynamic destinations
table = StorageWriteToBigQuery.DYNAMIC_DESTINATIONS

cdc_writes = self._use_cdc_writes if not callable(
self._use_cdc_writes) else True

output = (
input_beam_rows
| SchemaAwareExternalTransform(
Expand All @@ -2652,7 +2669,7 @@ def expand(self, input):
auto_sharding=self._with_auto_sharding,
num_streams=self._num_storage_api_streams,
use_at_least_once_semantics=self._use_at_least_once,
use_cdc_writes=self._use_cdc_writes,
use_cdc_writes=cdc_writes,
primary_key=self._primary_key,
error_handling={
'output': StorageWriteToBigQuery.FAILED_ROWS_WITH_ERRORS
Expand All @@ -2676,44 +2693,152 @@ def expand(self, input):
failed_rows=failed_rows,
failed_rows_with_errors=failed_rows_with_errors)

class PrepareBeamRows(PTransform):
def __init__(
self,
element_type,
dynamic_destinations,
mutation_info_fn: CdcWritesWithRows = None):
self.element_type = element_type
self.dynamic_destinations = dynamic_destinations
self.mutation_info_fn = mutation_info_fn

def expand(self, input_rows):
if self.dynamic_destinations:
return (
input_rows
| "Wrap and maybe include CDC info" >> beam.Map(
lambda data_and_dest: self.
maybe_add_mutation_info_dynamic_destination(data_and_dest)))
else:
if callable(self.mutation_info_fn):
return (
input_rows
| "Wrap and maybe include CDC info" >> beam.Map(
lambda row: beam.Row(
**{
StorageWriteToBigQuery.RECORD: row,
StorageWriteToBigQuery.CDC_INFO: self.
mutation_info_fn(row)
})))
else:
return input_rows

def with_output_types(self):
if self.dynamic_destinations:
fields = [(StorageWriteToBigQuery.DESTINATION, str),
(StorageWriteToBigQuery.RECORD, self.element_type)]
if callable(self.mutation_info_fn):
fields.append(CDC_INFO_TYPE_HINT)
type_hint = RowTypeConstraint.from_fields(fields)
else:
if callable(self.mutation_info_fn):
type_hint = RowTypeConstraint.from_fields([
(StorageWriteToBigQuery.RECORD, self.element_type),
StorageWriteToBigQuery.CDC_INFO_TYPE_HINT
])
else:
type_hint = self.element_type

return super().with_output_types(type_hint)

def maybe_add_mutation_info_dynamic_destination(self, row_and_dest):
if callable(self.mutation_info_fn):
return beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: row_and_dest[0],
StorageWriteToBigQuery.RECORD: row_and_dest[1],
StorageWriteToBigQuery.CDC_INFO: self.mutation_info_fn(
row_and_dest[1])
})
else:
return beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: row_and_dest[0],
StorageWriteToBigQuery.RECORD: row_and_dest[1]
})

class ConvertToBeamRows(PTransform):
def __init__(self, schema, dynamic_destinations):
def __init__(
self,
schema,
dynamic_destinations,
mutation_info_fn: CdcWritesWithDicts = None):
self.schema = schema
self.dynamic_destinations = dynamic_destinations
self.mutation_info_fn = mutation_info_fn

def expand(self, input_dicts):
if self.dynamic_destinations:
return (
input_dicts
| "Convert dict to Beam Row" >> beam.Map(
lambda row: beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: row[0],
StorageWriteToBigQuery.RECORD: bigquery_tools.
beam_row_from_dict(row[1], self.schema)
})))
lambda data_with_dest: self.
maybe_add_mutation_info_dynamic_destination(data_with_dest)))
else:
return (
input_dicts
| "Convert dict to Beam Row" >> beam.Map(
lambda row: bigquery_tools.beam_row_from_dict(row, self.schema))
)
| "Convert dict to Beam Row" >>
beam.Map(lambda dict_data: self.maybe_add_mutation_info(dict_data)))

def with_output_types(self):
row_type_hints = bigquery_tools.get_beam_typehints_from_tableschema(
self.schema)
if self.dynamic_destinations:
type_hint = RowTypeConstraint.from_fields([
(StorageWriteToBigQuery.DESTINATION, str),
(
StorageWriteToBigQuery.RECORD,
RowTypeConstraint.from_fields(row_type_hints))
])
fields = [(StorageWriteToBigQuery.DESTINATION, str),
(
StorageWriteToBigQuery.RECORD,
RowTypeConstraint.from_fields(row_type_hints))]
if callable(self.mutation_info_fn):
fields.append(CDC_INFO_TYPE_HINT)
type_hint = RowTypeConstraint.from_fields(fields)
else:
type_hint = RowTypeConstraint.from_fields(row_type_hints)
if callable(self.mutation_info_fn):
type_hint = RowTypeConstraint.from_fields([
(
StorageWriteToBigQuery.RECORD,
RowTypeConstraint.from_fields(row_type_hints)),
StorageWriteToBigQuery.CDC_INFO_TYPE_HINT
])
else:
type_hint = RowTypeConstraint.from_fields(row_type_hints)

return super().with_output_types(type_hint)

def maybe_add_mutation_info_dynamic_destination(self, data_and_dest):
if callable(self.mutation_info_fn):
return beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: data_and_dest[0],
StorageWriteToBigQuery.RECORD: bigquery_tools.
beam_row_from_dict(data_and_dest[1], self.schema),
StorageWriteToBigQuery.CDC_INFO: bigquery_tools.
beam_row_from_dict(
self.mutation_info_fn(data_and_dest[1]),
StorageWriteToBigQuery.CDC_INFO_SCHEMA)
})
else:
return beam.Row(
**{
StorageWriteToBigQuery.DESTINATION: data_and_dest[0],
StorageWriteToBigQuery.RECORD: bigquery_tools.
beam_row_from_dict(data_and_dest[1], self.schema)
})

def maybe_add_mutation_info(self, dict_data):
if callable(self.mutation_info_fn):
return beam.Row(
**{
StorageWriteToBigQuery.RECORD: bigquery_tools.
beam_row_from_dict(dict_data, self.schema),
StorageWriteToBigQuery.CDC_INFO: bigquery_tools.
beam_row_from_dict(
self.mutation_info_fn(dict_data),
StorageWriteToBigQuery.CDC_INFO_SCHEMA)
})
else:
return bigquery_tools.beam_row_from_dict(dict_data, self.schema)


class ReadFromBigQuery(PTransform):
# pylint: disable=line-too-long,W1401
Expand Down

0 comments on commit e12d2b9

Please sign in to comment.