Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support writing to Pubsub with ordering key; Add PubsubMessage SchemaCoder #31608

Open
wants to merge 29 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
7306870
support writing pubsub messages with ordering key
ahmedabu98 Jun 15, 2024
c64ae31
Merge branch 'master' of https://github.com/ahmedabu98/beam into pubs…
ahmedabu98 Sep 10, 2024
4bcd3b4
Add ordering key size validation to validatePubsubMessageSize
sjvanrossum Jun 18, 2024
9627cbe
Refactor writeMessagesWithOrderingKey into withOrderingKey
sjvanrossum Jun 17, 2024
ddd916f
Route to bad records if key is defined, but would be dropped silently
sjvanrossum Jun 19, 2024
4791fca
Add publishBatchWithOrderingKey to PubsubUnboundedSink
sjvanrossum Jun 19, 2024
73b07c1
Abort override if PubsubUnboundedSink set publishBatchWithOrderingKey
sjvanrossum Jun 19, 2024
986c2a5
Add support for ordering keys in PubsubBoundedWriter
sjvanrossum Jun 19, 2024
f5f8b57
Add support for ordering keys in PubsubUnboundedSink
sjvanrossum Jun 19, 2024
21e8e8e
Remove nullable ordering keys, null and empty are equivalent
sjvanrossum Jul 10, 2024
42bbb77
Construct OutgoingMessage with Beam PubsubMessage to reduce repetition
sjvanrossum Jul 10, 2024
394d135
Improve readability of PubsubUnboundedSink batch assignment
sjvanrossum Jul 10, 2024
1043961
Add size validation TODOs
sjvanrossum Jul 10, 2024
cd727c2
Replace auto-sharding sink comment with FR link, move to relevant place
sjvanrossum Jul 10, 2024
20e7bb9
Add links to Pub/Sub documentation
sjvanrossum Jul 10, 2024
5911f63
Refine comment about lack of ordering key support in Dataflow's sink
sjvanrossum Jul 10, 2024
ad397aa
Add TODO to remove ordering key check once all sinks support this
sjvanrossum Jul 10, 2024
53134d6
Add missing return statement
sjvanrossum Sep 10, 2024
402ec94
Remove duplicated statements
sjvanrossum Sep 10, 2024
a97f64c
Apply Spotless
sjvanrossum Sep 10, 2024
4513db4
Add notable changes
sjvanrossum Sep 10, 2024
dd8af6d
Merge pull request #427 from sjvanrossum/pr31608
ahmedabu98 Sep 11, 2024
a60d689
address comments
ahmedabu98 Sep 13, 2024
5bac762
allow messages with ordering keys even when the sink isn't configured…
ahmedabu98 Sep 15, 2024
53d47a7
spotless
ahmedabu98 Sep 16, 2024
cb9e7fb
spotless
ahmedabu98 Sep 16, 2024
bbe25ca
add warning log when ordering key is not configured
ahmedabu98 Sep 16, 2024
fa80a24
address comments
ahmedabu98 Sep 27, 2024
1bf4ae2
Merge branch 'master' of https://github.com/ahmedabu98/beam into pubs…
ahmedabu98 Sep 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,20 @@ public static Write<PubsubMessage> writeMessages() {
.build();
}

/**
* Returns A {@link PTransform} that writes {@link PubsubMessage}s, along with the {@link
* PubsubMessage#getMessageId() messageId} and {@link PubsubMessage#getOrderingKey()}, to a Google
* Cloud Pub/Sub stream.
*/
public static Write<PubsubMessage> writeMessagesWithOrderingKey() {
return Write.newBuilder()
.setTopicProvider(null)
.setTopicFunction(null)
.setDynamicDestinations(false)
.setNeedsOrderingKey(true)
.build();
}

ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved
/**
* Enables dynamic destination topics. The {@link PubsubMessage} elements are each expected to
* contain a destination topic, which can be set using {@link PubsubMessage#withTopic}. If {@link
Expand Down Expand Up @@ -1288,6 +1302,8 @@ public abstract static class Write<T> extends PTransform<PCollection<T>, PDone>

abstract @Nullable String getPubsubRootUrl();

abstract boolean getNeedsOrderingKey();

abstract BadRecordRouter getBadRecordRouter();

abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
Expand All @@ -1301,6 +1317,7 @@ static <T> Builder<T> newBuilder(
builder.setFormatFn(formatFn);
builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER);
builder.setBadRecordErrorHandler(new DefaultErrorHandler<>());
builder.setNeedsOrderingKey(false);
return builder;
}

Expand Down Expand Up @@ -1332,6 +1349,8 @@ abstract Builder<T> setFormatFn(

abstract Builder<T> setPubsubRootUrl(String pubsubRootUrl);

abstract Builder<T> setNeedsOrderingKey(boolean needsOrderingKey);
ahmedabu98 marked this conversation as resolved.
Show resolved Hide resolved

abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);

abstract Builder<T> setBadRecordErrorHandler(
Expand Down Expand Up @@ -1487,8 +1506,12 @@ public PDone expand(PCollection<T> input) {
pubsubMessageTuple
.get(BAD_RECORD_TAG)
.setCoder(BadRecord.getCoder(input.getPipeline())));
PCollection<PubsubMessage> pubsubMessages =
pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(new PubsubMessageWithTopicCoder());
PCollection<PubsubMessage> pubsubMessages = pubsubMessageTuple.get(pubsubMessageTupleTag);
if (getNeedsOrderingKey()) {
pubsubMessages.setCoder(PubsubMessageSchemaCoder.getSchemaCoder());
} else {
pubsubMessages.setCoder(new PubsubMessageWithTopicCoder());
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fork is required to not break update compatibility

switch (input.isBounded()) {
case BOUNDED:
pubsubMessages.apply(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import java.util.HashMap;
import java.util.Map;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.SchemaCoder;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;

/**
* Provides a {@link SchemaCoder} for {@link PubsubMessage}, including the topic and all fields of a
* PubSub message from server.
*
* <p>{@link SchemaCoder} is used so that fields can be added in the future without breaking update
* compatibility.
*/
public class PubsubMessageSchemaCoder {
private static final Schema PUBSUB_MESSAGE_SCHEMA =
Schema.builder()
.addByteArrayField("payload")
.addNullableStringField("topic")
.addNullableMapField("attributes", Schema.FieldType.STRING, Schema.FieldType.STRING)
.addNullableStringField("message_id")
.addNullableStringField("ordering_key")
.build();

private static final SerializableFunction<PubsubMessage, Row> TO_ROW =
(PubsubMessage message) -> {
Map<String, Object> fieldValues = new HashMap<>();
fieldValues.put("payload", message.getPayload());

String topic = message.getTopic();
if (topic != null) {
fieldValues.put("topic", topic);
}
Map<String, String> attributeMap = message.getAttributeMap();
if (attributeMap != null) {
fieldValues.put("attributes", attributeMap);
}
String messageId = message.getMessageId();
if (messageId != null) {
fieldValues.put("message_id", messageId);
}
String orderingKey = message.getOrderingKey();
if (orderingKey != null) {
fieldValues.put("ordering_key", orderingKey);
}
return Row.withSchema(PUBSUB_MESSAGE_SCHEMA).withFieldValues(fieldValues).build();
};

private static final SerializableFunction<Row, PubsubMessage> FROM_ROW =
(Row row) -> {
PubsubMessage message =
new PubsubMessage(
Preconditions.checkNotNull(row.getBytes("payload")),
row.getMap("attributes"),
row.getString("message_id"),
row.getString("ordering_key"));

String topic = row.getString("topic");
if (topic != null) {
message = message.withTopic(topic);
}
return message;
};

public static SchemaCoder<PubsubMessage> getSchemaCoder() {
return SchemaCoder.of(
PUBSUB_MESSAGE_SCHEMA, TypeDescriptor.of(PubsubMessage.class), TO_ROW, FROM_ROW);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;

import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.testing.CoderProperties;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.values.TypeDescriptor;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.junit.Test;

public class PubsubMessageSchemaCoderTest {
private static final String DATA = "testData";
private static final String TOPIC = "testTopic";
private static final String MESSAGE_ID = "testMessageId";
private static final Map<String, String> ATTRIBUTES =
new ImmutableMap.Builder<String, String>().put("1", "hello").build();
private static final String ORDERING_KEY = "key123";
private static final Coder<PubsubMessage> TEST_CODER = PubsubMessageSchemaCoder.getSchemaCoder();
private static final PubsubMessage TEST_VALUE =
new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID, ORDERING_KEY)
.withTopic(TOPIC);
private static final PubsubMessage TEST_MINIMAL_VALUE =
new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), null);

@Test
public void testValueEncodable() {
SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error");
SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_MINIMAL_VALUE, "error");
}

@Test
public void testCoderDecodeEncodeEqual() throws Exception {
CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE);
CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_MINIMAL_VALUE);
}

@Test
public void testEncodedTypeDescriptor() {
TypeDescriptor<PubsubMessage> typeDescriptor = new TypeDescriptor<PubsubMessage>() {};
assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,17 @@
*/
package org.apache.beam.sdk.io.gcp.pubsub;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
Expand All @@ -46,11 +52,12 @@ public class PubsubWriteIT {
private PubsubClient pubsubClient;

private TopicPath testTopic;
private String project;

@Before
public void setup() throws IOException {
PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class);
String project = options.getProject();
project = options.getProject();
pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null, options);
testTopic =
PubsubClient.topicPathFromName(project, "pubsub-write-" + Instant.now().getMillis());
Expand Down Expand Up @@ -102,4 +109,55 @@ public void testBoundedWriteMessageWithAttributes() {
.apply(PubsubIO.writeMessages().to(testTopic.getPath()));
pipeline.run();
}

@Test
public void testBoundedWriteMessageWithAttributesAndMessageIdAndOrderingKey() throws IOException {
TopicPath testTopicPath =
PubsubClient.topicPathFromName(
project, "pubsub-write-ordering-key-" + Instant.now().getMillis());
pubsubClient.createTopic(testTopicPath);
SubscriptionPath testSubscriptionPath =
pubsubClient.createRandomSubscription(
PubsubClient.projectPathFromId(project), testTopicPath, 10);

byte[] payload = RandomStringUtils.randomAscii(1_000_000).getBytes(StandardCharsets.UTF_8);
Map<String, String> attributes =
ImmutableMap.<String, String>builder()
.put("id", "1")
.put("description", RandomStringUtils.randomAscii(100))
.build();

PubsubMessage outgoingMessage =
new PubsubMessage(payload, attributes, "test_message", "111222");

pipeline
.apply(Create.of(outgoingMessage).withCoder(PubsubMessageSchemaCoder.getSchemaCoder()))
.apply(PubsubIO.writeMessagesWithOrderingKey().to(testTopicPath.getPath()));
pipeline.run().waitUntilFinish();

List<PubsubClient.IncomingMessage> incomingMessages =
pubsubClient.pull(Instant.now().getMillis(), testSubscriptionPath, 1, true);

// sometimes the first pull comes up short. try 4 pulls to avoid flaky false-negatives
int numPulls = 1;
while (incomingMessages.isEmpty()) {
if (numPulls >= 4) {
throw new RuntimeException(
String.format("Pulled %s times from PubSub but retrieved no elements.", numPulls));
}
incomingMessages =
pubsubClient.pull(Instant.now().getMillis(), testSubscriptionPath, 1, true);
numPulls++;
}
assertEquals(1, incomingMessages.size());

com.google.pubsub.v1.PubsubMessage incomingMessage = incomingMessages.get(0).message();
assertTrue(
Arrays.equals(outgoingMessage.getPayload(), incomingMessage.getData().toByteArray()));
assertEquals(outgoingMessage.getAttributeMap(), incomingMessage.getAttributesMap());
assertEquals(outgoingMessage.getOrderingKey(), incomingMessage.getOrderingKey());

pubsubClient.deleteSubscription(testSubscriptionPath);
pubsubClient.deleteTopic(testTopicPath);
}
}
Loading