Skip to content

Commit

Permalink
Dedup SerializableSupplier (#31829)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abacn authored Jul 11, 2024
1 parent 9ee961f commit b34c014
Show file tree
Hide file tree
Showing 15 changed files with 15 additions and 71 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.beam.sdk.coders.CoderException;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.util.UserCodeException;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -734,25 +735,14 @@ public static <T> SerializableMatcher<T> fromSupplier(SerializableSupplier<Match
return new SerializableMatcherFromSupplier<>(supplier);
}

/**
* Supplies values of type {@code T}, and is serializable. Thus, even if {@code T} is not
* serializable, the supplier can be serialized and provide a {@code T} wherever it is
* deserialized.
*
* @param <T> the type of value supplied.
*/
public interface SerializableSupplier<T> extends Serializable {
T get();
}

/**
* Since the delegate {@link Matcher} is not generally serializable, instead this takes a nullary
* SerializableFunction to return such a matcher.
*/
private static class SerializableMatcherFromSupplier<T> extends BaseMatcher<T>
implements SerializableMatcher<T> {

private SerializableSupplier<Matcher<T>> supplier;
private final SerializableSupplier<Matcher<T>> supplier;

public SerializableMatcherFromSupplier(SerializableSupplier<Matcher<T>> supplier) {
this.supplier = supplier;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators.Manual;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Preconditions;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import com.google.cloud.pubsublite.PartitionLookupUtils;
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import org.apache.beam.sdk.testing.SerializableMatchers.SerializableSupplier;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -34,6 +33,7 @@
import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
import org.apache.beam.sdk.transforms.splittabledofn.WatermarkEstimators;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.PBegin;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.apache.beam.sdk.io.gcp.bigtable.changestreams.restriction.StreamProgress;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.splittabledofn.ManualWatermarkEstimator;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.KV;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
import java.io.Serializable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.beam.sdk.testing.SerializableMatchers.SerializableSupplier;
import org.apache.beam.sdk.util.SerializableSupplier;

/**
* A FakeSerializable hides a non-serializable object in a static map and returns a handle into the
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
import com.google.cloud.pubsublite.SubscriptionPath;
import com.google.cloud.pubsublite.TopicPath;
import org.apache.beam.sdk.testing.PAssert;
import org.apache.beam.sdk.testing.SerializableMatchers.SerializableSupplier;
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Duration;
import org.junit.Before;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.function.Supplier;
import javax.jms.BytesMessage;
import javax.jms.ConnectionFactory;
import javax.jms.Message;
Expand All @@ -34,8 +35,8 @@
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.transport.amqp.AmqpTransportFactory;
import org.apache.beam.sdk.transforms.SerializableFunction;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.util.ThrowingSupplier;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Supplier;

/**
* A common test fixture to create a broker and connection factories for {@link JmsIOIT} & {@link
Expand All @@ -47,8 +48,6 @@ public class CommonJms implements Serializable {
// convenient typedefs and a helper conversion functions
interface ThrowingSerializableSupplier<T> extends ThrowingSupplier<T>, Serializable {}

private interface SerializableSupplier<T> extends Serializable, Supplier<T> {}

private static <T> SerializableSupplier<T> toSerializableSupplier(
ThrowingSerializableSupplier<T> throwingSerializableSupplier) {
return () -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.util.SerializableUtils;

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.beam.sdk.transforms.Values;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.FluentBackoff;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.util.SerializableUtils;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.KV;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;

import org.apache.beam.sdk.util.SerializableSupplier;
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
import org.joda.time.Duration;
import org.joda.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import org.apache.beam.sdk.testing.TestPipeline;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.util.BackOff;
import org.apache.beam.sdk.util.SerializableSupplier;
import org.apache.beam.sdk.util.Sleeper;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.TypeDescriptor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.apache.beam.sdk.testing.SerializableMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import org.apache.beam.sdk.util.SerializableSupplier;
import org.joda.time.Duration;
import org.junit.Test;

Expand Down

0 comments on commit b34c014

Please sign in to comment.