diff --git a/pom.xml b/pom.xml index c718a5d333..496918c625 100644 --- a/pom.xml +++ b/pom.xml @@ -1217,16 +1217,6 @@ 1.2.8 runtime - - com.google.apis - google-api-services-storage - v1-rev20220604-1.32.1 - - - com.google.cloud - google-cloud-storage - 2.8.0 - diff --git a/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature b/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature new file mode 100644 index 0000000000..1fdade3e9f --- /dev/null +++ b/src/e2e-test/features/bigquery/sink/BigQueryToBigQueryAdditional.feature @@ -0,0 +1,454 @@ +# Copyright © 2023 Cask Data, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not +# use this file except in compliance with the License. You may obtain a copy of +# the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations under +# the License. + +@BigQuery_Sink +Feature: BigQuery sink - Verification of BigQuery to BigQuery successful data transfer + + @BQ_UPSERT_SOURCE_TEST @BQ_UPSERT_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario:Validate successful records transfer from BigQuery source to BigQuery sink with Upsert operation by updating destination table schema and destination table exists with records in it. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "upsert" + Then Click on the Add Button of the property: "relationTableKey" with value: + | TableKeyUpsert | + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpsertExpectedFile" + + @BQ_NULL_MODE_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Validate Successful record transfer from BigQuery source plugin to BigQuery sink plugin having all null values in one column and few null values in another column in Source table + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the values of records transferred to BQ sink is equal to the values from source BigQuery table + + @BQ_UPDATE_SOURCE_DEDUPE_TEST @BQ_UPDATE_SINK_DEDUPE_TEST @EXISTING_BQ_CONNECTION + Scenario: Verify successful record transfer from BigQuery source to BigQuery sink using advance operation update with Dedupe By Property. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "update" + Then Enter Value for plugin property table key : "relationTableKey" with values: "relationTableKeyValue" + Then Select dropdown plugin property: "dedupeBy" with option value: "dedupeByOrder" + Then Enter key for plugin property: "dedupeBy" with values: "dedupeByValue" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpdateDedupeExpectedFile" + + @BQ_INSERT_INT_SOURCE_TEST @BQ_EXISTING_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Verify successful record transfer for the Insert operation with partition type Integer and destination table is existing already. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Select BigQuery sink property partitioning type as "INTEGER" + Then Enter input plugin property: "rangeStart" with value: "rangeStartValue" + Then Enter input plugin property: "rangeEnd" with value: "rangeEndValue" + Then Enter input plugin property: "rangeInterval" with value: "rangeIntervalValue" + Then Enter input plugin property: "partitionByField" with value: "partitionByFieldValue" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqInsertExpectedFile" + + @BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is date. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldDate" + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDateExpectedFile" + + @BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is datetime. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldDateTime" + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDateTimeExpectedFile" + + @BQ_TIME_SOURCE_TEST @BQ_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario: Verify successful record transfer for the Insert operation from BigQuery source plugin to BigQuery sink with partition type Time and partition field is timestamp. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + Then Enter input plugin property: "partitionByField" with value: "bqPartitionFieldTimeStamp" + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + Then Close the BigQuery properties + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqTimeStampExpectedFile" + + @BQ_UPSERT_DEDUPE_SOURCE_TEST @BQ_UPSERT_DEDUPE_SINK_TEST @EXISTING_BQ_CONNECTION + Scenario:Validate successful records transfer from BigQuery source to BigQuery sink with Upsert operation with dedupe source data and existing destination table where update table schema is set to false + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + Then Click plugin property: "switch-useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Click plugin property: "useConnection" + Then Click on the Browse Connections button + Then Select connection: "bqConnectionName" + Then Enter input plugin property: "referenceName" with value: "BQSinkReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "upsert" + Then Click on the Add Button of the property: "relationTableKey" with value: + | TableKeyDedupe | + Then Select dropdown plugin property: "dedupeBy" with option value: "dedupeBy" + Then Enter key for plugin property: "dedupeBy" with values: "dedupeByValueUpsert" + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Close the pipeline logs + Then Verify the pipeline status is "Succeeded" + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqUpsertDedupeFile" + + @BQ_RECORD_SOURCE_TEST @BQ_SECOND_RECORD_SOURCE_TEST @BQ_SINK_TEST + Scenario: Validate successful record transfer from two BigQuery source plugins with different schema record names, taking one extra column in BigQuery source plugin 1,and + using wrangler transformation plugin for removing the extra column and transferring the data in BigQuery sink plugin containing all the columns from both the source plugin. + Given Open Datafusion Project to configure pipeline + Then Click on the Plus Green Button to import the pipelines + Then Select the file for importing the pipeline for the plugin "Directive_Drop" + Then Navigate to the properties page of plugin: "BigQuery" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Click on the Validate button + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Replace input plugin property: "table" with value: "bqSourceTable2" + Then Click on the Get Schema button + Then Click on the Validate button + Then Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery3" + Then Replace input plugin property: "project" with value: "projectId" + Then Replace input plugin property: "table" with value: "bqTargetTable" + Then Replace input plugin property: "dataset" with value: "dataset" + Then Click on the Validate button + Then Close the Plugin Properties page + Then Rename the pipeline + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Close the pipeline logs + Then Validate the data transferred from BigQuery to BigQuery with actual And expected file for: "bqDifferentRecordFile" + + @BQ_INSERT_INT_SOURCE_TEST @BQ_INSERT_SINK_TEST @CDAP-20830 + Scenario:Validate successful records transfer from BigQuery to BigQuery with Advanced operations Insert with table existing in both source and sink plugin and update table schema to true. + Given Open Datafusion Project to configure pipeline + When Expand Plugin group in the LHS plugins list: "Source" + When Select plugin: "BigQuery" from the plugins list as: "Source" + When Expand Plugin group in the LHS plugins list: "Sink" + When Select plugin: "BigQuery" from the plugins list as: "Sink" + Then Connect plugins: "BigQuery" and "BigQuery2" to establish connection + Then Navigate to the properties page of plugin: "BigQuery" + And Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + And Replace input plugin property: "datasetProject" with value: "datasetprojectId" + And Replace input plugin property: "referenceName" with value: "reference" + And Replace input plugin property: "dataset" with value: "dataset" + And Replace input plugin property: "table" with value: "bqSourceTable" + Then Click on the Get Schema button + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Navigate to the properties page of plugin: "BigQuery2" + Then Replace input plugin property: "project" with value: "projectId" + Then Override Service account details if set in environment variables + Then Enter input plugin property: "datasetProject" with value: "projectId" + Then Enter input plugin property: "referenceName" with value: "BQReferenceName" + Then Enter input plugin property: "dataset" with value: "dataset" + Then Enter input plugin property: "table" with value: "bqTargetTable" + And Select radio button plugin property: "operation" with value: "insert" + Then Click plugin property: "updateTableSchema" + Then Validate "BigQuery" plugin properties + And Close the Plugin Properties page + Then Save the pipeline + Then Preview and run the pipeline + Then Wait till pipeline preview is in running state + Then Open and capture pipeline preview logs + Then Verify the preview run status of pipeline in the logs is "succeeded" + Then Close the pipeline logs + Then Close the preview + Then Deploy the pipeline + Then Run the Pipeline in Runtime + Then Wait till pipeline is in running state + Then Open and capture logs + Then Verify the pipeline status is "Succeeded" + Then Close the pipeline logs diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java index 2c07f9c50c..b6085ccb1e 100644 --- a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunner.java @@ -27,7 +27,8 @@ features = {"src/e2e-test/features"}, glue = {"io.cdap.plugin.bigquery.stepsdesign", "io.cdap.plugin.gcs.stepsdesign", "stepsdesign", "io.cdap.plugin.common.stepsdesign"}, - tags = {"@BigQuery_Sink"}, + tags = {"@BigQuery_Sink and not @CDAP-20830"}, + //TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830 monochrome = true, plugin = {"pretty", "html:target/cucumber-html-report/bigquery-sink", "json:target/cucumber-reports/cucumber-bigquery-sink.json", diff --git a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java index dea80dd01e..1347921079 100644 --- a/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java +++ b/src/e2e-test/java/io/cdap/plugin/bigquery/runners/sinkrunner/TestRunnerRequired.java @@ -29,6 +29,7 @@ "stepsdesign", "io.cdap.plugin.common.stepsdesign"}, tags = {"@BigQuery_Sink_Required"}, monochrome = true, + //TODO: Enable test once issue is fixed https://cdap.atlassian.net/browse/CDAP-20830 plugin = {"pretty", "html:target/cucumber-html-report/bigquery-sink-required", "json:target/cucumber-reports/cucumber-bigquery-sink-required.json", "junit:target/cucumber-reports/cucumber-bigquery-sink-required.xml"} diff --git a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java index 30d5d9000e..a0a97b8eef 100644 --- a/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java +++ b/src/e2e-test/java/io/cdap/plugin/common/stepsdesign/TestSetupHooks.java @@ -62,6 +62,7 @@ public class TestSetupHooks { public static String gcsTargetBucketName = StringUtils.EMPTY; public static String bqTargetTable = StringUtils.EMPTY; public static String bqSourceTable = StringUtils.EMPTY; + public static String bqSourceTable2 = StringUtils.EMPTY; public static String bqSourceView = StringUtils.EMPTY; public static String pubSubTargetTopic = StringUtils.EMPTY; public static String spannerInstance = StringUtils.EMPTY; @@ -1034,6 +1035,261 @@ public static void createSinkBQExistingDatatypeTable() throws IOException, Inter PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); } + + @Before(order = 1, value = "@BQ_UPSERT_SOURCE_TEST") + public static void createSourceBQUpsertTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(5, 'Raja', 500.0, true)," + + "(6, 'Tom', 100.0, false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_UPSERT_SINK_TEST") + public static void createSinkBQUpsertTable() throws IOException, InterruptedException { + bqTargetTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(5, 'Rakesh', 500.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " created successfully"); + } + + @Before(value = "@BQ_NULL_MODE_SOURCE_TEST") + public static void createNullSourceBQTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(Address STRING, id INT64, Firstname STRING," + + "LastName STRING)"); + try { + BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(Address, id, Firstname, LastName)" + + "VALUES" + "('Agra', 1, 'Harry','')," + + "('Noida', 2, '','')"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(value = "@BQ_UPDATE_SOURCE_DEDUPE_TEST") + public static void createSourceBQDedupeTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, Price FLOAT64, " + + "Customer_Exists BOOL)"); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(Name, ID, Price,Customer_Exists)" + + "VALUES" + "('string_1', 1, 0.1,true)," + + "('string_1', 2, 0.2,false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(value = "@BQ_UPDATE_SINK_DEDUPE_TEST") + public static void createSinkBQDedupeTable() throws IOException, InterruptedException { + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, Name STRING, Price FLOAT64, " + + "Customer_Exists BOOL)"); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(Name, ID, Price,Customer_Exists)" + + "VALUES" + "('string_0', 0, 0,true)," + + "('string_1', 10, 1.1,false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } + + @Before(value = "@BQ_INSERT_INT_SOURCE_TEST") + public static void createSourceBQTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, Price FLOAT64, Customer_Exists BOOL)"); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price,Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100.0, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(order = 1, value = "@BQ_TIME_SOURCE_TEST") + public static void createTimeStampBQTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ source table name - " + bqSourceTable); + BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID STRING, transaction_date DATE, Firstname STRING," + + " transaction_dt DATETIME, updated_on TIMESTAMP )"); + try { + BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, transaction_date, Firstname, transaction_dt, updated_on )" + + "VALUES" + "('Agra', '2021-02-20', 'Neera','2019-07-07 11:24:00', " + + "'2019-03-10 04:50:01 UTC')," + + "('Noida', '2021-02-21','', '2019-07-07 11:24:00', " + + "'2019-03-10 04:50:01 UTC')," + + "('Gurgaon', '2021-02-22', 'singh', '2019-07-07 11:24:00', " + + "'2019-03-10 04:50:01 UTC' )"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " updated successfully"); + } + + @Before(order = 1, value = "@BQ_UPSERT_DEDUPE_SOURCE_TEST") + public static void createSourceBQDedupeUpsertTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(1, 'string_1', 0.1, true)," + + "(2, 'string_1', 0.2, false)," + + "(3, 'string_3', 0.3, false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_UPSERT_DEDUPE_SINK_TEST") + public static void createSinkBQDeupeUpsertTable() throws IOException, InterruptedException { + bqTargetTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, Price, Customer_Exists)" + + "VALUES" + "(0, 'string_0', 0, true)," + + "(10, 'string_1', 1.1, false)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_RECORD_SOURCE_TEST") + public static void createSourceBQRecordTable() throws IOException, InterruptedException { + bqSourceTable = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64," + + "TableName STRING ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable + "` " + + "(ID, Name, Price, TableName)" + + "VALUES" + "(1, 'string_1', 0.1, 'Test')"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable", bqSourceTable); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable + " created successfully"); + } + + @Before(order = 1, value = "@BQ_SECOND_RECORD_SOURCE_TEST") + public static void createSourceBQSecondRecordTable() throws IOException, InterruptedException { + bqSourceTable2 = "E2E_SOURCE_" + UUID.randomUUID().toString().replaceAll("-", "_"); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID INT64, Name STRING, " + "Price FLOAT64 ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqSourceTable2 + "` " + + "(ID, Name, Price)" + + "VALUES" + "(1, 'string_1', 0.1)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp("bqSourceTable2", bqSourceTable2); + BeforeActions.scenario.write("BQ Source Table " + bqSourceTable2 + " created successfully"); + } + + @Before(order = 1, value = "@BQ_INSERT_SINK_TEST") + public static void createSinkBQInsertTable() throws IOException, InterruptedException { + + bqTargetTable = "E2E_TARGET_" + UUID.randomUUID().toString().replaceAll("-", "_"); + PluginPropertyUtils.addPluginProp("bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target table name - " + bqTargetTable); + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("create table `" + datasetName + "." + bqTargetTable + "` " + + "(ID INT64,Name STRING," + + "id_Value INT64, Customer_Exists BOOL ) "); + try { + io.cdap.e2e.utils.BigQueryClient.getSoleQueryResult("INSERT INTO `" + datasetName + "." + bqTargetTable + "` " + + "(ID, Name, id_Value, Customer_Exists)" + + "VALUES" + "(3, 'Rajan Kumar', 100, true)"); + } catch (NoSuchElementException e) { + // Insert query does not return any record. + // Iterator on TableResult values in getSoleQueryResult method throws NoSuchElementException + BeforeActions.scenario.write("Error inserting the record in the table" + e.getStackTrace()); + } + PluginPropertyUtils.addPluginProp(" bqTargetTable", bqTargetTable); + BeforeActions.scenario.write("BQ Target Table " + bqTargetTable + " updated successfully"); + } private static String createGCSBucketLifeCycle() throws IOException, URISyntaxException { String bucketName = StorageClient.createBucketwithLifeCycle("00000000-e2e-" + UUID.randomUUID(), 30).getName(); PluginPropertyUtils.addPluginProp("gcsTargetBucketName", bucketName); diff --git a/src/e2e-test/resources/pluginParameters.properties b/src/e2e-test/resources/pluginParameters.properties index 689ac5be70..81b167fae6 100644 --- a/src/e2e-test/resources/pluginParameters.properties +++ b/src/e2e-test/resources/pluginParameters.properties @@ -223,6 +223,31 @@ bqmtInvalidSinkReferenceName=(*^*&* bqmtInvalidTemporaryBucket=$#%$ bqExpectedFile=testdata/BigQuery/BQExistingTableFile bgInsertDatatypeFile=testdata/BigQuery/BQInsertDatatypeFile +TableKeyUpsert=ID +TableKeyInsert=ID +bqUpsertExpectedFile=testdata/BigQuery/BQUpsertTableFile +bqUpdateDedupeExpectedFile=testdata/BigQuery/BQUpdateDedupeFile +bqInsertExpectedFile=testdata/BigQuery/BQInsertIntFile +relationTableKeyValue=Name +dedupeByOrder=ASC +dedupeByValue=ID +dedupeByValueUpsert=Price +rangeStartValue=2 +rangeEndValue=3 +rangeIntervalValue=1 +partitionByFieldValue=ID +bqPartitionFieldDateTime=transaction_dt +bqPartitionFieldTimeStamp=updated_on +bqSourceTable2=dummy +dedupeBy=DESC +TableKeyDedupe=Name +Directive_Drop=testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json +bqUpsertDedupeFile=testdata/BigQuery/BQUpsertDedupeFile +bqDifferentRecordFile=testdata/BigQuery/BQDifferentRecordNameFile +bqDateExpectedFile=testdata/BigQuery/BQDateFile +bqDateTimeExpectedFile=testdata/BigQuery/BQDateTimeFile +bqTimeStampExpectedFile=testdata/BigQuery/BQTimeStampFile +bqPartitionFieldDate=transaction_date ## BIGQUERY-PLUGIN-PROPERTIES-END ## PUBSUBSINK-PLUGIN-PROPERTIES-START diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDateFile b/src/e2e-test/resources/testdata/BigQuery/BQDateFile new file mode 100644 index 0000000000..9f24705f5c --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQDateFile @@ -0,0 +1,3 @@ +{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile b/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile new file mode 100644 index 0000000000..9f24705f5c --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQDateTimeFile @@ -0,0 +1,3 @@ +{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile b/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile new file mode 100644 index 0000000000..18336cbbd0 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQDifferentRecordNameFile @@ -0,0 +1,2 @@ +{"ID":1,"Name":"string_1","Price":0.1} +{"ID":1,"Name":"string_1","Price":0.1} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile b/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile new file mode 100644 index 0000000000..aeed733779 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQInsertIntFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0} +{"Customer_Exists":true,"ID":3,"Name":"Rajan Kumar","Price":100.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile b/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile new file mode 100644 index 0000000000..9f24705f5c --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQTimeStampFile @@ -0,0 +1,3 @@ +{"Firstname":"singh","ID":"Gurgaon","transaction_date":"2021-02-22","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"Neera","ID":"Agra","transaction_date":"2021-02-20","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} +{"Firstname":"","ID":"Noida","transaction_date":"2021-02-21","transaction_dt":"2019-07-07T11:24:00","updated_on":"2019-03-10T04:50:01Z"} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile b/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile new file mode 100644 index 0000000000..6e2a839642 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQUpdateDedupeFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":0,"Name":"string_0","Price":0.0} +{"Customer_Exists":true,"ID":1,"Name":"string_1","Price":0.1} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile b/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile new file mode 100644 index 0000000000..550a80c916 --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQUpsertDedupeFile @@ -0,0 +1,3 @@ +{"Customer_Exists":false,"ID":3,"Name":"string_3","Price":0.3} +{"Customer_Exists":false,"ID":2,"Name":"string_1","Price":0.2} +{"Customer_Exists":true,"ID":0,"Name":"string_0","Price":0.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile b/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile new file mode 100644 index 0000000000..0b48d57e4d --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/BQUpsertTableFile @@ -0,0 +1,2 @@ +{"Customer_Exists":true,"ID":5,"Name":"Raja","Price":500.0} +{"Customer_Exists":false,"ID":6,"Name":"Tom","Price":100.0} \ No newline at end of file diff --git a/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json b/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json new file mode 100644 index 0000000000..e1b1966d3f --- /dev/null +++ b/src/e2e-test/resources/testdata/BigQuery/test_diffschema_record-cdap-data-pipeline.json @@ -0,0 +1,183 @@ +{ + "name": "test_diffschema_record", + "description": "Data Pipeline Application", + "artifact": { + "name": "cdap-data-pipeline", + "version": "6.10.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "config": { + "resources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "driverResources": { + "memoryMB": 2048, + "virtualCores": 1 + }, + "connections": [ + { + "from": "BigQuery", + "to": "Wrangler" + }, + { + "from": "Wrangler", + "to": "BigQuery3" + }, + { + "from": "BigQuery2", + "to": "BigQuery3" + } + ], + "comments": [], + "postActions": [], + "properties": {}, + "processTimingEnabled": true, + "stageLoggingEnabled": false, + "stages": [ + { + "name": "BigQuery", + "plugin": { + "name": "BigQueryTable", + "type": "batchsource", + "label": "BigQuery", + "artifact": { + "name": "google-cloud", + "version": "0.22.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "useConnection": "false", + "project": "cdf-athena", + "serviceAccountType": "filePath", + "serviceFilePath": "auto-detect", + "referenceName": "bq_ref", + "dataset": "bq_automation", + "table": "bqSourceTableMore", + "enableQueryingViews": "false", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}", + "id": "BigQuery", + "type": "batchsource", + "label": "BigQuery", + "icon": "fa-plug" + }, + { + "name": "Wrangler", + "plugin": { + "name": "Wrangler", + "type": "transform", + "label": "Wrangler", + "artifact": { + "name": "wrangler-transform", + "version": "4.10.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "field": "*", + "precondition": "false", + "directives": "drop :TableName", + "on-error": "fail-pipeline", + "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "workspaceId": "7038fc39-732e-4d75-8d3f-db6cfe5a11d8" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "inputSchema": [ + { + "name": "BigQuery", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]},{\"name\":\"TableName\",\"type\":[\"string\",\"null\"]}]}" + } + ], + "id": "Wrangler", + "type": "transform", + "label": "Wrangler", + "icon": "icon-DataPreparation" + }, + { + "name": "BigQuery3", + "plugin": { + "name": "BigQueryTable", + "type": "batchsink", + "label": "BigQuery3", + "artifact": { + "name": "google-cloud", + "version": "0.22.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "useConnection": "false", + "project": "auto-detect", + "serviceAccountType": "filePath", + "serviceFilePath": "auto-detect", + "dataset": "bq_automation", + "table": "New_target_table_combine", + "operation": "insert", + "truncateTable": "false", + "allowSchemaRelaxation": "false", + "location": "US", + "createPartitionedTable": "false", + "partitioningType": "TIME", + "partitionFilterRequired": "false", + "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "inputSchema": [ + { + "name": "Wrangler", + "schema": "{\"type\":\"record\",\"name\":\"record\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + }, + { + "name": "BigQuery2", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + } + ], + "id": "BigQuery3", + "type": "batchsink", + "label": "BigQuery3", + "icon": "fa-plug" + }, + { + "name": "BigQuery2", + "plugin": { + "name": "BigQueryTable", + "type": "batchsource", + "label": "BigQuery2", + "artifact": { + "name": "google-cloud", + "version": "0.22.0-SNAPSHOT", + "scope": "SYSTEM" + }, + "properties": { + "useConnection": "false", + "project": "cdf-athena", + "serviceAccountType": "filePath", + "serviceFilePath": "auto-detect", + "referenceName": "bq_test", + "dataset": "bq_automation", + "table": "bqSourceTableLess", + "enableQueryingViews": "false", + "schema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}" + } + }, + "outputSchema": "{\"type\":\"record\",\"name\":\"output\",\"fields\":[{\"name\":\"ID\",\"type\":[\"long\",\"null\"]},{\"name\":\"Name\",\"type\":[\"string\",\"null\"]},{\"name\":\"Price\",\"type\":[\"double\",\"null\"]}]}", + "id": "BigQuery2", + "type": "batchsource", + "label": "BigQuery2", + "icon": "fa-plug" + } + ], + "schedule": "0 1 */1 * *", + "engine": "spark", + "numOfRecordsPreview": 100, + "rangeRecordsPreview": { + "min": 1, + "max": "5000" + }, + "maxConcurrentRuns": 1 + }, + "version": "fe4ee1e3-6380-11ee-8217-0000003390c8" +} \ No newline at end of file