Skip to content

Commit

Permalink
Merge branch 'main' into bq-to-bt-QueryTempDataset
Browse files Browse the repository at this point in the history
  • Loading branch information
liferoad committed Oct 28, 2024
2 parents a15c28a + 3233cbf commit b005ccc
Show file tree
Hide file tree
Showing 46 changed files with 1,527 additions and 119 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,6 @@
import java.time.Instant;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.function.Supplier;
import javax.annotation.Nullable;
import org.apache.beam.it.common.ResourceManager;
Expand Down Expand Up @@ -310,11 +309,16 @@ public synchronized void executeDdlStatements(List<String> statements)

LOG.info("Executing DDL statements '{}' on database {}.", statements, databaseId);
try {
databaseAdminClient
.updateDatabaseDdl(instanceId, databaseId, statements, /* operationId= */ null)
.get();
// executeDdlStatments can fail for spanner staging because of failfast.
Failsafe.with(retryOnQuotaException())
.run(
() ->
databaseAdminClient
.updateDatabaseDdl(
instanceId, databaseId, statements, /* operationId= */ null)
.get());
LOG.info("Successfully executed DDL statements '{}' on database {}.", statements, databaseId);
} catch (ExecutionException | InterruptedException | SpannerException e) {
} catch (Exception e) {
throw new SpannerResourceManagerException("Failed to execute statement.", e);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,50 @@ public void testExecuteDdlStatementShouldWorkWhenSpannerDoesntThrowAnyError()
assertThat(actualStatement).containsExactlyElementsIn(ImmutableList.of(statement));
}

@Test
public void testExecuteDdlStatementShouldRetryOnResourceExhaustedError()
throws ExecutionException, InterruptedException {
// arrange
prepareCreateInstanceMock();
prepareCreateDatabaseMock();
String statement =
"CREATE TABLE Singers (\n"
+ " SingerId INT64 NOT NULL,\n"
+ " FirstName STRING(1024),\n"
+ " LastName STRING(1024),\n"
+ ") PRIMARY KEY (SingerId)";

RuntimeException resourceExhaustedException =
new RuntimeException(
"com.google.cloud.spanner.SpannerException: RESOURCE_EXHAUSTED: io.grpc.StatusRuntimeException: RESOURCE_EXHAUSTED: CPU overload detected");
when(spanner.getDatabaseAdminClient().updateDatabaseDdl(any(), any(), any(), any()).get())
.thenThrow(resourceExhaustedException)
.thenReturn(null);

// act
testManager.executeDdlStatement(statement);

// assert
// verify createInstance, createDatabase, and updateDatabaseDdl were called.
verify(spanner.getInstanceAdminClient(), times(2)).createInstance(any());
verify(spanner.getDatabaseAdminClient(), times(2)).createDatabase(any(), any());
verify(spanner.getDatabaseAdminClient(), times(3))
.updateDatabaseDdl(
instanceIdCaptor.capture(),
databaseIdCaptor.capture(),
statementCaptor.capture(),
any());

String actualInstanceId = instanceIdCaptor.getValue();
String actualDatabaseId = databaseIdCaptor.getValue();
Iterable<String> actualStatement = statementCaptor.getValue();

assertThat(actualInstanceId).matches(TEST_ID + "-\\d{8}-\\d{6}-[a-zA-Z0-9]{6}");

assertThat(actualDatabaseId).matches(TEST_ID + "_\\d{8}_\\d{6}_[a-zA-Z0-9]{6}");
assertThat(actualStatement).containsExactlyElementsIn(ImmutableList.of(statement));
}

@Test
public void testWriteSingleRecordShouldWorkWhenSpannerWriteSucceeds()
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,6 +176,12 @@ public void migrationTestWithAllDatatypeConversionMapping() {
// Assert Conditions
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
assertAllDatatypeColumnsTableCdcContents();
}

Expand Down Expand Up @@ -228,7 +234,12 @@ public void migrationTestWithAllDatatypeDefaultMapping() {

// Assert Conditions
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
assertAllDatatypeColumns2TableCdcContents();
}

Expand Down Expand Up @@ -281,7 +292,12 @@ public void migrationTestWithAllDatatypeTransformation() {

// Assert Conditions
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
assertAllDatatypeTransformationTableCdcContents();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ public void migrationTestWithUpdatesAndDeletes() {
// Assert Conditions
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
// Assert specific rows
assertUsersTableContents();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ public abstract class DataStreamToSpannerITBase extends TemplateTestBase {
// Format of avro file path in GCS - {table}/2023/12/20/06/57/{fileName}
public static final String DATA_STREAM_EVENT_FILES_PATH_FORMAT_IN_GCS = "%s/2023/12/20/06/57/%s";
private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpannerITBase.class);
public static final int CUTOVER_MILLIS = 30 * 1000;

public PubsubResourceManager setUpPubSubResourceManager() throws IOException {
return PubsubResourceManager.builder(testName, PROJECT, credentialsProvider).build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ public void migrationTestWithRenameAndDropColumn() {
// Assert Conditions
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
assertCategoryTableCdcContents();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ public class DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT

private static final String SPANNER_DDL_RESOURCE =
"DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT/spanner-schema.sql";
public static final int CUTOVER_MILLIS = 30 * 1000;

private static HashSet<DataStreamToSpannerShardedMigrationWithMigrationShardIdColumnIT>
testInstances = new HashSet<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,12 @@ public void multiShardMigration() {
.waitForCondition(createConfig(jobInfo1, Duration.ofMinutes(10)), rowsConditionCheck);
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
// Assert specific rows
assertUsersTableContents();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,12 @@ public void multiShardMigration() {
.waitForCondition(createConfig(jobInfo, Duration.ofMinutes(10)), rowsConditionCheck);
assertThatResult(result).meetsConditions();

// Sleep for cutover time to wait till all CDCs propagate.
// A real world customer also has a small cut over time to reach consistency.
try {
Thread.sleep(CUTOVER_MILLIS);
} catch (InterruptedException e) {
}
// Assert specific rows
assertUsersTableContents();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@
import org.slf4j.LoggerFactory;

public final class OptionsToConfigBuilder {

private static final Logger LOG = LoggerFactory.getLogger(OptionsToConfigBuilder.class);
public static final String DEFAULT_POSTGRESQL_NAMESPACE = "public";

public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
SourceDbToSpannerOptions options,
Expand All @@ -49,6 +51,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
String dbName = extractDbFromURL(sourceDbURL);
String username = options.getUsername();
String password = options.getPassword();
String namespace = options.getNamespace();

String jdbcDriverClassName = options.getJdbcDriverClassName();
String jdbcDriverJars = options.getJdbcDriverJars();
Expand All @@ -66,6 +69,7 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfigWithDefaults(
username,
password,
dbName,
namespace,
shardId,
jdbcDriverClassName,
jdbcDriverJars,
Expand All @@ -84,16 +88,19 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
String username,
String password,
String dbName,
String namespace,
String shardId,
String jdbcDriverClassName,
String jdbcDriverJars,
long maxConnections,
Integer numPartitions,
Wait.OnSignal<?> waitOn) {
JdbcIOWrapperConfig.Builder builder = builderWithDefaultsFor(sqlDialect);
SourceSchemaReference sourceSchemaReference =
sourceSchemaReferenceFrom(sqlDialect, dbName, namespace);
builder =
builder
.setSourceSchemaReference(SourceSchemaReference.builder().setDbName(dbName).build())
.setSourceSchemaReference(sourceSchemaReference)
.setDbAuth(
LocalCredentialsProvider.builder()
.setUserName(
Expand Down Expand Up @@ -123,8 +130,9 @@ public static JdbcIOWrapperConfig getJdbcIOWrapperConfig(
if (sourceDbURL == null) {
sourceDbURL = "jdbc:postgresql://" + host + ":" + port + "/" + dbName;
}
sourceDbURL = sourceDbURL + "?currentSchema=" + sourceSchemaReference.namespace();
if (StringUtils.isNotBlank(connectionProperties)) {
sourceDbURL = sourceDbURL + "?" + connectionProperties;
sourceDbURL = sourceDbURL + "&" + connectionProperties;
}
break;
}
Expand Down Expand Up @@ -228,5 +236,19 @@ private static JdbcIOWrapperConfig.Builder builderWithDefaultsFor(SQLDialect dia
return builderWithMySqlDefaults();
}

private static SourceSchemaReference sourceSchemaReferenceFrom(
SQLDialect dialect, String dbName, String namespace) {
SourceSchemaReference.Builder builder = SourceSchemaReference.builder();
// Namespaces are not supported for MySQL
if (dialect == SQLDialect.POSTGRESQL) {
if (StringUtils.isBlank(namespace)) {
builder.setNamespace(DEFAULT_POSTGRESQL_NAMESPACE);
} else {
builder.setNamespace(namespace);
}
}
return builder.setDbName(dbName).build();
}

private OptionsToConfigBuilder() {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -223,4 +223,15 @@ public interface SourceDbToSpannerOptions extends CommonTemplateOptions {
String getTransformationCustomParameters();

void setTransformationCustomParameters(String value);

@TemplateParameter.Text(
order = 19,
optional = true,
description = "Namespace",
helpText =
"Namespace to exported. For PostgreSQL, if no namespace is provided, 'public' will be used")
@Default.String("")
String getNamespace();

void setNamespace(String value);
}
Loading

0 comments on commit b005ccc

Please sign in to comment.