Skip to content

Commit

Permalink
feat: Modularization: Schema overrides via string and file (#1840)
Browse files Browse the repository at this point in the history
* Schema overrides via String and file

* Apply spotless

* Fix UTs

* Add UTs and remove redundant toString()

* Spotless

* Add regex for string overrides

* Apply spotless

* Fix regex

* Add NoopSchemaOverridesParser

* Address comments

* Address comments

* Swtich PCRE regex from alnum to graph
  • Loading branch information
manitgupta authored Oct 1, 2024
1 parent e122b9e commit 3b15a14
Show file tree
Hide file tree
Showing 16 changed files with 960 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,11 @@
import com.google.cloud.teleport.v2.datastream.sources.DataStreamIO;
import com.google.cloud.teleport.v2.datastream.utils.DataStreamClient;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NoopSchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaFileOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SchemaStringOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
Expand All @@ -48,6 +52,8 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -485,6 +491,51 @@ public interface Options
String getShardingContextFilePath();

void setShardingContextFilePath(String value);

@TemplateParameter.Text(
order = 30,
optional = true,
description = "Table name overrides from source to spanner",
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
example = "[{Singers, Vocalists}, {Albums, Records}]",
helpText =
"These are the table name overrides from source to spanner. They are written in the"
+ "following format: [{SourceTableName1, SpannerTableName1}, {SourceTableName2, SpannerTableName2}]"
+ "This example shows mapping Singers table to Vocalists and Albums table to Records.")
@Default.String("")
String getTableOverrides();

void setTableOverrides(String value);

@TemplateParameter.Text(
order = 31,
optional = true,
regexes =
"^\\[([[:space:]]*\\{[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*,[[:space:]]*[[:graph:]]+\\.[[:graph:]]+[[:space:]]*\\}[[:space:]]*(,[[:space:]]*)*)*\\]$",
description = "Column name overrides from source to spanner",
example =
"[{Singers.SingerName, Singers.TalentName}, {Albums.AlbumName, Albums.RecordName}]",
helpText =
"These are the column name overrides from source to spanner. They are written in the"
+ "following format: [{SourceTableName1.SourceColumnName1, SourceTableName1.SpannerColumnName1}, {SourceTableName2.SourceColumnName1, SourceTableName2.SpannerColumnName1}]"
+ "Note that the SourceTableName should remain the same in both the source and spanner pair. To override table names, use tableOverrides."
+ "The example shows mapping SingerName to TalentName and AlbumName to RecordName in Singers and Albums table respectively.")
@Default.String("")
String getColumnOverrides();

void setColumnOverrides(String value);

@TemplateParameter.Text(
order = 32,
optional = true,
description = "File based overrides from source to spanner",
helpText =
"A file which specifies the table and the column name overrides from source to spanner.")
@Default.String("")
String getSchemaOverridesFilePath();

void setSchemaOverridesFilePath(String value);
}

private static void validateSourceType(Options options) {
Expand Down Expand Up @@ -682,9 +733,13 @@ public static PipelineResult run(Options options) {
.setCustomParameters(options.getTransformationCustomParameters())
.build();

// Create the overrides mapping.
ISchemaOverridesParser schemaOverridesParser = configureSchemaOverrides(options);

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema,
schemaOverridesParser,
transformationContext,
shardingContext,
options.getDatastreamSourceType(),
Expand Down Expand Up @@ -810,4 +865,30 @@ private static DeadLetterQueueManager buildDlqManager(Options options) {
return DeadLetterQueueManager.create(dlqDirectory, retryDlqUri, 0);
}
}

private static ISchemaOverridesParser configureSchemaOverrides(Options options) {
// incorrect configuration
if (!options.getSchemaOverridesFilePath().isEmpty()
&& (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty())) {
throw new IllegalArgumentException(
"Only one of file based or string based overrides must be configured! Please correct the configuration and re-run the job");
}
// string based overrides
if (!options.getTableOverrides().isEmpty() || !options.getColumnOverrides().isEmpty()) {
Map<String, String> userOptionsOverrides = new HashMap<>();
if (!options.getTableOverrides().isEmpty()) {
userOptionsOverrides.put("tableOverrides", options.getTableOverrides());
}
if (!options.getColumnOverrides().isEmpty()) {
userOptionsOverrides.put("columnOverrides", options.getColumnOverrides());
}
return new SchemaStringOverridesParser(userOptionsOverrides);
}
// file based overrides
if (!options.getSchemaOverridesFilePath().isEmpty()) {
return new SchemaFileOverridesParser(options.getSchemaOverridesFilePath());
}
// no overrides
return new NoopSchemaOverridesParser();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import com.google.cloud.teleport.v2.spanner.migrations.convertors.ChangeEventToMapConvertor;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.DroppedTableException;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.CustomTransformation;
Expand Down Expand Up @@ -75,6 +76,9 @@ public abstract class ChangeEventTransformerDoFn
@Nullable
public abstract Schema schema();

@Nullable
public abstract ISchemaOverridesParser schemaOverridesParser();

@Nullable
public abstract TransformationContext transformationContext();

Expand Down Expand Up @@ -116,6 +120,7 @@ public abstract class ChangeEventTransformerDoFn

public static ChangeEventTransformerDoFn create(
Schema schema,
ISchemaOverridesParser schemaOverridesParser,
TransformationContext transformationContext,
ShardingContext shardingContext,
String sourceType,
Expand All @@ -125,6 +130,7 @@ public static ChangeEventTransformerDoFn create(
SpannerConfig spannerConfig) {
return new AutoValue_ChangeEventTransformerDoFn(
schema,
schemaOverridesParser,
transformationContext,
shardingContext,
sourceType,
Expand All @@ -144,6 +150,7 @@ public void setup() {
changeEventSessionConvertor =
new ChangeEventSessionConvertor(
schema(),
schemaOverridesParser(),
transformationContext(),
shardingContext(),
sourceType(),
Expand All @@ -162,11 +169,17 @@ public void processElement(ProcessContext c) {
Map<String, Object> sourceRecord =
ChangeEventToMapConvertor.convertChangeEventToMap(changeEvent);

// TODO: Transformation via session file should be marked deprecated and removed.
if (!schema().isEmpty()) {
schema().verifyTableInSession(changeEvent.get(EVENT_TABLE_NAME_KEY).asText());
changeEvent = changeEventSessionConvertor.transformChangeEventViaSessionFile(changeEvent);
}

// Perform mapping as per overrides
if (schemaOverridesParser() != null) {
changeEvent = changeEventSessionConvertor.transformChangeEventViaOverrides(changeEvent);
}

changeEvent =
changeEventSessionConvertor.transformChangeEventData(
changeEvent, spannerAccessor.getDatabaseClient(), ddl);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ public void testProcessElementWithNonEmptySchema() throws Exception {

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
Expand Down Expand Up @@ -161,7 +161,7 @@ public void testProcessElementWithCustomTransformation() throws Exception {

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.setDatastreamToSpannerTransformer(spannerMigrationTransformer);
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
Expand Down Expand Up @@ -232,6 +232,7 @@ public void testProcessElementWithCustomTransformationAndTransformationContext()
ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema,
null,
transformationContext,
null,
"mysql",
Expand Down Expand Up @@ -312,6 +313,7 @@ public void testProcessElementWithCustomTransformationAndShardingContext() throw
ChangeEventTransformerDoFn.create(
schema,
null,
null,
shardingContext,
"mysql",
customTransformation,
Expand Down Expand Up @@ -381,7 +383,7 @@ public void testProcessElementWithCustomTransformationFilterEvent() throws Excep

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.setDatastreamToSpannerTransformer(spannerMigrationTransformer);
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
Expand Down Expand Up @@ -440,7 +442,7 @@ public void testProcessElementWithInvalidTransformationException() throws Except

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.setDatastreamToSpannerTransformer(spannerMigrationTransformer);
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
Expand Down Expand Up @@ -483,7 +485,7 @@ public void testProcessElementWithDroppedTableException() throws DroppedTableExc

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.processElement(processContextMock);
verify(processContextMock, times(0)).output(any());
Expand Down Expand Up @@ -526,7 +528,7 @@ public void testProcessElementWithIllegalArgumentException() throws Exception {

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
Expand Down Expand Up @@ -558,7 +560,7 @@ public void testProcessElementWithException() {

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setMapper(mapper);
changeEventTransformerDoFn.processElement(processContextMock);

Expand Down Expand Up @@ -600,7 +602,7 @@ public void testProcessElementWithInvalidChangeEventException() throws Exception

ChangeEventTransformerDoFn changeEventTransformerDoFn =
ChangeEventTransformerDoFn.create(
schema, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
schema, null, null, null, "mysql", customTransformation, false, ddl, spannerConfig);
changeEventTransformerDoFn.setDatastreamToSpannerTransformer(spannerMigrationTransformer);
changeEventTransformerDoFn.setSpannerAccessor(spannerAccessor);
changeEventTransformerDoFn.setChangeEventSessionConvertor(changeEventSessionConvertor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import com.google.cloud.spanner.ResultSet;
import com.google.cloud.spanner.Statement;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.migrations.exceptions.InvalidChangeEventException;
import com.google.cloud.teleport.v2.spanner.migrations.schema.ISchemaOverridesParser;
import com.google.cloud.teleport.v2.spanner.migrations.schema.NameAndCols;
import com.google.cloud.teleport.v2.spanner.migrations.schema.Schema;
import com.google.cloud.teleport.v2.spanner.migrations.schema.SourceColumnDefinition;
Expand All @@ -37,6 +39,7 @@
import com.google.cloud.teleport.v2.spanner.migrations.schema.SyntheticPKey;
import com.google.cloud.teleport.v2.spanner.migrations.shard.ShardingContext;
import com.google.cloud.teleport.v2.spanner.migrations.transformation.TransformationContext;
import com.google.cloud.teleport.v2.spanner.migrations.utils.ChangeEventUtils;
import com.google.cloud.teleport.v2.spanner.type.Type;
import java.util.Iterator;
import java.util.List;
Expand All @@ -54,6 +57,8 @@ public class ChangeEventSessionConvertor {
// The mapping information read from the session file generated by HarbourBridge.
private final Schema schema;

private final ISchemaOverridesParser schemaOverridesParser;

/* The context used to populate transformation information */
private final TransformationContext transformationContext;

Expand All @@ -68,11 +73,13 @@ public class ChangeEventSessionConvertor {

public ChangeEventSessionConvertor(
Schema schema,
ISchemaOverridesParser schemaOverridesParser,
TransformationContext transformationContext,
ShardingContext shardingContext,
String sourceType,
boolean roundJsonDecimals) {
this.schema = schema;
this.schemaOverridesParser = schemaOverridesParser;
this.transformationContext = transformationContext;
this.shardingContext = shardingContext;
this.sourceType = sourceType;
Expand Down Expand Up @@ -209,6 +216,33 @@ JsonNode removeDroppedColumns(JsonNode changeEvent, String tableId) {
return changeEvent;
}

public JsonNode transformChangeEventViaOverrides(JsonNode changeEvent)
throws InvalidChangeEventException {
String sourceTableName = changeEvent.get(EVENT_TABLE_NAME_KEY).asText();
String spTableName = schemaOverridesParser.getTableOverride(sourceTableName);
// Replace the source table name with the overridden spanner table name if the override
// is specified at the table level.
if (!sourceTableName.equals(spTableName)) {
((ObjectNode) changeEvent).put(EVENT_TABLE_NAME_KEY, spTableName);
}
// Get the list of sourceColumnNames from the event
List<String> sourceFieldNames = ChangeEventUtils.getEventColumnKeys(changeEvent);
sourceFieldNames.forEach(
sourceFieldName -> {
String spannerTableColumn =
schemaOverridesParser.getColumnOverride(sourceTableName, sourceFieldName);
// a valid column override for the table in this changeEvent exist
// 1. the table name of the source should match the one specified in the override
// 2. the column name override should be a different value than the current source field
// name.
if (!sourceFieldName.equals(spannerTableColumn)) {
((ObjectNode) changeEvent).set(spannerTableColumn, changeEvent.get(sourceFieldName));
((ObjectNode) changeEvent).remove(sourceFieldName);
}
});
return changeEvent;
}

/**
* This function changes the modifies and data of the change event. Currently, only supports a
* single transformation set by roundJsonDecimals.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.schema;

public interface ISchemaOverridesParser {

/**
* Gets the spanner table name given the source table name, or source table name if no override is
* configured.
*
* @param sourceTableName The source table name
* @return The overridden spanner table name
*/
String getTableOverride(String sourceTableName);

/**
* Gets the spanner column name given the source table name, or the source column name if override
* is configured.
*
* @param sourceTableName the source table name for which column name is overridden
* @param sourceColumnName the source column name being overridden
* @return A pair of spannerTableName and spannerColumnName
*/
String getColumnOverride(String sourceTableName, String sourceColumnName);
}
Loading

0 comments on commit 3b15a14

Please sign in to comment.