From 9eec796e23e85473c2935a15627cabd7f4570e41 Mon Sep 17 00:00:00 2001 From: Manit Gupta Date: Fri, 25 Oct 2024 11:29:08 +0530 Subject: [PATCH 1/4] Terraform template for Reverse replication for sharded MySQL (#1946) * Reverse replication Terraform template * Add support for metadata db and changestream creation * End to end working draft * Make target_tags rule optional * Update variables for gCloud null resource and test with VPC * Address comments --- .../terraform/samples/README.md | 43 ++ .../spanner-to-sharded-mysql/README.md | 533 ++++++++++++++++++ .../samples/spanner-to-sharded-mysql/main.tf | 226 ++++++++ .../spanner-to-sharded-mysql/outputs.tf | 10 + .../spanner-to-sharded-mysql/session.json | 7 + .../spanner-to-sharded-mysql/terraform.tf | 52 ++ .../spanner-to-sharded-mysql/terraform.tfvars | 138 +++++ .../terraform_simple.tfvars | 42 ++ .../spanner-to-sharded-mysql/variables.tf | 79 +++ 9 files changed, 1130 insertions(+) create mode 100644 v2/spanner-to-sourcedb/terraform/samples/README.md create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/README.md create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/main.tf create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/outputs.tf create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/session.json create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/terraform.tf create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/terraform.tfvars create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/terraform_simple.tfvars create mode 100644 v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/variables.tf diff --git a/v2/spanner-to-sourcedb/terraform/samples/README.md b/v2/spanner-to-sourcedb/terraform/samples/README.md new file mode 100644 index 0000000000..008fff7003 --- /dev/null +++ b/v2/spanner-to-sourcedb/terraform/samples/README.md @@ -0,0 +1,43 @@ +## Terraform samples for reverse replication + +This repository provides samples for common scenarios users might have while trying to run a replication from Spanner to a source database. + +Pick a sample that is closest to your use-case, use it as a starting point, and tailor it to your own specific needs. + +## Other Sample Repositories + +The following sample repositories provide additional examples - + +1. [Sample environment setups](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-common/terraform/samples) +2. [Bulk migration](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/sourcedb-to-spanner/terraform/samples) +3. [Live migration](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/datastream-to-spanner/terraform/samples/) + +## List of examples + +1. [Spanner to sharded MySQL](spanner-to-sharded-mysql/README.md) + +## Sample structure + +Each sample contains the following (and potentially more) files - + +1. `main.tf` - This contains the Terraform resources which will be created. +2. `outputs.tf` - This declares the outputs that will be output as part of + running the terraform example. +3. `variables.tf` - This declares the input variables that are required to + configure the resources. +4. `terraform.tf` - This contains the required providers and APIs/project + configurations for the sample. +5. `terraform.tfvars` - This contains the dummy inputs that need to be populated + to run the example. +6. `terraform_simple.tfvars` - This contains the minimal list of dummy inputs + that need to be populated to run the example. + +## How to add a new sample + +It is strongly recommended to copy an existing sample and modify it according to the scenario you are trying to cover. +This ensures uniformity in the style in which terraform samples are written. + +```shell +mkdir my-new-sample +cp -r spanner-to-sharded-mysql/* my-new-sample/ +``` \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/README.md b/v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/README.md new file mode 100644 index 0000000000..d7a20ba662 --- /dev/null +++ b/v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/README.md @@ -0,0 +1,533 @@ +## Sample Scenario: Spanner to Sharded MySQL reverse replication + +> **_SCENARIO:_** This Terraform example illustrates launching a reverse replication +> jobs to replicate spanner writes for a sharded MySQL source, setting up all the required cloud infrastructure. +> **Details of MySQL shards are needed as input.** + +## Terraform permissions + +In order to create the resources in this sample, +the`Service account`/`User account` being used to run Terraform +should have the +required [permissions](https://cloud.google.com/iam/docs/manage-access-service-accounts#multiple-roles-console). +There are two ways to add permissions - + +1. Adding pre-defined roles to the service account running Terraform. +2. Creating a custom role with the granular permissions and attaching it to the + service account running Terraform. + +### Using custom role and granular permissions (recommended) + +Following permissions are required - + +```shell +- compute.firewalls.create +- compute.firewalls.delete +- compute.firewalls.update +- dataflow.jobs.cancel +- dataflow.jobs.create +- dataflow.jobs.updateContents +- iam.roles.get +- iam.serviceAccounts.actAs +- pubsub.subscriptions.create +- pubsub.subscriptions.delete +- pubsub.topics.attachSubscription +- pubsub.topics.create +- pubsub.topics.delete +- pubsub.topics.getIamPolicy +- pubsub.topics.setIamPolicy +- resourcemanager.projects.setIamPolicy +- storage.buckets.create +- storage.buckets.delete +- storage.buckets.update +- storage.objects.delete +- storage.objects.create +- serviceusage.services.use +- serviceusage.services.enable +``` + +**Note**: Add the `roles/viewer` role as well to the service account. + +> **_Note on IAM:_** +> +> For ease of use, this sample automatically adds the +> required +> roles to the service account used for running the migration. In order to +> do this, we need the `resourcemanager.projects.setIamPolicy` permission. If granting +> this role is unacceptable, please set +> the `var.common_params.add_policies_to_service_account` +> to **false**. This will skip adding the roles. +> They will have to be added manually. Note that if they are not added, **the +> migration will fail.** +> Two service accounts will need to be modified manually - +> 1. Dataflow service account - The list of roles can be found in the `main.tf` + file, in the `reverse_replication_roles` resource. +> 2. GCS service account - The list of roles can be found in the `main.tf` file, + in the `gcs_publisher_role` resource. + +[This](#adding-access-to-terraform-service-account) section in the FAQ +provides instructions to add these permissions to an existing service account. + +### Using pre-defined roles + +Following roles are required - + +```shell +roles/dataflow.developer +roles/iam.securityAdmin +roles/iam.serviceAccountUser +roles/pubsub.admin +roles/storage.admin +roles/viewer +roles/compute.networkAdmin +roles/spanner.databaseUser +``` + +> **_Note on IAM:_** +> +> 1. For ease of use, this sample automatically adds the +> required +> roles to the service account used for running the migration. In order to +> do this, we need the `roles/iam.securityAdmin` role. If granting +> this role is unacceptable, please set +> the `var.common_params.add_policies_to_service_account` +> to **false**. This will skip adding the roles. +> They will have to be added manually. Note that if they are not added, **the +> migration will fail.** +> Two service accounts will need to be modified manually - +> 1. Dataflow service account - The list of roles can be found in the `main.tf` + file, in the `reverse_replication_roles` resource. + +> 2. GCS service account - The list of roles can be found in the `main.tf` file, + in the `gcs_publisher_role` resource. + +[This](#adding-access-to-terraform-service-account) section in the FAQ +provides instructions to add these roles to an existing service account. + +## Assumptions + +It takes the following assumptions - + +1. Ensure that the MySQL instance is correctly setup. + 1. Check that the MySQL credentials are correctly specified in the `tfvars` file. + 2. Check that the MySQL server is up. + 3. The MySQL user configured in the `tfvars` file should have [INSERT](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_insert), [UPDATE](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_update) and [DELETE](https://dev.mysql.com/doc/refman/8.0/en/privileges-provided.html#priv_delete) privileges on the database. +2. Ensure that the MySQL instance and Dataflow workers can establish connectivity with each other. Template automatically adds networking firewalls rules to enable this access. This can differ depending on the source configuration. Please validate the template rules and ensure that network connectivity can be established. +3. The MySQL instance with database containing reverse-replication compatible + schema is created. +4. A session file has been generated to perform the spanner to MySQL schema mapping. + +> **_NOTE:_** +[SMT](https://googlecloudplatform.github.io/spanner-migration-tool/quickstart.html) +> can be used to generate the session file. + +## Description + +This sample contains the following files - + +1. `main.tf` - This contains the Terraform resources which will be created. +2. `outputs.tf` - This declares the outputs that will be output as part of + running this terraform example. +3. `variables.tf` - This declares the input variables that are required to + configure the resources. +4. `terraform.tf` - This contains the required providers and APIs/project + configurations for this sample. +5. `terraform.tfvars` - This contains the dummy inputs that need to be populated + to run this example. +6. `terraform_simple.tfvars` - This contains the minimal list of dummy inputs + that need to be populated to run this example. + +## Resources Created + +Given these assumptions, it uses a supplied source database connection +configuration and creates the following resources - + +1. **Firewall rules** - These rules allow Dataflow VMs to connect to each other + and allow Dataflow VMs to connect to the source MySQL shards. +2. **GCS buckets** - A GCS bucket to hold reverse replication metadata, such as + session and source shards files. +3. **GCS objects** - Generates source shards configuration file to upload to + GCS. +4. **Pubsub topic and subscription** - This contains GCS object notifications as + files are written to GCS for DLQ retrials. +5. **Spanner metadata database** - A Spanner metadata database to keep track of + Spanner change streams. +6. **Spanner change stream** - A Spanner change stream to capture change capture + data from the Spanner database under replication +7. **Dataflow job** - The Dataflow job which reads from Spanner change streams + and writes to MySQL +8. **Permissions** - It adds the required roles to the specified (or the + default) service accounts for the reverse replication to work. + +> **_NOTE:_** A label is attached to all the resources created via Terraform. +> The key is `migration_id` and the value is auto-generated. The auto-generated +> value is used as a global identifier for a migration job across resources. The +> auto-generated value is always pre-fixed with a `smt-rev`. + +## How to run + +1. Clone this repository or the sample locally. +2. Edit the `terraform.tfvars` or `terraform_simple.tfvars` file and replace the + dummy variables with real values. Extend the configuration to meet your + needs. It is recommended to get started with `terraform_simple.tfvars`. +3. Run the following commands - + +### Initialise Terraform + +```shell +# Initialise terraform - You only need to do this once for a directory. +terraform init +``` + +### Run `plan` and `apply` + +Validate the terraform files with - + +```shell +terraform plan --var-file=terraform_simple.tfvars +``` + +Run the terraform script with - + +```shell +terraform apply --var-file=terraform_simple.tfvars +``` + +This will launch the configured jobs and produce an output like below - + +```shell +Outputs: + +dataflow_job_ids = [ + "", +] +dataflow_job_urls = [ + "https://console.cloud.google.com/dataflow/jobs/us-central1/", +] +``` + +**Note:** Each of the jobs will have a random prefix added to it to prevent name +collisions. + +### Cleanup + +Once the jobs have finished running, you can clean up by running - + +```shell +terraform destroy --var-file=terraform_simple.tfvars +``` + +#### Note on GCS buckets + +The GCS bucket that is created will be cleaned up during `terraform destroy`. +If you want to exclude the GCS bucket from deletion due to any reason, you +can exclude it from the state file using `terraform state rm` command. + +## FAQ + +### Configuring to run using a VPC + +#### Specifying a shared VPC + +You can specify the shared VPC using the `host_project` configuration. +This will result in Dataflow jobs will be launched inside the shared VPC. + +> **_NOTE:_** Usage of shared VPC requires cross-project permissions. They +> are available as a Terraform +>template [here](../../../../spanner-common/terraform/samples/configure-shared-vpc/README.md). + +> Dataflow service account permissions are +> documented [here](https://cloud.google.com/dataflow/docs/guides/specifying-networks#shared). + +#### Dataflow + +1. Set the `network` and the `subnetwork` parameters to run the Dataflow job + inside a VPC. + Specify [network](https://cloud.google.com/dataflow/docs/guides/specifying-networks#network_parameter) + and [subnetwork](https://cloud.google.com/dataflow/docs/guides/specifying-networks#subnetwork_parameter) + according to the + linked guidelines. +2. Set the `ip_configuration` to `WORKER_IP_PRIVATE` to disable public IP + addresses for the worker VMs. + +> **_NOTE:_** The VPC should already exist. This template does not create a VPC. + +If you are facing issue with VPC connectivity, check the following Dataflow +[guide](https://cloud.google.com/dataflow/docs/guides/troubleshoot-networking) +to debug common networking issues. + +### Updating template parameters for an existing job + +Template parameters can be updated in place. Terraform and Dataflow will take +care of `UPDATING` a Dataflow job. This works internally by terminating the +existing job with an `UPDATED` state and creating a new job in its place. All +of this is done seamlessly by Dataflow and there is no risk to the fidelity of +an already executing job. + +Look for the following log during `terraform apply` - + +```shell + # google_dataflow_flex_template_job.reverse_replication_job will be updated in-place +``` + +### Updating workers of a Dataflow job + +Currently, the Terraform `google_dataflow_flex_template_job` resource does not +support updating the workers of a Dataflow job. +If the worker counts are changed in `tfvars` and a Terraform apply is run, +Terraform will attempt to cancel/drain the existing Dataflow job and replace it +with a new one. +**This is not recommended**. Instead, use the `gcloud` CLI to update the worker +counts of a launched Dataflow job. + +```shell +gcloud dataflow jobs update-options \ (base) + --region=us-central1 \ + --min-num-workers=5 \ + --max-num-workers=20 \ + 2024-06-17_01_21_44-12198433486526363702 +``` + +### Long time to delete SMT bucket + +A GCS bucket can only be deleted if all its comprising objects are deleted +first. This template attempts to delete all objects in the created GCS bucket +and then deletes the bucket as well. +**This may take a long time, depending on the size of the data in the bucket.** +If you want Terraform to only create the GCS bucket but skip its deletion +during `terraform destroy`, you will have to use the `terraform state rm` API to +delete GCS resource from being traced by Terraform after the `apply` command. + +### Source shard configuration file + +Source shard configuration file that is supplied to the dataflow is automatically +created by Terraform. A sample file that this uploaded to GCS looks like +below - + +#### Using Secrets Manager to specify password + +```json +[ + { + "logicalShardId": "shard1", + "host": "10.11.12.13", + "user": "root", + "secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard1/versions/latest", + "port": "3306", + "dbName": "db1" + }, + { + "logicalShardId": "shard2", + "host": "10.11.12.14", + "user": "root", + "secretManagerUri":"projects/123/secrets/rev-cmek-cred-shard2/versions/latest", + "port": "3306", + "dbName": "db2" + } +] +``` + +#### Specifying plaintext password + +```json +[ + { + "logicalShardId": "shard1", + "host": "10.11.12.13", + "user": "root", + "password":"", + "port": "3306", + "dbName": "db1" + }, + { + "logicalShardId": "shard2", + "host": "10.11.12.14", + "user": "root", + "password":"", + "port": "3306", + "dbName": "db2" + } +] +``` + +This file is generated from the configuration provided in the `var.shard_list` +parameter. + +### Adding access to Terraform service account + +#### Using custom role and granular permissions (recommended) + +You can run the following gcloud command to create a custom role in your GCP +project. + +```shell +gcloud iam roles create live_migrations_role --project= --file=perms.yaml --quiet +``` + +The `YAML` file required for the above will be like so - + +```shell +title: "Live Migrations Custom Role" +description: "Custom role for Spanner live migrations." +stage: "GA" +includedPermissions: +- iam.roles.get +- iam.serviceAccounts.actAs +....add all permissions from the list defined above. +``` + +Then attach the role to the service account - + +```shell +gcloud iam service-accounts add-iam-policy-binding @.iam.gserviceaccount.com \ + --member=@.iam.gserviceaccount.com --role=projects//roles/live_migrations_role \ + --condition=CONDITION +``` + +#### Using pre-defined roles + +You can run the following shell script to add roles to the service account +being used to run Terraform. This will have to done by a user which has the +authority to grant the specified roles to a service account - + +```shell +#!/bin/bash + +# Service account to be granted roles +SERVICE_ACCOUNT="@.iam.gserviceaccount.com" + +# Project ID where roles will be granted +PROJECT_ID="" + +# Array of roles to grant +ROLES=( + "roles/" + "roles/" +) + +# Loop through each role and grant it to the service account +for ROLE in "${ROLES[@]}" +do + gcloud projects add-iam-policy-binding "$PROJECT_ID" \ + --member="serviceAccount:${SERVICE_ACCOUNT}" \ + --role="$ROLE" +done +``` + +### Verifying access in the Terraform service account + +#### Using custom role and granular permissions (recommended) + +Verify that the custom role is attached to the service account - + +```shell +gcloud projects get-iam-policy \ +--flatten="bindings[].members" \ +--format='table(bindings.role)' \ +--filter="bindings.members:@.iam.gserviceaccount.com" +``` + +Verify that the role has the correct set of permissions + +```shell +gcloud iam roles describe live_migrations_role --project= +``` + +##### Using pre-defined roles + +Once the roles are added, run the following command to verify them - + +```shell +gcloud projects get-iam-policy \ +--flatten="bindings[].members" \ +--format='table(bindings.role)' \ +--filter="bindings.members:@.iam.gserviceaccount.com" +``` + +Sample output - + +```shell +ROLE +roles/dataflow.admin +roles/datastream.admin +roles/iam.securityAdmin +roles/iam.serviceAccountUser +roles/pubsub.admin +roles/storage.admin +roles/viewer +``` + +### Impersonating the Terraform service account + +#### Using GCE VM instance (recommended) + +A GCE VM created using the service account setup above will automatically +use the service account for all API requests triggered by Terraform. Running +terraform from such a GCE VM does not require downloading service keys and is +the recommended approach. + +#### Using key file + +1. Activate the service account - + ```shell + gcloud auth activate-service-account @.iam.gserviceaccount.com --key-file=path/to/key_file --project=project_id + ``` +2. Impersonate service account while fetching the ADC credentials - + ```shell + gcloud auth application-default login --impersonate-service-account @.iam.gserviceaccount.com + ``` + +## Observe, tune and troubleshoot + +Follow the instructions provided [here](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/v2/spanner-to-sourcedb/README.md#observe-tune-and-troubleshoot). + +## Advanced Topics + +### Specifying parallelism for Terraform + +Terraform limits the number of concurrent operations while walking the graph. +[The default](https://developer.hashicorp.com/terraform/cli/commands/apply#parallelism-n) +is 10. + +Increasing this value can +potentially speed up orchestration execution +when orchestrating a large sharded migration. We strongly recommend against +setting this value > 20. In most cases, the default value should suffice. + +### Correlating the `count.index` with the `shard_id` + +In the Terraform output, you should see resources being referred to by their +index. This is how Terraform works internally when it has to create multiple +resources of the same type. + +In order to correlate the `count.index` with the `shard_id` that is either +user specified or auto-generated, `terraform.tf.state` can be used. + +For example, a snippet of XX looks like - + +```json +{ + "mode": "managed", + "type": "google_datastream_connection_profile", + "name": "source_mysql", + "provider": "provider[\"registry.terraform.io/hashicorp/google\"]", + "instances": [ + { + "index_key": 0, + "schema_version": 0, + "attributes": { + "bigquery_profile": [], + "connection_profile_id": "smt-elegant-ape-source-mysql", + "create_without_validation": false, + "display_name": "smt-elegant-ape-source-mysql", + "effective_labels": { + "migration_id": "smt-elegant-ape" + } + } + } + ] +} +``` + +As you can see in this example, the `index_key` = `0`, correlates with the +auto-generated `shard_id` = `smt-elegant-ape`. \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/main.tf b/v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/main.tf new file mode 100644 index 0000000000..2fa85f99dd --- /dev/null +++ b/v2/spanner-to-sourcedb/terraform/samples/spanner-to-sharded-mysql/main.tf @@ -0,0 +1,226 @@ +resource "random_pet" "migration_id" { + prefix = "smt-rev" +} + +locals { + migration_id = var.common_params.migration_id != null ? var.common_params.migration_id : random_pet.migration_id.id + change_stream = replace(local.migration_id, "-", "_") +} + +# Setup network firewalls rules to enable Dataflow access to source. +resource "google_compute_firewall" "allow_dataflow_to_source" { + count = var.common_params.target_tags != null ? 1 : 0 + depends_on = [google_project_service.enabled_apis] + project = var.common_params.host_project != null ? var.common_params.host_project : var.common_params.project + name = "allow-dataflow-to-source" + network = var.dataflow_params.runner_params.network != null ? var.common_params.host_project != null ? "projects/${var.common_params.host_project}/global/networks/${var.dataflow_params.runner_params.network}" : "projects/${var.common_params.project}/global/networks/${var.dataflow_params.runner_params.network}" : "default" + description = "Allow traffic from Dataflow to source databases" + + allow { + protocol = "tcp" + ports = ["3306"] + } + source_tags = ["dataflow"] + target_tags = var.common_params.target_tags +} + +# Setup network firewalls rules to enable Dataflow VMs to talk to each other +resource "google_compute_firewall" "allow_dataflow_vms_communication" { + depends_on = [google_project_service.enabled_apis] + project = var.common_params.host_project != null ? var.common_params.host_project : var.common_params.project + name = "allow-dataflow-vms-communication" + network = var.dataflow_params.runner_params.network != null ? var.common_params.host_project != null ? "projects/${var.common_params.host_project}/global/networks/${var.dataflow_params.runner_params.network}" : "projects/${var.common_params.project}/global/networks/${var.dataflow_params.runner_params.network}" : "default" + description = "Allow traffic between Dataflow VMs" + + allow { + protocol = "tcp" + ports = ["12345", "12346"] + } + source_tags = ["dataflow"] + target_tags = ["dataflow"] +} + +# GCS bucket for holding configuration objects +resource "google_storage_bucket" "reverse_replication_bucket" { + depends_on = [google_project_service.enabled_apis] + name = "${local.migration_id}-${var.common_params.replication_bucket}" + location = var.common_params.region + uniform_bucket_level_access = true + force_destroy = true + labels = { + "migration_id" = local.migration_id + } +} + +# upload local session file to the created GCS bucket +resource "google_storage_bucket_object" "session_file_object" { + depends_on = [google_project_service.enabled_apis] + name = "session.json" + source = var.dataflow_params.template_params.local_session_file_path + content_type = "application/json" + bucket = google_storage_bucket.reverse_replication_bucket.id +} + +# Auto-generate the source shards file from the Terraform configuration and +# upload it to GCS. +resource "google_storage_bucket_object" "source_shards_file_object" { + depends_on = [google_project_service.enabled_apis] + name = "source_shards.json" + content_type = "application/json" + bucket = google_storage_bucket.reverse_replication_bucket.id + content = jsonencode(var.shard_list) +} + +# Pub/Sub topic for reverse replication DLQ +resource "google_pubsub_topic" "dlq_pubsub_topic" { + depends_on = [google_project_service.enabled_apis] + name = "${local.migration_id}-dlq-topic" + project = var.common_params.project + labels = { + "migration_id" = local.migration_id + } +} + +# Configure permissions to publish Pub/Sub notifications +resource "google_pubsub_topic_iam_member" "gcs_publisher_role" { + depends_on = [google_project_service.enabled_apis] + topic = google_pubsub_topic.dlq_pubsub_topic.name + role = "roles/pubsub.publisher" + member = "serviceAccount:${data.google_storage_project_service_account.gcs_account.email_address}" +} + +# Pub/Sub Notification on GCS Bucket +resource "google_storage_notification" "dlq_bucket_notification" { + depends_on = [ + google_project_service.enabled_apis, + google_pubsub_topic_iam_member.gcs_publisher_role + ] # Create a bucket notification using the created pubsub topic. + bucket = google_storage_bucket.reverse_replication_bucket.name + object_name_prefix = "dlq" + payload_format = "JSON_API_V1" + topic = google_pubsub_topic.dlq_pubsub_topic.id + event_types = ["OBJECT_FINALIZE"] +} + +# Pub/Sub subscription for the created notification +resource "google_pubsub_subscription" "dlq_pubsub_subscription" { + depends_on = [ + google_project_service.enabled_apis, + google_storage_notification.dlq_bucket_notification + ] # Create the subscription once the notification is created. + name = "${google_pubsub_topic.dlq_pubsub_topic.name}-sub" + topic = google_pubsub_topic.dlq_pubsub_topic.id + labels = { + "migration_id" = local.migration_id + } +} + +resource "google_spanner_database" "reverse_replication_metadata_database" { + instance = var.dataflow_params.template_params.instance_id + name = var.dataflow_params.template_params.metadata_database_id != null ? var.dataflow_params.template_params.metadata_database_id : local.change_stream + deletion_protection = false +} + +resource "null_resource" "create_spanner_change_stream" { + count = var.dataflow_params.template_params.change_stream_name == null ? 1 : 0 + triggers = { + database_id = var.dataflow_params.template_params.database_id + instance_id = var.dataflow_params.template_params.instance_id + change_stream = local.change_stream + project = var.common_params.project + } + provisioner "local-exec" { + command = < Date: Mon, 28 Oct 2024 16:56:00 +1100 Subject: [PATCH 2/4] feat: add namespace support for PostgreSQL databases (#1925) * feat: support namespace for PostgreSQL databases * tests: add it tests * feat: use namespace in discovery * trigger build * trigger build * fix: fixes pg sql * test: fix resource path * test: fix quotes in sql * test: fix test data * trigger build --- .../v2/options/OptionsToConfigBuilder.java | 26 ++++- .../v2/options/SourceDbToSpannerOptions.java | 11 ++ .../postgresql/PostgreSQLDialectAdapter.java | 96 +++++++--------- .../v2/templates/PipelineController.java | 2 + .../options/OptionsToConfigBuilderTest.java | 36 +++++- .../PostgreSQLDialectAdapterTest.java | 3 +- .../v2/templates/postgresql/NamespaceIT.java | 108 ++++++++++++++++++ .../NamespacesIT/postgresql-schema.sql | 28 +++++ .../resources/NamespacesIT/spanner-schema.sql | 13 +++ 9 files changed, 263 insertions(+), 60 deletions(-) create mode 100644 v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/NamespaceIT.java create mode 100644 v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/postgresql-schema.sql create mode 100644 v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/spanner-schema.sql diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java index 0760b74092..6b22a6ed6b 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilder.java @@ -37,7 +37,9 @@ import org.slf4j.LoggerFactory; public final class OptionsToConfigBuilder { + private static final Logger LOG = LoggerFactory.getLogger(OptionsToConfigBuilder.class); + public static final String DEFAULT_POSTGRESQL_NAMESPACE = "public"; public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults( SourceDbToSpannerOptions options, @@ -49,6 +51,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults( String dbName = extractDbFromURL(sourceDbURL); String username = options.getUsername(); String password = options.getPassword(); + String namespace = options.getNamespace(); String jdbcDriverClassName = options.getJdbcDriverClassName(); String jdbcDriverJars = options.getJdbcDriverJars(); @@ -66,6 +69,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults( username, password, dbName, + namespace, shardId, jdbcDriverClassName, jdbcDriverJars, @@ -84,6 +88,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( String username, String password, String dbName, + String namespace, String shardId, String jdbcDriverClassName, String jdbcDriverJars, @@ -91,9 +96,11 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( Integer numPartitions, Wait.OnSignal waitOn) { JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect); + SourceSchemaReference sourceSchemaReference = + sourceSchemaReferenceFrom(sqlDialect, dbName, namespace); builder = builder - .setSourceSchemaReference(SourceSchemaReference.builder().setDbName(dbName).build()) + .setSourceSchemaReference(sourceSchemaReference) .setDbAuth( LocalCredentialsProvider.builder() .setUserName( @@ -123,8 +130,9 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig( if (sourceDbURL == null) { sourceDbURL = "jdbc:postgresql://" + host + ":" + port + "/" + dbName; } + sourceDbURL = sourceDbURL + "?currentSchema=" + sourceSchemaReference.namespace(); if (StringUtils.isNotBlank(connectionProperties)) { - sourceDbURL = sourceDbURL + "?" + connectionProperties; + sourceDbURL = sourceDbURL + "&" + connectionProperties; } break; } @@ -228,5 +236,19 @@ private static JdbcIOWrapperConfig.Builder builderWithDefaultsFor(SQLDialect dia return builderWithMySqlDefaults(); } + private static SourceSchemaReference sourceSchemaReferenceFrom( + SQLDialect dialect, String dbName, String namespace) { + SourceSchemaReference.Builder builder = SourceSchemaReference.builder(); + // Namespaces are not supported for MySQL + if (dialect == SQLDialect.POSTGRESQL) { + if (StringUtils.isBlank(namespace)) { + builder.setNamespace(DEFAULT_POSTGRESQL_NAMESPACE); + } else { + builder.setNamespace(namespace); + } + } + return builder.setDbName(dbName).build(); + } + private OptionsToConfigBuilder() {} } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java index 6c85f7c098..d9321df03e 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/options/SourceDbToSpannerOptions.java @@ -223,4 +223,15 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions { String getTransformationCustomParameters(); void setTransformationCustomParameters(String value); + + @TemplateParameter.Text( + order = 19, + optional = true, + description = "Namespace", + helpText = + "Namespace to exported. For PostgreSQL, if no namespace is provided, 'public' will be used") + @Default.String("") + String getNamespace(); + + void setNamespace(String value); } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java index 68ed6854df..eb31f63648 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java @@ -69,12 +69,6 @@ public enum PostgreSQLVersion { private static final Set TIMEOUT_SQL_STATES = Sets.newHashSet(SQL_STATE_ER_QUERY_CANCELLED, SQL_STATE_ER_LOCK_TIMEOUT); - // Information schema / System tables constants - private static final ImmutableList EXCLUDED_SCHEMAS = - ImmutableList.of("information_schema"); - private static final String EXCLUDED_SCHEMAS_STR = - EXCLUDED_SCHEMAS.stream().map(schema -> "'" + schema + "'").collect(Collectors.joining(",")); - // Errors private final Counter schemaDiscoveryErrors = Metrics.counter(JdbcSourceRowMapper.class, MetricCounters.READER_SCHEMA_DISCOVERY_ERRORS); @@ -111,18 +105,16 @@ public ImmutableList discoverTables( logger.info("Discovering tables for DataSource: {}", dataSource); final String query = - String.format( - "SELECT table_name" - + " FROM information_schema.tables" - + " WHERE table_type = 'BASE TABLE'" - + " AND table_catalog = ?" - + " AND table_schema NOT LIKE 'pg_%%'" - + " AND table_schema NOT IN (%s)", - EXCLUDED_SCHEMAS_STR); + "SELECT table_name" + + " FROM information_schema.tables" + + " WHERE table_type = 'BASE TABLE'" + + " AND table_catalog = ?" + + " AND table_schema = ?"; ImmutableList.Builder tablesBuilder = ImmutableList.builder(); try (PreparedStatement stmt = dataSource.getConnection().prepareStatement(query)) { stmt.setString(1, sourceSchemaReference.dbName()); + stmt.setString(2, sourceSchemaReference.namespace()); try (ResultSet rs = stmt.executeQuery()) { while (rs.next()) { tablesBuilder.add(rs.getString("table_name")); @@ -176,25 +168,23 @@ public ImmutableMap> discoverTabl tables); final String query = - String.format( - "SELECT column_name," - + " data_type," - + " character_maximum_length," - + " numeric_precision," - + " numeric_scale" - + " FROM information_schema.columns" - + " WHERE table_catalog = ?" - + " AND table_name = ?" - + " AND table_schema NOT LIKE 'pg_%%'" - + " AND table_schema NOT IN (%s)", - EXCLUDED_SCHEMAS_STR); + "SELECT column_name," + + " data_type," + + " character_maximum_length," + + " numeric_precision," + + " numeric_scale" + + " FROM information_schema.columns" + + " WHERE table_catalog = ?" + + " AND table_schema = ?" + + " AND table_name = ?"; ImmutableMap.Builder> tableSchemaBuilder = ImmutableMap.builder(); try (PreparedStatement statement = dataSource.getConnection().prepareStatement(query)) { for (String table : tables) { statement.setString(1, sourceSchemaReference.dbName()); - statement.setString(2, table); + statement.setString(2, sourceSchemaReference.namespace()); + statement.setString(3, table); logger.info("Executing query " + query + ": " + statement); try (ResultSet resultSet = statement.executeQuery()) { ImmutableMap.Builder schema = ImmutableMap.builder(); @@ -294,37 +284,35 @@ public ImmutableMap> discoverTableI tables); final String query = - String.format( - "SELECT a.attname AS column_name," - + " ixs.indexname AS index_name," - + " ix.indisunique AS is_unique," - + " ix.indisprimary AS is_primary," - + " c.reltuples AS cardinality," - + " a.attnum AS ordinal_position," - + " t.typname AS type_name," - + " information_schema._pg_char_max_length(a.atttypid, a.atttypmod) AS type_length," - + " t.typcategory AS type_category," - + " ico.collation_name AS collation," - + " ico.pad_attribute AS pad," - + " pg_encoding_to_char(d.encoding) AS charset" - + " FROM pg_catalog.pg_indexes ixs" - + " JOIN pg_catalog.pg_class c ON c.relname = ixs.indexname" - + " JOIN pg_catalog.pg_index ix ON c.oid = ix.indexrelid" - + " JOIN pg_catalog.pg_attribute a ON c.oid = a.attrelid" - + " JOIN pg_catalog.pg_type t ON t.oid = a.atttypid" - + " LEFT OUTER JOIN pg_catalog.pg_collation co ON co.oid = ix.indcollation[a.attnum - 1]" - + " LEFT OUTER JOIN information_schema.collations ico ON ico.collation_name = co.collname" - + " LEFT OUTER JOIN pg_catalog.pg_database d ON d.datname = current_database()" - + " WHERE ixs.tablename = ?" - + " AND ixs.schemaname NOT LIKE 'pg_%%'" - + " AND ixs.schemaname NOT IN (%s)" - + " ORDER BY ix.indexrelid, ordinal_position ASC;", - EXCLUDED_SCHEMAS_STR); + "SELECT a.attname AS column_name," + + " ixs.indexname AS index_name," + + " ix.indisunique AS is_unique," + + " ix.indisprimary AS is_primary," + + " c.reltuples AS cardinality," + + " a.attnum AS ordinal_position," + + " t.typname AS type_name," + + " information_schema._pg_char_max_length(a.atttypid, a.atttypmod) AS type_length," + + " t.typcategory AS type_category," + + " ico.collation_name AS collation," + + " ico.pad_attribute AS pad," + + " pg_encoding_to_char(d.encoding) AS charset" + + " FROM pg_catalog.pg_indexes ixs" + + " JOIN pg_catalog.pg_class c ON c.relname = ixs.indexname" + + " JOIN pg_catalog.pg_index ix ON c.oid = ix.indexrelid" + + " JOIN pg_catalog.pg_attribute a ON c.oid = a.attrelid" + + " JOIN pg_catalog.pg_type t ON t.oid = a.atttypid" + + " LEFT OUTER JOIN pg_catalog.pg_collation co ON co.oid = ix.indcollation[a.attnum - 1]" + + " LEFT OUTER JOIN information_schema.collations ico ON ico.collation_name = co.collname" + + " LEFT OUTER JOIN pg_catalog.pg_database d ON d.datname = current_database()" + + " WHERE ixs.schemaname = ?" + + " AND ixs.tablename = ?" + + " ORDER BY ix.indexrelid, ordinal_position ASC;"; ImmutableMap.Builder> tableIndexesBuilder = ImmutableMap.builder(); try (PreparedStatement statement = dataSource.getConnection().prepareStatement(query)) { for (String table : tables) { - statement.setString(1, table); + statement.setString(1, sourceSchemaReference.namespace()); + statement.setString(2, table); ImmutableList.Builder indexInfosBuilder = ImmutableList.builder(); try (ResultSet resultSet = statement.executeQuery()) { while (resultSet.next()) { diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java index 7d883cecb5..cc03f8e80e 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/templates/PipelineController.java @@ -256,6 +256,8 @@ static PipelineResult executeShardedMigration( shard.getUserName(), shard.getPassword(), entry.getKey(), + // TODO(thiagotnunes): Specify the namespace from a shard + null, shardId, options.getJdbcDriverClassName(), options.getJdbcDriverJars(), diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java index 5ee1dc0ed5..7210706a0b 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/options/OptionsToConfigBuilderTest.java @@ -84,6 +84,7 @@ public void testConfigWithMySqlUrlFromOptions() { "mypassword", "mydb", null, + null, "com.mysql.jdbc.Driver", "mysql-jar", 10, @@ -102,6 +103,7 @@ public void testConfigWithMySqlUrlFromOptions() { "mypassword", "mydb", null, + null, "com.mysql.jdbc.Driver", "mysql-jar", 10, @@ -141,7 +143,7 @@ public void testConfigWithPostgreSQLDefaultsFromOptions() { null, Wait.on(dummyPCollection)); assertThat(config.jdbcDriverClassName()).isEqualTo(testDriverClassName); - assertThat(config.sourceDbURL()).isEqualTo(testUrl); + assertThat(config.sourceDbURL()).isEqualTo(testUrl + "?currentSchema=public"); assertThat(config.tables()) .containsExactlyElementsIn(new String[] {"table1", "table2", "table3"}); assertThat(config.dbAuth().getUserName().get()).isEqualTo(testUser); @@ -165,6 +167,7 @@ public void testConfigWithPostgreSqlUrlFromOptions() { "mypassword", "mydb", null, + null, "com.mysql.jdbc.Driver", "mysql-jar", 10, @@ -182,15 +185,42 @@ public void testConfigWithPostgreSqlUrlFromOptions() { "mypassword", "mydb", null, + null, "com.mysql.jdbc.Driver", "mysql-jar", 10, 0, Wait.on(dummyPCollection)); assertThat(configWithoutConnectionParameters.sourceDbURL()) - .isEqualTo("jdbc:postgresql://myhost:5432/mydb"); + .isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=public"); assertThat(configWithConnectionParameters.sourceDbURL()) - .isEqualTo("jdbc:postgresql://myhost:5432/mydb?testParam=testValue"); + .isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=public&testParam=testValue"); + } + + @Test + public void testConfigWithPostgreSqlUrlWithNamespace() { + PCollection dummyPCollection = pipeline.apply(Create.of(1)); + pipeline.run(); + JdbcIOWrapperConfig configWithNamespace = + OptionsToConfigBuilder.getJdbcIOWrapperConfig( + SQLDialect.POSTGRESQL, + List.of("table1", "table2"), + null, + "myhost", + "", + 5432, + "myuser", + "mypassword", + "mydb", + "mynamespace", + null, + "com.mysql.jdbc.Driver", + "mysql-jar", + 10, + 0, + Wait.on(dummyPCollection)); + assertThat(configWithNamespace.sourceDbURL()) + .isEqualTo("jdbc:postgresql://myhost:5432/mydb?currentSchema=mynamespace"); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java index fe236d24e3..55c842dbe9 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java @@ -62,7 +62,8 @@ public class PostgreSQLDialectAdapterTest { @Before public void setUp() throws Exception { - sourceSchemaReference = SourceSchemaReference.builder().setDbName("testDB").build(); + sourceSchemaReference = + SourceSchemaReference.builder().setDbName("testDB").setNamespace("public").build(); adapter = new PostgreSQLDialectAdapter(PostgreSQLVersion.DEFAULT); } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/NamespaceIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/NamespaceIT.java new file mode 100644 index 0000000000..dbc41b4b47 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/NamespaceIT.java @@ -0,0 +1,108 @@ +/* + * Copyright (C) 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.postgresql; + +import com.google.cloud.spanner.Struct; +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.cloud.teleport.v2.templates.SourceDbToSpanner; +import com.google.cloud.teleport.v2.templates.SourceDbToSpannerITBase; +import com.google.common.collect.ImmutableList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.spanner.SpannerResourceManager; +import org.apache.beam.it.gcp.spanner.matchers.SpannerAsserts; +import org.apache.beam.it.jdbc.PostgresResourceManager; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * An integration test for {@link SourceDbToSpanner} Flex template which tests a migration from a + * PostgreSQL custom schema. + */ +@Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) +@TemplateIntegrationTest(SourceDbToSpanner.class) +@RunWith(JUnit4.class) +public class NamespaceIT extends SourceDbToSpannerITBase { + + private static PipelineLauncher.LaunchInfo jobInfo; + + public static PostgresResourceManager postgresSQLResourceManager; + public static SpannerResourceManager spannerResourceManager; + + private static final String POSTGRESQL_DDL_RESOURCE = "NamespacesIT/postgresql-schema.sql"; + private static final String SPANNER_DDL_RESOURCE = "NamespacesIT/spanner-schema.sql"; + + /** + * Setup resource managers and Launch dataflow job once during the execution of this test class. \ + */ + @Before + public void setUp() { + postgresSQLResourceManager = setUpPostgreSQLResourceManager(); + spannerResourceManager = setUpSpannerResourceManager(); + } + + /** Cleanup dataflow job and all the resources and resource managers. */ + @After + public void cleanUp() { + ResourceManagerUtils.cleanResources(spannerResourceManager, postgresSQLResourceManager); + } + + @Test + public void simpleTest() throws Exception { + Map jobParameters = new HashMap<>(); + jobParameters.put("namespace", "my-namespace"); + loadSQLFileResource(postgresSQLResourceManager, POSTGRESQL_DDL_RESOURCE); + createSpannerDDL(spannerResourceManager, SPANNER_DDL_RESOURCE); + jobInfo = + launchDataflowJob( + getClass().getSimpleName(), + null, + null, + postgresSQLResourceManager, + spannerResourceManager, + jobParameters, + null); + PipelineOperator.Result result = pipelineOperator().waitUntilDone(createConfig(jobInfo)); + + List> singersPostgreSQL = + postgresSQLResourceManager.runSQLQuery( + "SELECT singer_id, first_name FROM \"my-namespace\".singers"); + ImmutableList singersSpanner = + spannerResourceManager.readTableRecords("singers", "singer_id", "first_name"); + + SpannerAsserts.assertThatStructs(singersSpanner) + .hasRecordsUnorderedCaseInsensitiveColumns(singersPostgreSQL); + + List> albumsPostgreSQL = + postgresSQLResourceManager.runSQLQuery( + "SELECT singer_id, album_id, album_serial_number FROM \"my-namespace\".albums"); + ImmutableList albumsSpanner = + spannerResourceManager.readTableRecords( + "albums", "singer_id", "album_id", "album_serial_number"); + + SpannerAsserts.assertThatStructs(albumsSpanner) + .hasRecordsUnorderedCaseInsensitiveColumns(albumsPostgreSQL); + } +} diff --git a/v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/postgresql-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/postgresql-schema.sql new file mode 100644 index 0000000000..6922f05e65 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/postgresql-schema.sql @@ -0,0 +1,28 @@ +CREATE SCHEMA "my-namespace"; + +CREATE TABLE "my-namespace"."singers" ( + "singer_id" int8 PRIMARY KEY, + "first_name" varchar(1024) +); + +CREATE TABLE "my-namespace"."albums" ( + "singer_id" int8 NOT NULL, + "album_id" int8 NOT NULL, + "album_serial_number" int8, + PRIMARY KEY ("singer_id", "album_id"), + CONSTRAINT "album_id_fk" FOREIGN KEY ("album_id") REFERENCES "my-namespace"."singers" ("singer_id") +); + +CREATE INDEX "album_serial_number_idx" +ON "my-namespace"."albums" ("album_serial_number"); + +INSERT INTO "my-namespace"."singers" ("singer_id", "first_name") +VALUES + (1, 'Singer 1'), + (2, 'Singer 2'); + +INSERT INTO "my-namespace"."albums" ("singer_id", "album_id", "album_serial_number") +VALUES + (1, 1, 10), + (1, 2, 11), + (2, 2, 20); diff --git a/v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/spanner-schema.sql new file mode 100644 index 0000000000..adc0d99d38 --- /dev/null +++ b/v2/sourcedb-to-spanner/src/test/resources/NamespacesIT/spanner-schema.sql @@ -0,0 +1,13 @@ +CREATE TABLE singers ( + singer_id INT64 NOT NULL, + first_name STRING(1024) +) PRIMARY KEY (singer_id); + +CREATE TABLE albums ( + singer_id INT64 NOT NULL, + album_id INT64 NOT NULL, + album_serial_number INT64, + CONSTRAINT album_id_fk FOREIGN KEY (album_id) REFERENCES singers (singer_id) +) PRIMARY KEY (singer_id, album_id); + +CREATE INDEX album_serial_number_idx ON albums (album_serial_number); From 60974649121c7fcc006db06fb1cc233cc91ed91e Mon Sep 17 00:00:00 2001 From: Thiago Nunes Date: Mon, 28 Oct 2024 16:57:46 +1100 Subject: [PATCH 3/4] refactor: rename tests (#1960) * refactor: rename tests Move tests and resources out of postgresql/mysql packages and rename them to have PostgreSQL/MySQL prefix instead. This makes it clearer which class broke when there are test failures. * trigger build --- ...a => MySQLCustomTransformationsNonShardedIT.java} | 6 +++--- .../{DataTypesIT.java => MySQLDataTypesIT.java} | 8 ++++---- ...dencyIT.java => MySQLForeignKeyDependencyIT.java} | 4 ++-- ...apperIT.java => MySQLIdentitySchemaMapperIT.java} | 6 +++--- ...SQLIdentitySchemaMapperWithTransformationIT.java} | 6 +++--- ...MapperIT.java => MySQLSessionSchemaMapperIT.java} | 6 +++--- ...> MySQLSessionSchemaMapperWithTableFilterIT.java} | 6 +++--- ...ySQLSessionSchemaMapperWithTransformationIT.java} | 6 +++--- .../{SingleShardIT.java => MySQLSingleShardIT.java} | 2 +- ...leIT.java => MySQLSourceDbToSpannerSimpleIT.java} | 6 +++--- .../DataTypesIT.java => PostgreSQLDataTypesIT.java} | 12 +++++------- ...IT.java => PostgreSQLIdentitySchemaMapperIT.java} | 6 ++---- ...nerLT.java => PostgreSQLSourceDbToSpannerLT.java} | 7 +++---- .../{mysql/data-types.sql => mysql-data-types.sql} | 0 .../spanner-schema.sql => mysql-spanner-schema.sql} | 0 .../data-types.sql => postgresql-data-types.sql} | 0 ...nner-schema.sql => postgresql-spanner-schema.sql} | 0 .../postgresql-schema.sql | 8 -------- ...nner-schema.sql => postgresql-spanner-schema.sql} | 0 19 files changed, 38 insertions(+), 51 deletions(-) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{CustomTransformationsNonShardedIT.java => MySQLCustomTransformationsNonShardedIT.java} (95%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{DataTypesIT.java => MySQLDataTypesIT.java} (97%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{ForeignKeyDependencyIT.java => MySQLForeignKeyDependencyIT.java} (95%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{IdentitySchemaMapperIT.java => MySQLIdentitySchemaMapperIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{IdentitySchemaMapperWithTransformationIT.java => MySQLIdentitySchemaMapperWithTransformationIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{SessionSchemaMapperIT.java => MySQLSessionSchemaMapperIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{SessionSchemaMapperWithTableFilterIT.java => MySQLSessionSchemaMapperWithTableFilterIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{SessionSchemaMapperWithTransformationIT.java => MySQLSessionSchemaMapperWithTransformationIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{SingleShardIT.java => MySQLSingleShardIT.java} (98%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{SourceDbToSpannerSimpleIT.java => MySQLSourceDbToSpannerSimpleIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{postgresql/DataTypesIT.java => PostgreSQLDataTypesIT.java} (95%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/{postgresql/IdentitySchemaMapperIT.java => PostgreSQLIdentitySchemaMapperIT.java} (94%) rename v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/{postgresql/SourceDbToSpannerLT.java => PostgreSQLSourceDbToSpannerLT.java} (90%) rename v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/{mysql/data-types.sql => mysql-data-types.sql} (100%) rename v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/{mysql/spanner-schema.sql => mysql-spanner-schema.sql} (100%) rename v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/{postgresql/data-types.sql => postgresql-data-types.sql} (100%) rename v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/{postgresql/spanner-schema.sql => postgresql-spanner-schema.sql} (100%) delete mode 100644 v2/sourcedb-to-spanner/src/test/resources/SingleShardWithTransformation/postgresql-schema.sql rename v2/sourcedb-to-spanner/src/test/resources/SourceDbToSpannerLT/{postgresql/spanner-schema.sql => postgresql-spanner-schema.sql} (100%) diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/CustomTransformationsNonShardedIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLCustomTransformationsNonShardedIT.java similarity index 95% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/CustomTransformationsNonShardedIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLCustomTransformationsNonShardedIT.java index 352aabd48f..3d9ee9e209 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/CustomTransformationsNonShardedIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLCustomTransformationsNonShardedIT.java @@ -45,10 +45,10 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class CustomTransformationsNonShardedIT extends SourceDbToSpannerITBase { +public class MySQLCustomTransformationsNonShardedIT extends SourceDbToSpannerITBase { private static final Logger LOG = - LoggerFactory.getLogger(CustomTransformationsNonShardedIT.class); - private static final HashSet testInstances = new HashSet<>(); + LoggerFactory.getLogger(MySQLCustomTransformationsNonShardedIT.class); + private static final HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java similarity index 97% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java index 448f76639d..864f211096 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataTypesIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLDataTypesIT.java @@ -49,16 +49,16 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class DataTypesIT extends SourceDbToSpannerITBase { - private static final Logger LOG = LoggerFactory.getLogger(DataTypesIT.class); +public class MySQLDataTypesIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(MySQLDataTypesIT.class); private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; public static SpannerResourceManager spannerResourceManager; - private static final String MYSQL_DUMP_FILE_RESOURCE = "DataTypesIT/mysql/data-types.sql"; + private static final String MYSQL_DUMP_FILE_RESOURCE = "DataTypesIT/mysql-data-types.sql"; - private static final String SPANNER_DDL_RESOURCE = "DataTypesIT/mysql/spanner-schema.sql"; + private static final String SPANNER_DDL_RESOURCE = "DataTypesIT/mysql-spanner-schema.sql"; /** * Setup resource managers and Launch dataflow job once during the execution of this test class. \ diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/ForeignKeyDependencyIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLForeignKeyDependencyIT.java similarity index 95% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/ForeignKeyDependencyIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLForeignKeyDependencyIT.java index fa514ee972..4cbe8f17f4 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/ForeignKeyDependencyIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLForeignKeyDependencyIT.java @@ -42,8 +42,8 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class ForeignKeyDependencyIT extends SourceDbToSpannerITBase { - private static final Logger LOG = LoggerFactory.getLogger(ForeignKeyDependencyIT.class); +public class MySQLForeignKeyDependencyIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(MySQLForeignKeyDependencyIT.class); private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/IdentitySchemaMapperIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLIdentitySchemaMapperIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/IdentitySchemaMapperIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLIdentitySchemaMapperIT.java index 671c610108..0b13033bae 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/IdentitySchemaMapperIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLIdentitySchemaMapperIT.java @@ -44,9 +44,9 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class IdentitySchemaMapperIT extends SourceDbToSpannerITBase { - private static final Logger LOG = LoggerFactory.getLogger(IdentitySchemaMapperIT.class); - private static final HashSet testInstances = new HashSet<>(); +public class MySQLIdentitySchemaMapperIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(MySQLIdentitySchemaMapperIT.class); + private static final HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/IdentitySchemaMapperWithTransformationIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLIdentitySchemaMapperWithTransformationIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/IdentitySchemaMapperWithTransformationIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLIdentitySchemaMapperWithTransformationIT.java index 3b07f8a4ad..db73061ca4 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/IdentitySchemaMapperWithTransformationIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLIdentitySchemaMapperWithTransformationIT.java @@ -45,10 +45,10 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class IdentitySchemaMapperWithTransformationIT extends SourceDbToSpannerITBase { +public class MySQLIdentitySchemaMapperWithTransformationIT extends SourceDbToSpannerITBase { private static final Logger LOG = - LoggerFactory.getLogger(IdentitySchemaMapperWithTransformationIT.class); - private static final HashSet testInstances = + LoggerFactory.getLogger(MySQLIdentitySchemaMapperWithTransformationIT.class); + private static final HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperIT.java index 0b690961c8..b9fcd87bc5 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperIT.java @@ -44,9 +44,9 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class SessionSchemaMapperIT extends SourceDbToSpannerITBase { - private static final Logger LOG = LoggerFactory.getLogger(SessionSchemaMapperIT.class); - private static final HashSet testInstances = new HashSet<>(); +public class MySQLSessionSchemaMapperIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(MySQLSessionSchemaMapperIT.class); + private static final HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperWithTableFilterIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperWithTableFilterIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperWithTableFilterIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperWithTableFilterIT.java index a729b61aa7..4567fdd18c 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperWithTableFilterIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperWithTableFilterIT.java @@ -45,10 +45,10 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class SessionSchemaMapperWithTableFilterIT extends SourceDbToSpannerITBase { +public class MySQLSessionSchemaMapperWithTableFilterIT extends SourceDbToSpannerITBase { private static final Logger LOG = - LoggerFactory.getLogger(SessionSchemaMapperWithTableFilterIT.class); - private static final HashSet testInstances = + LoggerFactory.getLogger(MySQLSessionSchemaMapperWithTableFilterIT.class); + private static final HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperWithTransformationIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperWithTransformationIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperWithTransformationIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperWithTransformationIT.java index c04813e101..9d5f288c18 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SessionSchemaMapperWithTransformationIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSessionSchemaMapperWithTransformationIT.java @@ -44,10 +44,10 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class SessionSchemaMapperWithTransformationIT extends SourceDbToSpannerITBase { +public class MySQLSessionSchemaMapperWithTransformationIT extends SourceDbToSpannerITBase { private static final Logger LOG = - LoggerFactory.getLogger(SessionSchemaMapperWithTransformationIT.class); - private static final HashSet testInstances = + LoggerFactory.getLogger(MySQLSessionSchemaMapperWithTransformationIT.class); + private static final HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SingleShardIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSingleShardIT.java similarity index 98% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SingleShardIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSingleShardIT.java index a234b62375..0258c1bed4 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SingleShardIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSingleShardIT.java @@ -41,7 +41,7 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class SingleShardIT extends SourceDbToSpannerITBase { +public class MySQLSingleShardIT extends SourceDbToSpannerITBase { private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerSimpleIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSourceDbToSpannerSimpleIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerSimpleIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSourceDbToSpannerSimpleIT.java index d4f6aba794..bba4e2e46f 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/SourceDbToSpannerSimpleIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/MySQLSourceDbToSpannerSimpleIT.java @@ -49,9 +49,9 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class SourceDbToSpannerSimpleIT extends SourceDbToSpannerITBase { - private static final Logger LOG = LoggerFactory.getLogger(SourceDbToSpannerSimpleIT.class); - private static HashSet testInstances = new HashSet<>(); +public class MySQLSourceDbToSpannerSimpleIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(MySQLSourceDbToSpannerSimpleIT.class); + private static HashSet testInstances = new HashSet<>(); private static PipelineLauncher.LaunchInfo jobInfo; public static MySQLResourceManager mySQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/DataTypesIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLDataTypesIT.java similarity index 95% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/DataTypesIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLDataTypesIT.java index a739fe210d..a30b1f65da 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/DataTypesIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLDataTypesIT.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.google.cloud.teleport.v2.templates.postgresql; +package com.google.cloud.teleport.v2.templates; import static com.google.common.truth.Truth.assertThat; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; @@ -21,8 +21,6 @@ import com.google.cloud.spanner.Struct; import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; -import com.google.cloud.teleport.v2.templates.SourceDbToSpanner; -import com.google.cloud.teleport.v2.templates.SourceDbToSpannerITBase; import java.time.Duration; import java.util.ArrayList; import java.util.HashMap; @@ -50,14 +48,14 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class DataTypesIT extends SourceDbToSpannerITBase { - private static final Logger LOG = LoggerFactory.getLogger(DataTypesIT.class); +public class PostgreSQLDataTypesIT extends SourceDbToSpannerITBase { + private static final Logger LOG = LoggerFactory.getLogger(PostgreSQLDataTypesIT.class); public static PostgresResourceManager postgreSQLResourceManager; public static SpannerResourceManager spannerResourceManager; - private static final String POSTGRESQL_DDL_RESOURCE = "DataTypesIT/postgresql/data-types.sql"; - private static final String SPANNER_DDL_RESOURCE = "DataTypesIT/postgresql/spanner-schema.sql"; + private static final String POSTGRESQL_DDL_RESOURCE = "DataTypesIT/postgresql-data-types.sql"; + private static final String SPANNER_DDL_RESOURCE = "DataTypesIT/postgresql-spanner-schema.sql"; /** Setup resource managers. */ @Before diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/IdentitySchemaMapperIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLIdentitySchemaMapperIT.java similarity index 94% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/IdentitySchemaMapperIT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLIdentitySchemaMapperIT.java index 276f9ce852..2c646aa7e6 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/postgresql/IdentitySchemaMapperIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLIdentitySchemaMapperIT.java @@ -13,13 +13,11 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.google.cloud.teleport.v2.templates.postgresql; +package com.google.cloud.teleport.v2.templates; import com.google.cloud.spanner.Struct; import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; -import com.google.cloud.teleport.v2.templates.SourceDbToSpanner; -import com.google.cloud.teleport.v2.templates.SourceDbToSpannerITBase; import com.google.common.collect.ImmutableList; import java.util.List; import java.util.Map; @@ -43,7 +41,7 @@ @Category({TemplateIntegrationTest.class, SkipDirectRunnerTest.class}) @TemplateIntegrationTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class IdentitySchemaMapperIT extends SourceDbToSpannerITBase { +public class PostgreSQLIdentitySchemaMapperIT extends SourceDbToSpannerITBase { private static PipelineLauncher.LaunchInfo jobInfo; public static PostgresResourceManager postgresSQLResourceManager; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/postgresql/SourceDbToSpannerLT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java similarity index 90% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/postgresql/SourceDbToSpannerLT.java rename to v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java index 6c26367323..31834e1b3e 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/postgresql/SourceDbToSpannerLT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLSourceDbToSpannerLT.java @@ -13,12 +13,11 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.google.cloud.teleport.v2.templates.loadtesting.postgresql; +package com.google.cloud.teleport.v2.templates.loadtesting; import com.google.cloud.teleport.metadata.TemplateLoadTest; import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect; import com.google.cloud.teleport.v2.templates.SourceDbToSpanner; -import com.google.cloud.teleport.v2.templates.loadtesting.SourceDbToSpannerLTBase; import java.io.IOException; import java.text.ParseException; import java.util.HashMap; @@ -31,7 +30,7 @@ @Category(TemplateLoadTest.class) @TemplateLoadTest(SourceDbToSpanner.class) @RunWith(JUnit4.class) -public class SourceDbToSpannerLT extends SourceDbToSpannerLTBase { +public class PostgreSQLSourceDbToSpannerLT extends SourceDbToSpannerLTBase { @Test public void backfill100Gb() throws IOException, ParseException, InterruptedException { @@ -53,7 +52,7 @@ public void backfill100Gb() throws IOException, ParseException, InterruptedExcep "projects/269744978479/secrets/nokill-sourcedb-postgresql-to-spanner-cloudsql-port/versions/1")); setUp(SQLDialect.POSTGRESQL, host, port, username, password, database); - createSpannerDDL("SourceDbToSpannerLT/postgresql/spanner-schema.sql"); + createSpannerDDL("SourceDbToSpannerLT/postgresql-spanner-schema.sql"); Map expectedCountPerTable = new HashMap<>() { diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql/data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-data-types.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql/data-types.sql rename to v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-data-types.sql diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql/spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-spanner-schema.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql/spanner-schema.sql rename to v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/mysql-spanner-schema.sql diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql/data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-data-types.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql/data-types.sql rename to v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-data-types.sql diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql/spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql/spanner-schema.sql rename to v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema.sql diff --git a/v2/sourcedb-to-spanner/src/test/resources/SingleShardWithTransformation/postgresql-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/SingleShardWithTransformation/postgresql-schema.sql deleted file mode 100644 index 4c8b1baa49..0000000000 --- a/v2/sourcedb-to-spanner/src/test/resources/SingleShardWithTransformation/postgresql-schema.sql +++ /dev/null @@ -1,8 +0,0 @@ -CREATE TABLE IF NOT EXISTS SingleShardWithTransformationTable ( - pkid SERIAL PRIMARY KEY, - name VARCHAR(20), - status VARCHAR(20) -); - -INSERT INTO SingleShardWithTransformationTable (pkid, name, status) -VALUES (1, 'Alice', 'active'), (2, 'Bob', 'inactive'), (3, 'Carol', 'pending'), (4, 'David', 'complete'), (5, 'Emily', 'error'); diff --git a/v2/sourcedb-to-spanner/src/test/resources/SourceDbToSpannerLT/postgresql/spanner-schema.sql b/v2/sourcedb-to-spanner/src/test/resources/SourceDbToSpannerLT/postgresql-spanner-schema.sql similarity index 100% rename from v2/sourcedb-to-spanner/src/test/resources/SourceDbToSpannerLT/postgresql/spanner-schema.sql rename to v2/sourcedb-to-spanner/src/test/resources/SourceDbToSpannerLT/postgresql-spanner-schema.sql From 3233cbf1384ede51f5e2758e28209d888196f890 Mon Sep 17 00:00:00 2001 From: darshan-sj Date: Mon, 28 Oct 2024 17:15:16 +0530 Subject: [PATCH 4/4] Fixing Flaky tests (#1963) * Fixing flaky tests * test * Making tests in DataStreamToSpannerIT sequential * Introducing retry instead of sequential test execution * Fixing UT * spotless apply * Adding UT --- .../gcp/spanner/SpannerResourceManager.java | 14 +++--- .../spanner/SpannerResourceManagerTest.java | 44 +++++++++++++++++++ .../templates/DataStreamToSpannerDDLIT.java | 20 ++++++++- .../DataStreamToSpannerEventsIT.java | 6 +++ .../templates/DataStreamToSpannerITBase.java | 1 + .../DataStreamToSpannerSessionIT.java | 6 +++ ...MigrationWithMigrationShardIdColumnIT.java | 1 - ...rationWithoutMigrationShardIdColumnIT.java | 6 +++ ...amToSpannerSingleDFShardedMigrationIT.java | 6 +++ 9 files changed, 96 insertions(+), 8 deletions(-) diff --git a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java index c5825fadc8..54ff99c5e6 100644 --- a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/spanner/SpannerResourceManager.java @@ -54,7 +54,6 @@ import java.time.Instant; import java.util.List; import java.util.Map; -import java.util.concurrent.ExecutionException; import java.util.function.Supplier; import javax.annotation.Nullable; import org.apache.beam.it.common.ResourceManager; @@ -310,11 +309,16 @@ public synchronized void executeDdlStatements(List statements) LOG.info("Executing DDL statements '{}' on database {}.", statements, databaseId); try { - databaseAdminClient - .updateDatabaseDdl(instanceId, databaseId, statements, /* operationId= */ null) - .get(); + // executeDdlStatments can fail for spanner staging because of failfast. + Failsafe.with(retryOnQuotaException()) + .run( + () -> + databaseAdminClient + .updateDatabaseDdl( + instanceId, databaseId, statements, /* operationId= */ null) + .get()); LOG.info("Successfully executed DDL statements '{}' on database {}.", statements, databaseId); - } catch (ExecutionException | InterruptedException | SpannerException e) { + } catch (Exception e) { throw new SpannerResourceManagerException("Failed to execute statement.", e); } } diff --git a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java index d1bbfb4372..1377f7d62b 100644 --- a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/spanner/SpannerResourceManagerTest.java @@ -197,6 +197,50 @@ public void testExecuteDdlStatementShouldWorkWhenSpannerDoesntThrowAnyError() assertThat(actualStatement).containsExactlyElementsIn(ImmutableList.of(statement)); } + @Test + public void testExecuteDdlStatementShouldRetryOnResourceExhaustedError() + throws ExecutionException, InterruptedException { + // arrange + prepareCreateInstanceMock(); + prepareCreateDatabaseMock(); + String statement = + "CREATE TABLE Singers (\n" + + " SingerId INT64 NOT NULL,\n" + + " FirstName STRING(1024),\n" + + " LastName STRING(1024),\n" + + ") PRIMARY KEY (SingerId)"; + + RuntimeException resourceExhaustedException = + new RuntimeException( + "com.google.cloud.spanner.SpannerException: RESOURCE_EXHAUSTED: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: CPU overload detected"); + when(spanner.getDatabaseAdminClient().updateDatabaseDdl(any(), any(), any(), any()).get()) + .thenThrow(resourceExhaustedException) + .thenReturn(null); + + // act + testManager.executeDdlStatement(statement); + + // assert + // verify createInstance, createDatabase, and updateDatabaseDdl were called. + verify(spanner.getInstanceAdminClient(), times(2)).createInstance(any()); + verify(spanner.getDatabaseAdminClient(), times(2)).createDatabase(any(), any()); + verify(spanner.getDatabaseAdminClient(), times(3)) + .updateDatabaseDdl( + instanceIdCaptor.capture(), + databaseIdCaptor.capture(), + statementCaptor.capture(), + any()); + + String actualInstanceId = instanceIdCaptor.getValue(); + String actualDatabaseId = databaseIdCaptor.getValue(); + Iterable actualStatement = statementCaptor.getValue(); + + assertThat(actualInstanceId).matches(TEST_ID + "-\\d{8}-\\d{6}-[a-zA-Z0-9]{6}"); + + assertThat(actualDatabaseId).matches(TEST_ID + "_\\d{8}_\\d{6}_[a-zA-Z0-9]{6}"); + assertThat(actualStatement).containsExactlyElementsIn(ImmutableList.of(statement)); + } + @Test public void testWriteSingleRecordShouldWorkWhenSpannerWriteSucceeds() throws ExecutionException, InterruptedException { diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java index b1e6bf4ea8..b6d2aa62ac 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerDDLIT.java @@ -176,6 +176,12 @@ public void migrationTestWithAllDatatypeConversionMapping() { // Assert Conditions assertThatResult(result).meetsConditions(); + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } assertAllDatatypeColumnsTableCdcContents(); } @@ -228,7 +234,12 @@ public void migrationTestWithAllDatatypeDefaultMapping() { // Assert Conditions assertThatResult(result).meetsConditions(); - + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } assertAllDatatypeColumns2TableCdcContents(); } @@ -281,7 +292,12 @@ public void migrationTestWithAllDatatypeTransformation() { // Assert Conditions assertThatResult(result).meetsConditions(); - + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } assertAllDatatypeTransformationTableCdcContents(); } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java index af0eb11096..f82bd8e58b 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerEventsIT.java @@ -156,6 +156,12 @@ public void migrationTestWithUpdatesAndDeletes() { // Assert Conditions assertThatResult(result).meetsConditions(); + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } // Assert specific rows assertUsersTableContents(); } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java index 358510fa0d..46897db0d9 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerITBase.java @@ -47,6 +47,7 @@ public abstract class DataStreamToSpannerITBase extends TemplateTestBase { // Format of avro file path in GCS - {table}/2023/12/20/06/57/{fileName} public static final String DATA_STREAM_EVENT_FILES_PATH_FORMAT_IN_GCS = "%s/2023/12/20/06/57/%s"; private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpannerITBase.class); + public static final int CUTOVER_MILLIS = 30 * 1000; public PubsubResourceManager setUpPubSubResourceManager() throws IOException { return PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build(); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java index e97c5f535a..d65c83a2a0 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerSessionIT.java @@ -161,6 +161,12 @@ public void migrationTestWithRenameAndDropColumn() { // Assert Conditions assertThatResult(result).meetsConditions(); + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } assertCategoryTableCdcContents(); } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java index 7c7214719d..383d1469d8 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT.java @@ -74,7 +74,6 @@ public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT private static final String SPANNER_DDL_RESOURCE = "DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql"; - public static final int CUTOVER_MILLIS = 30 * 1000; private static HashSet testInstances = new HashSet<>(); diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java index 9e44893cf4..b875c0e513 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamToSpannerShardedMigrationWithoutMigrationShardIdColumnIT.java @@ -217,6 +217,12 @@ public void multiShardMigration() { .waitForCondition(createConfig(jobInfo1, Duration.ofMinutes(10)), rowsConditionCheck); assertThatResult(result).meetsConditions(); + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } // Assert specific rows assertUsersTableContents(); } diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java index fbd8ce90a3..28483263d7 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/DatastreamToSpannerSingleDFShardedMigrationIT.java @@ -185,6 +185,12 @@ public void multiShardMigration() { .waitForCondition(createConfig(jobInfo, Duration.ofMinutes(10)), rowsConditionCheck); assertThatResult(result).meetsConditions(); + // Sleep for cutover time to wait till all CDCs propagate. + // A real world customer also has a small cut over time to reach consistency. + try { + Thread.sleep(CUTOVER_MILLIS); + } catch (InterruptedException e) { + } // Assert specific rows assertUsersTableContents(); }