Skip to content

Commit

Permalink
Register URN for SplittableParDo centrally, to avoid conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
kennknowles committed Feb 8, 2024
1 parent 382c6dc commit ea68a45
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -298,6 +298,7 @@ private static String toProto(TupleTag<?> tag) {
/** Returns the URN for the transform if it is known, otherwise throws. */
public static String urnForTransform(PTransform<?, ?> transform) {
String urn = urnForTransformOrNull(transform);

if (urn == null) {
throw new IllegalStateException(
String.format("No translator known for %s", transform.getClass().getName()));
Expand Down Expand Up @@ -485,6 +486,7 @@ public RunnerApi.PTransform translate(

TransformPayloadTranslator payloadTranslator =
getKnownPayloadTranslators().get(appliedPTransform.getTransform().getClass());

FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components);
if (spec != null) {
transformBuilder.setSpec(spec);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,19 @@
*/
package org.apache.beam.runners.core;

import static org.apache.beam.runners.core.construction.SplittableParDo.SPLITTABLE_PROCESS_URN;

import com.google.auto.service.AutoService;
import java.util.List;
import java.util.Map;
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.runners.core.construction.PTransformReplacements;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation.RawPTransform;
import org.apache.beam.runners.core.construction.ReplacementOutputs;
import org.apache.beam.runners.core.construction.SplittableParDo;
import org.apache.beam.runners.core.construction.SplittableParDo.ProcessKeyedElements;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
import org.apache.beam.sdk.coders.ByteArrayCoder;
import org.apache.beam.sdk.coders.Coder;
import org.apache.beam.sdk.coders.KvCoder;
Expand All @@ -34,6 +39,7 @@
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.state.WatermarkHoldState;
import org.apache.beam.sdk.transforms.Combine;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.GroupByKey;
import org.apache.beam.sdk.transforms.PTransform;
Expand All @@ -55,6 +61,7 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.apache.beam.sdk.values.WindowingStrategy;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.joda.time.Instant;
Expand Down Expand Up @@ -681,4 +688,26 @@ public String getErrorContext() {
};
}
}

/**
* Registers {@link PTransformTranslation.TransformPayloadTranslator TransformPayloadTranslators}
* for {@link Combine Combines}.
*/
@AutoService(TransformPayloadTranslatorRegistrar.class)
public static class Registrar implements TransformPayloadTranslatorRegistrar {
@SuppressWarnings("rawtypes")
@Override
public Map<
? extends Class<? extends PTransform>,
? extends PTransformTranslation.TransformPayloadTranslator>
getTransformPayloadTranslators() {
return ImmutableMap
.<Class<? extends PTransform>, PTransformTranslation.TransformPayloadTranslator>builder()
.put(
SplittableParDoViaKeyedWorkItems.ProcessElements.class,
PTransformTranslation.TransformPayloadTranslator.NotSerializable.forUrn(
SPLITTABLE_PROCESS_URN))
.build();
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@
import java.util.Collection;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.beam.runners.core.SplittableParDoViaKeyedWorkItems;
import org.apache.beam.runners.core.construction.PTransformTranslation;
import org.apache.beam.runners.core.construction.PTransformTranslation.TransformPayloadTranslator;
import org.apache.beam.runners.core.construction.TransformPayloadTranslatorRegistrar;
Expand Down Expand Up @@ -132,9 +131,6 @@ public static class DirectTransformsRegistrar implements TransformPayloadTransla
.put(
DirectTestStream.class,
TransformPayloadTranslator.NotSerializable.forUrn(DIRECT_TEST_STREAM_URN))
.put(
SplittableParDoViaKeyedWorkItems.ProcessElements.class,
TransformPayloadTranslator.NotSerializable.forUrn(SPLITTABLE_PROCESS_URN))
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1449,10 +1449,6 @@ public static class FlinkTransformsRegistrar implements TransformPayloadTranslat
.put(
CreateStreamingFlinkView.CreateFlinkPCollectionView.class,
new CreateStreamingFlinkViewPayloadTranslator())
.put(
SplittableParDoViaKeyedWorkItems.ProcessElements.class,
PTransformTranslation.TransformPayloadTranslator.NotSerializable.forUrn(
SPLITTABLE_PROCESS_URN))
.build();
}
}
Expand Down

0 comments on commit ea68a45

Please sign in to comment.