diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 435bbba9ae8e..d9dde11a3081 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -25,7 +25,6 @@ import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT; import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS; -import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument; import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull; @@ -61,6 +60,7 @@ import java.util.HashMap; import java.util.List; import java.util.Objects; +import java.util.Optional; import java.util.OptionalInt; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -77,6 +77,7 @@ import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; +import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn; @@ -1772,9 +1773,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta + fullPartitionMetadataDatabaseId + " has dialect " + metadataDatabaseDialect); - final String partitionMetadataTableName = - MoreObjects.firstNonNull( - getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId)); + PartitionMetadataTableNames partitionMetadataTableNames = + Optional.ofNullable(getMetadataTable()) + .map( + table -> + PartitionMetadataTableNames.fromExistingTable( + partitionMetadataDatabaseId, table)) + .orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId)); final String changeStreamName = getChangeStreamName(); final Timestamp startTimestamp = getInclusiveStartAt(); // Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the @@ -1791,7 +1796,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta changeStreamSpannerConfig, changeStreamName, partitionMetadataSpannerConfig, - partitionMetadataTableName, + partitionMetadataTableNames, rpcPriority, input.getPipeline().getOptions().getJobName(), changeStreamDatabaseDialect, @@ -1807,7 +1812,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta final PostProcessingMetricsDoFn postProcessingMetricsDoFn = new PostProcessingMetricsDoFn(metrics); - LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName); + LOG.info( + "Partition metadata table that will be used is " + + partitionMetadataTableNames.getTableName()); final PCollection impulseOut = input.apply(Impulse.create()); final PCollection partitionsOut = diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java deleted file mode 100644 index 322e85cb07a2..000000000000 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGenerator.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner.changestreams; - -import java.util.UUID; - -/** - * This class generates a unique name for the partition metadata table, which is created when the - * Connector is initialized. - */ -public class NameGenerator { - - private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s"; - private static final int MAX_TABLE_NAME_LENGTH = 63; - - /** - * Generates an unique name for the partition metadata table in the form of {@code - * "Metadata__"}. - * - * @param databaseId The database id where the table will be created - * @return the unique generated name of the partition metadata table - */ - public static String generatePartitionMetadataTableName(String databaseId) { - // There are 11 characters in the name format. - // Maximum Spanner database ID length is 30 characters. - // UUID always generates a String with 36 characters. - // Since the Postgres table name length is 63, we may need to truncate the table name depending - // on the database length. - String fullString = - String.format(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, UUID.randomUUID()) - .replaceAll("-", "_"); - if (fullString.length() < MAX_TABLE_NAME_LENGTH) { - return fullString; - } - return fullString.substring(0, MAX_TABLE_NAME_LENGTH); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java index b9718fdb675e..787abad02e02 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/DaoFactory.java @@ -44,7 +44,7 @@ public class DaoFactory implements Serializable { private final SpannerConfig metadataSpannerConfig; private final String changeStreamName; - private final String partitionMetadataTableName; + private final PartitionMetadataTableNames partitionMetadataTableNames; private final RpcPriority rpcPriority; private final String jobName; private final Dialect spannerChangeStreamDatabaseDialect; @@ -56,7 +56,7 @@ public class DaoFactory implements Serializable { * @param changeStreamSpannerConfig the configuration for the change streams DAO * @param changeStreamName the name of the change stream for the change streams DAO * @param metadataSpannerConfig the metadata tables configuration - * @param partitionMetadataTableName the name of the created partition metadata table + * @param partitionMetadataTableNames the names of the partition metadata ddl objects * @param rpcPriority the priority of the requests made by the DAO queries * @param jobName the name of the running job */ @@ -64,7 +64,7 @@ public DaoFactory( SpannerConfig changeStreamSpannerConfig, String changeStreamName, SpannerConfig metadataSpannerConfig, - String partitionMetadataTableName, + PartitionMetadataTableNames partitionMetadataTableNames, RpcPriority rpcPriority, String jobName, Dialect spannerChangeStreamDatabaseDialect, @@ -78,7 +78,7 @@ public DaoFactory( this.changeStreamSpannerConfig = changeStreamSpannerConfig; this.changeStreamName = changeStreamName; this.metadataSpannerConfig = metadataSpannerConfig; - this.partitionMetadataTableName = partitionMetadataTableName; + this.partitionMetadataTableNames = partitionMetadataTableNames; this.rpcPriority = rpcPriority; this.jobName = jobName; this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect; @@ -102,7 +102,7 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() { databaseAdminClient, metadataSpannerConfig.getInstanceId().get(), metadataSpannerConfig.getDatabaseId().get(), - partitionMetadataTableName, + partitionMetadataTableNames, this.metadataDatabaseDialect); } return partitionMetadataAdminDao; @@ -120,7 +120,7 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() { if (partitionMetadataDaoInstance == null) { partitionMetadataDaoInstance = new PartitionMetadataDao( - this.partitionMetadataTableName, + this.partitionMetadataTableNames.getTableName(), spannerAccessor.getDatabaseClient(), this.metadataDatabaseDialect); } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java index 368cab7022b3..3e6045d8858b 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDao.java @@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao { */ public static final String COLUMN_FINISHED_AT = "FinishedAt"; - /** Metadata table index for queries over the watermark column. */ - public static final String WATERMARK_INDEX = "WatermarkIndex"; - - /** Metadata table index for queries over the created at / start timestamp columns. */ - public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex"; - private static final int TIMEOUT_MINUTES = 10; private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1; private final DatabaseAdminClient databaseAdminClient; private final String instanceId; private final String databaseId; - private final String tableName; + private final PartitionMetadataTableNames names; private final Dialect dialect; /** @@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao { * table * @param instanceId the instance where the metadata table will reside * @param databaseId the database where the metadata table will reside - * @param tableName the name of the metadata table + * @param names the names of the metadata table ddl objects */ PartitionMetadataAdminDao( DatabaseAdminClient databaseAdminClient, String instanceId, String databaseId, - String tableName, + PartitionMetadataTableNames names, Dialect dialect) { this.databaseAdminClient = databaseAdminClient; this.instanceId = instanceId; this.databaseId = databaseId; - this.tableName = tableName; + this.names = names; this.dialect = dialect; } @@ -128,8 +122,8 @@ public void createPartitionMetadataTable() { if (this.isPostgres()) { // Literals need be added around literals to preserve casing. ddl.add( - "CREATE TABLE \"" - + tableName + "CREATE TABLE IF NOT EXISTS \"" + + names.getTableName() + "\"(\"" + COLUMN_PARTITION_TOKEN + "\" text NOT NULL,\"" @@ -163,20 +157,20 @@ public void createPartitionMetadataTable() { + COLUMN_FINISHED_AT + "\""); ddl.add( - "CREATE INDEX \"" - + WATERMARK_INDEX + "CREATE INDEX IF NOT EXISTS \"" + + names.getWatermarkIndexName() + "\" on \"" - + tableName + + names.getTableName() + "\" (\"" + COLUMN_WATERMARK + "\") INCLUDE (\"" + COLUMN_STATE + "\")"); ddl.add( - "CREATE INDEX \"" - + CREATED_AT_START_TIMESTAMP_INDEX + "CREATE INDEX IF NOT EXISTS \"" + + names.getCreatedAtIndexName() + "\" ON \"" - + tableName + + names.getTableName() + "\" (\"" + COLUMN_CREATED_AT + "\",\"" @@ -184,8 +178,8 @@ public void createPartitionMetadataTable() { + "\")"); } else { ddl.add( - "CREATE TABLE " - + tableName + "CREATE TABLE IF NOT EXISTS " + + names.getTableName() + " (" + COLUMN_PARTITION_TOKEN + " STRING(MAX) NOT NULL," @@ -218,20 +212,20 @@ public void createPartitionMetadataTable() { + TTL_AFTER_PARTITION_FINISHED_DAYS + " DAY))"); ddl.add( - "CREATE INDEX " - + WATERMARK_INDEX + "CREATE INDEX IF NOT EXISTS " + + names.getWatermarkIndexName() + " on " - + tableName + + names.getTableName() + " (" + COLUMN_WATERMARK + ") STORING (" + COLUMN_STATE + ")"); ddl.add( - "CREATE INDEX " - + CREATED_AT_START_TIMESTAMP_INDEX + "CREATE INDEX IF NOT EXISTS " + + names.getCreatedAtIndexName() + " ON " - + tableName + + names.getTableName() + " (" + COLUMN_CREATED_AT + "," @@ -261,16 +255,14 @@ public void createPartitionMetadataTable() { * Drops the metadata table. This operation should complete in {@link * PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes. */ - public void deletePartitionMetadataTable() { + public void deletePartitionMetadataTable(List indexes) { List ddl = new ArrayList<>(); if (this.isPostgres()) { - ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\""); - ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\""); - ddl.add("DROP TABLE \"" + tableName + "\""); + indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\"")); + ddl.add("DROP TABLE \"" + names.getTableName() + "\""); } else { - ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX); - ddl.add("DROP INDEX " + WATERMARK_INDEX); - ddl.add("DROP TABLE " + tableName); + indexes.forEach(index -> ddl.add("DROP INDEX " + index)); + ddl.add("DROP TABLE " + names.getTableName()); } OperationFuture op = databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java index 7867932cd1ad..654fd946663c 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataDao.java @@ -96,6 +96,41 @@ public boolean tableExists() { } } + /** + * Finds all indexes for the metadata table. + * + * @return a list of index names for the metadata table. + */ + public List findAllTableIndexes() { + String indexesStmt; + if (this.isPostgres()) { + indexesStmt = + "SELECT index_name FROM information_schema.indexes" + + " WHERE table_schema = 'public'" + + " AND table_name = '" + + metadataTableName + + "' AND index_type != 'PRIMARY_KEY'"; + } else { + indexesStmt = + "SELECT index_name FROM information_schema.indexes" + + " WHERE table_schema = ''" + + " AND table_name = '" + + metadataTableName + + "' AND index_type != 'PRIMARY_KEY'"; + } + + List result = new ArrayList<>(); + try (ResultSet queryResultSet = + databaseClient + .singleUseReadOnlyTransaction() + .executeQuery(Statement.of(indexesStmt), Options.tag("query=findAllTableIndexes"))) { + while (queryResultSet.next()) { + result.add(queryResultSet.getString("index_name")); + } + } + return result; + } + /** * Fetches the partition metadata row data for the given partition token. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java new file mode 100644 index 000000000000..07d7b80676de --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNames.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; + +import java.io.Serializable; +import java.util.Objects; +import java.util.UUID; +import javax.annotation.Nullable; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; + +/** + * Configuration for a partition metadata table. It encapsulates the name of the metadata table and + * indexes. + */ +public class PartitionMetadataTableNames implements Serializable { + + private static final long serialVersionUID = 8848098877671834584L; + + /** PostgreSQL max table and index length is 63 bytes. */ + @VisibleForTesting static final int MAX_NAME_LENGTH = 63; + + private static final String PARTITION_METADATA_TABLE_NAME_FORMAT = "Metadata_%s_%s"; + private static final String WATERMARK_INDEX_NAME_FORMAT = "WatermarkIdx_%s_%s"; + private static final String CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT = "CreatedAtIdx_%s_%s"; + + /** + * Generates a unique name for the partition metadata table and its indexes. The table name will + * be in the form of {@code "Metadata__"}. The watermark index will be in the + * form of {@code "WatermarkIdx__}. The createdAt / start timestamp index will + * be in the form of {@code "CreatedAtIdx__}. + * + * @param databaseId The database id where the table will be created + * @return the unique generated names of the partition metadata ddl + */ + public static PartitionMetadataTableNames generateRandom(String databaseId) { + UUID uuid = UUID.randomUUID(); + + String table = generateName(PARTITION_METADATA_TABLE_NAME_FORMAT, databaseId, uuid); + String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid); + String createdAtIndex = + generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid); + + return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); + } + + /** + * Encapsulates a selected table name. Index names are generated, but will only be used if the + * given table does not exist. The watermark index will be in the form of {@code + * "WatermarkIdx__}. The createdAt / start timestamp index will be in the form + * of {@code "CreatedAtIdx__}. + * + * @param databaseId The database id for the table + * @param table The table name to be used + * @return an instance with the table name and generated index names + */ + public static PartitionMetadataTableNames fromExistingTable(String databaseId, String table) { + UUID uuid = UUID.randomUUID(); + + String watermarkIndex = generateName(WATERMARK_INDEX_NAME_FORMAT, databaseId, uuid); + String createdAtIndex = + generateName(CREATED_AT_START_TIMESTAMP_INDEX_NAME_FORMAT, databaseId, uuid); + return new PartitionMetadataTableNames(table, watermarkIndex, createdAtIndex); + } + + private static String generateName(String template, String databaseId, UUID uuid) { + String name = String.format(template, databaseId, uuid).replaceAll("-", "_"); + if (name.length() > MAX_NAME_LENGTH) { + return name.substring(0, MAX_NAME_LENGTH); + } + return name; + } + + private final String tableName; + private final String watermarkIndexName; + private final String createdAtIndexName; + + public PartitionMetadataTableNames( + String tableName, String watermarkIndexName, String createdAtIndexName) { + this.tableName = tableName; + this.watermarkIndexName = watermarkIndexName; + this.createdAtIndexName = createdAtIndexName; + } + + public String getTableName() { + return tableName; + } + + public String getWatermarkIndexName() { + return watermarkIndexName; + } + + public String getCreatedAtIndexName() { + return createdAtIndexName; + } + + @Override + public boolean equals(@Nullable Object o) { + if (this == o) { + return true; + } + if (!(o instanceof PartitionMetadataTableNames)) { + return false; + } + PartitionMetadataTableNames that = (PartitionMetadataTableNames) o; + return Objects.equals(tableName, that.tableName) + && Objects.equals(watermarkIndexName, that.watermarkIndexName) + && Objects.equals(createdAtIndexName, that.createdAtIndexName); + } + + @Override + public int hashCode() { + return Objects.hash(tableName, watermarkIndexName, createdAtIndexName); + } + + @Override + public String toString() { + return "PartitionMetadataTableNames{" + + "tableName='" + + tableName + + '\'' + + ", watermarkIndexName='" + + watermarkIndexName + + '\'' + + ", createdAtIndexName='" + + createdAtIndexName + + '\'' + + '}'; + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java index a048c885a001..f8aa497292bf 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/CleanUpReadChangeStreamDoFn.java @@ -18,6 +18,7 @@ package org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn; import java.io.Serializable; +import java.util.List; import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory; import org.apache.beam.sdk.transforms.DoFn; @@ -33,6 +34,7 @@ public CleanUpReadChangeStreamDoFn(DaoFactory daoFactory) { @ProcessElement public void processElement(OutputReceiver receiver) { - daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable(); + List indexes = daoFactory.getPartitionMetadataDao().findAllTableIndexes(); + daoFactory.getPartitionMetadataAdminDao().deletePartitionMetadataTable(indexes); } } diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java index 387ffd603b14..ca93f34bf1ba 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dofn/InitializeDoFn.java @@ -64,6 +64,7 @@ public InitializeDoFn( public void processElement(OutputReceiver receiver) { PartitionMetadataDao partitionMetadataDao = daoFactory.getPartitionMetadataDao(); if (!partitionMetadataDao.tableExists()) { + // Creates partition metadata table and associated indexes daoFactory.getPartitionMetadataAdminDao().createPartitionMetadataTable(); createFakeParentPartition(); } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java deleted file mode 100644 index f15fc5307374..000000000000 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/NameGeneratorTest.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.beam.sdk.io.gcp.spanner.changestreams; - -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; - -import org.junit.Test; - -public class NameGeneratorTest { - private static final int MAXIMUM_POSTGRES_TABLE_NAME_LENGTH = 63; - - @Test - public void testGenerateMetadataTableNameRemovesHyphens() { - final String tableName = - NameGenerator.generatePartitionMetadataTableName("my-database-id-12345"); - assertFalse(tableName.contains("-")); - } - - @Test - public void testGenerateMetadataTableNameIsShorterThan64Characters() { - final String tableName = - NameGenerator.generatePartitionMetadataTableName("my-database-id1-maximum-length"); - assertTrue(tableName.length() <= MAXIMUM_POSTGRES_TABLE_NAME_LENGTH); - } -} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java index 3752c2fb3afc..02b9d111583b 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataAdminDaoTest.java @@ -33,7 +33,9 @@ import com.google.cloud.spanner.ErrorCode; import com.google.cloud.spanner.SpannerException; import com.google.spanner.admin.database.v1.UpdateDatabaseDdlMetadata; +import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Iterator; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -58,6 +60,8 @@ public class PartitionMetadataAdminDaoTest { private static final String DATABASE_ID = "SPANNER_DATABASE"; private static final String TABLE_NAME = "SPANNER_TABLE"; + private static final String WATERMARK_INDEX_NAME = "WATERMARK_INDEX"; + private static final String CREATED_AT_INDEX_NAME = "CREATED_AT_INDEX"; private static final int TIMEOUT_MINUTES = 10; @@ -68,12 +72,14 @@ public class PartitionMetadataAdminDaoTest { @Before public void setUp() { databaseAdminClient = mock(DatabaseAdminClient.class); + PartitionMetadataTableNames names = + new PartitionMetadataTableNames(TABLE_NAME, WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME); partitionMetadataAdminDao = new PartitionMetadataAdminDao( - databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME, Dialect.GOOGLE_STANDARD_SQL); + databaseAdminClient, INSTANCE_ID, DATABASE_ID, names, Dialect.GOOGLE_STANDARD_SQL); partitionMetadataAdminDaoPostgres = new PartitionMetadataAdminDao( - databaseAdminClient, INSTANCE_ID, DATABASE_ID, TABLE_NAME, Dialect.POSTGRESQL); + databaseAdminClient, INSTANCE_ID, DATABASE_ID, names, Dialect.POSTGRESQL); op = (OperationFuture) mock(OperationFuture.class); statements = ArgumentCaptor.forClass(Iterable.class); when(databaseAdminClient.updateDatabaseDdl( @@ -89,9 +95,9 @@ public void testCreatePartitionMetadataTable() throws Exception { .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); assertEquals(3, ((Collection) statements.getValue()).size()); Iterator it = statements.getValue().iterator(); - assertTrue(it.next().contains("CREATE TABLE")); - assertTrue(it.next().contains("CREATE INDEX")); - assertTrue(it.next().contains("CREATE INDEX")); + assertTrue(it.next().contains("CREATE TABLE IF NOT EXISTS")); + assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS")); + assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS")); } @Test @@ -102,9 +108,9 @@ public void testCreatePartitionMetadataTablePostgres() throws Exception { .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); assertEquals(3, ((Collection) statements.getValue()).size()); Iterator it = statements.getValue().iterator(); - assertTrue(it.next().contains("CREATE TABLE \"")); - assertTrue(it.next().contains("CREATE INDEX \"")); - assertTrue(it.next().contains("CREATE INDEX \"")); + assertTrue(it.next().contains("CREATE TABLE IF NOT EXISTS \"")); + assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS \"")); + assertTrue(it.next().contains("CREATE INDEX IF NOT EXISTS \"")); } @Test @@ -133,7 +139,8 @@ public void testCreatePartitionMetadataTableWithInterruptedException() throws Ex @Test public void testDeletePartitionMetadataTable() throws Exception { when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); - partitionMetadataAdminDao.deletePartitionMetadataTable(); + partitionMetadataAdminDao.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); verify(databaseAdminClient, times(1)) .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); assertEquals(3, ((Collection) statements.getValue()).size()); @@ -143,10 +150,22 @@ public void testDeletePartitionMetadataTable() throws Exception { assertTrue(it.next().contains("DROP TABLE")); } + @Test + public void testDeletePartitionMetadataTableWithNoIndexes() throws Exception { + when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); + partitionMetadataAdminDao.deletePartitionMetadataTable(Collections.emptyList()); + verify(databaseAdminClient, times(1)) + .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); + assertEquals(1, ((Collection) statements.getValue()).size()); + Iterator it = statements.getValue().iterator(); + assertTrue(it.next().contains("DROP TABLE")); + } + @Test public void testDeletePartitionMetadataTablePostgres() throws Exception { when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); - partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(); + partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); verify(databaseAdminClient, times(1)) .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); assertEquals(3, ((Collection) statements.getValue()).size()); @@ -156,11 +175,23 @@ public void testDeletePartitionMetadataTablePostgres() throws Exception { assertTrue(it.next().contains("DROP TABLE \"")); } + @Test + public void testDeletePartitionMetadataTablePostgresWithNoIndexes() throws Exception { + when(op.get(TIMEOUT_MINUTES, TimeUnit.MINUTES)).thenReturn(null); + partitionMetadataAdminDaoPostgres.deletePartitionMetadataTable(Collections.emptyList()); + verify(databaseAdminClient, times(1)) + .updateDatabaseDdl(eq(INSTANCE_ID), eq(DATABASE_ID), statements.capture(), isNull()); + assertEquals(1, ((Collection) statements.getValue()).size()); + Iterator it = statements.getValue().iterator(); + assertTrue(it.next().contains("DROP TABLE \"")); + } + @Test public void testDeletePartitionMetadataTableWithTimeoutException() throws Exception { when(op.get(10, TimeUnit.MINUTES)).thenThrow(new TimeoutException(TIMED_OUT)); try { - partitionMetadataAdminDao.deletePartitionMetadataTable(); + partitionMetadataAdminDao.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); fail(); } catch (SpannerException e) { assertTrue(e.getMessage().contains(TIMED_OUT)); @@ -171,7 +202,8 @@ public void testDeletePartitionMetadataTableWithTimeoutException() throws Except public void testDeletePartitionMetadataTableWithInterruptedException() throws Exception { when(op.get(10, TimeUnit.MINUTES)).thenThrow(new InterruptedException(INTERRUPTED)); try { - partitionMetadataAdminDao.deletePartitionMetadataTable(); + partitionMetadataAdminDao.deletePartitionMetadataTable( + Arrays.asList(WATERMARK_INDEX_NAME, CREATED_AT_INDEX_NAME)); fail(); } catch (SpannerException e) { assertEquals(ErrorCode.CANCELLED, e.getErrorCode()); diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java new file mode 100644 index 000000000000..2aae5b26a2cb --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/changestreams/dao/PartitionMetadataTableNamesTest.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.spanner.changestreams.dao; + +import static org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames.MAX_NAME_LENGTH; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotEquals; +import static org.junit.Assert.assertTrue; + +import org.junit.Test; + +public class PartitionMetadataTableNamesTest { + @Test + public void testGeneratePartitionMetadataNamesRemovesHyphens() { + String databaseId = "my-database-id-12345"; + + PartitionMetadataTableNames names1 = PartitionMetadataTableNames.generateRandom(databaseId); + assertFalse(names1.getTableName().contains("-")); + assertFalse(names1.getWatermarkIndexName().contains("-")); + assertFalse(names1.getCreatedAtIndexName().contains("-")); + + PartitionMetadataTableNames names2 = PartitionMetadataTableNames.generateRandom(databaseId); + assertNotEquals(names1.getTableName(), names2.getTableName()); + assertNotEquals(names1.getWatermarkIndexName(), names2.getWatermarkIndexName()); + assertNotEquals(names1.getCreatedAtIndexName(), names2.getCreatedAtIndexName()); + } + + @Test + public void testGeneratePartitionMetadataNamesIsShorterThan64Characters() { + PartitionMetadataTableNames names = + PartitionMetadataTableNames.generateRandom( + "my-database-id-larger-than-maximum-length-1234567890-1234567890-1234567890"); + assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); + + names = PartitionMetadataTableNames.generateRandom("d"); + assertTrue(names.getTableName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getWatermarkIndexName().length() <= MAX_NAME_LENGTH); + assertTrue(names.getCreatedAtIndexName().length() <= MAX_NAME_LENGTH); + } + + @Test + public void testPartitionMetadataNamesFromExistingTable() { + PartitionMetadataTableNames names1 = + PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable"); + assertEquals("mytable", names1.getTableName()); + assertFalse(names1.getWatermarkIndexName().contains("-")); + assertFalse(names1.getCreatedAtIndexName().contains("-")); + + PartitionMetadataTableNames names2 = + PartitionMetadataTableNames.fromExistingTable("databaseid", "mytable"); + assertEquals("mytable", names2.getTableName()); + assertNotEquals(names1.getWatermarkIndexName(), names2.getWatermarkIndexName()); + assertNotEquals(names1.getCreatedAtIndexName(), names2.getCreatedAtIndexName()); + } +}