Skip to content

Commit

Permalink
Merge pull request #1662 from Polber:jkinard/pubsub-cdc
Browse files Browse the repository at this point in the history
PiperOrigin-RevId: 644073615
  • Loading branch information
cloud-teleport committed Jun 17, 2024
2 parents 83966e1 + 0fb1013 commit 66f4322
Show file tree
Hide file tree
Showing 3 changed files with 253 additions and 2 deletions.
8 changes: 8 additions & 0 deletions v2/pubsub-cdc-to-bigquery/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
<artifactId>common</artifactId>
<version>${project.version}</version>
</dependency>

<!-- Test -->
<dependency>
<groupId>com.google.cloud.teleport</groupId>
<artifactId>it-google-cloud-platform</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
</dependencies>

</project>
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
Expand Down Expand Up @@ -168,6 +169,7 @@ public interface Options
@TemplateParameter.Text(
order = 4,
groupName = "Target",
optional = true,
description = "BigQuery Dataset Name or Template: dataset_name or {column_name}",
helpText = "The name for the dataset to contain the replica table.")
@Default.String("{_metadata_dataset}")
Expand Down Expand Up @@ -219,7 +221,6 @@ public interface Options
description = "Dead Letter Queue Directory",
helpText =
"The name of the directory on Cloud Storage you want to write dead letters messages to")
@Default.String("")
String getDeadLetterQueueDirectory();

void setDeadLetterQueueDirectory(String value);
Expand Down Expand Up @@ -264,6 +265,12 @@ public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);

if (!Strings.isNullOrEmpty(options.getDeadLetterQueueDirectory())
&& !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
throw new IllegalArgumentException(
"Cannot specify both deadLetterQueueDirectory and outputDeadletterTable");
}

run(options);
}

Expand All @@ -282,7 +289,7 @@ public static PipelineResult run(Options options) {
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String gcsOutputDateTimeDirectory = null;

if (options.getDeadLetterQueueDirectory() != null) {
if (!Strings.isNullOrEmpty(options.getDeadLetterQueueDirectory())) {
gcsOutputDateTimeDirectory = dlqManager.getRetryDlqDirectory() + "YYYY/MM/DD/HH/mm/";
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
* Copyright (C) 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates;

import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.gcp.bigquery.matchers.BigQueryAsserts.assertThatBigQueryRecords;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.Schema;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.TableResult;
import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
import com.google.common.collect.ImmutableMap;
import com.google.protobuf.ByteString;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.TemplateTestBase;
import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager;
import org.apache.beam.it.gcp.bigquery.conditions.BigQueryRowsCheck;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.commons.lang3.RandomStringUtils;
import org.json.JSONObject;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;

@Category(TemplateIntegrationTest.class)
@TemplateIntegrationTest(PubSubCdcToBigQuery.class)
@RunWith(JUnit4.class)
public class PubSubCdcToBigQueryIT extends TemplateTestBase {

private static final int MESSAGES_COUNT = 10;
private static final int BAD_MESSAGES_COUNT = 3;

private static final Schema BIG_QUERY_DLQ_SCHEMA = getDlqSchema();

private PubsubResourceManager pubsubResourceManager;
private BigQueryResourceManager bigQueryResourceManager;

@Before
public void setUp() throws IOException {
pubsubResourceManager =
PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build();
bigQueryResourceManager =
BigQueryResourceManager.builder(testName, PROJECT, credentials).build();

gcsClient.createArtifact(
"udf.js",
"function uppercaseName(value) {\n"
+ " const data = JSON.parse(value);\n"
+ " data.name = data.name.toUpperCase();\n"
+ " return JSON.stringify(data);\n"
+ "}");
gcsClient.createArtifact(
"cdc_schema.json",
"["
+ " {"
+ " \"type\": \"INTEGER\","
+ " \"name\": \"id\","
+ " \"mode\": \"NULLABLE\""
+ " }, "
+ " {"
+ " \"type\": \"STRING\","
+ " \"name\":\"job\","
+ " \"mode\":\"NULLABLE\""
+ " }, "
+ " {"
+ " \"type\": \"String\","
+ " \"name\": \"name\","
+ " \"mode\": \"NULLABLE\""
+ " }"
+ "]");
}

@After
public void cleanUp() {
ResourceManagerUtils.cleanResources(pubsubResourceManager, bigQueryResourceManager);
}

@Test
public void testSubscriptionToBigQuery() throws IOException {
subscriptionToBigQueryBase(false);
}

@Test
public void testSubscriptionToBigQueryWithUDf() throws IOException {
subscriptionToBigQueryBase(true);
}

public void subscriptionToBigQueryBase(boolean useUdf) throws IOException {
// Arrange
// Omit name column to ensure table column mapping works properly
List<Field> bqSchemaFields =
Arrays.asList(
Field.of("id", StandardSQLTypeName.INT64), Field.of("job", StandardSQLTypeName.STRING));
Schema bqSchema = Schema.of(bqSchemaFields);

TopicName topic = pubsubResourceManager.createTopic("input");
SubscriptionName subscription = pubsubResourceManager.createSubscription(topic, "input-sub-1");
bigQueryResourceManager.createDataset(REGION);
TableId table = bigQueryResourceManager.createTable(testName, bqSchema);

TableId dlqTable =
bigQueryResourceManager.createTable(
table.getTable() + PubSubCdcToBigQuery.DEFAULT_DEADLETTER_TABLE_SUFFIX,
BIG_QUERY_DLQ_SCHEMA);

// Act
LaunchConfig.Builder launchConfigBuilder =
LaunchConfig.builder(testName, specPath)
.addParameter("inputSubscription", subscription.toString())
.addParameter("schemaFilePath", getGcsPath("cdc_schema.json"))
.addParameter("outputDatasetTemplate", table.getDataset())
.addParameter("outputTableNameTemplate", table.getTable())
.addParameter("outputDeadletterTable", toTableSpecLegacy(dlqTable));
if (useUdf) {
launchConfigBuilder
.addParameter("javascriptTextTransformGcsPath", getGcsPath("udf.js"))
.addParameter("javascriptTextTransformFunctionName", "uppercaseName");
}
PipelineLauncher.LaunchInfo info = launchTemplate(launchConfigBuilder);
assertThatPipeline(info).isRunning();

List<Map<String, Object>> expectedMessages = new ArrayList<>();
for (int i = 1; i <= MESSAGES_COUNT; i++) {
Map<String, Object> message =
new HashMap<>(
Map.of(
"id",
i,
"job",
testName,
"name",
RandomStringUtils.randomAlphabetic(1, 20).toLowerCase()));
ByteString messageData = ByteString.copyFromUtf8(new JSONObject(message).toString());
pubsubResourceManager.publish(topic, ImmutableMap.of(), messageData);
if (useUdf) {
message.put("name", message.get("name").toString().toUpperCase());
}
expectedMessages.add(message);
}

for (int i = 1; i <= BAD_MESSAGES_COUNT; i++) {
ByteString messageData = ByteString.copyFromUtf8("bad id " + i);
pubsubResourceManager.publish(topic, ImmutableMap.of(), messageData);
}

PipelineOperator.Result result =
pipelineOperator()
.waitForConditionsAndFinish(
createConfig(info),
BigQueryRowsCheck.builder(bigQueryResourceManager, table)
.setMinRows(MESSAGES_COUNT)
.build(),
BigQueryRowsCheck.builder(bigQueryResourceManager, dlqTable)
.setMinRows(BAD_MESSAGES_COUNT)
.build());

// Assert
assertThatResult(result).meetsConditions();
TableResult records = bigQueryResourceManager.readTable(table);

// Make sure record can be read and UDF changed name to uppercase
assertThatBigQueryRecords(records).hasRecordsUnordered(expectedMessages);

TableResult dlqRecords = bigQueryResourceManager.readTable(dlqTable);
if (useUdf) {
assertThat(dlqRecords.getValues().iterator().next().toString())
.contains("Expected json literal but found");
} else {
assertThat(dlqRecords.getValues().iterator().next().toString())
.contains("Failed to serialize json to table row");
}
}

private static Schema getDlqSchema() {
return Schema.of(
Arrays.asList(
Field.newBuilder("timestamp", StandardSQLTypeName.TIMESTAMP)
.setMode(Field.Mode.REQUIRED)
.build(),
Field.newBuilder("payloadString", StandardSQLTypeName.STRING)
.setMode(Field.Mode.REQUIRED)
.build(),
Field.newBuilder("payloadBytes", StandardSQLTypeName.BYTES)
.setMode(Field.Mode.REQUIRED)
.build(),
Field.newBuilder(
"attributes",
LegacySQLTypeName.RECORD,
Field.newBuilder("key", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build(),
Field.newBuilder("value", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build())
.setMode(Field.Mode.REPEATED)
.build(),
Field.newBuilder("errorMessage", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build(),
Field.newBuilder("stacktrace", StandardSQLTypeName.STRING)
.setMode(Field.Mode.NULLABLE)
.build()));
}
}

0 comments on commit 66f4322

Please sign in to comment.