diff --git a/v2/pubsub-cdc-to-bigquery/pom.xml b/v2/pubsub-cdc-to-bigquery/pom.xml
index 44564e5134..9d919375fb 100644
--- a/v2/pubsub-cdc-to-bigquery/pom.xml
+++ b/v2/pubsub-cdc-to-bigquery/pom.xml
@@ -38,6 +38,14 @@
common
${project.version}
+
+
+
+ com.google.cloud.teleport
+ it-google-cloud-platform
+ ${project.version}
+ test
+
diff --git a/v2/pubsub-cdc-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQuery.java b/v2/pubsub-cdc-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQuery.java
index 88844e90f3..2b78ccb5b1 100644
--- a/v2/pubsub-cdc-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQuery.java
+++ b/v2/pubsub-cdc-to-bigquery/src/main/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQuery.java
@@ -40,6 +40,7 @@
import com.google.cloud.teleport.v2.utils.DurationUtils;
import com.google.cloud.teleport.v2.utils.ResourceUtils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
+import com.google.common.base.Strings;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
import org.apache.beam.sdk.coders.CoderRegistry;
@@ -168,6 +169,7 @@ public interface Options
@TemplateParameter.Text(
order = 4,
groupName = "Target",
+ optional = true,
description = "BigQuery Dataset Name or Template: dataset_name or {column_name}",
helpText = "The name for the dataset to contain the replica table.")
@Default.String("{_metadata_dataset}")
@@ -219,7 +221,6 @@ public interface Options
description = "Dead Letter Queue Directory",
helpText =
"The name of the directory on Cloud Storage you want to write dead letters messages to")
- @Default.String("")
String getDeadLetterQueueDirectory();
void setDeadLetterQueueDirectory(String value);
@@ -264,6 +265,12 @@ public static void main(String[] args) {
Options options = PipelineOptionsFactory.fromArgs(args).withValidation().as(Options.class);
BigQueryIOUtils.validateBQStorageApiOptionsStreaming(options);
+ if (!Strings.isNullOrEmpty(options.getDeadLetterQueueDirectory())
+ && !Strings.isNullOrEmpty(options.getOutputDeadletterTable())) {
+ throw new IllegalArgumentException(
+ "Cannot specify both deadLetterQueueDirectory and outputDeadletterTable");
+ }
+
run(options);
}
@@ -282,7 +289,7 @@ public static PipelineResult run(Options options) {
DeadLetterQueueManager dlqManager = buildDlqManager(options);
String gcsOutputDateTimeDirectory = null;
- if (options.getDeadLetterQueueDirectory() != null) {
+ if (!Strings.isNullOrEmpty(options.getDeadLetterQueueDirectory())) {
gcsOutputDateTimeDirectory = dlqManager.getRetryDlqDirectory() + "YYYY/MM/DD/HH/mm/";
}
diff --git a/v2/pubsub-cdc-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQueryIT.java b/v2/pubsub-cdc-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQueryIT.java
new file mode 100644
index 0000000000..a5d8cdbd67
--- /dev/null
+++ b/v2/pubsub-cdc-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubSubCdcToBigQueryIT.java
@@ -0,0 +1,236 @@
+/*
+ * Copyright (C) 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+package com.google.cloud.teleport.v2.templates;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.apache.beam.it.gcp.bigquery.matchers.BigQueryAsserts.assertThatBigQueryRecords;
+import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline;
+import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;
+
+import com.google.cloud.bigquery.Field;
+import com.google.cloud.bigquery.LegacySQLTypeName;
+import com.google.cloud.bigquery.Schema;
+import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableResult;
+import com.google.cloud.teleport.metadata.TemplateIntegrationTest;
+import com.google.common.collect.ImmutableMap;
+import com.google.protobuf.ByteString;
+import com.google.pubsub.v1.SubscriptionName;
+import com.google.pubsub.v1.TopicName;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.it.common.PipelineLauncher;
+import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
+import org.apache.beam.it.common.PipelineOperator;
+import org.apache.beam.it.common.utils.ResourceManagerUtils;
+import org.apache.beam.it.gcp.TemplateTestBase;
+import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager;
+import org.apache.beam.it.gcp.bigquery.conditions.BigQueryRowsCheck;
+import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
+import org.apache.commons.lang3.RandomStringUtils;
+import org.json.JSONObject;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@Category(TemplateIntegrationTest.class)
+@TemplateIntegrationTest(PubSubCdcToBigQuery.class)
+@RunWith(JUnit4.class)
+public class PubSubCdcToBigQueryIT extends TemplateTestBase {
+
+ private static final int MESSAGES_COUNT = 10;
+ private static final int BAD_MESSAGES_COUNT = 3;
+
+ private static final Schema BIG_QUERY_DLQ_SCHEMA = getDlqSchema();
+
+ private PubsubResourceManager pubsubResourceManager;
+ private BigQueryResourceManager bigQueryResourceManager;
+
+ @Before
+ public void setUp() throws IOException {
+ pubsubResourceManager =
+ PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build();
+ bigQueryResourceManager =
+ BigQueryResourceManager.builder(testName, PROJECT, credentials).build();
+
+ gcsClient.createArtifact(
+ "udf.js",
+ "function uppercaseName(value) {\n"
+ + " const data = JSON.parse(value);\n"
+ + " data.name = data.name.toUpperCase();\n"
+ + " return JSON.stringify(data);\n"
+ + "}");
+ gcsClient.createArtifact(
+ "cdc_schema.json",
+ "["
+ + " {"
+ + " \"type\": \"INTEGER\","
+ + " \"name\": \"id\","
+ + " \"mode\": \"NULLABLE\""
+ + " }, "
+ + " {"
+ + " \"type\": \"STRING\","
+ + " \"name\":\"job\","
+ + " \"mode\":\"NULLABLE\""
+ + " }, "
+ + " {"
+ + " \"type\": \"String\","
+ + " \"name\": \"name\","
+ + " \"mode\": \"NULLABLE\""
+ + " }"
+ + "]");
+ }
+
+ @After
+ public void cleanUp() {
+ ResourceManagerUtils.cleanResources(pubsubResourceManager, bigQueryResourceManager);
+ }
+
+ @Test
+ public void testSubscriptionToBigQuery() throws IOException {
+ subscriptionToBigQueryBase(false);
+ }
+
+ @Test
+ public void testSubscriptionToBigQueryWithUDf() throws IOException {
+ subscriptionToBigQueryBase(true);
+ }
+
+ public void subscriptionToBigQueryBase(boolean useUdf) throws IOException {
+ // Arrange
+ // Omit name column to ensure table column mapping works properly
+ List bqSchemaFields =
+ Arrays.asList(
+ Field.of("id", StandardSQLTypeName.INT64), Field.of("job", StandardSQLTypeName.STRING));
+ Schema bqSchema = Schema.of(bqSchemaFields);
+
+ TopicName topic = pubsubResourceManager.createTopic("input");
+ SubscriptionName subscription = pubsubResourceManager.createSubscription(topic, "input-sub-1");
+ bigQueryResourceManager.createDataset(REGION);
+ TableId table = bigQueryResourceManager.createTable(testName, bqSchema);
+
+ TableId dlqTable =
+ bigQueryResourceManager.createTable(
+ table.getTable() + PubSubCdcToBigQuery.DEFAULT_DEADLETTER_TABLE_SUFFIX,
+ BIG_QUERY_DLQ_SCHEMA);
+
+ // Act
+ LaunchConfig.Builder launchConfigBuilder =
+ LaunchConfig.builder(testName, specPath)
+ .addParameter("inputSubscription", subscription.toString())
+ .addParameter("schemaFilePath", getGcsPath("cdc_schema.json"))
+ .addParameter("outputDatasetTemplate", table.getDataset())
+ .addParameter("outputTableNameTemplate", table.getTable())
+ .addParameter("outputDeadletterTable", toTableSpecLegacy(dlqTable));
+ if (useUdf) {
+ launchConfigBuilder
+ .addParameter("javascriptTextTransformGcsPath", getGcsPath("udf.js"))
+ .addParameter("javascriptTextTransformFunctionName", "uppercaseName");
+ }
+ PipelineLauncher.LaunchInfo info = launchTemplate(launchConfigBuilder);
+ assertThatPipeline(info).isRunning();
+
+ List