Skip to content

Commit

Permalink
SpannerIO: retry if concurrent transaction is aborted by emulator (#2…
Browse files Browse the repository at this point in the history
  • Loading branch information
kberezin-nshl authored Aug 30, 2023
1 parent c360399 commit 7bc5d1a
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2005,10 +2005,14 @@ static class WriteToSpannerFn extends DoFn<Iterable<MutationGroup>, Void> {
/* Number of times an aborted write to spanner could be retried */
private static final int ABORTED_RETRY_ATTEMPTS = 5;
/* Error string in Aborted exception during schema change */
private final String errString =
private final String schemaChangeErrString =
"Transaction aborted. "
+ "Database schema probably changed during transaction, retry may succeed.";

/* Error string in Aborted exception for concurrent transaction in Spanner Emulator */
private final String emulatorErrorString =
"The emulator only supports one transaction at a time.";

@VisibleForTesting static Sleeper sleeper = Sleeper.DEFAULT;

private final Counter mutationGroupBatchesReceived =
Expand Down Expand Up @@ -2139,7 +2143,9 @@ private void spannerWriteWithRetryIfSchemaChange(List<Mutation> batch) throws Sp
if (retry >= ABORTED_RETRY_ATTEMPTS) {
throw e;
}
if (e.isRetryable() || e.getMessage().contains(errString)) {
if (e.isRetryable()
|| e.getMessage().contains(schemaChangeErrString)
|| e.getMessage().contains(emulatorErrorString)) {
continue;
}
throw e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -758,12 +758,22 @@ public void deadlineExceededFailsAfterRetries() throws InterruptedException {

@Test
public void retryOnSchemaChangeException() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(buildUpsertMutation((long) 1));

String errString =
"Transaction aborted. "
+ "Database schema probably changed during transaction, retry may succeed.";
retryOnAbortedExceptionWithMessage(errString);
}

@Test
public void retryOnEmulatorRejectedConcurrentTransaction() throws InterruptedException {
String errString =
"Transaction 199 aborted due to active transaction 167. "
+ "The emulator only supports one transaction at a time.";
retryOnAbortedExceptionWithMessage(errString);
}

public void retryOnAbortedExceptionWithMessage(String errString) throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(buildUpsertMutation((long) 1));
// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);

Expand Down Expand Up @@ -806,11 +816,22 @@ public void retryOnSchemaChangeException() throws InterruptedException {

@Test
public void retryMaxOnSchemaChangeException() throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(buildUpsertMutation((long) 1));

String errString =
"Transaction aborted. "
+ "Database schema probably changed during transaction, retry may succeed.";
retryMaxOnAbortedExceptionWithMessage(errString);
}

@Test
public void retryMaxOnEmulatorRejectedConcurrentTransaction() throws InterruptedException {
String errString =
"Transaction 199 aborted due to active transaction 167. "
+ "The emulator only supports one transaction at a time.";
retryOnAbortedExceptionWithMessage(errString);
}

public void retryMaxOnAbortedExceptionWithMessage(String errString) throws InterruptedException {
List<Mutation> mutationList = Arrays.asList(buildUpsertMutation((long) 1));

// mock sleeper so that it does not actually sleep.
WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);
Expand Down

0 comments on commit 7bc5d1a

Please sign in to comment.