diff --git a/CHANGES.md b/CHANGES.md index 7c377648c117..aeecddbf047f 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -64,6 +64,7 @@ * Dataflow worker can install packages from Google Artifact Registry Python repositories (Python) ([#32123](https://github.com/apache/beam/issues/32123)). * Added support for Zstd codec in SerializableAvroCodecFactory (Java) ([#32349](https://github.com/apache/beam/issues/32349)) * Added support for using vLLM in the RunInference transform (Python) ([#32528](https://github.com/apache/beam/issues/32528)) +* Added support for writing to Pubsub with ordering keys (Java) ([#21162](https://github.com/apache/beam/issues/21162)) * Significantly improved performance of Kafka IO reads that enable [commitOffsetsInFinalize](https://beam.apache.org/releases/javadoc/current/org/apache/beam/sdk/io/kafka/KafkaIO.Read.html#commitOffsetsInFinalize--) by removing the data reshuffle from SDF implementation. ([#31682](https://github.com/apache/beam/pull/31682)). ## Breaking Changes diff --git a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java index abe7d0d364d3..953cc9484e9f 100644 --- a/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java +++ b/runners/google-cloud-dataflow-java/src/main/java/org/apache/beam/runners/dataflow/DataflowRunner.java @@ -20,6 +20,7 @@ import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SINK; import static org.apache.beam.sdk.io.gcp.pubsub.PubsubIO.ENABLE_CUSTOM_PUBSUB_SOURCE; +import static org.apache.beam.sdk.options.ExperimentalOptions.hasExperiment; import static org.apache.beam.sdk.util.CoderUtils.encodeToByteArray; import static org.apache.beam.sdk.util.SerializableUtils.serializeToByteArray; import static org.apache.beam.sdk.util.StringUtils.byteArrayToJsonString; @@ -2033,6 +2034,15 @@ private static void translate( PubsubUnboundedSink overriddenTransform, StepTranslationContext stepContext, PCollection input) { + if (overriddenTransform.getPublishBatchWithOrderingKey()) { + throw new UnsupportedOperationException( + String.format( + "The DataflowRunner does not currently support publishing to Pubsub with ordering keys. " + + "%s is required to support publishing with ordering keys. " + + "Set the pipeline option --experiments=%s to use this PTransform. " + + "See https://issuetracker.google.com/issues/200955424 for current status.", + PubsubUnboundedSink.class.getSimpleName(), ENABLE_CUSTOM_PUBSUB_SINK)); + } stepContext.addInput(PropertyNames.FORMAT, "pubsub"); if (overriddenTransform.getTopicProvider() != null) { if (overriddenTransform.getTopicProvider().isAccessible()) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java index 521e65b934b9..9cdf6b788e4e 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFn.java @@ -29,19 +29,25 @@ import org.apache.beam.sdk.transforms.windowing.PaneInfo; import org.apache.beam.sdk.values.TupleTag; import org.apache.beam.sdk.values.ValueInSingleWindow; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PreparePubsubWriteDoFn extends DoFn { + private static final Logger LOG = LoggerFactory.getLogger(PreparePubsubWriteDoFn.class); // See https://cloud.google.com/pubsub/quotas#resource_limits. private static final int PUBSUB_MESSAGE_DATA_MAX_BYTES = 10 << 20; private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100; private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256; private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024; + private static final int ORDERING_KEY_MAX_BYTE_SIZE = 1024; // The amount of bytes that each attribute entry adds up to the request private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6; + private final boolean usesOrderingKey; private int maxPublishBatchSize; - + private boolean logOrderingKeyUnconfigured = false; private SerializableFunction, PubsubMessage> formatFunction; @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction; /** Last TopicPath that reported Lineage. */ @@ -66,6 +72,20 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS } int totalSize = payloadSize; + @Nullable String orderingKey = message.getOrderingKey(); + if (orderingKey != null) { + int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length; + if (orderingKeySize > ORDERING_KEY_MAX_BYTE_SIZE) { + throw new SizeLimitExceededException( + "Pubsub message ordering key of length " + + orderingKeySize + + " exceeds maximum of " + + ORDERING_KEY_MAX_BYTE_SIZE + + " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits"); + } + totalSize += orderingKeySize; + } + @Nullable Map attributes = message.getAttributeMap(); if (attributes != null) { if (attributes.size() > PUBSUB_MESSAGE_MAX_ATTRIBUTES) { @@ -125,12 +145,14 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS SerializableFunction, PubsubMessage> formatFunction, @Nullable SerializableFunction, PubsubIO.PubsubTopic> topicFunction, + boolean usesOrderingKey, int maxPublishBatchSize, BadRecordRouter badRecordRouter, Coder inputCoder, TupleTag outputTag) { this.formatFunction = formatFunction; this.topicFunction = topicFunction; + this.usesOrderingKey = usesOrderingKey; this.maxPublishBatchSize = maxPublishBatchSize; this.badRecordRouter = badRecordRouter; this.inputCoder = inputCoder; @@ -175,6 +197,14 @@ public void process( .add("pubsub", "topic", PubsubClient.topicPathFromPath(topic).getDataCatalogSegments()); reportedLineage = topic; } + if (!usesOrderingKey + && !Strings.isNullOrEmpty(message.getOrderingKey()) + && !logOrderingKeyUnconfigured) { + LOG.warn( + "Encountered Pubsub message with ordering key but this sink was not configured to " + + "retain ordering keys, so they will be dropped. Please set #withOrderingKeys()."); + logOrderingKeyUnconfigured = true; + } try { validatePubsubMessageSize(message, maxPublishBatchSize); } catch (SizeLimitExceededException e) { diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java index f59a68c40551..5ae47000b979 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java @@ -22,7 +22,6 @@ import com.google.api.client.util.Clock; import com.google.auto.value.AutoValue; -import com.google.protobuf.ByteString; import com.google.protobuf.Descriptors.Descriptor; import com.google.protobuf.DynamicMessage; import com.google.protobuf.InvalidProtocolBufferException; @@ -1337,6 +1336,8 @@ public abstract static class Write extends PTransform, PDone> abstract @Nullable String getPubsubRootUrl(); + abstract boolean getPublishWithOrderingKey(); + abstract BadRecordRouter getBadRecordRouter(); abstract ErrorHandler getBadRecordErrorHandler(); @@ -1350,6 +1351,7 @@ static Builder newBuilder( builder.setFormatFn(formatFn); builder.setBadRecordRouter(BadRecordRouter.THROWING_ROUTER); builder.setBadRecordErrorHandler(new DefaultErrorHandler<>()); + builder.setPublishWithOrderingKey(false); return builder; } @@ -1381,6 +1383,8 @@ abstract Builder setFormatFn( abstract Builder setPubsubRootUrl(String pubsubRootUrl); + abstract Builder setPublishWithOrderingKey(boolean publishWithOrderingKey); + abstract Builder setBadRecordRouter(BadRecordRouter badRecordRouter); abstract Builder setBadRecordErrorHandler( @@ -1454,6 +1458,19 @@ public Write withMaxBatchBytesSize(int maxBatchBytesSize) { return toBuilder().setMaxBatchBytesSize(maxBatchBytesSize).build(); } + /** + * Writes to Pub/Sub with each record's ordering key. A subscription with message ordering + * enabled will receive messages published in the same region with the same ordering key in the + * order in which they were received by the service. Note that the order in which Beam publishes + * records to the service remains unspecified. + * + * @see Pub/Sub documentation on message + * ordering + */ + public Write withOrderingKey() { + return toBuilder().setPublishWithOrderingKey(true).build(); + } + /** * Writes to Pub/Sub and adds each record's timestamp to the published messages in an attribute * with the specified name. The value of the attribute will be a number representing the number @@ -1525,6 +1542,7 @@ public PDone expand(PCollection input) { new PreparePubsubWriteDoFn<>( getFormatFn(), topicFunction, + getPublishWithOrderingKey(), maxMessageSize, getBadRecordRouter(), input.getCoder(), @@ -1536,8 +1554,12 @@ public PDone expand(PCollection input) { pubsubMessageTuple .get(BAD_RECORD_TAG) .setCoder(BadRecord.getCoder(input.getPipeline()))); - PCollection pubsubMessages = - pubsubMessageTuple.get(pubsubMessageTupleTag).setCoder(PubsubMessageWithTopicCoder.of()); + PCollection pubsubMessages = pubsubMessageTuple.get(pubsubMessageTupleTag); + if (getPublishWithOrderingKey()) { + pubsubMessages.setCoder(PubsubMessageSchemaCoder.getSchemaCoder()); + } else { + pubsubMessages.setCoder(PubsubMessageWithTopicCoder.of()); + } switch (input.isBounded()) { case BOUNDED: pubsubMessages.apply( @@ -1557,6 +1579,7 @@ public PDone expand(PCollection input) { getTimestampAttribute(), getIdAttribute(), 100 /* numShards */, + getPublishWithOrderingKey(), MoreObjects.firstNonNull( getMaxBatchSize(), PubsubUnboundedSink.DEFAULT_PUBLISH_BATCH_SIZE), MoreObjects.firstNonNull( @@ -1589,7 +1612,9 @@ private class OutgoingData { } } - private transient Map output; + // NOTE: A single publish request may only write to one ordering key. + // See https://cloud.google.com/pubsub/docs/publisher#using-ordering-keys for details. + private transient Map, OutgoingData> output; private transient PubsubClient pubsubClient; @@ -1620,51 +1645,44 @@ public void startBundle(StartBundleContext c) throws IOException { public void processElement(@Element PubsubMessage message, @Timestamp Instant timestamp) throws IOException, SizeLimitExceededException { // Validate again here just as a sanity check. + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 PreparePubsubWriteDoFn.validatePubsubMessageSize(message, maxPublishBatchByteSize); - byte[] payload = message.getPayload(); - int messageSize = payload.length; + // NOTE: The record id is always null. + final OutgoingMessage msg = + OutgoingMessage.of(message, timestamp.getMillis(), null, message.getTopic()); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + final int messageSize = msg.getMessage().getData().size(); - PubsubTopic pubsubTopic; + final PubsubTopic pubsubTopic; if (getTopicProvider() != null) { pubsubTopic = getTopicProvider().get(); } else { - pubsubTopic = - PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(message.getTopic())); - } - // Checking before adding the message stops us from violating max batch size or bytes - OutgoingData currentTopicOutput = - output.computeIfAbsent(pubsubTopic, t -> new OutgoingData()); - if (currentTopicOutput.messages.size() >= maxPublishBatchSize - || (!currentTopicOutput.messages.isEmpty() - && (currentTopicOutput.bytes + messageSize) >= maxPublishBatchByteSize)) { - publish(pubsubTopic, currentTopicOutput.messages); - currentTopicOutput.messages.clear(); - currentTopicOutput.bytes = 0; + pubsubTopic = PubsubTopic.fromPath(Preconditions.checkArgumentNotNull(msg.topic())); } - Map attributes = message.getAttributeMap(); - String orderingKey = message.getOrderingKey(); - - com.google.pubsub.v1.PubsubMessage.Builder msgBuilder = - com.google.pubsub.v1.PubsubMessage.newBuilder() - .setData(ByteString.copyFrom(payload)) - .putAllAttributes(attributes); - - if (orderingKey != null) { - msgBuilder.setOrderingKey(orderingKey); + // Checking before adding the message stops us from violating max batch size or bytes + String orderingKey = getPublishWithOrderingKey() ? msg.getMessage().getOrderingKey() : ""; + final OutgoingData currentTopicAndOrderingKeyOutput = + output.computeIfAbsent(KV.of(pubsubTopic, orderingKey), t -> new OutgoingData()); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + if (currentTopicAndOrderingKeyOutput.messages.size() >= maxPublishBatchSize + || (!currentTopicAndOrderingKeyOutput.messages.isEmpty() + && (currentTopicAndOrderingKeyOutput.bytes + messageSize) + >= maxPublishBatchByteSize)) { + publish(pubsubTopic, currentTopicAndOrderingKeyOutput.messages); + currentTopicAndOrderingKeyOutput.messages.clear(); + currentTopicAndOrderingKeyOutput.bytes = 0; } - // NOTE: The record id is always null. - currentTopicOutput.messages.add( - OutgoingMessage.of( - msgBuilder.build(), timestamp.getMillis(), null, message.getTopic())); - currentTopicOutput.bytes += messageSize; + currentTopicAndOrderingKeyOutput.messages.add(msg); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + currentTopicAndOrderingKeyOutput.bytes += messageSize; } @FinishBundle public void finishBundle() throws IOException { - for (Map.Entry entry : output.entrySet()) { - publish(entry.getKey(), entry.getValue().messages); + for (Map.Entry, OutgoingData> entry : output.entrySet()) { + publish(entry.getKey().getKey(), entry.getValue().messages); } output = null; pubsubClient.close(); diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java index 386febcf005b..15a1649a1d99 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubJsonClient.java @@ -230,11 +230,11 @@ public List pull( com.google.pubsub.v1.PubsubMessage.newBuilder(); protoMessage.setData(ByteString.copyFrom(elementBytes)); protoMessage.putAllAttributes(attributes); - // PubsubMessage uses `null` to represent no ordering key where we want a default of "". + // {@link PubsubMessage} uses `null` or empty string to represent no ordering key. + // {@link com.google.pubsub.v1.PubsubMessage} does not track string field presence and uses + // empty string as a default. if (pubsubMessage.getOrderingKey() != null) { protoMessage.setOrderingKey(pubsubMessage.getOrderingKey()); - } else { - protoMessage.setOrderingKey(""); } incomingMessages.add( IncomingMessage.of( diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoder.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoder.java new file mode 100644 index 000000000000..72eb32d87668 --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoder.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import java.util.HashMap; +import java.util.Map; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.schemas.SchemaCoder; +import org.apache.beam.sdk.transforms.SerializableFunction; +import org.apache.beam.sdk.values.Row; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; + +/** + * Provides a {@link SchemaCoder} for {@link PubsubMessage}, including the topic and all fields of a + * PubSub message from server. + * + *

{@link SchemaCoder} is used so that fields can be added in the future without breaking update + * compatibility. + */ +public class PubsubMessageSchemaCoder { + private static final Schema PUBSUB_MESSAGE_SCHEMA = + Schema.builder() + .addByteArrayField("payload") + .addNullableStringField("topic") + .addNullableMapField("attributes", Schema.FieldType.STRING, Schema.FieldType.STRING) + .addNullableStringField("message_id") + .addNullableStringField("ordering_key") + .build(); + + private static final SerializableFunction TO_ROW = + (PubsubMessage message) -> { + Map fieldValues = new HashMap<>(); + fieldValues.put("payload", message.getPayload()); + + String topic = message.getTopic(); + if (topic != null) { + fieldValues.put("topic", topic); + } + Map attributeMap = message.getAttributeMap(); + if (attributeMap != null) { + fieldValues.put("attributes", attributeMap); + } + String messageId = message.getMessageId(); + if (messageId != null) { + fieldValues.put("message_id", messageId); + } + String orderingKey = message.getOrderingKey(); + if (orderingKey != null) { + fieldValues.put("ordering_key", orderingKey); + } + return Row.withSchema(PUBSUB_MESSAGE_SCHEMA).withFieldValues(fieldValues).build(); + }; + + private static final SerializableFunction FROM_ROW = + (Row row) -> { + PubsubMessage message = + new PubsubMessage( + Preconditions.checkNotNull(row.getBytes("payload")), + row.getMap("attributes"), + row.getString("message_id"), + row.getString("ordering_key")); + + String topic = row.getString("topic"); + if (topic != null) { + message = message.withTopic(topic); + } + return message; + }; + + public static SchemaCoder getSchemaCoder() { + return SchemaCoder.of( + PUBSUB_MESSAGE_SCHEMA, TypeDescriptor.of(PubsubMessage.class), TO_ROW, FROM_ROW); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java index aa8e3a411486..bec1751831a1 100644 --- a/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java +++ b/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSink.java @@ -23,8 +23,11 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; import org.apache.beam.sdk.coders.AtomicCoder; @@ -69,7 +72,9 @@ import org.apache.beam.sdk.values.TypeDescriptors; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.hash.Hashing; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; import org.checkerframework.checker.nullness.qual.Nullable; import org.joda.time.Duration; import org.joda.time.Instant; @@ -201,8 +206,15 @@ public void processElement( break; } + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31828 + // NOTE: Null and empty ordering keys are treated as equivalent. @Nullable String topic = dynamicTopicFn.apply(element); - K key = keyFunction.apply(ThreadLocalRandom.current().nextInt(numShards), topic); + @Nullable String orderingKey = message.getOrderingKey(); + int shard = + Strings.isNullOrEmpty(orderingKey) + ? ThreadLocalRandom.current().nextInt(numShards) + : Hashing.murmur3_32_fixed().hashString(orderingKey, StandardCharsets.UTF_8).asInt(); + K key = keyFunction.apply(shard, topic); o.output(KV.of(key, OutgoingMessage.of(message, timestampMsSinceEpoch, recordId, topic))); } @@ -219,6 +231,16 @@ public void populateDisplayData(DisplayData.Builder builder) { /** Publish messages to Pubsub in batches. */ private static class WriterFn extends DoFn, Void> { + private class OutgoingData { + int bytes; + List messages; + + OutgoingData() { + this.bytes = 0; + this.messages = new ArrayList<>(publishBatchSize); + } + } + private final PubsubClientFactory pubsubFactory; private final @Nullable ValueProvider topic; private final String timestampAttribute; @@ -305,27 +327,45 @@ public void startBundle(StartBundleContext c) throws Exception { } @ProcessElement + @SuppressWarnings("ReferenceEquality") public void processElement(ProcessContext c) throws Exception { - List pubsubMessages = new ArrayList<>(publishBatchSize); - int bytes = 0; + // TODO(sjvanrossum): Refactor the write transform so this map can be indexed with topic + + // ordering key and have bundle scoped lifetime. + // NOTE: A single publish request may only write to one ordering key. + // See https://cloud.google.com/pubsub/docs/publisher#using-ordering-keys for details. + Map orderingKeyBatches = new HashMap<>(); + @MonotonicNonNull String currentOrderingKey = null; + @Nullable OutgoingData currentBatch = null; for (OutgoingMessage message : c.element()) { - if (!pubsubMessages.isEmpty() - && bytes + message.getMessage().getData().size() > publishBatchBytes) { + String messageOrderingKey = message.getMessage().getOrderingKey(); + if (currentOrderingKey == null || !currentOrderingKey.equals(messageOrderingKey)) { + currentOrderingKey = messageOrderingKey; + currentBatch = orderingKeyBatches.get(currentOrderingKey); + } + if (currentBatch == null) { + currentBatch = new OutgoingData(); + orderingKeyBatches.put(currentOrderingKey, currentBatch); + } else if (currentBatch.bytes + message.getMessage().getData().size() > publishBatchBytes) { + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + // Break large (in bytes) batches into smaller. // (We've already broken by batch size using the trigger below, though that may // run slightly over the actual PUBLISH_BATCH_SIZE. We'll consider that ok since // the hard limit from Pubsub is by bytes rather than number of messages.) // BLOCKS until published. - publishBatch(pubsubMessages, bytes); - pubsubMessages.clear(); - bytes = 0; + publishBatch(currentBatch.messages, currentBatch.bytes); + currentBatch.messages.clear(); + currentBatch.bytes = 0; } - pubsubMessages.add(message); - bytes += message.getMessage().getData().size(); + currentBatch.messages.add(message); + // TODO(sjvanrossum): https://github.com/apache/beam/issues/31800 + currentBatch.bytes += message.getMessage().getData().size(); } - if (!pubsubMessages.isEmpty()) { - // BLOCKS until published. - publishBatch(pubsubMessages, bytes); + for (OutgoingData batch : orderingKeyBatches.values()) { + if (!batch.messages.isEmpty()) { + // BLOCKS until published. + publishBatch(batch.messages, batch.bytes); + } } } @@ -378,6 +418,12 @@ public void populateDisplayData(DisplayData.Builder builder) { */ private final int numShards; + /** + * Publish messages with an ordering key. Currently unsupported with DataflowRunner's Pubsub sink + * override. + */ + private final boolean publishBatchWithOrderingKey; + /** Maximum number of messages per publish. */ private final int publishBatchSize; @@ -402,6 +448,7 @@ public void populateDisplayData(DisplayData.Builder builder) { String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, int publishBatchSize, int publishBatchBytes, Duration maxLatency, @@ -412,6 +459,7 @@ public void populateDisplayData(DisplayData.Builder builder) { this.timestampAttribute = timestampAttribute; this.idAttribute = idAttribute; this.numShards = numShards; + this.publishBatchWithOrderingKey = publishBatchWithOrderingKey; this.publishBatchSize = publishBatchSize; this.publishBatchBytes = publishBatchBytes; this.maxLatency = maxLatency; @@ -424,13 +472,15 @@ public PubsubUnboundedSink( ValueProvider topic, String timestampAttribute, String idAttribute, - int numShards) { + int numShards, + boolean publishBatchWithOrderingKey) { this( pubsubFactory, topic, timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, @@ -444,6 +494,7 @@ public PubsubUnboundedSink( String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, String pubsubRootUrl) { this( pubsubFactory, @@ -451,6 +502,7 @@ public PubsubUnboundedSink( timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, DEFAULT_PUBLISH_BATCH_SIZE, DEFAULT_PUBLISH_BATCH_BYTES, DEFAULT_MAX_LATENCY, @@ -464,6 +516,7 @@ public PubsubUnboundedSink( String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, int publishBatchSize, int publishBatchBytes) { this( @@ -472,6 +525,7 @@ public PubsubUnboundedSink( timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, @@ -485,6 +539,7 @@ public PubsubUnboundedSink( String timestampAttribute, String idAttribute, int numShards, + boolean publishBatchWithOrderingKey, int publishBatchSize, int publishBatchBytes, String pubsubRootUrl) { @@ -494,6 +549,7 @@ public PubsubUnboundedSink( timestampAttribute, idAttribute, numShards, + publishBatchWithOrderingKey, publishBatchSize, publishBatchBytes, DEFAULT_MAX_LATENCY, @@ -520,6 +576,10 @@ public PubsubUnboundedSink( return idAttribute; } + public boolean getPublishBatchWithOrderingKey() { + return publishBatchWithOrderingKey; + } + @Override public PDone expand(PCollection input) { if (topic != null) { diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java index 494189d43f36..a125a7b67e69 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PreparePubsubWriteDoFnTest.java @@ -44,6 +44,19 @@ public void testValidatePubsubMessageSizeOnlyPayload() throws SizeLimitExceededE assertEquals(data.length, messageSize); } + @Test + public void testValidatePubsubMessageSizePayloadAndOrderingKey() + throws SizeLimitExceededException { + byte[] data = new byte[1024]; + String orderingKey = "key"; + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + int messageSize = + PreparePubsubWriteDoFn.validatePubsubMessageSize(message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE); + + assertEquals(data.length + orderingKey.getBytes(StandardCharsets.UTF_8).length, messageSize); + } + @Test public void testValidatePubsubMessageSizePayloadAndAttributes() throws SizeLimitExceededException { @@ -76,6 +89,19 @@ public void testValidatePubsubMessageSizePayloadTooLarge() { message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); } + @Test + public void testValidatePubsubMessageSizePayloadPlusOrderingKeyTooLarge() { + byte[] data = new byte[(10 << 20)]; + String orderingKey = "key"; + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + @Test public void testValidatePubsubMessageSizePayloadPlusAttributesTooLarge() { byte[] data = new byte[(10 << 20)]; @@ -121,6 +147,19 @@ public void testValidatePubsubMessageSizeAttributeValueTooLarge() { message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); } + @Test + public void testValidatePubsubMessageSizeOrderingKeyTooLarge() { + byte[] data = new byte[1024]; + String orderingKey = RandomStringUtils.randomAscii(1025); + PubsubMessage message = new PubsubMessage(data, null, null, orderingKey); + + assertThrows( + SizeLimitExceededException.class, + () -> + PreparePubsubWriteDoFn.validatePubsubMessageSize( + message, PUBSUB_MESSAGE_MAX_TOTAL_SIZE)); + } + @Test public void testValidatePubsubMessagePayloadTooLarge() { byte[] data = new byte[(10 << 20) + 1]; diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java index 45fbab0576fb..75a6173591a1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubSubWritePayloadTranslationTest.java @@ -67,6 +67,7 @@ public void testTranslateSinkWithTopic() throws Exception { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, + false, 0, 0, Duration.ZERO, @@ -104,6 +105,7 @@ public void testTranslateDynamicSink() throws Exception { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, + false, 0, 0, Duration.ZERO, @@ -143,6 +145,7 @@ public void testTranslateSinkWithTopicOverridden() throws Exception { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, 0, + false, 0, 0, Duration.ZERO, diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoderTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoderTest.java new file mode 100644 index 000000000000..0776b1cb100f --- /dev/null +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubMessageSchemaCoderTest.java @@ -0,0 +1,63 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.gcp.pubsub; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; + +import java.nio.charset.StandardCharsets; +import java.util.Map; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.testing.CoderProperties; +import org.apache.beam.sdk.util.SerializableUtils; +import org.apache.beam.sdk.values.TypeDescriptor; +import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap; +import org.junit.Test; + +public class PubsubMessageSchemaCoderTest { + private static final String DATA = "testData"; + private static final String TOPIC = "testTopic"; + private static final String MESSAGE_ID = "testMessageId"; + private static final Map ATTRIBUTES = + new ImmutableMap.Builder().put("1", "hello").build(); + private static final String ORDERING_KEY = "key123"; + private static final Coder TEST_CODER = PubsubMessageSchemaCoder.getSchemaCoder(); + private static final PubsubMessage TEST_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), ATTRIBUTES, MESSAGE_ID, ORDERING_KEY) + .withTopic(TOPIC); + private static final PubsubMessage TEST_MINIMAL_VALUE = + new PubsubMessage(DATA.getBytes(StandardCharsets.UTF_8), null); + + @Test + public void testValueEncodable() { + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_VALUE, "error"); + SerializableUtils.ensureSerializableByCoder(TEST_CODER, TEST_MINIMAL_VALUE, "error"); + } + + @Test + public void testCoderDecodeEncodeEqual() throws Exception { + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_VALUE); + CoderProperties.structuralValueDecodeEncodeEqual(TEST_CODER, TEST_MINIMAL_VALUE); + } + + @Test + public void testEncodedTypeDescriptor() { + TypeDescriptor typeDescriptor = new TypeDescriptor() {}; + assertThat(TEST_CODER.getEncodedTypeDescriptor(), equalTo(typeDescriptor)); + } +} diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java index be68083bb28c..5296f4850ef1 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubUnboundedSinkTest.java @@ -120,6 +120,7 @@ public void sendOneMessage() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, batchSize, batchBytes, Duration.standardSeconds(2), @@ -152,6 +153,7 @@ public void sendOneMessageWithoutAttributes() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, 1 /* batchSize */, 1 /* batchBytes */, Duration.standardSeconds(2), @@ -207,6 +209,7 @@ public void testDynamicTopics() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, 1 /* batchSize */, 1 /* batchBytes */, Duration.standardSeconds(2), @@ -258,6 +261,7 @@ public void sendMoreThanOneBatchByNumMessages() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, batchSize, batchBytes, Duration.standardSeconds(2), @@ -303,6 +307,7 @@ public void sendMoreThanOneBatchByByteSize() throws IOException { TIMESTAMP_ATTRIBUTE, ID_ATTRIBUTE, NUM_SHARDS, + false, batchSize, batchBytes, Duration.standardSeconds(2), diff --git a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java index 1898c4ae4af0..fe36e31e69bb 100644 --- a/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java +++ b/sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubWriteIT.java @@ -17,11 +17,17 @@ */ package org.apache.beam.sdk.io.gcp.pubsub; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import java.io.IOException; import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import java.util.List; import java.util.Map; import java.util.Objects; import org.apache.beam.sdk.io.GenerateSequence; +import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.SubscriptionPath; import org.apache.beam.sdk.io.gcp.pubsub.PubsubClient.TopicPath; import org.apache.beam.sdk.testing.TestPipeline; import org.apache.beam.sdk.transforms.Create; @@ -46,11 +52,12 @@ public class PubsubWriteIT { private PubsubClient pubsubClient; private TopicPath testTopic; + private String project; @Before public void setup() throws IOException { PubsubOptions options = TestPipeline.testingPipelineOptions().as(PubsubOptions.class); - String project = options.getProject(); + project = options.getProject(); pubsubClient = PubsubGrpcClient.FACTORY.newClient(null, null, options); testTopic = PubsubClient.topicPathFromName(project, "pubsub-write-" + Instant.now().getMillis()); @@ -102,4 +109,55 @@ public void testBoundedWriteMessageWithAttributes() { .apply(PubsubIO.writeMessages().to(testTopic.getPath())); pipeline.run(); } + + @Test + public void testBoundedWriteMessageWithAttributesAndMessageIdAndOrderingKey() throws IOException { + TopicPath testTopicPath = + PubsubClient.topicPathFromName( + project, "pubsub-write-ordering-key-" + Instant.now().getMillis()); + pubsubClient.createTopic(testTopicPath); + SubscriptionPath testSubscriptionPath = + pubsubClient.createRandomSubscription( + PubsubClient.projectPathFromId(project), testTopicPath, 10); + + byte[] payload = RandomStringUtils.randomAscii(1_000_000).getBytes(StandardCharsets.UTF_8); + Map attributes = + ImmutableMap.builder() + .put("id", "1") + .put("description", RandomStringUtils.randomAscii(100)) + .build(); + + PubsubMessage outgoingMessage = + new PubsubMessage(payload, attributes, "test_message", "111222"); + + pipeline + .apply(Create.of(outgoingMessage).withCoder(PubsubMessageSchemaCoder.getSchemaCoder())) + .apply(PubsubIO.writeMessages().withOrderingKey().to(testTopicPath.getPath())); + pipeline.run().waitUntilFinish(); + + List incomingMessages = + pubsubClient.pull(Instant.now().getMillis(), testSubscriptionPath, 1, true); + + // sometimes the first pull comes up short. try 4 pulls to avoid flaky false-negatives + int numPulls = 1; + while (incomingMessages.isEmpty()) { + if (numPulls >= 4) { + throw new RuntimeException( + String.format("Pulled %s times from PubSub but retrieved no elements.", numPulls)); + } + incomingMessages = + pubsubClient.pull(Instant.now().getMillis(), testSubscriptionPath, 1, true); + numPulls++; + } + assertEquals(1, incomingMessages.size()); + + com.google.pubsub.v1.PubsubMessage incomingMessage = incomingMessages.get(0).message(); + assertTrue( + Arrays.equals(outgoingMessage.getPayload(), incomingMessage.getData().toByteArray())); + assertEquals(outgoingMessage.getAttributeMap(), incomingMessage.getAttributesMap()); + assertEquals(outgoingMessage.getOrderingKey(), incomingMessage.getOrderingKey()); + + pubsubClient.deleteSubscription(testSubscriptionPath); + pubsubClient.deleteTopic(testTopicPath); + } }