Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: add topic existing validation #32465

Merged
merged 10 commits into from
Sep 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@

* Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528))

## I/Os

* PubsubIO will validate that the Pub/Sub topic exists before running the Read/Write pipeline (Java) ([#32465](https://github.com/apache/beam/pull/32465))

## New Features / Improvements

* Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -507,6 +507,9 @@ public abstract void modifyAckDeadline(
/** Return a list of topics for {@code project}. */
public abstract List<TopicPath> listTopics(ProjectPath project) throws IOException;

/** Return {@literal true} if {@code topic} exists. */
public abstract boolean isTopicExists(TopicPath topic) throws IOException;

/** Create {@code subscription} to {@code topic}. */
public abstract void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import io.grpc.Channel;
import io.grpc.ClientInterceptors;
import io.grpc.ManagedChannel;
import io.grpc.StatusRuntimeException;
import io.grpc.auth.ClientAuthInterceptor;
import io.grpc.netty.GrpcSslContexts;
import io.grpc.netty.NegotiationType;
Expand Down Expand Up @@ -372,6 +373,21 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
GetTopicRequest request = GetTopicRequest.newBuilder().setTopic(topic.getPath()).build();
try {
publisherStub().getTopic(request);
return true;
} catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == io.grpc.Status.Code.NOT_FOUND) {
return false;
}

throw e;
Copy link
Contributor

@Abacn Abacn Sep 26, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a breaking change. Pipeline construction environment may not have access to the PubSub. Here it should either be fail safe or not enabled by default.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Abacn
Oh, I missed that point. How about disabled as default? Checking topic existence is needed.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How about disabled as default?

sounds good

}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.metrics.Lineage;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.options.ValueProvider.NestedValueProvider;
import org.apache.beam.sdk.options.ValueProvider.StaticValueProvider;
Expand Down Expand Up @@ -860,6 +861,8 @@ public abstract static class Read<T> extends PTransform<PBegin, PCollection<T>>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn) {
Expand All @@ -871,6 +874,7 @@ static <T> Builder<T> newBuilder(SerializableFunction<PubsubMessage, T> parseFn)
builder.setNeedsOrderingKey(false);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
proost marked this conversation as resolved.
Show resolved Hide resolved
return builder;
}

Expand Down Expand Up @@ -918,6 +922,8 @@ abstract static class Builder<T> {
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Read<T> build();
}

Expand Down Expand Up @@ -1097,6 +1103,11 @@ public Read<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandler
.build();
}

/** Disable validation of the existence of the topic. */
public Read<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@VisibleForTesting
/**
* Set's the internal Clock.
Expand Down Expand Up @@ -1262,6 +1273,35 @@ public T apply(PubsubMessage input) {
return read.setCoder(getCoder());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

@Override
public void populateDisplayData(DisplayData.Builder builder) {
super.populateDisplayData(builder);
Expand Down Expand Up @@ -1341,6 +1381,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();

abstract boolean getValidate();

abstract Builder<T> toBuilder();

static <T> Builder<T> newBuilder(
Expand All @@ -1350,6 +1392,7 @@ static <T> Builder<T> newBuilder(
builder.setFormatFn(formatFn);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setValidate(true);
proost marked this conversation as resolved.
Show resolved Hide resolved
return builder;
}

Expand Down Expand Up @@ -1386,6 +1429,8 @@ abstract Builder<T> setFormatFn(
abstract Builder<T> setBadRecordErrorHandler(
ErrorHandler<BadRecord, ?> badRecordErrorHandler);

abstract Builder<T> setValidate(boolean validation);

abstract Write<T> build();
}

Expand All @@ -1396,11 +1441,14 @@ abstract Builder<T> setBadRecordErrorHandler(
* {@code topic} string.
*/
public Write<T> to(String topic) {
return to(StaticValueProvider.of(topic));
ValueProvider<String> topicProvider = StaticValueProvider.of(topic);
validateTopic(topicProvider);
return to(topicProvider);
}

/** Like {@code topic()} but with a {@link ValueProvider}. */
public Write<T> to(ValueProvider<String> topic) {
validateTopic(topic);
return toBuilder()
.setTopicProvider(NestedValueProvider.of(topic, PubsubTopic::fromPath))
.setTopicFunction(null)
Expand All @@ -1421,6 +1469,13 @@ public Write<T> to(SerializableFunction<ValueInSingleWindow<T>, String> topicFun
.build();
}

/** Handles validation of {@code topic}. */
private static void validateTopic(ValueProvider<String> topic) {
if (topic.isAccessible()) {
PubsubTopic.fromPath(topic.get());
}
}

/**
* The default client to write to Pub/Sub is the {@link PubsubJsonClient}, created by the {@link
* PubsubJsonClient.PubsubJsonClientFactory}. This function allows to change the Pub/Sub client
Expand Down Expand Up @@ -1497,6 +1552,14 @@ public Write<T> withErrorHandler(ErrorHandler<BadRecord, ?> badRecordErrorHandle
.build();
}

/**
* Disable validation of the existence of the topic. Validation of the topic works only if the
* topic is set statically and not dynamically.
*/
public Write<T> withoutValidation() {
return toBuilder().setValidate(false).build();
}

@Override
public PDone expand(PCollection<T> input) {
if (getTopicProvider() == null && !getDynamicDestinations()) {
Expand Down Expand Up @@ -1573,6 +1636,35 @@ public void populateDisplayData(DisplayData.Builder builder) {
builder, getTimestampAttribute(), getIdAttribute(), getTopicProvider());
}

@Override
public void validate(PipelineOptions options) {
if (!getValidate()) {
return;
}

PubsubOptions psOptions = options.as(PubsubOptions.class);

// Validate the existence of the topic.
if (getTopicProvider() != null) {
PubsubTopic topic = getTopicProvider().get();
boolean topicExists = true;
try (PubsubClient pubsubClient =
getPubsubClientFactory()
.newClient(getTimestampAttribute(), getIdAttribute(), psOptions)) {
topicExists =
pubsubClient.isTopicExists(
PubsubClient.topicPathFromName(topic.project, topic.topic));
} catch (Exception e) {
throw new RuntimeException(e);
}

if (!topicExists) {
throw new IllegalArgumentException(
String.format("Pubsub topic '%s' does not exist.", topic));
}
}
}

/**
* Writer to Pubsub which batches messages from bounded collections.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions.checkState;

import com.google.api.client.googleapis.json.GoogleJsonResponseException;
import com.google.api.client.http.HttpRequestInitializer;
import com.google.api.services.pubsub.Pubsub;
import com.google.api.services.pubsub.Pubsub.Projects.Subscriptions;
Expand Down Expand Up @@ -310,6 +311,19 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
return topics;
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
try {
pubsub.projects().topics().get(topic.getPath()).execute();
return true;
} catch (GoogleJsonResponseException e) {
if (e.getStatusCode() == 404) {
return false;
}
throw e;
}
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -605,6 +605,12 @@ public List<TopicPath> listTopics(ProjectPath project) throws IOException {
throw new UnsupportedOperationException();
}

@Override
public boolean isTopicExists(TopicPath topic) throws IOException {
// Always return true for testing purposes.
return true;
}

@Override
public void createSubscription(
TopicPath topic, SubscriptionPath subscription, int ackDeadlineSeconds) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import com.google.pubsub.v1.Topic;
import io.grpc.ManagedChannel;
import io.grpc.Server;
import io.grpc.Status;
import io.grpc.StatusRuntimeException;
import io.grpc.inprocess.InProcessChannelBuilder;
import io.grpc.inprocess.InProcessServerBuilder;
Expand Down Expand Up @@ -432,4 +433,43 @@ public void getSchema(GetSchemaRequest request, StreamObserver<Schema> responseO
server.shutdownNow();
}
}

@Test
public void isTopicExists() throws IOException {
initializeClient(null, null);
TopicPath topicDoesNotExist =
PubsubClient.topicPathFromPath("projects/testProject/topics/dontexist");
TopicPath topicExists = PubsubClient.topicPathFromPath("projects/testProject/topics/exist");

PublisherImplBase publisherImplBase =
new PublisherImplBase() {
@Override
public void getTopic(GetTopicRequest request, StreamObserver<Topic> responseObserver) {
String topicPath = request.getTopic();
if (topicPath.equals(topicDoesNotExist.getPath())) {
responseObserver.onError(
new StatusRuntimeException(Status.fromCode(Status.Code.NOT_FOUND)));
}
if (topicPath.equals(topicExists.getPath())) {
responseObserver.onNext(
Topic.newBuilder()
.setName(topicPath)
.setSchemaSettings(
SchemaSettings.newBuilder().setSchema(SCHEMA.getPath()).build())
.build());
responseObserver.onCompleted();
}
}
};
Server server =
InProcessServerBuilder.forName(channelName).addService(publisherImplBase).build().start();
try {
assertEquals(false, client.isTopicExists(topicDoesNotExist));

assertEquals(true, client.isTopicExists(topicExists));

} finally {
server.shutdownNow();
}
}
}
Loading
Loading