diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java index 43975683eba1..786fa91f5582 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIO.java @@ -2005,10 +2005,14 @@ static class WriteToSpannerFn extends DoFn, Void> { /* Number of times an aborted write to spanner could be retried */ private static final int ABORTED_RETRY_ATTEMPTS = 5; /* Error string in Aborted exception during schema change */ - private final String errString = + private final String schemaChangeErrString = "Transaction aborted. " + "Database schema probably changed during transaction, retry may succeed."; + /* Error string in Aborted exception for concurrent transaction in Spanner Emulator */ + private final String emulatorErrorString = + "The emulator only supports one transaction at a time."; + @VisibleForTesting static Sleeper sleeper = Sleeper.DEFAULT; private final Counter mutationGroupBatchesReceived = @@ -2139,7 +2143,9 @@ private void spannerWriteWithRetryIfSchemaChange(List batch) throws Sp if (retry >= ABORTED_RETRY_ATTEMPTS) { throw e; } - if (e.isRetryable() || e.getMessage().contains(errString)) { + if (e.isRetryable() + || e.getMessage().contains(schemaChangeErrString) + || e.getMessage().contains(emulatorErrorString)) { continue; } throw e; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java index 2f5d4d7aed87..9cd6008a9466 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/spanner/SpannerIOWriteTest.java @@ -758,12 +758,22 @@ public void deadlineExceededFailsAfterRetries() throws InterruptedException { @Test public void retryOnSchemaChangeException() throws InterruptedException { - List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); - String errString = "Transaction aborted. " + "Database schema probably changed during transaction, retry may succeed."; + retryOnAbortedExceptionWithMessage(errString); + } + @Test + public void retryOnEmulatorRejectedConcurrentTransaction() throws InterruptedException { + String errString = + "Transaction 199 aborted due to active transaction 167. " + + "The emulator only supports one transaction at a time."; + retryOnAbortedExceptionWithMessage(errString); + } + + public void retryOnAbortedExceptionWithMessage(String errString) throws InterruptedException { + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); // mock sleeper so that it does not actually sleep. WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class); @@ -806,11 +816,22 @@ public void retryOnSchemaChangeException() throws InterruptedException { @Test public void retryMaxOnSchemaChangeException() throws InterruptedException { - List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); - String errString = "Transaction aborted. " + "Database schema probably changed during transaction, retry may succeed."; + retryMaxOnAbortedExceptionWithMessage(errString); + } + + @Test + public void retryMaxOnEmulatorRejectedConcurrentTransaction() throws InterruptedException { + String errString = + "Transaction 199 aborted due to active transaction 167. " + + "The emulator only supports one transaction at a time."; + retryOnAbortedExceptionWithMessage(errString); + } + + public void retryMaxOnAbortedExceptionWithMessage(String errString) throws InterruptedException { + List mutationList = Arrays.asList(buildUpsertMutation((long) 1)); // mock sleeper so that it does not actually sleep. WriteToSpannerFn.sleeper = Mockito.mock(Sleeper.class);