Skip to content

Commit

Permalink
fix: generate random index name for change streams (#32689)
Browse files Browse the repository at this point in the history
Generates index names for change stream partition metadata table using a
random UUID. This prevents issues if the job is being redeployed in an
existing database.
  • Loading branch information
thiagotnunes authored Oct 21, 2024
1 parent 1ba33b8 commit ac87d7b
Show file tree
Hide file tree
Showing 11 changed files with 344 additions and 151 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.DEFAULT_RPC_PRIORITY;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.MAX_INCLUSIVE_END_AT;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.ChangeStreamsConstants.THROUGHPUT_WINDOW_SECONDS;
import static org.apache.beam.sdk.io.gcp.spanner.changestreams.NameGenerator.generatePartitionMetadataTableName;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkArgument;
import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkNotNull;

Expand Down Expand Up @@ -61,6 +60,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import java.util.OptionalInt;
import java.util.Set;
import java.util.concurrent.TimeUnit;
Expand All @@ -77,6 +77,7 @@
import org.apache.beam.sdk.io.gcp.spanner.changestreams.MetadataSpannerConfigFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.action.ActionFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.DaoFactory;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dao.PartitionMetadataTableNames;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.CleanUpReadChangeStreamDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.DetectNewPartitionsDoFn;
import org.apache.beam.sdk.io.gcp.spanner.changestreams.dofn.InitializeDoFn;
Expand Down Expand Up @@ -1772,9 +1773,13 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
+ fullPartitionMetadataDatabaseId
+ " has dialect "
+ metadataDatabaseDialect);
final String partitionMetadataTableName =
MoreObjects.firstNonNull(
getMetadataTable(), generatePartitionMetadataTableName(partitionMetadataDatabaseId));
PartitionMetadataTableNames partitionMetadataTableNames =
Optional.ofNullable(getMetadataTable())
.map(
table ->
PartitionMetadataTableNames.fromExistingTable(
partitionMetadataDatabaseId, table))
.orElse(PartitionMetadataTableNames.generateRandom(partitionMetadataDatabaseId));
final String changeStreamName = getChangeStreamName();
final Timestamp startTimestamp = getInclusiveStartAt();
// Uses (Timestamp.MAX - 1ns) at max for end timestamp, because we add 1ns to transform the
Expand All @@ -1791,7 +1796,7 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
changeStreamSpannerConfig,
changeStreamName,
partitionMetadataSpannerConfig,
partitionMetadataTableName,
partitionMetadataTableNames,
rpcPriority,
input.getPipeline().getOptions().getJobName(),
changeStreamDatabaseDialect,
Expand All @@ -1807,7 +1812,9 @@ && getInclusiveStartAt().toSqlTimestamp().after(getInclusiveEndAt().toSqlTimesta
final PostProcessingMetricsDoFn postProcessingMetricsDoFn =
new PostProcessingMetricsDoFn(metrics);

LOG.info("Partition metadata table that will be used is " + partitionMetadataTableName);
LOG.info(
"Partition metadata table that will be used is "
+ partitionMetadataTableNames.getTableName());

final PCollection<byte[]> impulseOut = input.apply(Impulse.create());
final PCollection<PartitionMetadata> partitionsOut =
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public class DaoFactory implements Serializable {
private final SpannerConfig metadataSpannerConfig;

private final String changeStreamName;
private final String partitionMetadataTableName;
private final PartitionMetadataTableNames partitionMetadataTableNames;
private final RpcPriority rpcPriority;
private final String jobName;
private final Dialect spannerChangeStreamDatabaseDialect;
Expand All @@ -56,15 +56,15 @@ public class DaoFactory implements Serializable {
* @param changeStreamSpannerConfig the configuration for the change streams DAO
* @param changeStreamName the name of the change stream for the change streams DAO
* @param metadataSpannerConfig the metadata tables configuration
* @param partitionMetadataTableName the name of the created partition metadata table
* @param partitionMetadataTableNames the names of the partition metadata ddl objects
* @param rpcPriority the priority of the requests made by the DAO queries
* @param jobName the name of the running job
*/
public DaoFactory(
SpannerConfig changeStreamSpannerConfig,
String changeStreamName,
SpannerConfig metadataSpannerConfig,
String partitionMetadataTableName,
PartitionMetadataTableNames partitionMetadataTableNames,
RpcPriority rpcPriority,
String jobName,
Dialect spannerChangeStreamDatabaseDialect,
Expand All @@ -78,7 +78,7 @@ public DaoFactory(
this.changeStreamSpannerConfig = changeStreamSpannerConfig;
this.changeStreamName = changeStreamName;
this.metadataSpannerConfig = metadataSpannerConfig;
this.partitionMetadataTableName = partitionMetadataTableName;
this.partitionMetadataTableNames = partitionMetadataTableNames;
this.rpcPriority = rpcPriority;
this.jobName = jobName;
this.spannerChangeStreamDatabaseDialect = spannerChangeStreamDatabaseDialect;
Expand All @@ -102,7 +102,7 @@ public synchronized PartitionMetadataAdminDao getPartitionMetadataAdminDao() {
databaseAdminClient,
metadataSpannerConfig.getInstanceId().get(),
metadataSpannerConfig.getDatabaseId().get(),
partitionMetadataTableName,
partitionMetadataTableNames,
this.metadataDatabaseDialect);
}
return partitionMetadataAdminDao;
Expand All @@ -120,7 +120,7 @@ public synchronized PartitionMetadataDao getPartitionMetadataDao() {
if (partitionMetadataDaoInstance == null) {
partitionMetadataDaoInstance =
new PartitionMetadataDao(
this.partitionMetadataTableName,
this.partitionMetadataTableNames.getTableName(),
spannerAccessor.getDatabaseClient(),
this.metadataDatabaseDialect);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,13 @@ public class PartitionMetadataAdminDao {
*/
public static final String COLUMN_FINISHED_AT = "FinishedAt";

/** Metadata table index for queries over the watermark column. */
public static final String WATERMARK_INDEX = "WatermarkIndex";

/** Metadata table index for queries over the created at / start timestamp columns. */
public static final String CREATED_AT_START_TIMESTAMP_INDEX = "CreatedAtStartTimestampIndex";

private static final int TIMEOUT_MINUTES = 10;
private static final int TTL_AFTER_PARTITION_FINISHED_DAYS = 1;

private final DatabaseAdminClient databaseAdminClient;
private final String instanceId;
private final String databaseId;
private final String tableName;
private final PartitionMetadataTableNames names;
private final Dialect dialect;

/**
Expand All @@ -101,18 +95,18 @@ public class PartitionMetadataAdminDao {
* table
* @param instanceId the instance where the metadata table will reside
* @param databaseId the database where the metadata table will reside
* @param tableName the name of the metadata table
* @param names the names of the metadata table ddl objects
*/
PartitionMetadataAdminDao(
DatabaseAdminClient databaseAdminClient,
String instanceId,
String databaseId,
String tableName,
PartitionMetadataTableNames names,
Dialect dialect) {
this.databaseAdminClient = databaseAdminClient;
this.instanceId = instanceId;
this.databaseId = databaseId;
this.tableName = tableName;
this.names = names;
this.dialect = dialect;
}

Expand All @@ -128,8 +122,8 @@ public void createPartitionMetadataTable() {
if (this.isPostgres()) {
// Literals need be added around literals to preserve casing.
ddl.add(
"CREATE TABLE \""
+ tableName
"CREATE TABLE IF NOT EXISTS \""
+ names.getTableName()
+ "\"(\""
+ COLUMN_PARTITION_TOKEN
+ "\" text NOT NULL,\""
Expand Down Expand Up @@ -163,29 +157,29 @@ public void createPartitionMetadataTable() {
+ COLUMN_FINISHED_AT
+ "\"");
ddl.add(
"CREATE INDEX \""
+ WATERMARK_INDEX
"CREATE INDEX IF NOT EXISTS \""
+ names.getWatermarkIndexName()
+ "\" on \""
+ tableName
+ names.getTableName()
+ "\" (\""
+ COLUMN_WATERMARK
+ "\") INCLUDE (\""
+ COLUMN_STATE
+ "\")");
ddl.add(
"CREATE INDEX \""
+ CREATED_AT_START_TIMESTAMP_INDEX
"CREATE INDEX IF NOT EXISTS \""
+ names.getCreatedAtIndexName()
+ "\" ON \""
+ tableName
+ names.getTableName()
+ "\" (\""
+ COLUMN_CREATED_AT
+ "\",\""
+ COLUMN_START_TIMESTAMP
+ "\")");
} else {
ddl.add(
"CREATE TABLE "
+ tableName
"CREATE TABLE IF NOT EXISTS "
+ names.getTableName()
+ " ("
+ COLUMN_PARTITION_TOKEN
+ " STRING(MAX) NOT NULL,"
Expand Down Expand Up @@ -218,20 +212,20 @@ public void createPartitionMetadataTable() {
+ TTL_AFTER_PARTITION_FINISHED_DAYS
+ " DAY))");
ddl.add(
"CREATE INDEX "
+ WATERMARK_INDEX
"CREATE INDEX IF NOT EXISTS "
+ names.getWatermarkIndexName()
+ " on "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_WATERMARK
+ ") STORING ("
+ COLUMN_STATE
+ ")");
ddl.add(
"CREATE INDEX "
+ CREATED_AT_START_TIMESTAMP_INDEX
"CREATE INDEX IF NOT EXISTS "
+ names.getCreatedAtIndexName()
+ " ON "
+ tableName
+ names.getTableName()
+ " ("
+ COLUMN_CREATED_AT
+ ","
Expand Down Expand Up @@ -261,16 +255,14 @@ public void createPartitionMetadataTable() {
* Drops the metadata table. This operation should complete in {@link
* PartitionMetadataAdminDao#TIMEOUT_MINUTES} minutes.
*/
public void deletePartitionMetadataTable() {
public void deletePartitionMetadataTable(List<String> indexes) {
List<String> ddl = new ArrayList<>();
if (this.isPostgres()) {
ddl.add("DROP INDEX \"" + CREATED_AT_START_TIMESTAMP_INDEX + "\"");
ddl.add("DROP INDEX \"" + WATERMARK_INDEX + "\"");
ddl.add("DROP TABLE \"" + tableName + "\"");
indexes.forEach(index -> ddl.add("DROP INDEX \"" + index + "\""));
ddl.add("DROP TABLE \"" + names.getTableName() + "\"");
} else {
ddl.add("DROP INDEX " + CREATED_AT_START_TIMESTAMP_INDEX);
ddl.add("DROP INDEX " + WATERMARK_INDEX);
ddl.add("DROP TABLE " + tableName);
indexes.forEach(index -> ddl.add("DROP INDEX " + index));
ddl.add("DROP TABLE " + names.getTableName());
}
OperationFuture<Void, UpdateDatabaseDdlMetadata> op =
databaseAdminClient.updateDatabaseDdl(instanceId, databaseId, ddl, null);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,41 @@ public boolean tableExists() {
}
}

/**
* Finds all indexes for the metadata table.
*
* @return a list of index names for the metadata table.
*/
public List<String> findAllTableIndexes() {
String indexesStmt;
if (this.isPostgres()) {
indexesStmt =
"SELECT index_name FROM information_schema.indexes"
+ " WHERE table_schema = 'public'"
+ " AND table_name = '"
+ metadataTableName
+ "' AND index_type != 'PRIMARY_KEY'";
} else {
indexesStmt =
"SELECT index_name FROM information_schema.indexes"
+ " WHERE table_schema = ''"
+ " AND table_name = '"
+ metadataTableName
+ "' AND index_type != 'PRIMARY_KEY'";
}

List<String> result = new ArrayList<>();
try (ResultSet queryResultSet =
databaseClient
.singleUseReadOnlyTransaction()
.executeQuery(Statement.of(indexesStmt), Options.tag("query=findAllTableIndexes"))) {
while (queryResultSet.next()) {
result.add(queryResultSet.getString("index_name"));
}
}
return result;
}

/**
* Fetches the partition metadata row data for the given partition token.
*
Expand Down
Loading

0 comments on commit ac87d7b

Please sign in to comment.