From 380ed7b7f746a379a6b9bd902327fcb0ba1b2139 Mon Sep 17 00:00:00 2001 From: Hyeonho Kim Date: Thu, 26 Sep 2024 02:29:20 +0900 Subject: [PATCH] fix: add topic existing validation (#32465) * feat: add topic existing validation * feat: add validation to write * docs: add changes * docs: change docs * refactor: change validate to primitive type * test: change test more clearly * style: follow lint * docs: fix CHANGES * docs: follow changes --- CHANGES.md | 4 + .../beam/sdk/io/gcp/pubsub/PubsubClient.java | 3 + .../sdk/io/gcp/pubsub/PubsubGrpcClient.java | 16 ++ .../beam/sdk/io/gcp/pubsub/PubsubIO.java | 94 +++++++++- .../sdk/io/gcp/pubsub/PubsubJsonClient.java | 14 ++ .../sdk/io/gcp/pubsub/PubsubTestClient.java | 6 + .../io/gcp/pubsub/PubsubGrpcClientTest.java | 40 +++++ .../beam/sdk/io/gcp/pubsub/PubsubIOTest.java | 170 ++++++++++++++++++ .../io/gcp/pubsub/PubsubJsonClientTest.java | 24 +++ 9 files changed, 370 insertions(+), 1 deletion(-) diff --git a/CHANGES.md b/CHANGES.md index c123a8e1a4dc..a0133bd531ca 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -59,6 +59,10 @@ * Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) +## I/Os + +* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465)) + ## New Features / Improvements * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java index 2964a29dbb6b..bd01604643e1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubClient.java @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline( /** Return a list of topics for {@code project}. */ public abstract List listTopics(ProjectPath project) throws IOException; + /** Return {@literal true} if {@code topic} exists. */ + public abstract boolean isTopicExists(TopicPath topic) throws IOException; + /** Create {@code subscription} to {@code topic}. */ public abstract void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException; diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java index 93fdd5524007..0cfb06688108 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClient.java @@ -54,6 +54,7 @@ import io.grpc.Channel; import io.grpc.ClientInterceptors; import io.grpc.ManagedChannel; +import io.grpc.StatusRuntimeException; import io.grpc.auth.ClientAuthInterceptor; import io.grpc.netty.GrpcSslContexts; import io.grpc.netty.NegotiationType; @@ -372,6 +373,21 @@ public List listTopics(ProjectPath project) throws IOException { return topics; } + @Override + public boolean isTopicExists(TopicPath topic) throws IOException { + GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build(); + try { + publisherStub().getTopic(request); + return true; + } catch (StatusRuntimeException e) { + if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) { + return false; + } + + throw e; + } + } + @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index f59a68c40551..4d0586aa85af 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -50,6 +50,7 @@ import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.metrics.Lineage; +import org.apache.beam.sdk.options.PipelineOptions; import org.apache.beam.sdk.options.ValueProvider; import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider; import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider; @@ -860,6 +861,8 @@ public abstract static class Read extends PTransform> abstract ErrorHandler getBadRecordErrorHandler(); + abstract boolean getValidate(); + abstract Builder toBuilder(); static Builder newBuilder(SerializableFunction parseFn) { @@ -871,6 +874,7 @@ static Builder newBuilder(SerializableFunction parseFn) builder.setNeedsOrderingKey(false); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); + builder.setValidate(true); return builder; } @@ -918,6 +922,8 @@ abstract static class Builder { abstract Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); + abstract Builder setValidate(boolean validation); + abstract Read build(); } @@ -1097,6 +1103,11 @@ public Read withErrorHandler(ErrorHandler badRecordErrorHandler .build(); } + /** Disable validation of the existence of the topic. */ + public Read withoutValidation() { + return toBuilder().setValidate(false).build(); + } + @VisibleForTesting /** * Set's the internal Clock. @@ -1262,6 +1273,35 @@ public T apply(PubsubMessage input) { return read.setCoder(getCoder()); } + @Override + public void validate(PipelineOptions options) { + if (!getValidate()) { + return; + } + + PubsubOptions psOptions = options.as(PubsubOptions.class); + + // Validate the existence of the topic. + if (getTopicProvider() != null) { + PubsubTopic topic = getTopicProvider().get(); + boolean topicExists = true; + try (PubsubClient pubsubClient = + getPubsubClientFactory() + .newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) { + topicExists = + pubsubClient.isTopicExists( + PubsubClient.topicPathFromName(topic.project, topic.topic)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!topicExists) { + throw new IllegalArgumentException( + String.format("Pubsub topic '%s' does not exist.", topic)); + } + } + } + @Override public void populateDisplayData(DisplayData.Builder builder) { super.populateDisplayData(builder); @@ -1341,6 +1381,8 @@ public abstract static class Write extends PTransform, PDone> abstract ErrorHandler getBadRecordErrorHandler(); + abstract boolean getValidate(); + abstract Builder toBuilder(); static Builder newBuilder( @@ -1350,6 +1392,7 @@ static Builder newBuilder( builder.setFormatFn(formatFn); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); + builder.setValidate(true); return builder; } @@ -1386,6 +1429,8 @@ abstract Builder setFormatFn( abstract Builder setBadRecordErrorHandler( ErrorHandler badRecordErrorHandler); + abstract Builder setValidate(boolean validation); + abstract Write build(); } @@ -1396,11 +1441,14 @@ abstract Builder setBadRecordErrorHandler( * {@code topic} string. */ public Write to(String topic) { - return to(StaticValueProvider.of(topic)); + ValueProvider topicProvider = StaticValueProvider.of(topic); + validateTopic(topicProvider); + return to(topicProvider); } /** Like {@code topic()} but with a {@link ValueProvider}. */ public Write to(ValueProvider topic) { + validateTopic(topic); return toBuilder() .setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath)) .setTopicFunction(null) @@ -1421,6 +1469,13 @@ public Write to(SerializableFunction, String> topicFun .build(); } + /** Handles validation of {@code topic}. */ + private static void validateTopic(ValueProvider topic) { + if (topic.isAccessible()) { + PubsubTopic.fromPath(topic.get()); + } + } + /** * The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link * PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client @@ -1497,6 +1552,14 @@ public Write withErrorHandler(ErrorHandler badRecordErrorHandle .build(); } + /** + * Disable validation of the existence of the topic. Validation of the topic works only if the + * topic is set statically and not dynamically. + */ + public Write withoutValidation() { + return toBuilder().setValidate(false).build(); + } + @Override public PDone expand(PCollection input) { if (getTopicProvider() == null && !getDynamicDestinations()) { @@ -1573,6 +1636,35 @@ public void populateDisplayData(DisplayData.Builder builder) { builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider()); } + @Override + public void validate(PipelineOptions options) { + if (!getValidate()) { + return; + } + + PubsubOptions psOptions = options.as(PubsubOptions.class); + + // Validate the existence of the topic. + if (getTopicProvider() != null) { + PubsubTopic topic = getTopicProvider().get(); + boolean topicExists = true; + try (PubsubClient pubsubClient = + getPubsubClientFactory() + .newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) { + topicExists = + pubsubClient.isTopicExists( + PubsubClient.topicPathFromName(topic.project, topic.topic)); + } catch (Exception e) { + throw new RuntimeException(e); + } + + if (!topicExists) { + throw new IllegalArgumentException( + String.format("Pubsub topic '%s' does not exist.", topic)); + } + } + } + /** * Writer to Pubsub which batches messages from bounded collections. * diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 386febcf005b..0a838da66f69 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -19,6 +19,7 @@ import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; import com.google.api.client.http.HttpRequestInitializer; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; @@ -310,6 +311,19 @@ public List listTopics(ProjectPath project) throws IOException { return topics; } + @Override + public boolean isTopicExists(TopicPath topic) throws IOException { + try { + pubsub.projects().topics().get(topic.getPath()).execute(); + return true; + } catch (GoogleJsonResponseException e) { + if (e.getStatusCode() == 404) { + return false; + } + throw e; + } + } + @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java index a8109d05ec38..3d5a879fce15 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubTestClient.java @@ -605,6 +605,12 @@ public List listTopics(ProjectPath project) throws IOException { throw new UnsupportedOperationException(); } + @Override + public boolean isTopicExists(TopicPath topic) throws IOException { + // Always return true for testing purposes. + return true; + } + @Override public void createSubscription( TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java index 3724e169c612..6c4625f2e077 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubGrpcClientTest.java @@ -40,6 +40,7 @@ import com.google.pubsub.v1.Topic; import io.grpc.ManagedChannel; import io.grpc.Server; +import io.grpc.Status; import io.grpc.StatusRuntimeException; import io.grpc.inprocess.InProcessChannelBuilder; import io.grpc.inprocess.InProcessServerBuilder; @@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver responseO server.shutdownNow(); } } + + @Test + public void isTopicExists() throws IOException { + initializeClient(null, null); + TopicPath topicDoesNotExist = + PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist"); + TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist"); + + PublisherImplBase publisherImplBase = + new PublisherImplBase() { + @Override + public void getTopic(GetTopicRequest request, StreamObserver responseObserver) { + String topicPath = request.getTopic(); + if (topicPath.equals(topicDoesNotExist.getPath())) { + responseObserver.onError( + new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND))); + } + if (topicPath.equals(topicExists.getPath())) { + responseObserver.onNext( + Topic.newBuilder() + .setName(topicPath) + .setSchemaSettings( + SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build()) + .build()); + responseObserver.onCompleted(); + } + } + }; + Server server = + InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start(); + try { + assertEquals(false, client.isTopicExists(topicDoesNotExist)); + + assertEquals(true, client.isTopicExists(topicExists)); + + } finally { + server.shutdownNow(); + } + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java index d4effbae40a4..0f4c929619a5 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIOTest.java @@ -83,6 +83,7 @@ import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists; +import org.apache.commons.lang3.RandomStringUtils; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; @@ -97,6 +98,7 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.junit.runners.model.Statement; +import org.mockito.Mockito; /** Tests for PubsubIO Read and Write transforms. */ @RunWith(JUnit4.class) @@ -928,4 +930,172 @@ public void testBigMessageBounded() throws IOException { pipeline.run(); } } + + @Test + public void testReadValidate() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + Read read = + Read.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setPubsubClientFactory(mockFactory) + .setCoder(PubsubMessagePayloadOnlyCoder.of()) + .build(); + + read.validate(options); + } + + @Test + public void testReadValidateTopicIsNotExists() throws Exception { + thrown.expect(IllegalArgumentException.class); + + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + Read read = + Read.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setPubsubClientFactory(mockFactory) + .setCoder(PubsubMessagePayloadOnlyCoder.of()) + .build(); + + read.validate(options); + } + + @Test + public void testReadWithoutValidation() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + Read read = + PubsubIO.readMessages() + .fromTopic("projects/test-project/topics/nonExistingTopic") + .withoutValidation(); + + read.validate(options); + } + + @Test + public void testWriteTopicValidationSuccess() throws Exception { + PubsubIO.writeStrings().to("projects/my-project/topics/abc"); + PubsubIO.writeStrings().to("projects/my-project/topics/ABC"); + PubsubIO.writeStrings().to("projects/my-project/topics/AbC-DeF"); + PubsubIO.writeStrings().to("projects/my-project/topics/AbC-1234"); + PubsubIO.writeStrings().to("projects/my-project/topics/AbC-1234-_.~%+-_.~%+-_.~%+-abc"); + PubsubIO.writeStrings() + .to( + new StringBuilder() + .append("projects/my-project/topics/A-really-long-one-") + .append(RandomStringUtils.randomAlphanumeric(100)) + .toString()); + } + + @Test + public void testWriteTopicValidationBadCharacter() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.writeStrings().to("projects/my-project/topics/abc-*-abc"); + } + + @Test + public void testWriteValidationTooLong() throws Exception { + thrown.expect(IllegalArgumentException.class); + PubsubIO.writeStrings() + .to( + new StringBuilder() + .append("projects/my-project/topics/A-really-long-one-") + .append(RandomStringUtils.randomAlphanumeric(1000)) + .toString()); + } + + @Test + public void testWriteValidate() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath existingTopic = PubsubClient.topicPathFromName("test-project", "testTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(existingTopic)).thenReturn(true); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + PubsubIO.Write write = + PubsubIO.Write.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/testTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setDynamicDestinations(false) + .setPubsubClientFactory(mockFactory) + .build(); + + write.validate(options); + } + + @Test + public void testWriteValidateTopicIsNotExists() throws Exception { + thrown.expect(IllegalArgumentException.class); + + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + PubsubIO.Write write = + PubsubIO.Write.newBuilder() + .setTopicProvider( + StaticValueProvider.of( + PubsubIO.PubsubTopic.fromPath("projects/test-project/topics/nonExistingTopic"))) + .setTimestampAttribute("myTimestamp") + .setIdAttribute("myId") + .setDynamicDestinations(false) + .setPubsubClientFactory(mockFactory) + .build(); + + write.validate(options); + } + + @Test + public void testWithoutValidation() throws IOException { + PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); + TopicPath nonExistingTopic = PubsubClient.topicPathFromName("test-project", "nonExistingTopic"); + PubsubClient mockClient = Mockito.mock(PubsubClient.class); + Mockito.when(mockClient.isTopicExists(nonExistingTopic)).thenReturn(false); + PubsubClient.PubsubClientFactory mockFactory = + Mockito.mock(PubsubClient.PubsubClientFactory.class); + Mockito.when(mockFactory.newClient("myTimestamp", "myId", options)).thenReturn(mockClient); + + PubsubIO.Write write = + PubsubIO.writeMessages() + .to("projects/test-project/topics/nonExistingTopic") + .withoutValidation(); + + write.validate(options); + } } diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java index 634ad42c937a..5ee32825db1f 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClientTest.java @@ -23,6 +23,10 @@ import static org.junit.Assert.assertThrows; import static org.mockito.Mockito.when; +import com.google.api.client.googleapis.json.GoogleJsonError; +import com.google.api.client.googleapis.json.GoogleJsonResponseException; +import com.google.api.client.http.HttpHeaders; +import com.google.api.client.http.HttpResponseException; import com.google.api.services.pubsub.Pubsub; import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions; import com.google.api.services.pubsub.Pubsub.Projects.Topics; @@ -308,6 +312,26 @@ private static Topic buildTopic(int i) { return topic; } + @Test + public void isTopicExists() throws Exception { + TopicPath topicExists = + PubsubClient.topicPathFromPath("projects/testProject/topics/topicExists"); + TopicPath topicDoesNotExist = + PubsubClient.topicPathFromPath("projects/testProject/topics/topicDoesNotExist"); + HttpResponseException.Builder builder = + new HttpResponseException.Builder(404, "topic is not found", new HttpHeaders()); + GoogleJsonError error = new GoogleJsonError(); + when(mockPubsub.projects().topics().get(topicExists.getPath()).execute()) + .thenReturn(new Topic().setName(topicExists.getName())); + when(mockPubsub.projects().topics().get(topicDoesNotExist.getPath()).execute()) + .thenThrow(new GoogleJsonResponseException(builder, error)); + + client = new PubsubJsonClient(null, null, mockPubsub); + + assertEquals(true, client.isTopicExists(topicExists)); + assertEquals(false, client.isTopicExists(topicDoesNotExist)); + } + @Test public void listSubscriptions() throws Exception { ListSubscriptionsResponse expectedResponse1 = new ListSubscriptionsResponse();