Skip to content

Commit

Permalink
fix: add topic existing validation (#32465)
Browse files Browse the repository at this point in the history
* feat: add topic existing validation

* feat: add validation to write

* docs: add changes

* docs: change docs

* refactor: change validate to primitive type

* test: change test more clearly

* style: follow lint

* docs: fix CHANGES

* docs: follow changes
  • Loading branch information
proost authored Sep 25, 2024
1 parent 01c7caf commit 380ed7b
Show file tree
Hide file tree
Showing 9 changed files with 370 additions and 1 deletion.
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))

## I/Os

* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465))

## New Features / Improvements

* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline(
/** Return a list of topics for {@code project}. */
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;

/** Return {@literal true} if {@code topic} exists. */
public abstract boolean isTopicExists(TopicPath topic) throws IOException;

/** Create {@code subscription} to {@code topic}. */
public abstract void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand Down Expand Up @@ -372,6 +373,21 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
try {
publisherStub().getTopic(request);
return true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
return false;
}

throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -860,6 +861,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
Expand All @@ -871,6 +874,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
builder.setNeedsOrderingKey(false);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
return builder;
}

Expand Down Expand Up @@ -918,6 +922,8 @@ abstract static class Builder<T> {
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Read<T> build();
}

Expand Down Expand Up @@ -1097,6 +1103,11 @@ public Read<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler
.build();
}

/** Disable validation of the existence of the topic. */
public Read<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@VisibleForTesting
/**
* Set's the internal Clock.
Expand Down Expand Up @@ -1262,6 +1273,35 @@ public T apply(PubsubMessage input) {
return read.setCoder(getCoder());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Expand Down Expand Up @@ -1341,6 +1381,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(
Expand All @@ -1350,6 +1392,7 @@ static <T> Builder<T> newBuilder(
builder.setFormatFn(formatFn);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
return builder;
}

Expand Down Expand Up @@ -1386,6 +1429,8 @@ abstract Builder<T> setFormatFn(
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Write<T> build();
}

Expand All @@ -1396,11 +1441,14 @@ abstract Builder<T> setBadRecordErrorHandler(
* {@code topic} string.
*/
public Write<T> to(String topic) {
return to(StaticValueProvider.of(topic));
ValueProvider<String> topicProvider = StaticValueProvider.of(topic);
validateTopic(topicProvider);
return to(topicProvider);
}

/** Like {@code topic()} but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> topic) {
validateTopic(topic);
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.setTopicFunction(null)
Expand All @@ -1421,6 +1469,13 @@ public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> topicFun
.build();
}

/** Handles validation of {@code topic}. */
private static void validateTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
PubsubTopic.fromPath(topic.get());
}
}

/**
* The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link
* PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client
Expand Down Expand Up @@ -1497,6 +1552,14 @@ public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandle
.build();
}

/**
* Disable validation of the existence of the topic. Validation of the topic works only if the
* topic is set statically and not dynamically.
*/
public Write<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null && !getDynamicDestinations()) {
Expand Down Expand Up @@ -1573,6 +1636,35 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

/**
* Writer to Pubsub which batches messages from bounded collections.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
Expand Down Expand Up @@ -310,6 +311,19 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
try {
pubsub.projects().topics().get(topic.getPath()).execute();
return true;
} catch (GoogleJsonResponseException e) {
if (e.getStatusCode() == 404) {
return false;
}
throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
// Always return true for testing purposes.
return true;
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.pubsub.v1.Topic;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand Down Expand Up @@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver<Schema> responseO
server.shutdownNow();
}
}

@Test
public void isTopicExists() throws IOException {
initializeClient(null, null);
TopicPath topicDoesNotExist =
PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist");
TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist");

PublisherImplBase publisherImplBase =
new PublisherImplBase() {
@Override
public void getTopic(GetTopicRequest request, StreamObserver<Topic> responseObserver) {
String topicPath = request.getTopic();
if (topicPath.equals(topicDoesNotExist.getPath())) {
responseObserver.onError(
new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND)));
}
if (topicPath.equals(topicExists.getPath())) {
responseObserver.onNext(
Topic.newBuilder()
.setName(topicPath)
.setSchemaSettings(
SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build())
.build());
responseObserver.onCompleted();
}
}
};
Server server =
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
try {
assertEquals(false, client.isTopicExists(topicDoesNotExist));

assertEquals(true, client.isTopicExists(topicExists));

} finally {
server.shutdownNow();
}
}
}
Loading

0 comments on commit 380ed7b

Please sign in to comment.