Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Java ExpansionService to use arbitrary PipelineOptions set through an ExpansionRequest #33026

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ public class PipelineOptionsTranslation {
new ObjectMapper()
.registerModules(ObjectMapper.findModules(ReflectHelpers.findClassLoader()));

public static final String PIPELINE_OPTIONS_URN_PREFIX = "beam:option:";
public static final String PIPELINE_OPTIONS_URN_SUFFIX = ":v1";

/** Converts the provided {@link PipelineOptions} to a {@link Struct}. */
public static Struct toProto(PipelineOptions options) {
Struct.Builder builder = Struct.newBuilder();
Expand All @@ -65,9 +68,9 @@ public static Struct toProto(PipelineOptions options) {
while (optionsEntries.hasNext()) {
Map.Entry<String, JsonNode> entry = optionsEntries.next();
optionsUsingUrns.put(
"beam:option:"
PIPELINE_OPTIONS_URN_PREFIX
+ CaseFormat.LOWER_CAMEL.to(CaseFormat.LOWER_UNDERSCORE, entry.getKey())
+ ":v1",
+ PIPELINE_OPTIONS_URN_SUFFIX,
entry.getValue());
}

Expand All @@ -92,7 +95,9 @@ public static PipelineOptions fromProto(Struct protoOptions) {
mapWithoutUrns.put(
CaseFormat.LOWER_UNDERSCORE.to(
CaseFormat.LOWER_CAMEL,
optionKey.substring("beam:option:".length(), optionKey.length() - ":v1".length())),
optionKey.substring(
PIPELINE_OPTIONS_URN_PREFIX.length(),
optionKey.length() - PIPELINE_OPTIONS_URN_SUFFIX.length())),
optionValue);
}
return MAPPER.readValue(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,6 @@
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.sdk.options.StreamingOptions;
import org.apache.beam.sdk.schemas.NoSuchSchemaException;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand Down Expand Up @@ -535,7 +534,7 @@ private static <ConfigT> void invokeSetter(ConfigT config, @Nullable Object valu
}

private @MonotonicNonNull Map<String, TransformProvider> registeredTransforms;
private final PipelineOptions pipelineOptions;
private final PipelineOptions commandLineOptions;
private final @Nullable String loopbackAddress;

public ExpansionService() {
Expand All @@ -551,7 +550,7 @@ public ExpansionService(PipelineOptions opts) {
}

public ExpansionService(PipelineOptions opts, @Nullable String loopbackAddress) {
this.pipelineOptions = opts;
this.commandLineOptions = opts;
this.loopbackAddress = loopbackAddress;
}

Expand Down Expand Up @@ -587,12 +586,15 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
request.getTransform().getSpec().getUrn());
LOG.debug("Full transform: {}", request.getTransform());
Set<String> existingTransformIds = request.getComponents().getTransformsMap().keySet();
Pipeline pipeline =
createPipeline(PipelineOptionsTranslation.fromProto(request.getPipelineOptions()));

PipelineOptions pipelineOptionsFromRequest =
PipelineOptionsTranslation.fromProto(request.getPipelineOptions());
Pipeline pipeline = createPipeline(pipelineOptionsFromRequest);

boolean isUseDeprecatedRead =
ExperimentalOptions.hasExperiment(pipelineOptions, "use_deprecated_read")
ExperimentalOptions.hasExperiment(commandLineOptions, "use_deprecated_read")
|| ExperimentalOptions.hasExperiment(
pipelineOptions, "beam_fn_api_use_deprecated_read");
commandLineOptions, "beam_fn_api_use_deprecated_read");
if (!isUseDeprecatedRead) {
ExperimentalOptions.addExperiment(
pipeline.getOptions().as(ExperimentalOptions.class), "beam_fn_api");
Expand Down Expand Up @@ -629,7 +631,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
if (transformProvider == null) {
if (getUrn(ExpansionMethods.Enum.JAVA_CLASS_LOOKUP).equals(urn)) {
AllowList allowList =
pipelineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist();
commandLineOptions.as(ExpansionServiceOptions.class).getJavaClassLookupAllowlist();
assert allowList != null;
transformProvider = new JavaClassLookupTransformProvider(allowList);
} else if (getUrn(SCHEMA_TRANSFORM).equals(urn)) {
Expand Down Expand Up @@ -671,7 +673,7 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
RunnerApi.Environment defaultEnvironment =
Environments.createOrGetDefaultEnvironment(
pipeline.getOptions().as(PortablePipelineOptions.class));
if (pipelineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) {
if (commandLineOptions.as(ExpansionServiceOptions.class).getAlsoStartLoopbackWorker()) {
PortablePipelineOptions externalOptions =
PipelineOptionsFactory.create().as(PortablePipelineOptions.class);
externalOptions.setDefaultEnvironmentType(Environments.ENVIRONMENT_EXTERNAL);
Expand Down Expand Up @@ -723,35 +725,34 @@ private Map<String, TransformProvider> loadRegisteredTransforms() {
}

protected Pipeline createPipeline(PipelineOptions requestOptions) {
// TODO: [https://github.com/apache/beam/issues/21064]: implement proper validation
PipelineOptions effectiveOpts = PipelineOptionsFactory.create();
PortablePipelineOptions portableOptions = effectiveOpts.as(PortablePipelineOptions.class);
PortablePipelineOptions specifiedOptions = pipelineOptions.as(PortablePipelineOptions.class);
Optional.ofNullable(specifiedOptions.getDefaultEnvironmentType())
.ifPresent(portableOptions::setDefaultEnvironmentType);
Optional.ofNullable(specifiedOptions.getDefaultEnvironmentConfig())
.ifPresent(portableOptions::setDefaultEnvironmentConfig);
List<String> filesToStage = specifiedOptions.getFilesToStage();
// We expect the ExpansionRequest to contain a valid set of options to be used for this
// expansion.
// Additionally, we override selected options using options values set via command line or
// ExpansionService wide overrides.

PortablePipelineOptions requestPortablePipelineOptions =
requestOptions.as(PortablePipelineOptions.class);
PortablePipelineOptions commandLinePortablePipelineOptions =
commandLineOptions.as(PortablePipelineOptions.class);
Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentType())
.ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentType);
Optional.ofNullable(commandLinePortablePipelineOptions.getDefaultEnvironmentConfig())
.ifPresent(requestPortablePipelineOptions::setDefaultEnvironmentConfig);
List<String> filesToStage = commandLinePortablePipelineOptions.getFilesToStage();
if (filesToStage != null) {
effectiveOpts.as(PortablePipelineOptions.class).setFilesToStage(filesToStage);
requestPortablePipelineOptions
.as(PortablePipelineOptions.class)
.setFilesToStage(filesToStage);
}
effectiveOpts
requestPortablePipelineOptions
.as(ExperimentalOptions.class)
.setExperiments(pipelineOptions.as(ExperimentalOptions.class).getExperiments());
effectiveOpts.setRunner(NotRunnableRunner.class);
effectiveOpts
.setExperiments(commandLineOptions.as(ExperimentalOptions.class).getExperiments());
requestPortablePipelineOptions.setRunner(NotRunnableRunner.class);
requestPortablePipelineOptions
.as(ExpansionServiceOptions.class)
.setExpansionServiceConfig(
pipelineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig());
// TODO(https://github.com/apache/beam/issues/20090): Figure out the correct subset of options
// to propagate.
if (requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion() != null) {
effectiveOpts
.as(StreamingOptions.class)
.setUpdateCompatibilityVersion(
requestOptions.as(StreamingOptions.class).getUpdateCompatibilityVersion());
}
return Pipeline.create(effectiveOpts);
commandLineOptions.as(ExpansionServiceOptions.class).getExpansionServiceConfig());
return Pipeline.create(requestOptions);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.beam.sdk.expansion.service;

import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_PREFIX;
import static org.apache.beam.sdk.util.construction.PipelineOptionsTranslation.PIPELINE_OPTIONS_URN_SUFFIX;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.allOf;
import static org.hamcrest.Matchers.contains;
Expand Down Expand Up @@ -49,6 +51,8 @@
import org.apache.beam.model.pipeline.v1.RunnerApi;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.GenerateSequence;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.Schema;
import org.apache.beam.sdk.schemas.Schema.Field;
Expand All @@ -58,15 +62,20 @@
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.transforms.Count;
import org.apache.beam.sdk.transforms.Impulse;
import org.apache.beam.sdk.transforms.PTransform;
import org.apache.beam.sdk.util.ByteStringOutputStream;
import org.apache.beam.sdk.util.construction.PipelineTranslation;
import org.apache.beam.sdk.values.PCollection;
import org.apache.beam.sdk.values.Row;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Struct;
import org.apache.beam.vendor.grpc.v1p60p1.com.google.protobuf.Value;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableList;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Iterables;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.io.Resources;
import org.checkerframework.checker.nullness.qual.Nullable;
import org.hamcrest.Matchers;
import org.junit.Assert;
import org.junit.Test;

/** Tests for {@link ExpansionService}. */
Expand All @@ -76,6 +85,7 @@
public class ExpansionServiceTest {

private static final String TEST_URN = "test:beam:transforms:count";
private static final String TEST_OPTIONS_URN = "test:beam:transforms:test_options";

private static final String TEST_NAME = "TestName";

Expand All @@ -98,9 +108,59 @@ public class ExpansionServiceTest {
@AutoService(ExpansionService.ExpansionServiceRegistrar.class)
public static class TestTransformRegistrar implements ExpansionService.ExpansionServiceRegistrar {

static final String EXPECTED_STRING_VALUE = "abcde";
static final Boolean EXPECTED_BOOLEAN_VALUE = true;
static final Integer EXPECTED_INTEGER_VALUE = 12345;

@Override
public Map<String, TransformProvider> knownTransforms() {
return ImmutableMap.of(TEST_URN, (spec, options) -> Count.perElement());
return ImmutableMap.of(
TEST_URN, (spec, options) -> Count.perElement(),
TEST_OPTIONS_URN,
(spec, options) ->
new TestOptionsTransform(
EXPECTED_STRING_VALUE, EXPECTED_BOOLEAN_VALUE, EXPECTED_INTEGER_VALUE));
}
}

public interface TestOptions extends PipelineOptions {
String getStringOption();

void setStringOption(String value);

Boolean getBooleanOption();

void setBooleanOption(Boolean value);

Integer getIntegerOption();

void setIntegerOption(Integer value);
}

public static class TestOptionsTransform
extends PTransform<PCollection<String>, PCollection<String>> {
String expectedStringValue;

Boolean expectedBooleanValue;

Integer expectedIntegerValue;

public TestOptionsTransform(
String expectedStringValue, Boolean expectedBooleanValue, Integer expectedIntegerValue) {
this.expectedStringValue = expectedStringValue;
this.expectedBooleanValue = expectedBooleanValue;
this.expectedIntegerValue = expectedIntegerValue;
}

@Override
public PCollection<String> expand(PCollection<String> input) {
TestOptions testOption = input.getPipeline().getOptions().as(TestOptions.class);

Assert.assertEquals(expectedStringValue, testOption.getStringOption());
Assert.assertEquals(expectedBooleanValue, testOption.getBooleanOption());
Assert.assertEquals(expectedIntegerValue, testOption.getIntegerOption());

return input;
}
}

Expand Down Expand Up @@ -146,6 +206,58 @@ public void testConstruct() {
}
}

@Test
public void testConstructWithPipelineOptions() {
PipelineOptionsFactory.register(TestOptions.class);
Pipeline p = Pipeline.create();
p.apply(Impulse.create());
RunnerApi.Pipeline pipelineProto = PipelineTranslation.toProto(p);
String inputPcollId =
Iterables.getOnlyElement(
Iterables.getOnlyElement(pipelineProto.getComponents().getTransformsMap().values())
.getOutputsMap()
.values());

Struct optionsStruct =
Struct.newBuilder()
.putFields(
PIPELINE_OPTIONS_URN_PREFIX + "string_option" + PIPELINE_OPTIONS_URN_SUFFIX,
Value.newBuilder()
.setStringValue(TestTransformRegistrar.EXPECTED_STRING_VALUE)
.build())
.putFields(
PIPELINE_OPTIONS_URN_PREFIX + "boolean_option" + PIPELINE_OPTIONS_URN_SUFFIX,
Value.newBuilder()
.setBoolValue(TestTransformRegistrar.EXPECTED_BOOLEAN_VALUE)
.build())
.putFields(
PIPELINE_OPTIONS_URN_PREFIX + "integer_option" + PIPELINE_OPTIONS_URN_SUFFIX,
Value.newBuilder()
.setNumberValue(TestTransformRegistrar.EXPECTED_INTEGER_VALUE)
.build())
.build();
ExpansionApi.ExpansionRequest request =
ExpansionApi.ExpansionRequest.newBuilder()
.setComponents(pipelineProto.getComponents())
.setPipelineOptions(optionsStruct)
.setTransform(
RunnerApi.PTransform.newBuilder()
.setUniqueName(TEST_NAME)
.setSpec(RunnerApi.FunctionSpec.newBuilder().setUrn(TEST_OPTIONS_URN))
.putInputs("input", inputPcollId))
.setNamespace(TEST_NAMESPACE)
.build();
ExpansionApi.ExpansionResponse response = expansionService.expand(request);
RunnerApi.PTransform expandedTransform = response.getTransform();
assertEquals(TEST_NAMESPACE + TEST_NAME, expandedTransform.getUniqueName());

// Verify it has the right input.
assertThat(expandedTransform.getInputsMap().values(), contains(inputPcollId));

// Verify it has the right output.
assertThat(expandedTransform.getOutputsMap().keySet(), contains("output"));
}

@Test
public void testConstructGenerateSequenceWithRegistration() {
ExternalTransforms.ExternalConfigurationPayload payload =
Expand Down
Loading