Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support populating Spanner source database id, instance id and a custom metadata field in Spanner CDC to Pub Sub template #1769

Merged
merged 4 commits into from
Sep 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -214,4 +214,29 @@ public interface SpannerChangeStreamsToPubSubOptions extends DataflowPipelineOpt
RpcPriority getRpcPriority();

void setRpcPriority(RpcPriority rpcPriority);

@TemplateParameter.Boolean(
order = 17,
optional = true,
description = "Include spanner database id and instance id in output message",
helpText =
"Whether or not to include the spanner database id and instance id to read the change stream from in the output message data. Defaults to: false")
@Default.Boolean(false)
Boolean getIncludeSpannerSource();

void setIncludeSpannerSource(Boolean value);

@TemplateParameter.Text(
order = 18,
optional = true,
description = "Custom Value for the optional field outputMessageMetadata",
helpText =
"The string value for the custom field outputMessageMetadata in output pub/sub message. "
+ "Defaults to empty and the field outputMessageMetadata is only populated if this "
+ "value is non-empty. Please escape any special characters when entering the value "
+ "here(ie: double quotes).")
@Default.String("")
String getOutputMessageMetadata();

void setOutputMessageMetadata(String value);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger;
import com.google.cloud.teleport.v2.options.SpannerChangeStreamsToPubSubOptions;
import com.google.cloud.teleport.v2.transforms.FileFormatFactorySpannerChangeStreamsToPubSub;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.Pipeline;
Expand Down Expand Up @@ -95,6 +96,14 @@ private static String getPubsubProjectId(SpannerChangeStreamsToPubSubOptions opt
: options.getPubsubProjectId();
}

public static boolean isValidAsciiString(String outputMessageMetadata) {
if (outputMessageMetadata != null
&& !StandardCharsets.US_ASCII.newEncoder().canEncode(outputMessageMetadata)) {
return false;
}
return true;
}

public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
LOG.info("Requested Message Format is " + options.getOutputDataFormat());
options.setStreaming(true);
Expand All @@ -112,6 +121,13 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
String pubsubProjectId = getPubsubProjectId(options);
String pubsubTopicName = options.getPubsubTopic();
String pubsubAPI = options.getPubsubAPI();
Boolean includeSpannerSource = options.getIncludeSpannerSource();
String outputMessageMetadata = options.getOutputMessageMetadata();

// Ensure outputMessageMetadata only contains valid ascii characters
if (!isValidAsciiString(outputMessageMetadata)) {
throw new RuntimeException("outputMessageMetadata contains non ascii characters.");
}

// Retrieve and parse the start / end timestamps.
Timestamp startTimestamp =
Expand Down Expand Up @@ -170,6 +186,10 @@ public static PipelineResult run(SpannerChangeStreamsToPubSubOptions options) {
.setProjectId(pubsubProjectId)
.setPubsubAPI(pubsubAPI)
.setPubsubTopicName(pubsubTopicName)
.setIncludeSpannerSource(includeSpannerSource)
.setSpannerDatabaseId(databaseId)
.setSpannerInstanceId(instanceId)
.setOutputMessageMetadata(outputMessageMetadata)
.build());
return pipeline.run();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import static com.google.cloud.teleport.v2.transforms.WriteDataChangeRecordsToJson.DataChangeRecordToJsonTextFn;

import com.google.auto.value.AutoValue;
import javax.annotation.Nullable;
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.extensions.avro.coders.AvroCoder;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
Expand Down Expand Up @@ -61,6 +62,18 @@ public static WriteToPubSubBuilder newBuilder() {

protected abstract String pubsubTopicName();

@Nullable
protected abstract Boolean includeSpannerSource();

Abacn marked this conversation as resolved.
Show resolved Hide resolved
@Nullable
protected abstract String spannerDatabaseId();

@Nullable
protected abstract String spannerInstanceId();

@Nullable
protected abstract String outputMessageMetadata();

@Override
public PCollection<byte[]> expand(PCollection<DataChangeRecord> records) {
PCollection<byte[]> encodedRecords = null;
Expand All @@ -72,11 +85,17 @@ public PCollection<byte[]> expand(PCollection<DataChangeRecord> records) {
case "AVRO":
AvroCoder<com.google.cloud.teleport.v2.DataChangeRecord> coder =
AvroCoder.of(com.google.cloud.teleport.v2.DataChangeRecord.class);
DataChangeRecordToAvroFn.Builder avroBuilder =
new DataChangeRecordToAvroFn.Builder()
.setOutputMessageMetadata(outputMessageMetadata());
if (includeSpannerSource()) {
avroBuilder
.setSpannerDatabaseId(spannerDatabaseId())
.setSpannerInstanceId(spannerInstanceId());
}
encodedRecords =
records
.apply(
"Write DataChangeRecord into AVRO",
MapElements.via(new DataChangeRecordToAvroFn()))
.apply("Write DataChangeRecord into AVRO", MapElements.via(avroBuilder.build()))
.apply(
"Convert encoded DataChangeRecord in AVRO to bytes to be saved into"
+ " PubsubMessage.",
Expand All @@ -101,11 +120,17 @@ public void processElement(ProcessContext context) {

break;
case "JSON":
DataChangeRecordToJsonTextFn.Builder jsonBuilder =
new DataChangeRecordToJsonTextFn.Builder()
.setOutputMessageMetadata(outputMessageMetadata());
if (includeSpannerSource()) {
jsonBuilder
.setSpannerDatabaseId(spannerDatabaseId())
.setSpannerInstanceId(spannerInstanceId());
}
encodedRecords =
records
.apply(
"Write DataChangeRecord into JSON",
MapElements.via(new DataChangeRecordToJsonTextFn()))
.apply("Write DataChangeRecord into JSON", MapElements.via(jsonBuilder.build()))
.apply(
"Convert encoded DataChangeRecord in JSON to bytes to be saved into"
+ " PubsubMessage.",
Expand Down Expand Up @@ -183,6 +208,14 @@ public abstract static class WriteToPubSubBuilder {

public abstract WriteToPubSubBuilder setPubsubTopicName(String value);

public abstract WriteToPubSubBuilder setIncludeSpannerSource(Boolean value);

public abstract WriteToPubSubBuilder setSpannerDatabaseId(String value);

public abstract WriteToPubSubBuilder setSpannerInstanceId(String value);

public abstract WriteToPubSubBuilder setOutputMessageMetadata(String value);

abstract FileFormatFactorySpannerChangeStreamsToPubSub autoBuild();

public FileFormatFactorySpannerChangeStreamsToPubSub build() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,91 +46,149 @@ public class WriteDataChangeRecordsToAvro {
*/
public static class DataChangeRecordToAvroFn
extends SimpleFunction<DataChangeRecord, com.google.cloud.teleport.v2.DataChangeRecord> {

private String spannerDatabaseId;

private String spannerInstanceId;

private String outputMessageMetadata;

public String spannerDatabaseId() {
return spannerDatabaseId;
}

public String spannerInstanceId() {
return spannerInstanceId;
}

public String outputMessageMetadata() {
return outputMessageMetadata;
}

@Override
public com.google.cloud.teleport.v2.DataChangeRecord apply(DataChangeRecord record) {
return dataChangeRecordToAvro(record);
}

public DataChangeRecordToAvroFn() {}
ShuranZhang marked this conversation as resolved.
Show resolved Hide resolved

private DataChangeRecordToAvroFn(Builder builder) {
this.spannerDatabaseId = builder.spannerDatabaseId;
this.spannerInstanceId = builder.spannerInstanceId;
this.outputMessageMetadata = builder.outputMessageMetadata;
}

public com.google.cloud.teleport.v2.DataChangeRecord dataChangeRecordToAvro(
DataChangeRecord record) {
String partitionToken = record.getPartitionToken();
long commitTimestampMicros = timestampToMicros(record.getCommitTimestamp());
String serverTransactionId = record.getServerTransactionId();
boolean isLastRecordInTransaction = record.isLastRecordInTransactionInPartition();
String recordSequence = record.getRecordSequence();
String tableName = record.getTableName();
List<com.google.cloud.teleport.v2.ColumnType> columnTypes =
record.getRowType().stream()
.map(
columnType ->
new com.google.cloud.teleport.v2.ColumnType(
columnType.getName(),
mapTypeCodeToAvro(columnType.getType()),
columnType.isPrimaryKey(),
columnType.getOrdinalPosition()))
.collect(Collectors.toList());

List<com.google.cloud.teleport.v2.Mod> mods =
record.getMods().stream()
.map(
mod ->
new com.google.cloud.teleport.v2.Mod(
mod.getKeysJson(),
mod.getOldValuesJson() != null ? mod.getOldValuesJson() : "",
mod.getNewValuesJson() != null ? mod.getNewValuesJson() : ""))
.collect(Collectors.toList());

com.google.cloud.teleport.v2.ModType modType = mapModTypeToModTypeAvro(record.getModType());
com.google.cloud.teleport.v2.ValueCaptureType captureType =
mapValueCaptureTypeToAvro(record.getValueCaptureType());
long numberOfRecordsInTransaction = record.getNumberOfRecordsInTransaction();
long numberOfPartitionsInTransaction = record.getNumberOfPartitionsInTransaction();

com.google.cloud.teleport.v2.ChangeStreamRecordMetadata metadata =
record.getMetadata() == null
? null
: new com.google.cloud.teleport.v2.ChangeStreamRecordMetadata(
record.getMetadata().getPartitionToken(),
timestampToMicros(record.getMetadata().getRecordTimestamp()),
timestampToMicros(record.getMetadata().getPartitionStartTimestamp()),
timestampToMicros(record.getMetadata().getPartitionEndTimestamp()),
timestampToMicros(record.getMetadata().getPartitionCreatedAt()),
record.getMetadata().getPartitionScheduledAt() == null
? 0
: timestampToMicros(record.getMetadata().getPartitionScheduledAt()),
record.getMetadata().getPartitionRunningAt() == null
? 0
: timestampToMicros(record.getMetadata().getPartitionRunningAt()),
timestampToMicros(record.getMetadata().getQueryStartedAt()),
timestampToMicros(record.getMetadata().getRecordStreamStartedAt()),
timestampToMicros(record.getMetadata().getRecordStreamEndedAt()),
timestampToMicros(record.getMetadata().getRecordReadAt()),
record.getMetadata().getTotalStreamTimeMillis(),
record.getMetadata().getNumberOfRecordsRead());

// Add ChangeStreamMetadata and spanner source info
return new com.google.cloud.teleport.v2.DataChangeRecord(
partitionToken,
commitTimestampMicros,
serverTransactionId,
isLastRecordInTransaction,
recordSequence,
tableName,
columnTypes,
mods,
modType,
captureType,
numberOfRecordsInTransaction,
numberOfPartitionsInTransaction,
metadata,
// If includeSpannerSource is false, spannerDatabaseId() and spannerInstanceId() are null.
spannerDatabaseId(),
spannerInstanceId(),
outputMessageMetadata());
}

static class Builder {
private String spannerDatabaseId;

private String spannerInstanceId;

private String outputMessageMetadata;

public Builder setSpannerDatabaseId(String value) {
this.spannerDatabaseId = value;
return this;
}

public Builder setSpannerInstanceId(String value) {
this.spannerInstanceId = value;
return this;
}

public Builder setOutputMessageMetadata(String value) {
this.outputMessageMetadata = value;
return this;
}

public DataChangeRecordToAvroFn build() {
return new DataChangeRecordToAvroFn(this);
}
}
}

private static long timestampToMicros(Timestamp ts) {
return TimeUnit.SECONDS.toMicros(ts.getSeconds())
+ TimeUnit.NANOSECONDS.toMicros(ts.getNanos());
}

public static com.google.cloud.teleport.v2.DataChangeRecord dataChangeRecordToAvro(
ShuranZhang marked this conversation as resolved.
Show resolved Hide resolved
DataChangeRecord record) {
String partitionToken = record.getPartitionToken();
long commitTimestampMicros = timestampToMicros(record.getCommitTimestamp());
String serverTransactionId = record.getServerTransactionId();
boolean isLastRecordInTransaction = record.isLastRecordInTransactionInPartition();
String recordSequence = record.getRecordSequence();
String tableName = record.getTableName();
List<com.google.cloud.teleport.v2.ColumnType> columnTypes =
record.getRowType().stream()
.map(
columnType ->
new com.google.cloud.teleport.v2.ColumnType(
columnType.getName(),
mapTypeCodeToAvro(columnType.getType()),
columnType.isPrimaryKey(),
columnType.getOrdinalPosition()))
.collect(Collectors.toList());

List<com.google.cloud.teleport.v2.Mod> mods =
record.getMods().stream()
.map(
mod ->
new com.google.cloud.teleport.v2.Mod(
mod.getKeysJson(),
mod.getOldValuesJson() != null ? mod.getOldValuesJson() : "",
mod.getNewValuesJson() != null ? mod.getNewValuesJson() : ""))
.collect(Collectors.toList());

com.google.cloud.teleport.v2.ModType modType = mapModTypeToModTypeAvro(record.getModType());
com.google.cloud.teleport.v2.ValueCaptureType captureType =
mapValueCaptureTypeToAvro(record.getValueCaptureType());
long numberOfRecordsInTransaction = record.getNumberOfRecordsInTransaction();
long numberOfPartitionsInTransaction = record.getNumberOfPartitionsInTransaction();

com.google.cloud.teleport.v2.ChangeStreamRecordMetadata metadata =
record.getMetadata() == null
? null
: new com.google.cloud.teleport.v2.ChangeStreamRecordMetadata(
record.getMetadata().getPartitionToken(),
timestampToMicros(record.getMetadata().getRecordTimestamp()),
timestampToMicros(record.getMetadata().getPartitionStartTimestamp()),
timestampToMicros(record.getMetadata().getPartitionEndTimestamp()),
timestampToMicros(record.getMetadata().getPartitionCreatedAt()),
record.getMetadata().getPartitionScheduledAt() == null
? 0
: timestampToMicros(record.getMetadata().getPartitionScheduledAt()),
record.getMetadata().getPartitionRunningAt() == null
? 0
: timestampToMicros(record.getMetadata().getPartitionRunningAt()),
timestampToMicros(record.getMetadata().getQueryStartedAt()),
timestampToMicros(record.getMetadata().getRecordStreamStartedAt()),
timestampToMicros(record.getMetadata().getRecordStreamEndedAt()),
timestampToMicros(record.getMetadata().getRecordReadAt()),
record.getMetadata().getTotalStreamTimeMillis(),
record.getMetadata().getNumberOfRecordsRead());

// Add ChangeStreamMetadata
return new com.google.cloud.teleport.v2.DataChangeRecord(
partitionToken,
commitTimestampMicros,
serverTransactionId,
isLastRecordInTransaction,
recordSequence,
tableName,
columnTypes,
mods,
modType,
captureType,
numberOfRecordsInTransaction,
numberOfPartitionsInTransaction,
metadata);
}

private static com.google.cloud.teleport.v2.ModType mapModTypeToModTypeAvro(ModType modType) {
switch (modType) {
case INSERT:
Expand Down
Loading
Loading