Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Bug]: Unable to Restart Google Spanner Change Streams Consumer due to tableExists(table_name) bug #32509

Open
2 of 17 tasks
NimzyMaina opened this issue Sep 19, 2024 · 9 comments · May be fixed by NimzyMaina/beam#1 or #32510
Open
2 of 17 tasks

Comments

@NimzyMaina
Copy link

What happened?

The method used to check whether a table exists on spanner can in some scenarios always return false as there is no ability to specify the table_catalog and table_schema. In my case, these fields are always populated in the information_schema.tables view.

The first time I run the application, it runs fine as it creates the metadata table. The problem arrises when I restart the application & specify the same metadata table. It tries to recreate the table but it already exists resulting in a Spanner Exception.

Caused by: com.google.cloud.spanner.SpannerException: FAILED_PRECONDITION: Operation with name "projects/<project>/instances/<instance>/databases/<db>/operations/..." failed with status = GrpcStatusCode{transportCode=FAILED_PRECONDITION} and message = Duplicate name in schema: java_metadata_2.

Link to tableExists code

  /**
   * Checks whether the metadata table already exists in the database.
   *
   * @return true if the table exists, false if the table does not exist.
   */
  public boolean tableExists() {
    final String checkTableExistsStmt =
        "SELECT t.table_name FROM information_schema.tables AS t "
            + "WHERE t.table_catalog = '' AND "
            + "t.table_schema = '' AND "
            + "t.table_name = '"
            + metadataTableName
            + "'";
    try (ResultSet queryResultSet =
        databaseClient
            .singleUseReadOnlyTransaction()
            .executeQuery(Statement.of(checkTableExistsStmt))) {
      return queryResultSet.next();
    }
  }

Link to Where it is used

@ProcessElement
  public void processElement(OutputReceiver<PartitionMetadata> receiver) {
    PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao();
    if (!partitionMetadataDao.tableExists()) { // <-- Fails at this point on restart
      daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable();
      createFakeParentPartition();
    }
    final PartitionMetadata initialPartition =
        Optional.ofNullable(partitionMetadataDao.getPartition(InitialPartition.PARTITION_TOKEN))
            .map(mapperFactory.partitionMetadataMapper()::from)
            .orElseThrow(
                () -> new IllegalStateException("Initial partition not found in metadata table."));
    receiver.output(initialPartition);
  }

This results in a scenario where a change stream consumer cannot recover from a restart.

Suggested solution

/**
   * Checks whether the metadata table already exists in the database.
   *
   * @return true if the table exists, false if the table does not exist.
   */
  public boolean tableExists() {
    final String checkTableExistsStmt =
        "SELECT t.table_name FROM information_schema.tables AS t "
            + "WHERE t.table_name = '" + metadataTableName + "'";
    try (ResultSet queryResultSet =
        databaseClient
            .singleUseReadOnlyTransaction()
            .executeQuery(Statement.of(checkTableExistsStmt))) {
      return queryResultSet.next();
    }
  }

Issue Priority

Priority: 1 (data loss / total loss of function)

Issue Components

  • Component: Python SDK
  • Component: Java SDK
  • Component: Go SDK
  • Component: Typescript SDK
  • Component: IO connector
  • Component: Beam YAML
  • Component: Beam examples
  • Component: Beam playground
  • Component: Beam katas
  • Component: Website
  • Component: Infrastructure
  • Component: Spark Runner
  • Component: Flink Runner
  • Component: Samza Runner
  • Component: Twister2 Runner
  • Component: Hazelcast Jet Runner
  • Component: Google Cloud Dataflow Runner
NimzyMaina added a commit to NimzyMaina/beam that referenced this issue Sep 19, 2024
@liferoad
Copy link
Collaborator

cc @nielm

@nielm
Copy link
Contributor

nielm commented Sep 20, 2024

cc: @thiagotnunes

@nielm
Copy link
Contributor

nielm commented Sep 20, 2024

The fix may have to be more complicated than the suggested solution, as it is possible to have multiple schemas in a spanner database.

I have handed this over to the team responsible for maintaining SpannerIO Change streams.

@NimzyMaina
Copy link
Author

Hi @nielm,

Thanks for your quick response. I understand that may be the case. If they can come up with a quick solution that will factor that in then that would be great.

However, if that will take a bit of time, could we please have this added as an intermediate solution. As the SDK is right now, it's not usable. This solution will allow people to use the SDK as we await for a permanent solution.

If this is still not acceptable, can we please have something like this instead that will not throw an Exception on restart?

CREATE TABLE IF NOT EXISTS metadata_table ...

@nielm
Copy link
Contributor

nielm commented Sep 22, 2024

@ShuranZhang

@dedocibula
Copy link
Contributor

Are you using Postgres dialect by any chance? If so are you seeing table_schema set to public?

@NimzyMaina
Copy link
Author

@dedocibula

Yes I'm using Postgres dialect. table_schema has been set to "public" & table_catalog has been set to the name of the DB.

@dedocibula
Copy link
Contributor

Right, added a comment to your PR

@NimzyMaina
Copy link
Author

@dedocibula

Apologies, which comment are you refereeing to? I don't see a comment from your account on the PR.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging a pull request may close this issue.

4 participants