Skip to content

Commit

Permalink
Consolidate SamzaPipelineOptions and SamzaPortablePipelineOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
mynameborat committed May 19, 2022
1 parent 1b0df38 commit 79738cc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import org.apache.beam.runners.jobsubmission.JobInvoker;
import org.apache.beam.runners.jobsubmission.PortablePipelineJarCreator;
import org.apache.beam.runners.jobsubmission.PortablePipelineRunner;
import org.apache.beam.sdk.options.PortablePipelineOptions;
import org.apache.beam.vendor.grpc.v1p43p2.com.google.protobuf.Struct;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.base.Strings;
import org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.ListeningExecutorService;
Expand Down Expand Up @@ -59,11 +60,12 @@ protected JobInvocation invokeWithExecutor(
@Nullable String retrievalToken,
ListeningExecutorService executorService) {
LOG.trace("Parsing pipeline options");
final SamzaPortablePipelineOptions samzaOptions =
PipelineOptionsTranslation.fromProto(options).as(SamzaPortablePipelineOptions.class);
final SamzaPipelineOptions samzaOptions =
PipelineOptionsTranslation.fromProto(options).as(SamzaPipelineOptions.class);
final PortablePipelineOptions portableOptions = samzaOptions.as(PortablePipelineOptions.class);

final PortablePipelineRunner pipelineRunner;
if (Strings.isNullOrEmpty(samzaOptions.getOutputExecutablePath())) {
if (Strings.isNullOrEmpty(portableOptions.getOutputExecutablePath())) {
pipelineRunner = new SamzaPipelineRunner(samzaOptions);
} else {
/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,13 @@ public interface SamzaPipelineOptions extends PipelineOptions {

void setJobInstance(String instance);

@Description(
"The file path for the local file system token. If not set (by default), then the runner would"
+ " not use secure server factory.")
String getFsTokenPath();

void setFsTokenPath(String path);

@Description(
"Samza application execution environment."
+ "See {@link org.apache.beam.runners.samza.SamzaExecutionEnvironment} for detailed environment descriptions.")
Expand Down

This file was deleted.

0 comments on commit 79738cc

Please sign in to comment.