From c8b3224959a8fac4c5cd2f082956931ef1c2cd5e Mon Sep 17 00:00:00 2001 From: marc hurabielle Date: Mon, 6 May 2024 22:18:47 +0900 Subject: [PATCH 1/2] add support for FlinkJobServer configuration --- .../flink/FlinkPortableClientEntryPoint.java | 18 +++++++++--------- .../runners/jobsubmission/JobServerDriver.java | 12 ++++++++++++ 2 files changed, 21 insertions(+), 9 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java index 47d3959ad18c..a07ab61d6ee9 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java @@ -80,7 +80,6 @@ public class FlinkPortableClientEntryPoint { private FlinkJobServerDriver jobServer; private Thread jobServerThread; private DetachedJobInvokerFactory jobInvokerFactory; - private int jobPort = 0; // pick any free port public FlinkPortableClientEntryPoint(String driverCmd) { Preconditions.checkState( @@ -96,7 +95,7 @@ public static void main(String[] args) throws Exception { FlinkPortableClientEntryPoint runner = new FlinkPortableClientEntryPoint(configuration.driverCmd); try { - runner.startJobService(); + runner.startJobService(configuration); runner.runDriverProgram(); } catch (Exception e) { throw new RuntimeException(String.format("Job %s failed.", configuration.driverCmd), e); @@ -107,7 +106,8 @@ public static void main(String[] args) throws Exception { LOG.info("Job submitted successfully."); } - private static class EntryPointConfiguration { + private static class EntryPointConfiguration + extends FlinkJobServerDriver.FlinkServerConfiguration { @Option( name = "--driver-cmd", required = true, @@ -127,16 +127,16 @@ private static EntryPointConfiguration parseArgs(String[] args) { parser.printUsage(System.err); throw new IllegalArgumentException("Unable to parse command line arguments.", e); } + configuration.setPort(0); + configuration.setArtifactPort(0); + configuration.setExpansionPort(0); return configuration; } - private void startJobService() throws Exception { + private void startJobService(FlinkJobServerDriver.FlinkServerConfiguration configuration) + throws Exception { jobInvokerFactory = new DetachedJobInvokerFactory(); - jobServer = - FlinkJobServerDriver.fromConfig( - FlinkJobServerDriver.parseArgs( - new String[] {"--job-port=" + jobPort, "--artifact-port=0", "--expansion-port=0"}), - jobInvokerFactory); + jobServer = FlinkJobServerDriver.fromConfig(configuration, jobInvokerFactory); jobServerThread = new Thread(jobServer); jobServerThread.start(); diff --git a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java index a342593b21f7..885e54bfb665 100644 --- a/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java +++ b/runners/java-job-service/src/main/java/org/apache/beam/runners/jobsubmission/JobServerDriver.java @@ -119,14 +119,26 @@ public int getPort() { return port; } + public void setPort(int port) { + this.port = port; + } + public int getArtifactPort() { return artifactPort; } + public void setArtifactPort(int artifactPort) { + this.artifactPort = artifactPort; + } + public int getExpansionPort() { return expansionPort; } + public void setExpansionPort(int expansionPort) { + this.expansionPort = expansionPort; + } + public String getArtifactStagingPath() { return artifactStagingPath; } From d5957f8a1b4f8d5a8b6f07976e5720c89b503b9d Mon Sep 17 00:00:00 2001 From: marc hurabielle Date: Mon, 6 May 2024 22:40:21 +0900 Subject: [PATCH 2/2] remove hardcoded timeout for FlinkPortableClient --- .../flink/FlinkPortableClientEntryPoint.java | 33 +++++++++++-------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java index a07ab61d6ee9..8f0ecf3efbda 100644 --- a/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java +++ b/runners/flink/src/main/java/org/apache/beam/runners/flink/FlinkPortableClientEntryPoint.java @@ -73,9 +73,6 @@ public class FlinkPortableClientEntryPoint { private static final Logger LOG = LoggerFactory.getLogger(FlinkPortableClientEntryPoint.class); private static final String JOB_ENDPOINT_FLAG = "--job_endpoint"; - private static final Duration JOB_INVOCATION_TIMEOUT = Duration.ofSeconds(30); - private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = Duration.ofSeconds(30); - private final String driverCmd; private FlinkJobServerDriver jobServer; private Thread jobServerThread; @@ -96,7 +93,7 @@ public static void main(String[] args) throws Exception { new FlinkPortableClientEntryPoint(configuration.driverCmd); try { runner.startJobService(configuration); - runner.runDriverProgram(); + runner.runDriverProgram(Duration.ofSeconds(configuration.jobInvocationTimeoutSeconds)); } catch (Exception e) { throw new RuntimeException(String.format("Job %s failed.", configuration.driverCmd), e); } finally { @@ -115,6 +112,16 @@ private static class EntryPointConfiguration "Command that launches the Python driver program. " + "(The job service endpoint will be appended as --job_endpoint=localhost:.)") private String driverCmd; + + @Option( + name = "--job-service-startup-timeout-seconds", + usage = "Timeout for the job service start in seconds") + private long jobServiceStartupTimeoutSeconds = 30; + + @Option( + name = "--job-invocation-timeout-seconds", + usage = "Timeout for the job submission in seconds") + private long jobInvocationTimeoutSeconds = 30; } private static EntryPointConfiguration parseArgs(String[] args) { @@ -133,14 +140,14 @@ private static EntryPointConfiguration parseArgs(String[] args) { return configuration; } - private void startJobService(FlinkJobServerDriver.FlinkServerConfiguration configuration) - throws Exception { + private void startJobService(EntryPointConfiguration configuration) throws Exception { jobInvokerFactory = new DetachedJobInvokerFactory(); jobServer = FlinkJobServerDriver.fromConfig(configuration, jobInvokerFactory); jobServerThread = new Thread(jobServer); jobServerThread.start(); - Deadline deadline = Deadline.fromNow(JOB_SERVICE_STARTUP_TIMEOUT); + Deadline deadline = + Deadline.fromNow(Duration.ofSeconds(configuration.jobServiceStartupTimeoutSeconds)); while (jobServer.getJobServerUrl() == null && deadline.hasTimeLeft()) { try { Thread.sleep(500); @@ -160,7 +167,7 @@ private void startJobService(FlinkJobServerDriver.FlinkServerConfiguration confi } } - private void runDriverProgram() throws Exception { + private void runDriverProgram(Duration startTimeout) throws Exception { ProcessManager processManager = ProcessManager.create(); String executable = "bash"; List args = @@ -177,7 +184,7 @@ private void runDriverProgram() throws Exception { LOG.info("Started driver program"); // await effect of the driver program submitting the job - jobInvokerFactory.executeDetachedJob(); + jobInvokerFactory.executeDetachedJob(startTimeout); } catch (Exception e) { try { processManager.stopProcess(processId); @@ -247,13 +254,13 @@ protected JobInvocation createJobInvocation( }; } - private void executeDetachedJob() throws Exception { - long timeoutSeconds = JOB_INVOCATION_TIMEOUT.getSeconds(); - if (latch.await(timeoutSeconds, TimeUnit.SECONDS)) { + private void executeDetachedJob(Duration startTimeout) throws Exception { + if (latch.await(startTimeout.getSeconds(), TimeUnit.SECONDS)) { actualPipelineRunner.run(pipeline, jobInfo); } else { throw new TimeoutException( - String.format("Timeout of %s seconds waiting for job submission.", timeoutSeconds)); + String.format( + "Timeout of %s seconds waiting for job submission.", startTimeout.getSeconds())); } } }