Skip to content

Commit

Permalink
Upgrade transforms without upgrading the pipelines (#28210)
Browse files Browse the repository at this point in the history
* Upgrade Java transforms without upgrading the pipelines

* Addresses reviewer comments

* Reduce visibility of the test-only constructor

* Fix compile errors

* Fix spotless

* Addressing reviewer comments

* Do not bundle Transform Service Launcher in the harness

* Fix harness build and a fix for when a runner invokes toProto() multiple times
  • Loading branch information
chamikaramj authored Sep 27, 2023
1 parent ced3de3 commit 1c50fd2
Show file tree
Hide file tree
Showing 29 changed files with 994 additions and 41 deletions.
1 change: 1 addition & 0 deletions runners/core-construction-java/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies {
implementation project(path: ":sdks:java:core", configuration: "shadow")
implementation project(path: ":sdks:java:extensions:avro")
implementation project(path: ":sdks:java:fn-execution")
implementation project(path: ":sdks:java:transform-service:launcher")
implementation library.java.vendored_grpc_1_54_0
implementation library.java.vendored_guava_32_1_2_jre
implementation library.java.classgraph
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public static class CombinePerKeyPayloadTranslator
private CombinePerKeyPayloadTranslator() {}

@Override
public String getUrn(Combine.PerKey<?, ?, ?> transform) {
public String getUrn() {
return COMBINE_PER_KEY_TRANSFORM_URN;
}

Expand Down Expand Up @@ -108,7 +108,7 @@ public static class CombineGloballyPayloadTranslator
private CombineGloballyPayloadTranslator() {}

@Override
public String getUrn(Combine.Globally<?, ?> transform) {
public String getUrn() {
return COMBINE_GLOBALLY_TRANSFORM_URN;
}

Expand Down Expand Up @@ -165,7 +165,7 @@ public static class CombineGroupedValuesPayloadTranslator
private CombineGroupedValuesPayloadTranslator() {}

@Override
public String getUrn(Combine.GroupedValues<?, ?, ?> transform) {
public String getUrn() {
return COMBINE_GROUPED_VALUES_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static <ElemT, ViewT> PCollectionView<ViewT> getView(
static class CreatePCollectionViewTranslator
implements TransformPayloadTranslator<View.CreatePCollectionView<?, ?>> {
@Override
public String getUrn(View.CreatePCollectionView<?, ?> transform) {
public String getUrn() {
return PTransformTranslation.CREATE_VIEW_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,7 +295,7 @@ public OutputT expand(InputT input) {
response
.getComponents()
.toBuilder()
.putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies))
.putAllEnvironments(resolveArtifacts(newEnvironmentsWithDependencies, endpoint))
.build();
expandedTransform = response.getTransform();
expandedRequirements = response.getRequirementsList();
Expand Down Expand Up @@ -338,8 +338,8 @@ public OutputT expand(InputT input) {
return toOutputCollection(outputMapBuilder.build());
}

private Map<String, RunnerApi.Environment> resolveArtifacts(
Map<String, RunnerApi.Environment> environments) {
static Map<String, RunnerApi.Environment> resolveArtifacts(
Map<String, RunnerApi.Environment> environments, Endpoints.ApiServiceDescriptor endpoint) {
if (environments.size() == 0) {
return environments;
}
Expand Down Expand Up @@ -367,7 +367,7 @@ private Map<String, RunnerApi.Environment> resolveArtifacts(
}
}

private RunnerApi.Environment resolveArtifacts(
private static RunnerApi.Environment resolveArtifacts(
ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalStub,
RunnerApi.Environment environment)
throws IOException {
Expand All @@ -378,7 +378,7 @@ private RunnerApi.Environment resolveArtifacts(
.build();
}

private List<RunnerApi.ArtifactInformation> resolveArtifacts(
private static List<RunnerApi.ArtifactInformation> resolveArtifacts(
ArtifactRetrievalServiceGrpc.ArtifactRetrievalServiceBlockingStub retrievalStub,
List<RunnerApi.ArtifactInformation> artifacts)
throws IOException {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import java.util.List;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.transforms.resourcehints.ResourceHintsOptions.EmptyListDefault;

public interface ExternalTranslationOptions extends PipelineOptions {

@Description("Set of URNs of transforms to be overriden using the transform service.")
@Default.InstanceFactory(EmptyListDefault.class)
List<String> getTransformsToOverride();

void setTransformsToOverride(List<String> transformsToOverride);

@Description("Address of an already available transform service.")
String getTransformServiceAddress();

void setTransformServiceAddress(String transformServiceAddress);

@Description("An available Beam version which will be used to start a transform service.")
String getTransformServiceBeamVersion();

void setTransformServiceBeamVersion(String transformServiceBeamVersion);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.beam.runners.core.construction;

import com.google.auto.service.AutoService;
import org.apache.beam.sdk.annotations.Internal;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsRegistrar;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;

/** A registrar for ExternalTranslationOptions. */
@AutoService(PipelineOptionsRegistrar.class)
@Internal
public class ExternalTranslationOptionsRegistrar implements PipelineOptionsRegistrar {
@Override
public Iterable<Class<? extends PipelineOptions>> getPipelineOptions() {
return ImmutableList.<Class<? extends PipelineOptions>>builder()
.add(ExternalTranslationOptions.class)
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public static TransformPayloadTranslator create() {
private FlattenTranslator() {}

@Override
public String getUrn(Flatten.PCollections<?> transform) {
public String getUrn() {
return PTransformTranslation.FLATTEN_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ public class GroupByKeyTranslation {

static class GroupByKeyTranslator implements TransformPayloadTranslator<GroupByKey<?, ?>> {
@Override
public String getUrn(GroupByKey<?, ?> transform) {
public String getUrn() {
return PTransformTranslation.GROUP_BY_KEY_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public class GroupIntoBatchesTranslation {
static class GroupIntoBatchesTranslator
implements TransformPayloadTranslator<GroupIntoBatches<?, ?>> {
@Override
public String getUrn(GroupIntoBatches<?, ?> transform) {
public String getUrn() {
return PTransformTranslation.GROUP_INTO_BATCHES_URN;
}

Expand All @@ -61,7 +61,7 @@ public RunnerApi.FunctionSpec translate(
static class ShardedGroupIntoBatchesTranslator
implements TransformPayloadTranslator<GroupIntoBatches<?, ?>.WithShardedKey> {
@Override
public String getUrn(GroupIntoBatches<?, ?>.WithShardedKey transform) {
public String getUrn() {
return PTransformTranslation.GROUP_INTO_BATCHES_WITH_SHARDED_KEY_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class ImpulseTranslation {
private static class ImpulseTranslator implements TransformPayloadTranslator<Impulse> {
@Override
public String getUrn(Impulse transform) {
public String getUrn() {
return PTransformTranslation.IMPULSE_TRANSFORM_URN;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,20 @@
import org.apache.beam.runners.core.construction.ExternalTranslation.ExternalTranslator;
import org.apache.beam.runners.core.construction.ParDoTranslation.ParDoTranslator;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.RowCoder;
import org.apache.beam.sdk.io.Read;
import org.apache.beam.sdk.runners.AppliedPTransform;
import org.apache.beam.sdk.schemas.SchemaTranslation;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.transforms.display.DisplayData;
import org.apache.beam.sdk.util.CoderUtils;
import org.apache.beam.sdk.util.common.ReflectHelpers.ObjectsClassComparator;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.PInput;
import org.apache.beam.sdk.values.POutput;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.sdk.values.TupleTag;
import org.apache.beam.vendor.grpc.v1p54p0.com.google.protobuf.ByteString;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Joiner;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableSet;
Expand All @@ -54,6 +59,8 @@
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
import org.checkerframework.checker.nullness.qual.NonNull;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Utilities for converting {@link PTransform PTransforms} to {@link RunnerApi Runner API protocol
Expand All @@ -65,10 +72,14 @@
"keyfor"
}) // TODO(https://github.com/apache/beam/issues/20497)
public class PTransformTranslation {

private static final Logger LOG = LoggerFactory.getLogger(PTransformTranslation.class);

// We specifically copy the values here so that they can be used in switch case statements
// and we validate that the value matches the actual URN in the static block below.

// Primitives
public static final String CREATE_TRANSFORM_URN = "beam:transform:create:v1";
public static final String PAR_DO_TRANSFORM_URN = "beam:transform:pardo:v1";
public static final String FLATTEN_TRANSFORM_URN = "beam:transform:flatten:v1";
public static final String GROUP_BY_KEY_TRANSFORM_URN = "beam:transform:group_by_key:v1";
Expand All @@ -83,6 +94,10 @@ public class PTransformTranslation {
public static final ImmutableSet<String> RUNNER_IMPLEMENTED_TRANSFORMS =
ImmutableSet.of(GROUP_BY_KEY_TRANSFORM_URN, IMPULSE_TRANSFORM_URN);

public static final String CONFIG_ROW_KEY = "config_row";

public static final String CONFIG_ROW_SCHEMA_KEY = "config_row_schema";

// DeprecatedPrimitives
/**
* @deprecated SDKs should move away from creating `Read` transforms and migrate to using Impulse
Expand Down Expand Up @@ -435,10 +450,9 @@ public RunnerApi.PTransform translate(
RunnerApi.PTransform.Builder transformBuilder =
translateAppliedPTransform(appliedPTransform, subtransforms, components);

FunctionSpec spec =
KNOWN_PAYLOAD_TRANSLATORS
.get(appliedPTransform.getTransform().getClass())
.translate(appliedPTransform, components);
TransformPayloadTranslator payloadTranslator =
KNOWN_PAYLOAD_TRANSLATORS.get(appliedPTransform.getTransform().getClass());
FunctionSpec spec = payloadTranslator.translate(appliedPTransform, components);
if (spec != null) {
transformBuilder.setSpec(spec);

Expand All @@ -461,6 +475,33 @@ public RunnerApi.PTransform translate(
}
}
}

Row configRow = null;
try {
configRow = payloadTranslator.toConfigRow(appliedPTransform.getTransform());
} catch (UnsupportedOperationException e) {
// Optional toConfigRow() has not been implemented. We can just ignore.
} catch (Exception e) {
LOG.warn(
"Could not attach the config row for transform "
+ appliedPTransform.getTransform().getName()
+ ": "
+ e);
// Ignoring the error and continuing with the translation since attaching config rows is
// optional.
}
if (configRow != null) {
transformBuilder.putAnnotations(
CONFIG_ROW_KEY,
ByteString.copyFrom(
CoderUtils.encodeToByteArray(RowCoder.of(configRow.getSchema()), configRow)));

transformBuilder.putAnnotations(
CONFIG_ROW_SCHEMA_KEY,
ByteString.copyFrom(
SchemaTranslation.schemaToProto(configRow.getSchema(), true).toByteArray()));
}

return transformBuilder.build();
}
}
Expand Down Expand Up @@ -508,14 +549,63 @@ static RunnerApi.PTransform.Builder translateAppliedPTransform(
*
* <p>When going to a protocol buffer message, the translator produces a payload corresponding to
* the Java representation while registering components that payload references.
*
* <p>Also, provides methods for generating a Row-based constructor config for the transform that
* can be later used to re-construct the transform.
*/
public interface TransformPayloadTranslator<T extends PTransform<?, ?>> {
String getUrn(T transform);

/**
* Provides a unique URN for transforms represented by this {@code TransformPayloadTranslator}.
*/
String getUrn();

/**
* Same as {@link #getUrn()} but the returned URN may depend on the transform provided.
*
* <p>Only override this if the same {@code TransformPayloadTranslator} used for multiple
* transforms. Otherwise, use {@link #getUrn()}.
*/
default String getUrn(T transform) {
return getUrn();
}

/** */
/**
* Translates the given transform represented by the provided {@code AppliedPTransform} to a
* {@code FunctionSpec} with a URN and a payload.
*
* @param application an {@code AppliedPTransform} that includes the transform to be expanded.
* @param components components of the pipeline that includes the transform.
* @return a generated spec for the transform to be included in the pipeline proto. If return
* value is null, transform should include an empty spec.
* @throws IOException
*/
@Nullable
FunctionSpec translate(AppliedPTransform<?, ?, T> application, SdkComponents components)
throws IOException;

/**
* Generates a Row-based construction configuration for the provided transform.
*
* @param transform a transform represented by the current {@code TransformPayloadTranslator}.
* @return
*/
default Row toConfigRow(T transform) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* Construts a transform from a provided Row-based construction configuration.
*
* @param configRow a construction configuration similar to what would be generated by the
* {@link #toConfigRow(PTransform)} method.
* @return a transform represented by the current {@code TransformPayloadTranslator}.
*/
default T fromConfigRow(Row configRow) {
throw new UnsupportedOperationException("Not implemented");
}

/**
* A {@link TransformPayloadTranslator} for transforms that contain no references to components,
* so they do not need a specialized rehydration.
Expand All @@ -526,7 +616,7 @@ abstract class NotSerializable<T extends PTransform<?, ?>>
public static NotSerializable<?> forUrn(final String urn) {
return new NotSerializable<PTransform<?, ?>>() {
@Override
public String getUrn(PTransform<?, ?> transform) {
public String getUrn() {
return urn;
}
};
Expand Down
Loading

0 comments on commit 1c50fd2

Please sign in to comment.