diff --git a/.github/actions/publish-site-report/action.yml b/.github/actions/publish-site-report/action.yml new file mode 100644 index 00000000000..9489fd3b386 --- /dev/null +++ b/.github/actions/publish-site-report/action.yml @@ -0,0 +1,39 @@ +# Copyright 2024 Google LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Cleans up the Java environment. This is mostly intended to help keep the +# Java cache a bit smaller by removing unnecessary dependencies or stuff that +# we will always rebuild anyways. + +name: 'Publish Site Report' +description: 'Publish site report' + +inputs: + output-zip-file: + description: 'Output file name' + type: string + required: true + +runs: + using: 'composite' + steps: + - name: Generate Site Report + shell: bash + run: mvn -B surefire-report:report-only -f pom.xml -Daggregate=true + - name: Publish Site Report + uses: actions/upload-artifact@65462800fd760344b1a7b4382951275a0abb4808 # v3.1.2 + with: + name: ${{ inputs.output-zip-file }} + path: 'target/site/' + retention-days: 7 \ No newline at end of file diff --git a/.github/workflows/java-pr.yml b/.github/workflows/java-pr.yml index 9dd38de7fd3..8e36daae5d4 100644 --- a/.github/workflows/java-pr.yml +++ b/.github/workflows/java-pr.yml @@ -113,10 +113,9 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} slug: GoogleCloudPlatform/DataflowTemplates files: 'target/site/jacoco-aggregate/jacoco.xml' - # Temp fix for https://github.com/codecov/codecov-action/issues/1487 - version: v0.6.0 - name: Cleanup Java Environment uses: ./.github/actions/cleanup-java-env + if: always() java_integration_smoke_tests_templates: name: Dataflow Templates Integration Smoke Tests needs: [spotless_check, checkstyle_check, java_build, java_unit_tests] @@ -145,6 +144,7 @@ jobs: retention-days: 1 - name: Cleanup Java Environment uses: ./.github/actions/cleanup-java-env + if: always() java_integration_tests_templates: name: Dataflow Templates Integration Tests needs: [java_integration_smoke_tests_templates] @@ -173,6 +173,7 @@ jobs: retention-days: 1 - name: Cleanup Java Environment uses: ./.github/actions/cleanup-java-env + if: always() java_load_tests_templates: if: contains(github.event.pull_request.labels.*.name, 'run-load-tests') name: Dataflow Templates Load Tests @@ -195,3 +196,4 @@ jobs: --it-private-connectivity="datastream-private-connect-us-central1" - name: Cleanup Java Environment uses: ./.github/actions/cleanup-java-env + if: always() diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 025724543e5..515dabee7cb 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -21,99 +21,195 @@ permissions: contents: write jobs: - release: + create_release_branch: name: Create Release - runs-on: [self-hosted, release] + runs-on: ubuntu-latest + outputs: + release_branch_name: ${{ steps.variables.outputs.release_branch_name }} + tag_name: ${{ steps.variables.outputs.tag_name }} steps: - - name: Get releaser identity - run: | - git config --global user.name '${{github.actor}}' - git config --global user.email '${{github.actor}}@users.noreply.github.com' - - name: Declare release branch name and tag name - id: variables - run: | - echo "releaseBranchName=release_${CANDIDATE_NAME,,}" >> $GITHUB_OUTPUT - echo "tagName=${CANDIDATE_NAME^^}" >> $GITHUB_OUTPUT - env: - CANDIDATE_NAME: ${{ inputs.candidateName }} - - name: Checkout code - uses: actions/checkout@c85c95e3d7251135ab7dc9ce3241c5835cc595a9 # v3.5.3 - with: - fetch-depth: 0 - token: ${{ secrets.RELEASE_TOKEN }} - - name: Create release branch - run: git checkout -b $RELEASE_BRANCH_NAME $BRANCH_COMMIT - env: - RELEASE_BRANCH_NAME: ${{ steps.variables.outputs.releaseBranchName }} - BRANCH_COMMIT: ${{ inputs.branchCommit }} - - name: Cherry pick commits - run: | - commits=$(echo $CHERRYPICK_COMMITS | tr "," "\n") - for commit in $commits - do - echo "Cherry picking $commit." - git cherry-pick $commit - done - env: - CHERRYPICK_COMMITS: ${{ inputs.cherrypickCommits }} - - name: Add tag to most recent commit - run: | - DATE=$(date -d"next-monday - 1week" +'%Y-%m-%d') - T_COMMIT=$(git log -n 1 $RELEASE_BRANCH_NAME --pretty=format:'%H') - git tag -a $TAG_NAME -m "Release week of $DATE" $T_COMMIT - env: - RELEASE_BRANCH_NAME: ${{ steps.variables.outputs.releaseBranchName }} - TAG_NAME: ${{ steps.variables.outputs.tagName }} - - name: Setup Environment - id: setup-env - uses: ./.github/actions/setup-env - - name: Run Build - run: ./cicd/run-build - - name: Run Unit Tests - run: ./cicd/run-unit-tests - - name: Run Integration Smoke Tests - run: | - ./cicd/run-it-smoke-tests \ - --it-region="us-central1" \ - --it-project="cloud-teleport-testing" \ - --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ - --it-private-connectivity="datastream-private-connect-us-central1" \ - --it-spanner-host="https://staging-wrenchworks.sandbox.googleapis.com/" \ - --it-release=true \ - --it-retry-failures=2 - - name: Run Integration Tests - run: | - ./cicd/run-it-tests \ - --it-region="us-central1" \ - --it-project="cloud-teleport-testing" \ - --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ - --it-private-connectivity="datastream-private-connect-us-central1" \ - --it-spanner-host="https://staging-wrenchworks.sandbox.googleapis.com/" \ - --it-release=true \ - --it-retry-failures=2 - - name: Upload Tests Report - uses: actions/upload-artifact@0b7f8abb1508181956e8e162db84b466c27e18ce # v3.1.2 - if: always() # always run even if previous step fails - with: - name: surefire-test-results - path: '**/surefire-reports/TEST-*.xml' - retention-days: 1 - - name: Create artifacts and push - run: | - mvn verify -PtemplatesRelease \ - -DprojectId="dataflow-templates" \ - -DbucketName="dataflow-templates-staging" \ - -DlibrariesBucketName="dataflow-templates-libraries" \ - -DstagePrefix="${CANDIDATE_NAME}" \ - -Dmaven.test.skip -T8 -e - env: - CANDIDATE_NAME: ${{ inputs.candidateName }} - - name: Push tags - run: | - git push -u origin --tags - - name: Release - run: | - gh release create $TAG_NAME --title "Dataflow Templates $TAG_NAME" --notes "" - env: - GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} - TAG_NAME: ${{ steps.variables.outputs.tagName }} + - name: Get releaser identity + run: | + git config --global user.name '${{github.actor}}' + git config --global user.email '${{github.actor}}@users.noreply.github.com' + - name: Declare release branch name and tag name + id: variables + run: | + echo "release_branch_name=release_${CANDIDATE_NAME,,}" >> $GITHUB_OUTPUT + echo "tag_name=${CANDIDATE_NAME^^}" >> $GITHUB_OUTPUT + env: + CANDIDATE_NAME: ${{ inputs.candidateName }} + - name: Checkout code + uses: actions/checkout@1e31de5234b9f8995739874a8ce0492dc87873e2 # v4.0.0 + with: + fetch-depth: 0 + token: ${{ secrets.RELEASE_TOKEN }} + - name: Create release branch + run: git checkout -b $RELEASE_BRANCH_NAME $BRANCH_COMMIT + env: + RELEASE_BRANCH_NAME: ${{ steps.variables.outputs.release_branch_name }} + BRANCH_COMMIT: ${{ inputs.branchCommit }} + - name: Cherry pick commits + run: | + commits=$(echo $CHERRYPICK_COMMITS | tr "," "\n") + for commit in $commits + do + echo "Cherry picking $commit." + git cherry-pick $commit + done + env: + CHERRYPICK_COMMITS: ${{ inputs.cherrypickCommits }} + - name: Push release branch + run: | + git push origin --delete $RELEASE_BRANCH_NAME || true + git push -u origin $RELEASE_BRANCH_NAME + env: + RELEASE_BRANCH_NAME: ${{ steps.variables.outputs.release_branch_name }} + java_unit_tests: + name: Unit Tests + needs: create_release_branch + timeout-minutes: 60 + runs-on: [ self-hosted, it ] + steps: + - name: Checkout code + uses: actions/checkout@1e31de5234b9f8995739874a8ce0492dc87873e2 # v4.0.0 + with: + ref: ${{ needs.create_release_branch.outputs.release_branch_name }} + fetch-depth: 0 + token: ${{ secrets.RELEASE_TOKEN }} + - name: Setup Environment + id: setup-env + uses: ./.github/actions/setup-env + - name: Run Unit Tests + run: ./cicd/run-unit-tests + - name: Upload Unit Tests Site Report + uses: ./.github/actions/publish-site-report + with: + output-zip-file: unit-test-report + if: always() + - name: Cleanup Java Environment + uses: ./.github/actions/cleanup-java-env + if: always() + java_integration_smoke_tests_templates: + name: Dataflow Templates Integration Smoke Tests + needs: create_release_branch + timeout-minutes: 60 + # Run on any runner that matches all the specified runs-on values. + runs-on: [ self-hosted, it ] + steps: + - name: Checkout code + uses: actions/checkout@1e31de5234b9f8995739874a8ce0492dc87873e2 # v4.0.0 + with: + ref: ${{ needs.create_release_branch.outputs.release_branch_name }} + fetch-depth: 0 + token: ${{ secrets.RELEASE_TOKEN }} + - name: Setup Environment + id: setup-env + uses: ./.github/actions/setup-env + - name: Run Integration Smoke Tests + run: | + ./cicd/run-it-smoke-tests \ + --changed-files="pom.xml" \ + --it-region="us-central1" \ + --it-project="cloud-teleport-testing" \ + --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ + --it-private-connectivity="datastream-private-connect-us-central1" \ + --it-release=true \ + --it-retry-failures=2 + - name: Publish Smoke Test Site Report + uses: ./.github/actions/publish-site-report + with: + output-zip-file: smoke-test-report + if: always() + - name: Cleanup Java Environment + uses: ./.github/actions/cleanup-java-env + if: always() + java_integration_tests_templates: + name: Dataflow Templates Integration Tests + needs: create_release_branch + timeout-minutes: 240 + # Run on any runner that matches all the specified runs-on values. + runs-on: [ self-hosted, it ] + steps: + - name: Checkout code + uses: actions/checkout@1e31de5234b9f8995739874a8ce0492dc87873e2 # v4.0.0 + with: + ref: ${{ needs.create_release_branch.outputs.release_branch_name }} + fetch-depth: 0 + token: ${{ secrets.RELEASE_TOKEN }} + - name: Setup Environment + id: setup-env + uses: ./.github/actions/setup-env + - name: Run Integration Tests + shell: bash + run: | + ./cicd/run-it-tests \ + --changed-files="pom.xml" \ + --it-region="us-central1" \ + --it-project="cloud-teleport-testing" \ + --it-artifact-bucket="cloud-teleport-testing-it-gitactions" \ + --it-private-connectivity="datastream-private-connect-us-central1" \ + --it-release=true \ + --it-retry-failures=2 + - name: Publish Integration Test Site Report + uses: ./.github/actions/publish-site-report + with: + output-zip-file: integration-test-report + if: always() + - name: Cleanup Java Environment + uses: ./.github/actions/cleanup-java-env + if: always() + upload_artifacts: + name: Upload Template Artifacts + needs: [ create_release_branch, java_unit_tests, java_integration_smoke_tests_templates, java_integration_tests_templates ] + runs-on: [ self-hosted, it ] + steps: + - name: Checkout code + uses: actions/checkout@1e31de5234b9f8995739874a8ce0492dc87873e2 # v4.0.0 + with: + ref: ${{ needs.create_release_branch.outputs.release_branch_name }} + fetch-depth: 0 + token: ${{ secrets.RELEASE_TOKEN }} + - name: Create artifacts and push + run: | + ./cicd/run-release \ + --it-project="dataflow-templates" \ + --release-bucket-name="dataflow-templates-staging" \ + --release-libraries-bucket-name="dataflow-templates-libraries" \ + --release-stage-prefix="${CANDIDATE_NAME}" + env: + CANDIDATE_NAME: ${{ inputs.candidateName }} + - name: Cleanup Java Environment + uses: ./.github/actions/cleanup-java-env + if: always() + release_github: + name: Release on GitHub + needs: [ create_release_branch, upload_artifacts ] + runs-on: ubuntu-latest + steps: + - name: Checkout code + uses: actions/checkout@1e31de5234b9f8995739874a8ce0492dc87873e2 # v4.0.0 + with: + ref: ${{ needs.create_release_branch.outputs.release_branch_name }} + fetch-depth: 0 + token: ${{ secrets.RELEASE_TOKEN }} + - name: Add tag to most recent commit + run: | + DATE=$(date -d"next-monday - 1week" +'%Y-%m-%d') + T_COMMIT=$(git log -n 1 $RELEASE_BRANCH_NAME --pretty=format:'%H') + git tag -a $TAG_NAME -m "Release week of $DATE" $T_COMMIT + env: + RELEASE_BRANCH_NAME: ${{ needs.create_release_branch.outputs.release_branch_name }} + TAG_NAME: ${{ needs.create_release_branch.outputs.tag_name }} + - name: Push tags + run: | + git push --delete origin $TAG_NAME || true + git push -u origin --tags + env: + TAG_NAME: ${{ needs.create_release_branch.outputs.tag_name }} + - name: Release + run: | + gh release create $TAG_NAME --title "Dataflow Templates $TAG_NAME" --notes "" + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + TAG_NAME: ${{ needs.create_release_branch.outputs.tag_name }} diff --git a/.github/workflows/spanner-pr.yml b/.github/workflows/spanner-pr.yml index feb5182d0f8..a6722035ec0 100644 --- a/.github/workflows/spanner-pr.yml +++ b/.github/workflows/spanner-pr.yml @@ -116,8 +116,6 @@ jobs: token: ${{ secrets.CODECOV_TOKEN }} slug: GoogleCloudPlatform/DataflowTemplates files: 'target/site/jacoco-aggregate/jacoco.xml' - # Temp fix for https://github.com/codecov/codecov-action/issues/1487 - version: v0.6.0 - name: Cleanup Java Environment uses: ./.github/actions/cleanup-java-env java_integration_smoke_tests_templates: diff --git a/cicd/cmd/run-release/main.go b/cicd/cmd/run-release/main.go new file mode 100644 index 00000000000..c64b368ffc6 --- /dev/null +++ b/cicd/cmd/run-release/main.go @@ -0,0 +1,64 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package main + +import ( + "flag" + "log" + + "github.com/GoogleCloudPlatform/DataflowTemplates/cicd/internal/flags" + "github.com/GoogleCloudPlatform/DataflowTemplates/cicd/internal/workflows" +) + +func main() { + flags.RegisterCommonFlags() + flags.RegisterItFlags() + flags.RegisterReleaseFlags() + flag.Parse() + + // Run mvn install before running release + mvnFlags := workflows.NewMavenFlags() + err := workflows.MvnCleanInstall().Run( + mvnFlags.IncludeDependencies(), + mvnFlags.IncludeDependents(), + mvnFlags.SkipDependencyAnalysis(), + mvnFlags.SkipCheckstyle(), + mvnFlags.SkipJib(), + mvnFlags.SkipTests(), + mvnFlags.SkipJacoco(), + mvnFlags.SkipShade(), + mvnFlags.ThreadCount(8)) + if err != nil { + log.Fatalf("%v\n", err) + } + + // Run release + mvnFlags = workflows.NewMavenFlags() + err = workflows.MvnVerify().Run( + mvnFlags.RunRelease(), + mvnFlags.SkipTests(), + mvnFlags.ThreadCount(8), + mvnFlags.ProduceErrors(), + flags.Project(), + flags.BucketName(), + flags.LibrariesBucketName(), + flags.StagePrefix()) + if err != nil { + log.Fatalf("%v\n", err) + } + log.Println("Release Successful!") +} diff --git a/cicd/internal/flags/it-flags.go b/cicd/internal/flags/it-flags.go index f8a54e867e9..2d12fede15a 100644 --- a/cicd/internal/flags/it-flags.go +++ b/cicd/internal/flags/it-flags.go @@ -39,7 +39,7 @@ var ( dOracleInstance string ) -// Registers all common flags. Must be called before flag.Parse(). +// Registers all it flags. Must be called before flag.Parse(). func RegisterItFlags() { flag.StringVar(&dRegion, "it-region", "", "The GCP region to use for storing test artifacts") flag.StringVar(&dProject, "it-project", "", "The GCP project to run the integration tests in") diff --git a/cicd/internal/flags/release-flags.go b/cicd/internal/flags/release-flags.go new file mode 100644 index 00000000000..3045c7cd39a --- /dev/null +++ b/cicd/internal/flags/release-flags.go @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package flags + +import ( + "flag" +) + +// Avoid making these vars public. +var ( + dBucketName string + dLibrariesBucketName string + dStagePrefix string +) + +// Registers all release flags. Must be called before flag.Parse(). +func RegisterReleaseFlags() { + flag.StringVar(&dBucketName, "release-bucket-name", "", "The GCP bucket to stage the released templates.") + flag.StringVar(&dLibrariesBucketName, "release-libraries-bucket-name", "", "The GCP bucket to stage the released template libraries.") + flag.StringVar(&dStagePrefix, "release-stage-prefix", "", "Prefix to use as parent folder in GCS for released templates.") +} + +func BucketName() string { + return "-DbucketName=" + dBucketName +} + +func LibrariesBucketName() string { + return "-DlibrariesBucketName=" + dLibrariesBucketName +} + +func StagePrefix() string { + return "-DstagePrefix=" + dStagePrefix +} diff --git a/cicd/internal/workflows/maven-workflows.go b/cicd/internal/workflows/maven-workflows.go index 2eabf31f8cd..db60dc9346a 100644 --- a/cicd/internal/workflows/maven-workflows.go +++ b/cicd/internal/workflows/maven-workflows.go @@ -49,9 +49,11 @@ type MavenFlags interface { SkipSpotlessCheck() string SkipIntegrationTests() string FailAtTheEnd() string + ProduceErrors() string RunIntegrationTests() string RunIntegrationSmokeTests() string RunLoadTests() string + RunRelease() string ThreadCount(int) string IntegrationTestParallelism(int) string StaticBigtableInstance(string) string @@ -105,6 +107,10 @@ func (*mvnFlags) FailAtTheEnd() string { return "-fae" } +func (*mvnFlags) ProduceErrors() string { + return "-e" +} + func (*mvnFlags) RunIntegrationTests() string { return "-PtemplatesIntegrationTests,splunkDeps,missing-artifact-repos" } @@ -117,6 +123,10 @@ func (*mvnFlags) RunLoadTests() string { return "-PtemplatesLoadTests,splunkDeps,missing-artifact-repos" } +func (*mvnFlags) RunRelease() string { + return "-PtemplatesRelease,splunkDeps,missing-artifact-repos" +} + // The number of modules Maven is going to build in parallel in a multi-module project. func (*mvnFlags) ThreadCount(count int) string { return "-T" + strconv.Itoa(count) diff --git a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java index eee25345574..4ae80fd6e82 100644 --- a/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java +++ b/v1/src/test/java/com/google/cloud/teleport/spanner/ddl/InformationSchemaScannerIT.java @@ -72,7 +72,6 @@ private Ddl getDatabaseDdl() { BatchClient batchClient = spannerServer.getBatchClient(dbId); BatchReadOnlyTransaction batchTx = batchClient.batchReadOnlyTransaction(TimestampBound.strong()); - InformationSchemaScanner scanner = new InformationSchemaScanner(batchTx); return scanner.scan(); } diff --git a/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md b/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md index c2ebb00e280..cf6624e4caf 100644 --- a/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md +++ b/v2/mongodb-to-googlecloud/README_MongoDB_to_BigQuery.md @@ -31,7 +31,6 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat * **useStorageWriteApiAtLeastOnce** : When using the Storage Write API, specifies the write semantics. To use at-least-once semantics (https://beam.apache.org/documentation/io/built-in/google-bigquery/#at-least-once-semantics), set this parameter to `true`. To use exactly-once semantics, set the parameter to `false`. This parameter applies only when `useStorageWriteApi` is `true`. The default value is `false`. * **javascriptDocumentTransformGcsPath** : The Cloud Storage URI of the `.js` file that defines the JavaScript user-defined function (UDF) to use. (Example: gs://your-bucket/your-transforms/*.js). * **javascriptDocumentTransformFunctionName** : The name of the JavaScript user-defined function (UDF) to use. For example, if your JavaScript function code is `myTransform(inJson) { /*...do stuff...*/ }`, then the function name is myTransform. For sample JavaScript UDFs, see UDF Examples (https://github.com/GoogleCloudPlatform/DataflowTemplates#udf-examples). (Example: transform). -* **bigQuerySchemaPath** : The Cloud Storage path for the BigQuery JSON schema. (Example: gs://your-bucket/your-schema.json). diff --git a/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/options/MongoDbToBigQueryOptions.java b/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/options/MongoDbToBigQueryOptions.java index 2a77984d2a3..d228b30ec1c 100644 --- a/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/options/MongoDbToBigQueryOptions.java +++ b/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/options/MongoDbToBigQueryOptions.java @@ -81,17 +81,6 @@ public interface MongoDbOptions extends PipelineOptions, DataflowPipelineOptions String getKMSEncryptionKey(); void setKMSEncryptionKey(String keyName); - - @TemplateParameter.Text( - order = 6, - groupName = "Source", - description = "Bson filter", - optional = true, - helpText = "Bson filter in json format.", - example = "{ \"val\": { $gt: 0, $lt: 9 }}") - String getFilter(); - - void setFilter(String jsonFilter); } /** Options for reading from PubSub. */ @@ -119,16 +108,6 @@ public interface BigQueryWriteOptions extends PipelineOptions, DataflowPipelineO String getOutputTableSpec(); void setOutputTableSpec(String outputTableSpec); - - @TemplateParameter.GcsReadFile( - order = 2, - optional = true, - description = "Cloud Storage path to BigQuery JSON schema", - helpText = "The Cloud Storage path for the BigQuery JSON schema.", - example = "gs://your-bucket/your-schema.json") - String getBigQuerySchemaPath(); - - void setBigQuerySchemaPath(String path); } /** UDF options. */ diff --git a/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQuery.java b/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQuery.java index 16426765434..54a80107d80 100644 --- a/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQuery.java +++ b/v2/mongodb-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQuery.java @@ -15,10 +15,8 @@ */ package com.google.cloud.teleport.v2.mongodb.templates; -import static com.google.cloud.teleport.v2.utils.GCSUtils.getGcsFileAsString; import static com.google.cloud.teleport.v2.utils.KMSUtils.maybeDecrypt; -import com.google.api.client.json.gson.GsonFactory; import com.google.api.services.bigquery.model.TableRow; import com.google.api.services.bigquery.model.TableSchema; import com.google.cloud.teleport.metadata.Template; @@ -31,19 +29,15 @@ import com.google.cloud.teleport.v2.options.BigQueryStorageApiBatchOptions; import com.google.cloud.teleport.v2.transforms.JavascriptDocumentTransformer.TransformDocumentViaJavascript; import com.google.cloud.teleport.v2.utils.BigQueryIOUtils; -import com.google.common.base.Strings; import java.io.IOException; import javax.script.ScriptException; import org.apache.beam.sdk.Pipeline; -import org.apache.beam.sdk.io.FileSystems; import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO; -import org.apache.beam.sdk.io.mongodb.FindQuery; import org.apache.beam.sdk.io.mongodb.MongoDbIO; import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.ParDo; -import org.bson.BsonDocument; import org.bson.Document; /** @@ -112,13 +106,7 @@ public static boolean run(Options options) // Get MongoDbUri plain text or base64 encrypted with a specific KMS encryption key String mongoDbUri = maybeDecrypt(options.getMongoDbUri(), options.getKMSEncryptionKey()).get(); - if (options.getBigQuerySchemaPath() != null) { - // initialize FileSystem to read from GCS - FileSystems.setDefaultPipelineOptions(options); - String jsonSchema = getGcsFileAsString(options.getBigQuerySchemaPath()); - GsonFactory gf = new GsonFactory(); - bigquerySchema = gf.fromString(jsonSchema, TableSchema.class); - } else if (options.getJavascriptDocumentTransformFunctionName() != null + if (options.getJavascriptDocumentTransformFunctionName() != null && options.getJavascriptDocumentTransformGcsPath() != null) { bigquerySchema = MongoDbUtils.getTableFieldSchemaForUDF( @@ -134,21 +122,13 @@ public static boolean run(Options options) mongoDbUri, options.getDatabase(), options.getCollection(), options.getUserOption()); } - MongoDbIO.Read readDocuments = - MongoDbIO.read() - .withUri(mongoDbUri) - .withDatabase(options.getDatabase()) - .withCollection(options.getCollection()); - - String filterJson = options.getFilter(); - BsonDocument filter; - if (!Strings.isNullOrEmpty(filterJson) - && !(filter = BsonDocument.parse(filterJson)).isEmpty()) { - readDocuments = readDocuments.withQueryFn(FindQuery.create().withFilters(filter)); - } - pipeline - .apply("Read Documents", readDocuments) + .apply( + "Read Documents", + MongoDbIO.read() + .withUri(mongoDbUri) + .withDatabase(options.getDatabase()) + .withCollection(options.getCollection())) .apply( "UDF", TransformDocumentViaJavascript.newBuilder() diff --git a/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java b/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java index 6cb05f1ce53..734fddbd7fe 100644 --- a/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java +++ b/v2/mongodb-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/mongodb/templates/MongoDbToBigQueryIT.java @@ -182,79 +182,10 @@ public void testMongoDbToBigQuery() throws IOException { val -> { JSONObject bigQueryJson = new JSONObject(val.getStringValue()); assertTrue(bigQueryJson.has("timestamp")); - bigQueryJson.remove("timestamp"); - String bigQueryId = bigQueryJson.getString(MONGO_DB_ID); - assertTrue(mongoMap.get(bigQueryId).similar(bigQueryJson)); - })); - } - - @Test - public void testMongoDbToBigQueryWithFilters() throws IOException { - // Arrange - String collectionName = testName; - List mongoDocuments = generateDocuments(); - mongoDbClient.insertDocuments(collectionName, mongoDocuments); - - String bqTable = testName; - - List bqSchemaFields = new ArrayList<>(); - bqSchemaFields.add(Field.of("timestamp", StandardSQLTypeName.TIMESTAMP)); - mongoDocuments - .get(0) - .forEach((key, val) -> bqSchemaFields.add(Field.of(key, StandardSQLTypeName.STRING))); - Schema bqSchema = Schema.of(bqSchemaFields); - - bigQueryClient.createDataset(REGION); - TableId table = bigQueryClient.createTable(bqTable, bqSchema); - - LaunchConfig.Builder options = - LaunchConfig.builder(testName, specPath) - .addParameter(MONGO_URI, mongoDbClient.getUri()) - .addParameter(MONGO_DB, mongoDbClient.getDatabaseName()) - .addParameter(MONGO_COLLECTION, collectionName) - .addParameter(BIGQUERY_TABLE, toTableSpecLegacy(table)) - .addParameter(USER_OPTION, "FLATTEN") - .addParameter("filter", "{ \"filter_test\": { $eq: \"0\" }}"); - // Act - LaunchInfo info = launchTemplate(options); - assertThatPipeline(info).isRunning(); - - Result result = - pipelineOperator() - .waitForCondition( - createConfig(info), - BigQueryRowsCheck.builder(bigQueryClient, table).setMinRows(1).build()); - - // Assert - assertThatResult(result).meetsConditions(); - - Map mongoMap = new HashMap<>(); - mongoDocuments.forEach( - mongoDocument -> { - JSONObject mongoDbJson = new JSONObject(mongoDocument.toJson()); - String mongoId = mongoDbJson.getJSONObject(MONGO_DB_ID).getString("$oid"); - mongoDbJson.put(MONGO_DB_ID, mongoId); - mongoMap.put(mongoId, mongoDbJson); - }); - - TableResult tableRows = bigQueryClient.readTable(bqTable); - tableRows - .getValues() - .forEach( - row -> - row.forEach( - val -> { - JSONObject bigQueryJson = new JSONObject(val.getStringValue()); - assertTrue(bigQueryJson.has("timestamp")); - assertTrue(bigQueryJson.getString("filter_test").equals("0")); bigQueryJson.remove("timestamp"); String bigQueryId = bigQueryJson.getString(MONGO_DB_ID); - String msg = - val.getStringValue() - + " is different from " - + mongoMap.get(bigQueryId).toString(); - assertTrue(msg, mongoMap.get(bigQueryId).similar(bigQueryJson)); + assertTrue(mongoMap.get(bigQueryId).similar(bigQueryJson)); })); } @@ -294,7 +225,6 @@ private static List generateDocuments() { } randomDocument.append("udf", "in"); randomDocument.append("nullonly", null); - randomDocument.append("filter_test", String.valueOf(i)); mongoDocuments.add(randomDocument); } diff --git a/v2/pubsub-binary-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubsubAvroToBigQueryIT.java b/v2/pubsub-binary-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubsubAvroToBigQueryIT.java index b855e9387d4..38cc098eabb 100644 --- a/v2/pubsub-binary-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubsubAvroToBigQueryIT.java +++ b/v2/pubsub-binary-to-bigquery/src/test/java/com/google/cloud/teleport/v2/templates/PubsubAvroToBigQueryIT.java @@ -18,7 +18,6 @@ import static org.apache.beam.it.gcp.bigquery.matchers.BigQueryAsserts.assertThatBigQueryRecords; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; -import static org.apache.commons.lang3.RandomStringUtils.randomAlphabetic; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.StandardSQLTypeName; @@ -37,7 +36,6 @@ import java.math.RoundingMode; import java.net.URL; import java.nio.ByteBuffer; -import java.util.ArrayList; import java.util.List; import java.util.Map; import org.apache.avro.Schema; @@ -54,7 +52,6 @@ import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager; import org.apache.beam.it.gcp.bigquery.conditions.BigQueryRowsCheck; import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; -import org.apache.beam.it.gcp.pubsub.conditions.PubsubMessagesCheck; import org.junit.After; import org.junit.Before; import org.junit.Test; @@ -75,9 +72,6 @@ public final class PubsubAvroToBigQueryIT extends TemplateTestBase { private PubsubResourceManager pubsubResourceManager; private BigQueryResourceManager bigQueryResourceManager; - private static final int GOOD_MESSAGES_COUNT = 10; - private static final int BAD_PUB_SUB_MESSAGES_COUNT = 3; - @Before public void setup() throws IOException { pubsubResourceManager = @@ -101,30 +95,26 @@ public void tearDown() { } @Test - public void testPubsubAvroToBigQueryUdf() throws IOException { + public void testPubsubAvroToBigQuerySimple() throws IOException { // Arrange TopicName topic = pubsubResourceManager.createTopic("input"); TopicName dlqTopic = pubsubResourceManager.createTopic("dlq"); - SubscriptionName subscription = pubsubResourceManager.createSubscription(topic, "sub-1"); - SubscriptionName dlqSubscription = pubsubResourceManager.createSubscription(dlqTopic, "sub-2"); - - // Generate good avro messages - List> expectedMessages = generateRecordData(); - for (Map record : expectedMessages) { + SubscriptionName subscription = pubsubResourceManager.createSubscription(topic, "input-1"); + + List> recordMaps = + List.of( + Map.of("name", "John", "age", 5, "decimal", 3.3), + Map.of("name", "Jane", "age", 4, "decimal", 4.4), + Map.of("name", "Jim", "age", 3, "decimal", 5.5)); + for (Map record : recordMaps) { ByteString sendRecord = createRecord( - record.get("name").toString(), + (String) record.get("name"), (Integer) record.get("age"), (Double) record.get("decimal")); pubsubResourceManager.publish(topic, ImmutableMap.of(), sendRecord); } - // Generate proto messages that cannot be parsed - for (int i = 0; i < BAD_PUB_SUB_MESSAGES_COUNT; i++) { - pubsubResourceManager.publish( - topic, ImmutableMap.of(), ByteString.copyFromUtf8("bad id " + i)); - } - TableId people = bigQueryResourceManager.createTable("people", bigQuerySchema); // Act @@ -139,19 +129,13 @@ public void testPubsubAvroToBigQueryUdf() throws IOException { Result result = pipelineOperator() - .waitForConditionsAndFinish( + .waitForConditionAndFinish( createConfig(info), - BigQueryRowsCheck.builder(bigQueryResourceManager, people) - .setMinRows(GOOD_MESSAGES_COUNT) - .build(), - PubsubMessagesCheck.builder(pubsubResourceManager, dlqSubscription) - .setMinMessages(BAD_PUB_SUB_MESSAGES_COUNT) - .build()); + BigQueryRowsCheck.builder(bigQueryResourceManager, people).setMinRows(1).build()); // Assert assertThatResult(result).meetsConditions(); - assertThatBigQueryRecords(bigQueryResourceManager.readTable(people)) - .hasRecords(expectedMessages); + assertThatBigQueryRecords(bigQueryResourceManager.readTable(people)).hasRecords(recordMaps); } private ByteString createRecord(String name, int age, double decimal) throws IOException { @@ -176,12 +160,4 @@ private ByteString createRecord(String name, int age, double decimal) throws IOE return ByteString.copyFrom(output.toByteArray()); } - - private List> generateRecordData() { - List> recordMaps = new ArrayList<>(); - for (int i = 1; i <= PubsubAvroToBigQueryIT.GOOD_MESSAGES_COUNT; i++) { - recordMaps.add(Map.of("name", randomAlphabetic(8, 20), "age", i, "decimal", i + 0.1)); - } - return recordMaps; - } }