Skip to content

Commit

Permalink
flink portable client configurations (#31188)
Browse files Browse the repository at this point in the history
* add support for FlinkJobServer configuration

* remove hardcoded timeout for FlinkPortableClient
  • Loading branch information
AyWa authored May 30, 2024
1 parent 9f3f1c9 commit 3cadc83
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,10 @@
public class FlinkPortableClientEntryPoint {
private static final Logger LOG = LoggerFactory.getLogger(FlinkPortableClientEntryPoint.class);
private static final String JOB_ENDPOINT_FLAG = "--job_endpoint";
private static final Duration JOB_INVOCATION_TIMEOUT = Duration.ofSeconds(30);
private static final Duration JOB_SERVICE_STARTUP_TIMEOUT = Duration.ofSeconds(30);

private final String driverCmd;
private FlinkJobServerDriver jobServer;
private Thread jobServerThread;
private DetachedJobInvokerFactory jobInvokerFactory;
private int jobPort = 0; // pick any free port

public FlinkPortableClientEntryPoint(String driverCmd) {
Preconditions.checkState(
Expand All @@ -96,8 +92,8 @@ public static void main(String[] args) throws Exception {
FlinkPortableClientEntryPoint runner =
new FlinkPortableClientEntryPoint(configuration.driverCmd);
try {
runner.startJobService();
runner.runDriverProgram();
runner.startJobService(configuration);
runner.runDriverProgram(Duration.ofSeconds(configuration.jobInvocationTimeoutSeconds));
} catch (Exception e) {
throw new RuntimeException(String.format("Job %s failed.", configuration.driverCmd), e);
} finally {
Expand All @@ -107,14 +103,25 @@ public static void main(String[] args) throws Exception {
LOG.info("Job submitted successfully.");
}

private static class EntryPointConfiguration {
private static class EntryPointConfiguration
extends FlinkJobServerDriver.FlinkServerConfiguration {
@Option(
name = "--driver-cmd",
required = true,
usage =
"Command that launches the Python driver program. "
+ "(The job service endpoint will be appended as --job_endpoint=localhost:<port>.)")
private String driverCmd;

@Option(
name = "--job-service-startup-timeout-seconds",
usage = "Timeout for the job service start in seconds")
private long jobServiceStartupTimeoutSeconds = 30;

@Option(
name = "--job-invocation-timeout-seconds",
usage = "Timeout for the job submission in seconds")
private long jobInvocationTimeoutSeconds = 30;
}

private static EntryPointConfiguration parseArgs(String[] args) {
Expand All @@ -127,20 +134,20 @@ private static EntryPointConfiguration parseArgs(String[] args) {
parser.printUsage(System.err);
throw new IllegalArgumentException("Unable to parse command line arguments.", e);
}
configuration.setPort(0);
configuration.setArtifactPort(0);
configuration.setExpansionPort(0);
return configuration;
}

private void startJobService() throws Exception {
private void startJobService(EntryPointConfiguration configuration) throws Exception {
jobInvokerFactory = new DetachedJobInvokerFactory();
jobServer =
FlinkJobServerDriver.fromConfig(
FlinkJobServerDriver.parseArgs(
new String[] {"--job-port=" + jobPort, "--artifact-port=0", "--expansion-port=0"}),
jobInvokerFactory);
jobServer = FlinkJobServerDriver.fromConfig(configuration, jobInvokerFactory);
jobServerThread = new Thread(jobServer);
jobServerThread.start();

Deadline deadline = Deadline.fromNow(JOB_SERVICE_STARTUP_TIMEOUT);
Deadline deadline =
Deadline.fromNow(Duration.ofSeconds(configuration.jobServiceStartupTimeoutSeconds));
while (jobServer.getJobServerUrl() == null && deadline.hasTimeLeft()) {
try {
Thread.sleep(500);
Expand All @@ -160,7 +167,7 @@ private void startJobService() throws Exception {
}
}

private void runDriverProgram() throws Exception {
private void runDriverProgram(Duration startTimeout) throws Exception {
ProcessManager processManager = ProcessManager.create();
String executable = "bash";
List<String> args =
Expand All @@ -177,7 +184,7 @@ private void runDriverProgram() throws Exception {
LOG.info("Started driver program");

// await effect of the driver program submitting the job
jobInvokerFactory.executeDetachedJob();
jobInvokerFactory.executeDetachedJob(startTimeout);
} catch (Exception e) {
try {
processManager.stopProcess(processId);
Expand Down Expand Up @@ -247,13 +254,13 @@ protected JobInvocation createJobInvocation(
};
}

private void executeDetachedJob() throws Exception {
long timeoutSeconds = JOB_INVOCATION_TIMEOUT.getSeconds();
if (latch.await(timeoutSeconds, TimeUnit.SECONDS)) {
private void executeDetachedJob(Duration startTimeout) throws Exception {
if (latch.await(startTimeout.getSeconds(), TimeUnit.SECONDS)) {
actualPipelineRunner.run(pipeline, jobInfo);
} else {
throw new TimeoutException(
String.format("Timeout of %s seconds waiting for job submission.", timeoutSeconds));
String.format(
"Timeout of %s seconds waiting for job submission.", startTimeout.getSeconds()));
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,14 +119,26 @@ public int getPort() {
return port;
}

public void setPort(int port) {
this.port = port;
}

public int getArtifactPort() {
return artifactPort;
}

public void setArtifactPort(int artifactPort) {
this.artifactPort = artifactPort;
}

public int getExpansionPort() {
return expansionPort;
}

public void setExpansionPort(int expansionPort) {
this.expansionPort = expansionPort;
}

public String getArtifactStagingPath() {
return artifactStagingPath;
}
Expand Down

0 comments on commit 3cadc83

Please sign in to comment.