Skip to content

Commit

Permalink
spotless
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedabu98 committed Sep 16, 2024
1 parent 53d47a7 commit cb9e7fb
Show file tree
Hide file tree
Showing 2 changed files with 3 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@ public class PreparePubsubWriteDoFn<InputT> extends DoFn<InputT, PubsubMessage>
private static final int PUBSUB_MESSAGE_MAX_ATTRIBUTES = 100;
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_KEY_BYTES = 256;
private static final int PUBSUB_MESSAGE_ATTRIBUTE_MAX_VALUE_BYTES = 1024;
private static final int ORDERING_KEY_MAX_VALUE_BYTES = 1024;
private static final int ORDERING_KEY_MAX_BYTE_SIZE = 1024;
// The amount of bytes that each attribute entry adds up to the request
private static final int PUBSUB_MESSAGE_ATTRIBUTE_ENCODE_ADDITIONAL_BYTES = 6;
private boolean allowOrderingKey;
private int maxPublishBatchSize;

private SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction;
Expand Down Expand Up @@ -71,12 +70,12 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
@Nullable String orderingKey = message.getOrderingKey();
if (orderingKey != null) {
int orderingKeySize = orderingKey.getBytes(StandardCharsets.UTF_8).length;
if (orderingKeySize > ORDERING_KEY_MAX_VALUE_BYTES) {
if (orderingKeySize > ORDERING_KEY_MAX_BYTE_SIZE) {
throw new SizeLimitExceededException(
"Pubsub message ordering key of length "
+ orderingKeySize
+ " exceeds maximum of "
+ ORDERING_KEY_MAX_VALUE_BYTES
+ ORDERING_KEY_MAX_BYTE_SIZE
+ " bytes. See https://cloud.google.com/pubsub/quotas#resource_limits");
}
totalSize += orderingKeySize;
Expand Down Expand Up @@ -141,14 +140,12 @@ static int validatePubsubMessageSize(PubsubMessage message, int maxPublishBatchS
SerializableFunction<ValueInSingleWindow<InputT>, PubsubMessage> formatFunction,
@Nullable
SerializableFunction<ValueInSingleWindow<InputT>, PubsubIO.PubsubTopic> topicFunction,
boolean allowOrderingKey,
int maxPublishBatchSize,
BadRecordRouter badRecordRouter,
Coder<InputT> inputCoder,
TupleTag<PubsubMessage> outputTag) {
this.formatFunction = formatFunction;
this.topicFunction = topicFunction;
this.allowOrderingKey = allowOrderingKey;
this.maxPublishBatchSize = maxPublishBatchSize;
this.badRecordRouter = badRecordRouter;
this.inputCoder = inputCoder;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1543,7 +1543,6 @@ public PDone expand(PCollection<T> input) {
new PreparePubsubWriteDoFn<>(
getFormatFn(),
topicFunction,
getNeedsOrderingKey(),
maxMessageSize,
getBadRecordRouter(),
input.getCoder(),
Expand Down

0 comments on commit cb9e7fb

Please sign in to comment.