From 897860fd7d9cec5ee7ef546fcc4a3a64714dd430 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Tue, 19 Feb 2019 17:48:37 +0700 Subject: [PATCH 01/13] Pass jobVertex's ResourceProfile to SlotPool and ResourceManager --- .../runtime/clusterframework/types/ResourceProfile.java | 2 +- .../apache/flink/runtime/executiongraph/Execution.java | 2 +- .../apache/flink/runtime/instance/SimpleSlotContext.java | 7 +++++++ .../org/apache/flink/runtime/jobmaster/SlotContext.java | 3 +++ .../runtime/jobmaster/slotpool/SlotSharingManager.java | 9 +++++++-- .../runtime/jobmanager/scheduler/SchedulerTestBase.java | 2 +- .../jobmaster/slotpool/SingleLogicalSlotTest.java | 7 +++++++ 7 files changed, 27 insertions(+), 5 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java index 5b133e7b624fb..4b5d6d9461518 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceProfile.java @@ -300,7 +300,7 @@ public String toString() { '}'; } - static ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) { + static public ResourceProfile fromResourceSpec(ResourceSpec resourceSpec, int networkMemory) { Map copiedExtendedResources = new HashMap<>(resourceSpec.getExtendedResources()); return new ResourceProfile( diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index e3b501e52e837..4137ec0cc0a4a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -515,7 +515,7 @@ public CompletableFuture allocateAndAssignSlotForExecution( toSchedule, queued, new SlotProfile( - ResourceProfile.UNKNOWN, + ResourceProfile.fromResourceSpec(vertex.getJobVertex().getJobVertex().getPreferredResources(), 0), preferredLocations, previousAllocationIDs, allPreviousExecutionGraphAllocationIds), diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java index 282fd2ccf4e3b..5d7f54d52d508 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SimpleSlotContext.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.instance; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; import org.apache.flink.runtime.jobmaster.SlotContext; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -67,4 +68,10 @@ public int getPhysicalSlotNumber() { public TaskManagerGateway getTaskManagerGateway() { return taskManagerGateway; } + + @Override + public ResourceProfile getResourceProfile() + { + return ResourceProfile.ANY; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java index 8777edd2fe7b3..2f36be2fbf545 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.jobmaster; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; /** @@ -35,4 +36,6 @@ public interface SlotContext extends SlotInfo { * @return The gateway that can be used to send messages to the TaskManager. */ TaskManagerGateway getTaskManagerGateway(); + + ResourceProfile getResourceProfile(); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java index af5582752d928..83613bec73267 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/slotpool/SlotSharingManager.java @@ -188,8 +188,13 @@ MultiTaskSlotLocality getResolvedRootSlot(AbstractID groupId, SchedulingStrategy slotProfile, () -> resolvedRootSlotsValues.stream().flatMap(Collection::stream), (MultiTaskSlot multiTaskSlot) -> multiTaskSlot.getSlotContextFuture().join(), - (MultiTaskSlot multiTaskSlot) -> !multiTaskSlot.contains(groupId), - MultiTaskSlotLocality::of); + (MultiTaskSlot multiTaskSlot) -> + !multiTaskSlot.contains(groupId) && multiTaskSlot.getSlotContextFuture() + .join() + .getResourceProfile() + .isMatching(slotProfile.getResourceProfile()), + MultiTaskSlotLocality::of + ); } /** diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java index 683b0cde52f7a..0dd5eedd1980e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/scheduler/SchedulerTestBase.java @@ -162,7 +162,7 @@ public TaskManagerLocation addTaskManager(int numberSlots) { final SlotOffer slotOffer = new SlotOffer( new AllocationID(), i, - ResourceProfile.UNKNOWN); + ResourceProfile.ANY); slotOffers.add(slotOffer); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java index c5beda44d17ec..72d92b5ef771e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/slotpool/SingleLogicalSlotTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.jobmaster.slotpool; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.jobmanager.scheduler.Locality; @@ -319,5 +320,11 @@ public int getPhysicalSlotNumber() { public TaskManagerGateway getTaskManagerGateway() { return taskManagerGateway; } + + @Override + public ResourceProfile getResourceProfile() + { + return ResourceProfile.ANY; + } } } From 44e14db054b0d0b6543b3724fd0de213c7b6a842 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Tue, 19 Feb 2019 20:27:42 +0700 Subject: [PATCH 02/13] Added a comment for new method getResourceProfile() --- .../java/org/apache/flink/runtime/jobmaster/SlotContext.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java index 2f36be2fbf545..1e35ac8d64c4e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/SlotContext.java @@ -37,5 +37,10 @@ public interface SlotContext extends SlotInfo { */ TaskManagerGateway getTaskManagerGateway(); + /** + * Gets the resource profile of the underlying allocated slot + * + * @return The resource profile that can be used to define if the slot has the requested resources + */ ResourceProfile getResourceProfile(); } From a8cc09bfdd578d3adbf4aa35e114c2182b06ba26 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Tue, 19 Feb 2019 20:36:57 +0700 Subject: [PATCH 03/13] Use min resources instead of preferred to increase the probability of allocating (or sharing) resources --- .../java/org/apache/flink/runtime/executiongraph/Execution.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 4137ec0cc0a4a..55de1c1f38ba5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -515,7 +515,7 @@ public CompletableFuture allocateAndAssignSlotForExecution( toSchedule, queued, new SlotProfile( - ResourceProfile.fromResourceSpec(vertex.getJobVertex().getJobVertex().getPreferredResources(), 0), + ResourceProfile.fromResourceSpec(vertex.getJobVertex().getJobVertex().getMinResources(), 0), preferredLocations, previousAllocationIDs, allPreviousExecutionGraphAllocationIds), From 8b988baee5054d2afbe4be8b696555948b66b058 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Wed, 13 Mar 2019 19:19:42 +0700 Subject: [PATCH 04/13] BACKEND-1137: K8s integration - initial commit --- flink-kubernetes/pom.xml | 215 ++++++++++++++++++ .../kubernetes/FlinkKubernetesOptions.java | 37 +++ .../kubernetes/KubernetesResourceManager.java | 120 ++++++++++ .../KubernetesTaskExecutorRunner.java | 104 +++++++++ .../flink/kubernetes/client/Endpoint.java | 42 ++++ .../kubernetes/client/KubernetesClient.java | 60 +++++ .../client/KubernetesClientImpl.java | 71 ++++++ .../cluster/KubernetesClusterDescriptor.java | 152 +++++++++++++ .../KubernetesResourceManagerFactory.java | 64 ++++++ .../KubernetesSessionClusterEntrypoint.java | 73 ++++++ pom.xml | 1 + 11 files changed, 939 insertions(+) create mode 100644 flink-kubernetes/pom.xml create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java diff --git a/flink-kubernetes/pom.xml b/flink-kubernetes/pom.xml new file mode 100644 index 0000000000000..1ce6b3e8a5bc7 --- /dev/null +++ b/flink-kubernetes/pom.xml @@ -0,0 +1,215 @@ + + + 4.0.0 + + + org.apache.flink + flink-parent + 1.7-SNAPSHOT + .. + + + flink-kubernetes_${scala.binary.version} + flink-kubernetes + jar + + + 1.0.1 + + + + + + + + + org.apache.flink + flink-clients_${scala.binary.version} + ${project.version} + provided + + + + org.apache.flink + flink-runtime_${scala.binary.version} + ${project.version} + provided + + + + org.scala-lang + scala-library + + + + io.kubernetes + client-java + 3.0.0 + compile + + + + + + org.apache.flink + flink-test-utils-junit + + + + + + + + org.apache.maven.plugins + maven-enforcer-plugin + + + dependency-convergence + + enforce + + + true + + + + + + + + net.alchim31.maven + scala-maven-plugin + + + + scala-compile-first + process-resources + + compile + + + + + + scala-test-compile + process-test-resources + + testCompile + + + + + + -Xms128m + -Xmx512m + + + + + + + org.apache.maven.plugins + maven-eclipse-plugin + 2.8 + + true + + org.scala-ide.sdt.core.scalanature + org.eclipse.jdt.core.javanature + + + org.scala-ide.sdt.core.scalabuilder + + + org.scala-ide.sdt.launching.SCALA_CONTAINER + org.eclipse.jdt.launching.JRE_CONTAINER + + + org.scala-lang:scala-library + org.scala-lang:scala-compiler + + + **/*.scala + **/*.java + + + + + + + org.codehaus.mojo + build-helper-maven-plugin + 1.7 + + + + add-source + generate-sources + + add-source + + + + src/main/scala + + + + + + add-test-source + generate-test-sources + + add-test-source + + + + src/test/scala + + + + + + + + + org.scalastyle + scalastyle-maven-plugin + + ${project.basedir}/../tools/maven/scalastyle-config.xml + + + + + org.apache.maven.plugins + maven-jar-plugin + + + + test-jar + + + + + + + + diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java new file mode 100644 index 0000000000000..e0a6b81a4bce9 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/FlinkKubernetesOptions.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; + +/** + * Parameters that will be used in Flink on k8s cluster. + * */ +public class FlinkKubernetesOptions { + + private Configuration configuration; + + public Configuration getConfiguration() { + return configuration; + } + + public FlinkKubernetesOptions(Configuration configuration) { + this.configuration = configuration; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java new file mode 100644 index 0000000000000..8e3b41893cbdd --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java @@ -0,0 +1,120 @@ +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.client.KubernetesClient; +import org.apache.flink.kubernetes.client.KubernetesClientImpl; +import org.apache.flink.runtime.clusterframework.ApplicationStatus; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; +import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; +import java.io.IOException; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public class KubernetesResourceManager extends ResourceManager +{ + + protected static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + + private final Configuration configuration; + private final Map environment; + + /** Client to communicate with the Node manager and launch TaskExecutor processes. */ + private KubernetesClient nodeManagerClient; + + public KubernetesResourceManager( + RpcService rpcService, + String resourceManagerEndpointId, + ResourceID resourceId, + Configuration flinkConfig, + Map env, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + SlotManager slotManager, + MetricRegistry metricRegistry, + JobLeaderIdService jobLeaderIdService, + ClusterInformation clusterInformation, + FatalErrorHandler fatalErrorHandler, + @Nullable String webInterfaceUrl, + JobManagerMetricGroup jobManagerMetricGroup + ) + { + super( + rpcService, + resourceManagerEndpointId, + resourceId, + highAvailabilityServices, + heartbeatServices, + slotManager, + metricRegistry, + jobLeaderIdService, + clusterInformation, + fatalErrorHandler, + jobManagerMetricGroup + ); + this.configuration = flinkConfig; + this.environment = env; + } + + @Override + protected void initialize() throws ResourceManagerException + { + try{ + nodeManagerClient = new KubernetesClientImpl(configuration, environment); + } catch (IOException e) { + throw new ResourceManagerException("Error while initializing K8s client", e); + } + } + + @Override + protected void internalDeregisterApplication( + ApplicationStatus finalStatus, @Nullable String optionalDiagnostics + ) throws ResourceManagerException + { + LOG.info("Shutting down and cleaning the cluster up."); + nodeManagerClient.stopAndCleanupCluster(null); + } + + @Override + public Collection startNewWorker(ResourceProfile resourceProfile) + { + LOG.info("Starting a new worker."); + nodeManagerClient.createClusterPod(resourceProfile); + return Collections.singletonList(resourceProfile); + } + + @Override + public boolean stopWorker(ResourceID worker) + { + LOG.info("Stopping worker {}.", worker.getResourceID()); + try { + nodeManagerClient.terminateClusterPod(worker); + return true; + } catch (Exception e) { + LOG.error("Could not terminate a worker", e); + return false; + } + } + + @Override + protected ResourceID workerStarted(ResourceID resourceID) + { + // TODO Hooray it started. Remove from pending + return resourceID; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java new file mode 100644 index 0000000000000..647f700cf1ffd --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java @@ -0,0 +1,104 @@ +package org.apache.flink.kubernetes; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.security.SecurityConfiguration; +import org.apache.flink.runtime.security.SecurityUtils; +import org.apache.flink.runtime.taskexecutor.TaskManagerRunner; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; +import org.apache.flink.util.ExceptionUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import scala.NotImplementedError; + +import java.lang.reflect.UndeclaredThrowableException; +import java.util.Map; +import java.util.concurrent.Callable; + + +/** + * This class is the executable entry point for running a TaskExecutor in a YARN container. + */ +public class KubernetesTaskExecutorRunner +{ + protected static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskExecutorRunner.class); + + /** The process environment variables. */ + private static final Map ENV = System.getenv(); + + /** The exit code returned if the initialization of the yarn task executor runner failed. */ + private static final int INIT_ERROR_EXIT_CODE = 31; + + // ------------------------------------------------------------------------ + // Program entry point + // ------------------------------------------------------------------------ + + /** + * The entry point for the YARN task executor runner. + * + * @param args The command line arguments. + */ + public static void main(String[] args) { + EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + run(args); + } + + /** + * The instance entry point for the YARN task executor. Obtains user group information and calls + * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a + * privileged action. + * + * @param args The command line arguments. + */ + private static void run(String[] args) { + try { + LOG.debug("All environment variables: {}", ENV); + + Configuration configuration = getConfiguration(args); + SecurityConfiguration sc = new SecurityConfiguration(configuration); + + final String containerId = getContainerId(configuration); + + // use the hostname passed by job manager + final String taskExecutorHostname = getHostName(); + if (taskExecutorHostname != null) { + configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname); + } + + SecurityUtils.install(sc); + + SecurityUtils.getInstalledContext().runSecured((Callable) () -> { + TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId)); + return null; + }); + } + catch (Throwable t) { + final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); + // make sure that everything whatever ends up in the log + LOG.error("Kubernetes TaskManager initialization failed.", strippedThrowable); + System.exit(INIT_ERROR_EXIT_CODE); + } + } + + private static String getHostName() + { + throw new NotImplementedError(); + } + + private static String getContainerId(Configuration configuration) + { + throw new NotImplementedError(); + } + + private static Configuration getConfiguration(String[] args) + { + throw new NotImplementedError(); + } + +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java new file mode 100644 index 0000000000000..ca33bb7dcdc93 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.client; + +/** + * represent a endpoint. + * */ +public class Endpoint { + + private String address; + + private int port; + + public Endpoint(String address, int port) { + this.address = address; + this.port = port; + } + + public String getAddress() { + return address; + } + + public int getPort() { + return port; + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java new file mode 100644 index 0000000000000..63fec76ec1f24 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.client; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + +/** + * The client to talk with kubernetes. + * */ +public interface KubernetesClient extends AutoCloseable { + + /** + * Create kubernetes services and expose endpoints for access outside cluster. + */ + Endpoint createClusterService(); + + /** + * Create cluster pod. + */ + void createClusterPod(ResourceProfile resourceProfile); + + + /** + * Terminate a cluster pod + * @param resourceID cluster pod id + */ + void terminateClusterPod(ResourceID resourceID); + + /** + * stop cluster and clean up all resources, include services, auxiliary services and all running pods. + * */ + void stopAndCleanupCluster(String clusterId); + + /** + * Log exception. + * */ + void logException(Exception e); + + /** + * retrieval rest endpoint of the giving flink clusterId. + */ + Endpoint getResetEndpoint(String flinkClusterId); +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java new file mode 100644 index 0000000000000..6bb638c00e043 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java @@ -0,0 +1,71 @@ +package org.apache.flink.kubernetes.client; + +import io.kubernetes.client.ApiClient; +import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.util.Config; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import scala.NotImplementedError; + +import java.io.IOException; +import java.util.Map; + +public class KubernetesClientImpl implements KubernetesClient +{ + private final Configuration configuration; + private final Map environment; + private final CoreV1Api coreV1Api; + + + public KubernetesClientImpl(Configuration configuration, Map environment) throws IOException + { + this.configuration = configuration; + this.environment = environment; + final ApiClient apiClient = Config.defaultClient(); + io.kubernetes.client.Configuration.setDefaultApiClient(apiClient); + this.coreV1Api = new CoreV1Api(apiClient); + } + + @Override + public Endpoint createClusterService() + { + throw new NotImplementedError(); + } + + @Override + public void createClusterPod(ResourceProfile resourceProfile) + { + throw new NotImplementedError(); + } + + @Override + public void terminateClusterPod(ResourceID resourceID) + { + throw new NotImplementedError(); + } + + @Override + public void stopAndCleanupCluster(String clusterId) + { + throw new NotImplementedError(); + } + + @Override + public void logException(Exception e) + { + throw new NotImplementedError(); + } + + @Override + public Endpoint getResetEndpoint(String flinkClusterId) + { + throw new NotImplementedError(); + } + + @Override + public void close() throws Exception + { + throw new NotImplementedError(); + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java new file mode 100644 index 0000000000000..d035212c02440 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java @@ -0,0 +1,152 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.cluster; + +import org.apache.flink.client.deployment.ClusterDeploymentException; +import org.apache.flink.client.deployment.ClusterDescriptor; +import org.apache.flink.client.deployment.ClusterRetrieveException; +import org.apache.flink.client.deployment.ClusterSpecification; +import org.apache.flink.client.program.ClusterClient; +import org.apache.flink.client.program.rest.RestClusterClient; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.JobManagerOptions; +import org.apache.flink.kubernetes.FlinkKubernetesOptions; +import org.apache.flink.kubernetes.client.Endpoint; +import org.apache.flink.kubernetes.client.KubernetesClient; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.util.FlinkException; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import sun.reflect.generics.reflectiveObjects.NotImplementedException; + +import javax.annotation.Nonnull; + +import java.util.Arrays; +import java.util.List; +import java.util.UUID; + +/** + * Kubernetes specific {@link ClusterDescriptor} implementation. + */ +public class KubernetesClusterDescriptor implements ClusterDescriptor { + + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClusterDescriptor.class); + + private static final String CLUSTER_ID_PREFIX = "flink-session-cluster-"; + + private static final String CLUSTER_DESCRIPTION = "Kubernetes cluster"; + + private FlinkKubernetesOptions options; + + private KubernetesClient client; + + public KubernetesClusterDescriptor(@Nonnull FlinkKubernetesOptions options, @Nonnull KubernetesClient client) { + this.options = options; + this.client = client; + } + + private String generateClusterId() { + return CLUSTER_ID_PREFIX + UUID.randomUUID(); + } + + @Override + public String getClusterDescription() { + return CLUSTER_DESCRIPTION; + } + + private ClusterClient createClusterEndpoint(Endpoint clusterEndpoint, String clusterId) throws Exception { + + Configuration configuration = new Configuration(this.options.getConfiguration()); + configuration.setString(JobManagerOptions.ADDRESS, clusterEndpoint.getAddress()); + configuration.setInteger(JobManagerOptions.PORT, clusterEndpoint.getPort()); + return new RestClusterClient<>(configuration, clusterId); + } + + @Override + public ClusterClient retrieve(String clusterId) throws ClusterRetrieveException { + try { + Endpoint clusterEndpoint = this.client.getResetEndpoint(clusterId); + return this.createClusterEndpoint(clusterEndpoint, clusterId); + } catch (Exception e) { + this.client.logException(e); + throw new ClusterRetrieveException("Could not create the RestClusterClient.", e); + } + } + + @Override + public ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) + throws ClusterDeploymentException { + + String clusterId = this.generateClusterId(); + + //TODO: add arguments + final List args = Arrays.asList(); + + return this.deployClusterInternal(clusterId, args); + } + + @Override + public ClusterClient deployJobCluster(ClusterSpecification clusterSpecification, JobGraph jobGraph, boolean detached) { + throw new NotImplementedException(); + } + + @Nonnull + private ClusterClient deployClusterInternal(String clusterId, List args) throws ClusterDeploymentException { + try { + Endpoint clusterEndpoint = this.client.createClusterService(); + return this.createClusterEndpoint(clusterEndpoint, clusterId); + } catch (Exception e) { + this.client.logException(e); + this.tryKillCluster(clusterId); + throw new ClusterDeploymentException("Could not create Kubernetes cluster " + clusterId, e); + } + } + + /** + * Try to kill cluster without throw exception. + */ + private void tryKillCluster(String clusterId) { + try { + this.killCluster(clusterId); + } catch (Exception e) { + this.client.logException(e); + } + } + + @Override + public void killCluster(String clusterId) throws FlinkException { + try { + this.client.stopAndCleanupCluster(clusterId); + } catch (Exception e) { + this.client.logException(e); + throw new FlinkException("Could not create Kubernetes cluster " + clusterId); + } + } + + @Override + public void close() { + try { + this.client.close(); + } catch (Exception e) { + this.client.logException(e); + LOG.error("failed to close client, exception {}", e.toString()); + } + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java new file mode 100644 index 0000000000000..fca5e92754b23 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java @@ -0,0 +1,64 @@ +package org.apache.flink.kubernetes.entrypoint; + + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.KubernetesResourceManager; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.entrypoint.ClusterInformation; +import org.apache.flink.runtime.heartbeat.HeartbeatServices; +import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.metrics.MetricRegistry; +import org.apache.flink.runtime.metrics.groups.JobManagerMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManager; +import org.apache.flink.runtime.resourcemanager.ResourceManagerFactory; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServices; +import org.apache.flink.runtime.resourcemanager.ResourceManagerRuntimeServicesConfiguration; +import org.apache.flink.runtime.rpc.FatalErrorHandler; +import org.apache.flink.runtime.rpc.RpcService; + +import javax.annotation.Nullable; + +public enum KubernetesResourceManagerFactory implements ResourceManagerFactory +{ + INSTANCE; + + @Override + public ResourceManager createResourceManager( + Configuration configuration, + ResourceID resourceId, + RpcService rpcService, + HighAvailabilityServices highAvailabilityServices, + HeartbeatServices heartbeatServices, + MetricRegistry metricRegistry, + FatalErrorHandler fatalErrorHandler, + ClusterInformation clusterInformation, + @Nullable String webInterfaceUrl, + JobManagerMetricGroup jobManagerMetricGroup + ) throws Exception + { + final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration + .fromConfiguration(configuration); + final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( + rmServicesConfiguration, + highAvailabilityServices, + rpcService.getScheduledExecutor() + ); + + return new KubernetesResourceManager( + rpcService, + ResourceManager.RESOURCE_MANAGER_NAME, + resourceId, + configuration, + System.getenv(), + highAvailabilityServices, + heartbeatServices, + rmRuntimeServices.getSlotManager(), + metricRegistry, + rmRuntimeServices.getJobLeaderIdService(), + clusterInformation, + fatalErrorHandler, + webInterfaceUrl, + jobManagerMetricGroup + ); + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java new file mode 100644 index 0000000000000..9cfd5053d3806 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.kubernetes.entrypoint; + +import org.apache.flink.configuration.Configuration; +import org.apache.flink.runtime.entrypoint.ClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.EntrypointClusterConfiguration; +import org.apache.flink.runtime.entrypoint.EntrypointClusterConfigurationParserFactory; +import org.apache.flink.runtime.entrypoint.FlinkParseException; +import org.apache.flink.runtime.entrypoint.SessionClusterEntrypoint; +import org.apache.flink.runtime.entrypoint.component.DispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.component.SessionDispatcherResourceManagerComponentFactory; +import org.apache.flink.runtime.entrypoint.parser.CommandLineParser; +import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.runtime.util.JvmShutdownSafeguard; +import org.apache.flink.runtime.util.SignalHandler; + +/** + * Entrypoint for a Kubernetes session cluster. + */ +public class KubernetesSessionClusterEntrypoint extends SessionClusterEntrypoint { + + public KubernetesSessionClusterEntrypoint(Configuration configuration) { + super(configuration); + } + + @Override + protected DispatcherResourceManagerComponentFactory createDispatcherResourceManagerComponentFactory(Configuration configuration) { + return new SessionDispatcherResourceManagerComponentFactory(KubernetesResourceManagerFactory.INSTANCE); + } + + public static void main(String[] args) { + // startup checks and logging + EnvironmentInformation.logEnvironmentInfo(LOG, KubernetesSessionClusterEntrypoint.class.getSimpleName(), args); + SignalHandler.register(LOG); + JvmShutdownSafeguard.installAsShutdownHook(LOG); + + EntrypointClusterConfiguration entrypointClusterConfiguration = null; + final CommandLineParser commandLineParser = new CommandLineParser<>(new EntrypointClusterConfigurationParserFactory()); + + try { + entrypointClusterConfiguration = commandLineParser.parse(args); + } catch (FlinkParseException e) { + LOG.error("Could not parse command line arguments {}.", args, e); + commandLineParser.printHelp(KubernetesSessionClusterEntrypoint.class.getSimpleName()); + System.exit(1); + } + + + LOG.info("Started {}.", KubernetesSessionClusterEntrypoint.class.getSimpleName()); + + Configuration configuration = loadConfiguration(entrypointClusterConfiguration); + final KubernetesSessionClusterEntrypoint entrypoint = + new KubernetesSessionClusterEntrypoint(configuration); + ClusterEntrypoint.runClusterEntrypoint(entrypoint); + } +} diff --git a/pom.xml b/pom.xml index 5fd305a7514d5..faf2115db56a1 100644 --- a/pom.xml +++ b/pom.xml @@ -82,6 +82,7 @@ under the License. flink-metrics flink-yarn flink-yarn-tests + flink-kubernetes flink-fs-tests flink-docs From e4da06d426f552c9b75cd3e8b67db2cbf42f0efa Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 22 Mar 2019 12:35:34 +0700 Subject: [PATCH 05/13] BACKEND-1137: K8s integration --- flink-dist/pom.xml | 6 + .../src/main/flink-bin/bin/flink-console.sh | 2 + .../kubernetes/jobmanager-deployment.yaml | 36 +++++ .../kubernetes/jobmanager-service.yaml | 23 +++ flink-kubernetes/pom.xml | 2 +- .../kubernetes/KubernetesResourceManager.java | 16 +- ....java => KubernetesTaskManagerRunner.java} | 78 +++++----- .../kubernetes/client/KubernetesClient.java | 5 +- .../client/KubernetesClientImpl.java | 140 +++++++++++++++++- .../exception/KubernetesClientException.java | 12 ++ .../cluster/KubernetesClusterDescriptor.java | 2 + .../taskexecutor/TaskManagerRunner.java | 2 +- 12 files changed, 267 insertions(+), 57 deletions(-) create mode 100644 flink-kubernetes/kubernetes/jobmanager-deployment.yaml create mode 100644 flink-kubernetes/kubernetes/jobmanager-service.yaml rename flink-kubernetes/src/main/java/org/apache/flink/kubernetes/{KubernetesTaskExecutorRunner.java => KubernetesTaskManagerRunner.java} (53%) create mode 100644 flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java diff --git a/flink-dist/pom.xml b/flink-dist/pom.xml index deedb3027a900..5c117569262dd 100644 --- a/flink-dist/pom.xml +++ b/flink-dist/pom.xml @@ -140,6 +140,12 @@ under the License. + + org.apache.flink + flink-kubernetes_${scala.binary.version} + ${project.version} + + diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh index a5168fe9562c8..8c19cdce9a8dd 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-console.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -56,6 +56,8 @@ case $SERVICE in ;; esac +CLASS_TO_RUN = ${FLINK_CLASS_TO_RUN:-$CLASS_TO_RUN} + FLINK_TM_CLASSPATH=`constructFlinkClassPath` log_setting=("-Dlog4j.configuration=file:${FLINK_CONF_DIR}/log4j-console.properties" "-Dlogback.configurationFile=file:${FLINK_CONF_DIR}/logback-console.xml") diff --git a/flink-kubernetes/kubernetes/jobmanager-deployment.yaml b/flink-kubernetes/kubernetes/jobmanager-deployment.yaml new file mode 100644 index 0000000000000..3802f31c7d869 --- /dev/null +++ b/flink-kubernetes/kubernetes/jobmanager-deployment.yaml @@ -0,0 +1,36 @@ +apiVersion: extensions/v1beta1 +kind: Deployment +metadata: + name: flink-jobmanager +spec: + replicas: 1 + template: + metadata: + labels: + app: flink + component: jobmanager + spec: + containers: + - name: jobmanager + image: flink-mmx:latest + imagePullPolicy: IfNotPresent + args: + - jobmanager + ports: + - containerPort: 6123 + name: rpc + - containerPort: 6124 + name: blob + - containerPort: 6125 + name: query + - containerPort: 8081 + name: ui + env: + - name: JOB_MANAGER_RPC_ADDRESS + value: flink-jobmanager + - name: FLINK_CLASS_TO_RUN + value: org.apache.flink.kubernetes.entrypoint.KubernetesSessionClusterEntrypoint + - name: FLINK_NAMESPACE + value: default + - name: FLINK_TM_IMAGE + value: flink-mmx:latest diff --git a/flink-kubernetes/kubernetes/jobmanager-service.yaml b/flink-kubernetes/kubernetes/jobmanager-service.yaml new file mode 100644 index 0000000000000..57182037fdf4c --- /dev/null +++ b/flink-kubernetes/kubernetes/jobmanager-service.yaml @@ -0,0 +1,23 @@ +apiVersion: v1 +kind: Service +metadata: + name: flink-jobmanager +spec: + ports: + - name: rpc + port: 6123 + # nodePort: 6123 + - name: blob + port: 6124 + # nodePort: 6124 + - name: query + port: 6125 + # nodePort: 6125 + - name: ui + port: 8081 + nodePort: 30000 + type: NodePort + selector: + app: flink + component: jobmanager + diff --git a/flink-kubernetes/pom.xml b/flink-kubernetes/pom.xml index 1ce6b3e8a5bc7..fec076c9a6ba2 100644 --- a/flink-kubernetes/pom.xml +++ b/flink-kubernetes/pom.xml @@ -61,7 +61,7 @@ io.kubernetes client-java - 3.0.0 + 4.0.0 compile diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java index 8e3b41893cbdd..11dcd2412fae7 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java @@ -3,6 +3,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.client.KubernetesClient; import org.apache.flink.kubernetes.client.KubernetesClientImpl; +import org.apache.flink.kubernetes.client.exception.KubernetesClientException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; @@ -28,6 +29,10 @@ public class KubernetesResourceManager extends ResourceManager { + public final static String FLINK_TM_IMAGE = "FLINK_TM_IMAGE"; + public final static String FLINK_NAMESPACE = "FLINK_NAMESPACE"; + public final static String FLINK_TM_RESOURCE_ID = "FLINK_TM_RESOURCE_ID"; + public final static String FLINK_CLASS_TO_RUN = "FLINK_CLASS_TO_RUN"; protected static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); @@ -94,14 +99,19 @@ protected void internalDeregisterApplication( public Collection startNewWorker(ResourceProfile resourceProfile) { LOG.info("Starting a new worker."); - nodeManagerClient.createClusterPod(resourceProfile); + try { + nodeManagerClient.createClusterPod(resourceProfile); + } + catch (KubernetesClientException e) { + throw new RuntimeException("Could not start a new worker", e); + } return Collections.singletonList(resourceProfile); } @Override public boolean stopWorker(ResourceID worker) { - LOG.info("Stopping worker {}.", worker.getResourceID()); + LOG.info("Stopping worker [{}].", worker.getResourceID()); try { nodeManagerClient.terminateClusterPod(worker); return true; @@ -114,7 +124,7 @@ public boolean stopWorker(ResourceID worker) @Override protected ResourceID workerStarted(ResourceID resourceID) { - // TODO Hooray it started. Remove from pending + // Hooray it started! return resourceID; } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java similarity index 53% rename from flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java rename to flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java index 647f700cf1ffd..f70c48b768f2f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskExecutorRunner.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java @@ -1,7 +1,7 @@ package org.apache.flink.kubernetes; import org.apache.flink.configuration.Configuration; -import org.apache.flink.configuration.TaskManagerOptions; +import org.apache.flink.core.fs.FileSystem; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.security.SecurityConfiguration; import org.apache.flink.runtime.security.SecurityUtils; @@ -12,24 +12,24 @@ import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.NotImplementedError; +import java.io.IOException; import java.lang.reflect.UndeclaredThrowableException; import java.util.Map; import java.util.concurrent.Callable; /** - * This class is the executable entry point for running a TaskExecutor in a YARN container. + * This class is the executable entry point for running a TaskExecutor in a Kubernetes container. */ -public class KubernetesTaskExecutorRunner +public class KubernetesTaskManagerRunner { - protected static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskExecutorRunner.class); + protected static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskManagerRunner.class); /** The process environment variables. */ private static final Map ENV = System.getenv(); - /** The exit code returned if the initialization of the yarn task executor runner failed. */ + /** The exit code returned if the initialization of the Kubernetes task executor runner failed. */ private static final int INIT_ERROR_EXIT_CODE = 31; // ------------------------------------------------------------------------ @@ -37,44 +37,41 @@ public class KubernetesTaskExecutorRunner // ------------------------------------------------------------------------ /** - * The entry point for the YARN task executor runner. + * The entry point for the Kubernetes task executor runner. * * @param args The command line arguments. */ - public static void main(String[] args) { - EnvironmentInformation.logEnvironmentInfo(LOG, "YARN TaskExecutor runner", args); + public static void main(String[] args) throws Exception + { + EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); - run(args); - } + long maxOpenFileHandles = EnvironmentInformation.getOpenFileHandlesLimit(); - /** - * The instance entry point for the YARN task executor. Obtains user group information and calls - * the main work method {@link TaskManagerRunner#runTaskManager(Configuration, ResourceID)} as a - * privileged action. - * - * @param args The command line arguments. - */ - private static void run(String[] args) { - try { - LOG.debug("All environment variables: {}", ENV); + if (maxOpenFileHandles != -1L) { + LOG.info("Maximum number of open file descriptors is {}.", maxOpenFileHandles); + } else { + LOG.info("Cannot determine the maximum number of open file descriptors"); + } + + final Configuration configuration = TaskManagerRunner.loadConfiguration(args); - Configuration configuration = getConfiguration(args); - SecurityConfiguration sc = new SecurityConfiguration(configuration); + try { + FileSystem.initialize(configuration); + } catch (IOException e) { + throw new IOException("Error while setting the default " + + "filesystem scheme from configuration.", e); + } - final String containerId = getContainerId(configuration); + SecurityUtils.install(new SecurityConfiguration(configuration)); - // use the hostname passed by job manager - final String taskExecutorHostname = getHostName(); - if (taskExecutorHostname != null) { - configuration.setString(TaskManagerOptions.HOST, taskExecutorHostname); - } + LOG.info("All environment variables: {}", ENV); - SecurityUtils.install(sc); + try { SecurityUtils.getInstalledContext().runSecured((Callable) () -> { - TaskManagerRunner.runTaskManager(configuration, new ResourceID(containerId)); + TaskManagerRunner.runTaskManager(configuration, new ResourceID(getContainerId())); return null; }); } @@ -86,19 +83,16 @@ private static void run(String[] args) { } } - private static String getHostName() - { - throw new NotImplementedError(); - } - private static String getContainerId(Configuration configuration) + private static String getContainerId() { - throw new NotImplementedError(); - } - - private static Configuration getConfiguration(String[] args) - { - throw new NotImplementedError(); + if (ENV.containsKey(KubernetesResourceManager.FLINK_TM_RESOURCE_ID)) { + return ENV.get(KubernetesResourceManager.FLINK_TM_RESOURCE_ID); + } else { + LOG.warn("ResourceID env variable {} is not found. Generating resource id", + KubernetesResourceManager.FLINK_TM_RESOURCE_ID); + return ResourceID.generate().getResourceIdString(); + } } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java index 63fec76ec1f24..638ffb85b2d3e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java @@ -20,6 +20,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.kubernetes.client.exception.KubernetesClientException; /** * The client to talk with kubernetes. @@ -34,14 +35,14 @@ public interface KubernetesClient extends AutoCloseable { /** * Create cluster pod. */ - void createClusterPod(ResourceProfile resourceProfile); + void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException; /** * Terminate a cluster pod * @param resourceID cluster pod id */ - void terminateClusterPod(ResourceID resourceID); + void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException; /** * stop cluster and clean up all resources, include services, auxiliary services and all running pods. diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java index 6bb638c00e043..4a0b3f046ee9a 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java @@ -1,30 +1,55 @@ package org.apache.flink.kubernetes.client; +import com.google.gson.JsonSyntaxException; import io.kubernetes.client.ApiClient; +import io.kubernetes.client.ApiException; import io.kubernetes.client.apis.CoreV1Api; +import io.kubernetes.client.models.V1Container; +import io.kubernetes.client.models.V1ContainerPort; +import io.kubernetes.client.models.V1EnvVar; +import io.kubernetes.client.models.V1ObjectMeta; +import io.kubernetes.client.models.V1Pod; +import io.kubernetes.client.models.V1PodSpec; import io.kubernetes.client.util.Config; import org.apache.flink.configuration.Configuration; +import org.apache.flink.kubernetes.KubernetesResourceManager; +import org.apache.flink.kubernetes.client.exception.KubernetesClientException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import scala.NotImplementedError; import java.io.IOException; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; import java.util.Map; +import static org.apache.flink.util.Preconditions.checkNotNull; + public class KubernetesClientImpl implements KubernetesClient { + private static final Logger LOG = LoggerFactory.getLogger(KubernetesClient.class); + private final Configuration configuration; private final Map environment; private final CoreV1Api coreV1Api; - + private final String flinkTMImageName; + private final String flinkNamespace; + private final Map podResourceProfiles = new HashMap<>(); public KubernetesClientImpl(Configuration configuration, Map environment) throws IOException { this.configuration = configuration; this.environment = environment; + // Default user is used for managing deployments and their pods + // Make sure default user has enough permissions for doing that final ApiClient apiClient = Config.defaultClient(); io.kubernetes.client.Configuration.setDefaultApiClient(apiClient); this.coreV1Api = new CoreV1Api(apiClient); + this.flinkNamespace = checkNotNull(this.environment.get(KubernetesResourceManager.FLINK_NAMESPACE)); + this.flinkTMImageName = checkNotNull(this.environment.get(KubernetesResourceManager.FLINK_TM_IMAGE)); } @Override @@ -34,27 +59,121 @@ public Endpoint createClusterService() } @Override - public void createClusterPod(ResourceProfile resourceProfile) + public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException { - throw new NotImplementedError(); + final ResourceID resourceID = ResourceID.generate(); + final String podName = getPodName(resourceID); + + LOG.info("Creating a cluster pod [{}] for a resource profile [{}]", podName, resourceProfile); + V1Pod body = new V1Pod() + .apiVersion("v1") + .kind("Pod") + .metadata( + new V1ObjectMeta() + .name(podName) + .labels(new HashMap() + {{ + put("app", "flink"); + put("component", "taskmanager"); + put("role", "taskmanager"); + put("ResourceId", resourceID.getResourceIdString()); + put("CpuCores", String.valueOf(resourceProfile.getCpuCores())); + put("MemoryInMB", String.valueOf(resourceProfile.getMemoryInMB())); + put("HeapMemoryInMB", String.valueOf(resourceProfile.getHeapMemoryInMB())); + put("DirectMemoryInMB", String.valueOf(resourceProfile.getDirectMemoryInMB())); + put("NativeMemoryInMB", String.valueOf(resourceProfile.getNativeMemoryInMB())); + put("NetworkMemoryInMB", String.valueOf(resourceProfile.getNetworkMemoryInMB())); + put("OperatorsMemoryInMB", String.valueOf(resourceProfile.getOperatorsMemoryInMB())); + resourceProfile.getExtendedResources().forEach( + (key, resource) -> { + put(key, resource.getName()); + put(resource.getName(), String.valueOf(resource.getValue())); + } + ); + }} + )) + .spec( + new V1PodSpec() + .containers(Collections.singletonList( + new V1Container() + .name("taskmanager") + .image(flinkTMImageName) + .imagePullPolicy("IfNotPresent") + .args(Collections.singletonList("taskmanager")) + .ports(Arrays.asList( + new V1ContainerPort().containerPort(6121).name("data"), + new V1ContainerPort().containerPort(6122).name("rpc"), + new V1ContainerPort().containerPort(6125).name("query") + )) + .env(Arrays.asList( + new V1EnvVar() + .name("JOB_MANAGER_RPC_ADDRESS") + .value("flink-jobmanager"), + new V1EnvVar() + .name(KubernetesResourceManager.FLINK_TM_RESOURCE_ID) + .value(resourceID.getResourceIdString()), + new V1EnvVar() + .name(KubernetesResourceManager.FLINK_CLASS_TO_RUN) + .value("org.apache.flink.kubernetes.KubernetesTaskManagerRunner") + )) + ))); + try { + coreV1Api.createNamespacedPod(flinkNamespace, body, false, null, null); + podResourceProfiles.put(podName, resourceProfile); + } + catch (ApiException e) { + final String message = String.format("Cannot create a pod for resource profile [%s]", resourceProfile); + throw new KubernetesClientException(message, e); + } } @Override - public void terminateClusterPod(ResourceID resourceID) + public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException { - throw new NotImplementedError(); + final String podName = getPodName(resourceID); + + LOG.info("Terminating a cluster pod [{}] for a resource profile [{}]", podName, resourceID); + try { + coreV1Api.deleteNamespacedPod(podName, flinkNamespace, null, null, null, 0, null, null); + podResourceProfiles.remove(podName); + } + catch (ApiException e) { + if (e.getMessage().equals("Not Found")) { + LOG.warn("Could not delete a pod [{}] as it was not found", podName); + } else { + final String message = + String.format("Could not delete a pod [%s] for resource profile [%s]", podName, flinkNamespace); + throw new KubernetesClientException(message, e); + } + } + catch (JsonSyntaxException e) { + // It's a known issue until the Swagger spec is updated to OpenAPI 3.0 + // https://github.com/kubernetes-client/java/issues/86 + // Simply ignoring the exception + } } @Override public void stopAndCleanupCluster(String clusterId) { - throw new NotImplementedError(); + LOG.info("Stopping the cluster and deleting all its task manager pods"); + podResourceProfiles.forEach( + (podName, resourceProfile) -> { + try { + coreV1Api.deleteNamespacedPod(podName, flinkNamespace, null, null, null, 0, null, null); + } + catch (ApiException e) { + LOG.error("Could not delete a pod [{}]", podName, e); + } + } + ); + podResourceProfiles.clear(); } @Override public void logException(Exception e) { - throw new NotImplementedError(); + LOG.error("Exception occurred", e); } @Override @@ -64,8 +183,13 @@ public Endpoint getResetEndpoint(String flinkClusterId) } @Override - public void close() throws Exception + public void close() { throw new NotImplementedError(); } + + private String getPodName(ResourceID resourceId) + { + return "flink-taskmanager-" + resourceId.getResourceIdString(); + } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java new file mode 100644 index 0000000000000..e48f629883089 --- /dev/null +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java @@ -0,0 +1,12 @@ +package org.apache.flink.kubernetes.client.exception; + +public class KubernetesClientException extends Exception +{ + public KubernetesClientException(String message) { + super(message); + } + + public KubernetesClientException(String message, Throwable cause) { + super(message, cause); + } +} diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java index d035212c02440..d43c8c72300df 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java @@ -44,6 +44,7 @@ /** * Kubernetes specific {@link ClusterDescriptor} implementation. + * This class is responsible for cluster creation from scratch */ public class KubernetesClusterDescriptor implements ClusterDescriptor { @@ -111,6 +112,7 @@ public ClusterClient deployJobCluster(ClusterSpecification clusterSpecif private ClusterClient deployClusterInternal(String clusterId, List args) throws ClusterDeploymentException { try { Endpoint clusterEndpoint = this.client.createClusterService(); + this.client.createClusterPod(null); return this.createClusterEndpoint(clusterEndpoint, clusterId); } catch (Exception e) { this.client.logException(e); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java index e06be5329cf5a..30f68d92ecde0 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerRunner.java @@ -309,7 +309,7 @@ public Void call() throws Exception { } } - private static Configuration loadConfiguration(String[] args) throws FlinkParseException { + public static Configuration loadConfiguration(String[] args) throws FlinkParseException { final CommandLineParser commandLineParser = new CommandLineParser<>(new ClusterConfigurationParserFactory()); final ClusterConfiguration clusterConfiguration; From f8fb17254393032a13b5804647781efbcc4fc81f Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 22 Mar 2019 15:07:08 +0700 Subject: [PATCH 06/13] BACKEND-1137: K8s integration - fixed a script --- flink-dist/src/main/flink-bin/bin/flink-console.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-dist/src/main/flink-bin/bin/flink-console.sh b/flink-dist/src/main/flink-bin/bin/flink-console.sh index 8c19cdce9a8dd..dd56450b24be0 100644 --- a/flink-dist/src/main/flink-bin/bin/flink-console.sh +++ b/flink-dist/src/main/flink-bin/bin/flink-console.sh @@ -56,7 +56,7 @@ case $SERVICE in ;; esac -CLASS_TO_RUN = ${FLINK_CLASS_TO_RUN:-$CLASS_TO_RUN} +CLASS_TO_RUN=${FLINK_CLASS_TO_RUN:-$CLASS_TO_RUN} FLINK_TM_CLASSPATH=`constructFlinkClassPath` From 60094aab8657f13a034ba867e637e37783952fe5 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 22 Mar 2019 15:52:20 +0700 Subject: [PATCH 07/13] BACKEND-1137: K8s integration - refactoring --- .../kubernetes/KubernetesResourceManager.java | 4 ++-- ...ntImpl.java => DefaultKubernetesClient.java} | 17 +++++++---------- 2 files changed, 9 insertions(+), 12 deletions(-) rename flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/{KubernetesClientImpl.java => DefaultKubernetesClient.java} (90%) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java index 11dcd2412fae7..71d9e4b0c24c4 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java @@ -2,7 +2,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.client.KubernetesClient; -import org.apache.flink.kubernetes.client.KubernetesClientImpl; +import org.apache.flink.kubernetes.client.DefaultKubernetesClient; import org.apache.flink.kubernetes.client.exception.KubernetesClientException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -80,7 +80,7 @@ public KubernetesResourceManager( protected void initialize() throws ResourceManagerException { try{ - nodeManagerClient = new KubernetesClientImpl(configuration, environment); + nodeManagerClient = new DefaultKubernetesClient(environment); } catch (IOException e) { throw new ResourceManagerException("Error while initializing K8s client", e); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java similarity index 90% rename from flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java rename to flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java index 4a0b3f046ee9a..54f63bcafcbda 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClientImpl.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java @@ -6,12 +6,12 @@ import io.kubernetes.client.apis.CoreV1Api; import io.kubernetes.client.models.V1Container; import io.kubernetes.client.models.V1ContainerPort; +import io.kubernetes.client.models.V1DeleteOptions; import io.kubernetes.client.models.V1EnvVar; import io.kubernetes.client.models.V1ObjectMeta; import io.kubernetes.client.models.V1Pod; import io.kubernetes.client.models.V1PodSpec; import io.kubernetes.client.util.Config; -import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.KubernetesResourceManager; import org.apache.flink.kubernetes.client.exception.KubernetesClientException; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -28,28 +28,24 @@ import static org.apache.flink.util.Preconditions.checkNotNull; -public class KubernetesClientImpl implements KubernetesClient +public class DefaultKubernetesClient implements KubernetesClient { private static final Logger LOG = LoggerFactory.getLogger(KubernetesClient.class); - private final Configuration configuration; - private final Map environment; private final CoreV1Api coreV1Api; private final String flinkTMImageName; private final String flinkNamespace; private final Map podResourceProfiles = new HashMap<>(); - public KubernetesClientImpl(Configuration configuration, Map environment) throws IOException + public DefaultKubernetesClient(Map environment) throws IOException { - this.configuration = configuration; - this.environment = environment; // Default user is used for managing deployments and their pods // Make sure default user has enough permissions for doing that final ApiClient apiClient = Config.defaultClient(); io.kubernetes.client.Configuration.setDefaultApiClient(apiClient); this.coreV1Api = new CoreV1Api(apiClient); - this.flinkNamespace = checkNotNull(this.environment.get(KubernetesResourceManager.FLINK_NAMESPACE)); - this.flinkTMImageName = checkNotNull(this.environment.get(KubernetesResourceManager.FLINK_TM_IMAGE)); + this.flinkNamespace = checkNotNull(environment.get(KubernetesResourceManager.FLINK_NAMESPACE)); + this.flinkTMImageName = checkNotNull(environment.get(KubernetesResourceManager.FLINK_TM_IMAGE)); } @Override @@ -134,7 +130,8 @@ public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientEx LOG.info("Terminating a cluster pod [{}] for a resource profile [{}]", podName, resourceID); try { - coreV1Api.deleteNamespacedPod(podName, flinkNamespace, null, null, null, 0, null, null); + V1DeleteOptions body = new V1DeleteOptions().gracePeriodSeconds(0L).orphanDependents(false); + coreV1Api.deleteNamespacedPod(podName, flinkNamespace, body, null, null, null, null, null); podResourceProfiles.remove(podName); } catch (ApiException e) { From ccd1318757bfa5710e34602b9fa57c72dfb4a041 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 22 Mar 2019 17:15:05 +0700 Subject: [PATCH 08/13] BACKEND-1137: K8s integration - README --- flink-kubernetes/README.md | 65 +++++++++++++++++++ .../jobmanager-deployment.yaml | 0 .../jobmanager-service.yaml | 0 3 files changed, 65 insertions(+) create mode 100644 flink-kubernetes/README.md rename flink-kubernetes/{kubernetes => templates}/jobmanager-deployment.yaml (100%) rename flink-kubernetes/{kubernetes => templates}/jobmanager-service.yaml (100%) diff --git a/flink-kubernetes/README.md b/flink-kubernetes/README.md new file mode 100644 index 0000000000000..5f7b6f5737888 --- /dev/null +++ b/flink-kubernetes/README.md @@ -0,0 +1,65 @@ +# Flink On Kubernetes + +## Job manager +Job manager is run as a pod of corresponding deployment which is supposed to be prepared beforehand. +To create the deployment and a service for it the following templates (under `flink-kubernets/templates` path) +should be used: +``` +jobmanager-deployment.yaml +jobmanager-service.yaml +``` +Example: +``` +kubectl create -f jobmanager-deployment.yaml +kubectl create -f jobmanager-service.yaml +``` +That creates the deployment with one job manager and a service around it that exposes the job manager. + +## Task Manager +Task manager is a temporary essence and is created (and deleted) by a job manager for a particular slot. +No deployments/jobs/services are created for a task manager only pods. +A template for a task manager is hardcoded into the implementation +(`org.apache.flink.kubernetes.client.DefaultKubernetesClient`). + +For every slot request the job manager passes a resource profile to a resource manager +(`org.apache.flink.kubernetes.KubernetesResourceManager`). The resource profile contains specific hardware requirements +(CPU, Memory and other). All these requirements are included into the pod template as labels thus they could be used for +binding specific pods onto specific VMs. + +## Resource Profile +A resource profile might be set to a `StreamTransformation` by calling a corresponding method. A resource profile has +cpu cores, heap memory, direct memory, native memory, network memory and extended resources (GPU and user defined). +Only `StreamTransformation.minResources` is used for a pod template. + +### Resource Profile Configuration Example +TBD + +## Kubernetes Resource Management +Resource management uses a default service account every pod contains. It should has admin privileges to be able +to create and delete pods: +``` +kubectl create clusterrolebinding serviceaccounts-cluster-admin \ + --clusterrole=cluster-admin \ + --group=system:serviceaccounts +``` + +## Build and run +The implementation is based on existing mechanism of packaging (maven) and containerization (docker). + +Prepare a package first: +```mvn clean package -DskipTests``` +Then in needs to be containerized: +```cd flink-contrib/docker-flink/ +sh build.sh --from-local-dist --image-name flink-mmx +``` +If minikube is used then a container image should be uploaded into minikube node. +So before building a container image a docker env is supposed to be exported: +``` +eval $(minikube docker-env) +``` +Job manager deployment and service: +``` +cd ../../flink-kubernetes/templates/ +kubectl create -f jobmanager-deployment.yaml +kubectl create -f jobmanager-service.yaml +``` diff --git a/flink-kubernetes/kubernetes/jobmanager-deployment.yaml b/flink-kubernetes/templates/jobmanager-deployment.yaml similarity index 100% rename from flink-kubernetes/kubernetes/jobmanager-deployment.yaml rename to flink-kubernetes/templates/jobmanager-deployment.yaml diff --git a/flink-kubernetes/kubernetes/jobmanager-service.yaml b/flink-kubernetes/templates/jobmanager-service.yaml similarity index 100% rename from flink-kubernetes/kubernetes/jobmanager-service.yaml rename to flink-kubernetes/templates/jobmanager-service.yaml From 2028d721243a949c1030d359fd95dddb05388403 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Mon, 25 Mar 2019 16:45:36 +0700 Subject: [PATCH 09/13] BACKEND-1137: Fixed style --- .../kubernetes/KubernetesResourceManager.java | 50 +++++++-------- .../KubernetesTaskManagerRunner.java | 28 ++++----- .../client/DefaultKubernetesClient.java | 61 ++++++++----------- .../kubernetes/client/KubernetesClient.java | 4 +- .../exception/KubernetesClientException.java | 6 +- .../KubernetesResourceManagerFactory.java | 11 ++-- .../KubernetesSessionClusterEntrypoint.java | 3 +- 7 files changed, 75 insertions(+), 88 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java index 71d9e4b0c24c4..6e92b087e91bd 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesResourceManager.java @@ -1,8 +1,8 @@ package org.apache.flink.kubernetes; import org.apache.flink.configuration.Configuration; -import org.apache.flink.kubernetes.client.KubernetesClient; import org.apache.flink.kubernetes.client.DefaultKubernetesClient; +import org.apache.flink.kubernetes.client.KubernetesClient; import org.apache.flink.kubernetes.client.exception.KubernetesClientException; import org.apache.flink.runtime.clusterframework.ApplicationStatus; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -18,28 +18,33 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import javax.annotation.Nullable; + import java.io.IOException; import java.util.Collection; import java.util.Collections; import java.util.Map; -public class KubernetesResourceManager extends ResourceManager -{ - public final static String FLINK_TM_IMAGE = "FLINK_TM_IMAGE"; - public final static String FLINK_NAMESPACE = "FLINK_NAMESPACE"; - public final static String FLINK_TM_RESOURCE_ID = "FLINK_TM_RESOURCE_ID"; - public final static String FLINK_CLASS_TO_RUN = "FLINK_CLASS_TO_RUN"; +/** + * Kubernetes Resource Manager. + */ +public class KubernetesResourceManager extends ResourceManager { + public static final String FLINK_TM_IMAGE = "FLINK_TM_IMAGE"; + public static final String FLINK_NAMESPACE = "FLINK_NAMESPACE"; + public static final String FLINK_TM_RESOURCE_ID = "FLINK_TM_RESOURCE_ID"; + public static final String FLINK_CLASS_TO_RUN = "FLINK_CLASS_TO_RUN"; - protected static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); + private static final Logger LOG = LoggerFactory.getLogger(KubernetesResourceManager.class); - private final Configuration configuration; private final Map environment; - /** Client to communicate with the Node manager and launch TaskExecutor processes. */ + /** + * Client to communicate with the Node manager and launch TaskExecutor processes. + */ private KubernetesClient nodeManagerClient; public KubernetesResourceManager( @@ -55,10 +60,8 @@ public KubernetesResourceManager( JobLeaderIdService jobLeaderIdService, ClusterInformation clusterInformation, FatalErrorHandler fatalErrorHandler, - @Nullable String webInterfaceUrl, JobManagerMetricGroup jobManagerMetricGroup - ) - { + ) { super( rpcService, resourceManagerEndpointId, @@ -72,14 +75,12 @@ public KubernetesResourceManager( fatalErrorHandler, jobManagerMetricGroup ); - this.configuration = flinkConfig; this.environment = env; } @Override - protected void initialize() throws ResourceManagerException - { - try{ + protected void initialize() throws ResourceManagerException { + try { nodeManagerClient = new DefaultKubernetesClient(environment); } catch (IOException e) { throw new ResourceManagerException("Error while initializing K8s client", e); @@ -89,28 +90,24 @@ protected void initialize() throws ResourceManagerException @Override protected void internalDeregisterApplication( ApplicationStatus finalStatus, @Nullable String optionalDiagnostics - ) throws ResourceManagerException - { + ) { LOG.info("Shutting down and cleaning the cluster up."); nodeManagerClient.stopAndCleanupCluster(null); } @Override - public Collection startNewWorker(ResourceProfile resourceProfile) - { + public Collection startNewWorker(ResourceProfile resourceProfile) { LOG.info("Starting a new worker."); try { nodeManagerClient.createClusterPod(resourceProfile); - } - catch (KubernetesClientException e) { + } catch (KubernetesClientException e) { throw new RuntimeException("Could not start a new worker", e); } return Collections.singletonList(resourceProfile); } @Override - public boolean stopWorker(ResourceID worker) - { + public boolean stopWorker(ResourceID worker) { LOG.info("Stopping worker [{}].", worker.getResourceID()); try { nodeManagerClient.terminateClusterPod(worker); @@ -122,8 +119,7 @@ public boolean stopWorker(ResourceID worker) } @Override - protected ResourceID workerStarted(ResourceID resourceID) - { + protected ResourceID workerStarted(ResourceID resourceID) { // Hooray it started! return resourceID; } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java index f70c48b768f2f..9e56f88f8407e 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java @@ -10,6 +10,7 @@ import org.apache.flink.runtime.util.JvmShutdownSafeguard; import org.apache.flink.runtime.util.SignalHandler; import org.apache.flink.util.ExceptionUtils; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,14 +23,17 @@ /** * This class is the executable entry point for running a TaskExecutor in a Kubernetes container. */ -public class KubernetesTaskManagerRunner -{ - protected static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskManagerRunner.class); +public class KubernetesTaskManagerRunner { + private static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskManagerRunner.class); - /** The process environment variables. */ + /** + * The process environment variables. + */ private static final Map ENV = System.getenv(); - /** The exit code returned if the initialization of the Kubernetes task executor runner failed. */ + /** + * The exit code returned if the initialization of the Kubernetes task executor runner failed. + */ private static final int INIT_ERROR_EXIT_CODE = 31; // ------------------------------------------------------------------------ @@ -41,8 +45,7 @@ public class KubernetesTaskManagerRunner * * @param args The command line arguments. */ - public static void main(String[] args) throws Exception - { + public static void main(String[] args) throws Exception { EnvironmentInformation.logEnvironmentInfo(LOG, "TaskManager", args); SignalHandler.register(LOG); JvmShutdownSafeguard.installAsShutdownHook(LOG); @@ -61,7 +64,7 @@ public static void main(String[] args) throws Exception FileSystem.initialize(configuration); } catch (IOException e) { throw new IOException("Error while setting the default " + - "filesystem scheme from configuration.", e); + "filesystem scheme from configuration.", e); } SecurityUtils.install(new SecurityConfiguration(configuration)); @@ -74,8 +77,7 @@ public static void main(String[] args) throws Exception TaskManagerRunner.runTaskManager(configuration, new ResourceID(getContainerId())); return null; }); - } - catch (Throwable t) { + } catch (Throwable t) { final Throwable strippedThrowable = ExceptionUtils.stripException(t, UndeclaredThrowableException.class); // make sure that everything whatever ends up in the log LOG.error("Kubernetes TaskManager initialization failed.", strippedThrowable); @@ -83,14 +85,12 @@ public static void main(String[] args) throws Exception } } - - private static String getContainerId() - { + private static String getContainerId() { if (ENV.containsKey(KubernetesResourceManager.FLINK_TM_RESOURCE_ID)) { return ENV.get(KubernetesResourceManager.FLINK_TM_RESOURCE_ID); } else { LOG.warn("ResourceID env variable {} is not found. Generating resource id", - KubernetesResourceManager.FLINK_TM_RESOURCE_ID); + KubernetesResourceManager.FLINK_TM_RESOURCE_ID); return ResourceID.generate().getResourceIdString(); } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java index 54f63bcafcbda..47c15b1cd8877 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java @@ -1,5 +1,10 @@ package org.apache.flink.kubernetes.client; +import org.apache.flink.kubernetes.KubernetesResourceManager; +import org.apache.flink.kubernetes.client.exception.KubernetesClientException; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; + import com.google.gson.JsonSyntaxException; import io.kubernetes.client.ApiClient; import io.kubernetes.client.ApiException; @@ -12,13 +17,8 @@ import io.kubernetes.client.models.V1Pod; import io.kubernetes.client.models.V1PodSpec; import io.kubernetes.client.util.Config; -import org.apache.flink.kubernetes.KubernetesResourceManager; -import org.apache.flink.kubernetes.client.exception.KubernetesClientException; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import scala.NotImplementedError; import java.io.IOException; import java.util.Arrays; @@ -26,10 +26,15 @@ import java.util.HashMap; import java.util.Map; +import scala.NotImplementedError; + import static org.apache.flink.util.Preconditions.checkNotNull; -public class DefaultKubernetesClient implements KubernetesClient -{ +/** + * Kubernetes Client. + * It uses default service client to operate with kubernetes abstractions. + */ +public class DefaultKubernetesClient implements KubernetesClient { private static final Logger LOG = LoggerFactory.getLogger(KubernetesClient.class); private final CoreV1Api coreV1Api; @@ -37,8 +42,7 @@ public class DefaultKubernetesClient implements KubernetesClient private final String flinkNamespace; private final Map podResourceProfiles = new HashMap<>(); - public DefaultKubernetesClient(Map environment) throws IOException - { + public DefaultKubernetesClient(Map environment) throws IOException { // Default user is used for managing deployments and their pods // Make sure default user has enough permissions for doing that final ApiClient apiClient = Config.defaultClient(); @@ -49,14 +53,12 @@ public DefaultKubernetesClient(Map environment) throws IOExcepti } @Override - public Endpoint createClusterService() - { + public Endpoint createClusterService() { throw new NotImplementedError(); } @Override - public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException - { + public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException { final ResourceID resourceID = ResourceID.generate(); final String podName = getPodName(resourceID); @@ -67,8 +69,7 @@ public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesC .metadata( new V1ObjectMeta() .name(podName) - .labels(new HashMap() - {{ + .labels(new HashMap() {{ put("app", "flink"); put("component", "taskmanager"); put("role", "taskmanager"); @@ -116,16 +117,14 @@ public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesC try { coreV1Api.createNamespacedPod(flinkNamespace, body, false, null, null); podResourceProfiles.put(podName, resourceProfile); - } - catch (ApiException e) { + } catch (ApiException e) { final String message = String.format("Cannot create a pod for resource profile [%s]", resourceProfile); throw new KubernetesClientException(message, e); } } @Override - public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException - { + public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException { final String podName = getPodName(resourceID); LOG.info("Terminating a cluster pod [{}] for a resource profile [{}]", podName, resourceID); @@ -133,8 +132,7 @@ public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientEx V1DeleteOptions body = new V1DeleteOptions().gracePeriodSeconds(0L).orphanDependents(false); coreV1Api.deleteNamespacedPod(podName, flinkNamespace, body, null, null, null, null, null); podResourceProfiles.remove(podName); - } - catch (ApiException e) { + } catch (ApiException e) { if (e.getMessage().equals("Not Found")) { LOG.warn("Could not delete a pod [{}] as it was not found", podName); } else { @@ -142,8 +140,7 @@ public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientEx String.format("Could not delete a pod [%s] for resource profile [%s]", podName, flinkNamespace); throw new KubernetesClientException(message, e); } - } - catch (JsonSyntaxException e) { + } catch (JsonSyntaxException e) { // It's a known issue until the Swagger spec is updated to OpenAPI 3.0 // https://github.com/kubernetes-client/java/issues/86 // Simply ignoring the exception @@ -151,15 +148,13 @@ public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientEx } @Override - public void stopAndCleanupCluster(String clusterId) - { + public void stopAndCleanupCluster(String clusterId) { LOG.info("Stopping the cluster and deleting all its task manager pods"); podResourceProfiles.forEach( (podName, resourceProfile) -> { try { coreV1Api.deleteNamespacedPod(podName, flinkNamespace, null, null, null, 0, null, null); - } - catch (ApiException e) { + } catch (ApiException e) { LOG.error("Could not delete a pod [{}]", podName, e); } } @@ -168,25 +163,21 @@ public void stopAndCleanupCluster(String clusterId) } @Override - public void logException(Exception e) - { + public void logException(Exception e) { LOG.error("Exception occurred", e); } @Override - public Endpoint getResetEndpoint(String flinkClusterId) - { + public Endpoint getResetEndpoint(String flinkClusterId) { throw new NotImplementedError(); } @Override - public void close() - { + public void close() { throw new NotImplementedError(); } - private String getPodName(ResourceID resourceId) - { + private String getPodName(ResourceID resourceId) { return "flink-taskmanager-" + resourceId.getResourceIdString(); } } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java index 638ffb85b2d3e..2cdbf82c8200f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java @@ -18,9 +18,9 @@ package org.apache.flink.kubernetes.client; +import org.apache.flink.kubernetes.client.exception.KubernetesClientException; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; -import org.apache.flink.kubernetes.client.exception.KubernetesClientException; /** * The client to talk with kubernetes. @@ -39,7 +39,7 @@ public interface KubernetesClient extends AutoCloseable { /** - * Terminate a cluster pod + * Terminate a cluster pod. * @param resourceID cluster pod id */ void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException; diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java index e48f629883089..2283e2733df88 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/exception/KubernetesClientException.java @@ -1,7 +1,9 @@ package org.apache.flink.kubernetes.client.exception; -public class KubernetesClientException extends Exception -{ +/** + * Kubernetes Client Exception. + */ +public class KubernetesClientException extends Exception { public KubernetesClientException(String message) { super(message); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java index fca5e92754b23..4aec9d670f925 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesResourceManagerFactory.java @@ -1,6 +1,5 @@ package org.apache.flink.kubernetes.entrypoint; - import org.apache.flink.configuration.Configuration; import org.apache.flink.kubernetes.KubernetesResourceManager; import org.apache.flink.runtime.clusterframework.types.ResourceID; @@ -18,8 +17,10 @@ import javax.annotation.Nullable; -public enum KubernetesResourceManagerFactory implements ResourceManagerFactory -{ +/** + * Kubernetes Resource Manager Factory. + */ +public enum KubernetesResourceManagerFactory implements ResourceManagerFactory { INSTANCE; @Override @@ -34,8 +35,7 @@ public ResourceManager createResourceManager( ClusterInformation clusterInformation, @Nullable String webInterfaceUrl, JobManagerMetricGroup jobManagerMetricGroup - ) throws Exception - { + ) throws Exception { final ResourceManagerRuntimeServicesConfiguration rmServicesConfiguration = ResourceManagerRuntimeServicesConfiguration .fromConfiguration(configuration); final ResourceManagerRuntimeServices rmRuntimeServices = ResourceManagerRuntimeServices.fromConfiguration( @@ -57,7 +57,6 @@ public ResourceManager createResourceManager( rmRuntimeServices.getJobLeaderIdService(), clusterInformation, fatalErrorHandler, - webInterfaceUrl, jobManagerMetricGroup ); } diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java index 9cfd5053d3806..e19c4da1251dd 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/entrypoint/KubernetesSessionClusterEntrypoint.java @@ -36,7 +36,7 @@ */ public class KubernetesSessionClusterEntrypoint extends SessionClusterEntrypoint { - public KubernetesSessionClusterEntrypoint(Configuration configuration) { + private KubernetesSessionClusterEntrypoint(Configuration configuration) { super(configuration); } @@ -62,7 +62,6 @@ public static void main(String[] args) { System.exit(1); } - LOG.info("Started {}.", KubernetesSessionClusterEntrypoint.class.getSimpleName()); Configuration configuration = loadConfiguration(entrypointClusterConfiguration); From 41e2f6630f1b5cbd7b4ea30fbcb48d804d4eef61 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Mon, 25 Mar 2019 18:40:11 +0700 Subject: [PATCH 10/13] BACKEND-1137: Refactored and added some comments --- .../KubernetesTaskManagerRunner.java | 1 + .../client/DefaultKubernetesClient.java | 49 ++++++++++--------- 2 files changed, 28 insertions(+), 22 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java index 9e56f88f8407e..8d20d3547e9fb 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/KubernetesTaskManagerRunner.java @@ -22,6 +22,7 @@ /** * This class is the executable entry point for running a TaskExecutor in a Kubernetes container. + * It duplicates an entry point of {@link org.apache.flink.runtime.taskexecutor.TaskManagerRunner} */ public class KubernetesTaskManagerRunner { private static final Logger LOG = LoggerFactory.getLogger(KubernetesTaskManagerRunner.class); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java index 47c15b1cd8877..bbe91eda58907 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java @@ -43,6 +43,7 @@ public class DefaultKubernetesClient implements KubernetesClient { private final Map podResourceProfiles = new HashMap<>(); public DefaultKubernetesClient(Map environment) throws IOException { + //TODO: add a property to specify a user // Default user is used for managing deployments and their pods // Make sure default user has enough permissions for doing that final ApiClient apiClient = Config.defaultClient(); @@ -61,8 +62,8 @@ public Endpoint createClusterService() { public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesClientException { final ResourceID resourceID = ResourceID.generate(); final String podName = getPodName(resourceID); - LOG.info("Creating a cluster pod [{}] for a resource profile [{}]", podName, resourceProfile); + //TODO: Place template into a resource file or somewhere into config file V1Pod body = new V1Pod() .apiVersion("v1") .kind("Pod") @@ -90,6 +91,7 @@ public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesC }} )) .spec( + //TODO: Add resource spec (CPU, Memory) and an option to turn the feature on/off new V1PodSpec() .containers(Collections.singletonList( new V1Container() @@ -126,25 +128,9 @@ public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesC @Override public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException { final String podName = getPodName(resourceID); - LOG.info("Terminating a cluster pod [{}] for a resource profile [{}]", podName, resourceID); - try { - V1DeleteOptions body = new V1DeleteOptions().gracePeriodSeconds(0L).orphanDependents(false); - coreV1Api.deleteNamespacedPod(podName, flinkNamespace, body, null, null, null, null, null); - podResourceProfiles.remove(podName); - } catch (ApiException e) { - if (e.getMessage().equals("Not Found")) { - LOG.warn("Could not delete a pod [{}] as it was not found", podName); - } else { - final String message = - String.format("Could not delete a pod [%s] for resource profile [%s]", podName, flinkNamespace); - throw new KubernetesClientException(message, e); - } - } catch (JsonSyntaxException e) { - // It's a known issue until the Swagger spec is updated to OpenAPI 3.0 - // https://github.com/kubernetes-client/java/issues/86 - // Simply ignoring the exception - } + deleteNamespacedPod(podName); + podResourceProfiles.remove(podName); } @Override @@ -153,9 +139,9 @@ public void stopAndCleanupCluster(String clusterId) { podResourceProfiles.forEach( (podName, resourceProfile) -> { try { - coreV1Api.deleteNamespacedPod(podName, flinkNamespace, null, null, null, 0, null, null); - } catch (ApiException e) { - LOG.error("Could not delete a pod [{}]", podName, e); + deleteNamespacedPod(podName); + } catch (KubernetesClientException e) { + LOG.warn("Could not delete a pod [{}]", podName); } } ); @@ -180,4 +166,23 @@ public void close() { private String getPodName(ResourceID resourceId) { return "flink-taskmanager-" + resourceId.getResourceIdString(); } + + private void deleteNamespacedPod(String podName) throws KubernetesClientException { + try { + V1DeleteOptions body = new V1DeleteOptions().gracePeriodSeconds(0L).orphanDependents(false); + coreV1Api.deleteNamespacedPod(podName, flinkNamespace, body, null, null, null, null, null); + } catch (ApiException e) { + if (e.getMessage().equals("Not Found")) { + LOG.warn("Could not delete a pod [{}] as it was not found", podName); + } else { + final String message = + String.format("Could not delete a pod [%s] for resource profile [%s]", podName, flinkNamespace); + throw new KubernetesClientException(message, e); + } + } catch (JsonSyntaxException e) { + // It's a known issue until the Swagger spec is updated to OpenAPI 3.0 + // https://github.com/kubernetes-client/java/issues/86 + // Simply ignoring the exception + } + } } From ec6263494a985ca90a772f81affafe02694ca021 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Fri, 29 Mar 2019 19:22:13 +0700 Subject: [PATCH 11/13] BACKEND-1137: Refactored and added some comments --- flink-kubernetes/README.md | 12 +++++++++--- .../flink/kubernetes/client/KubernetesClient.java | 4 ++-- 2 files changed, 11 insertions(+), 5 deletions(-) diff --git a/flink-kubernetes/README.md b/flink-kubernetes/README.md index 5f7b6f5737888..53e2d8abfe811 100644 --- a/flink-kubernetes/README.md +++ b/flink-kubernetes/README.md @@ -13,11 +13,12 @@ Example: kubectl create -f jobmanager-deployment.yaml kubectl create -f jobmanager-service.yaml ``` -That creates the deployment with one job manager and a service around it that exposes the job manager. +That creates the deployment with one job manager and a service around it that exposes +(ClusterIP, NodePort, LoadBalancer, ExternalName) the job manager. ## Task Manager Task manager is a temporary essence and is created (and deleted) by a job manager for a particular slot. -No deployments/jobs/services are created for a task manager only pods. +No deployments/jobs/services are created for a task manager, only pods. A template for a task manager is hardcoded into the implementation (`org.apache.flink.kubernetes.client.DefaultKubernetesClient`). @@ -35,7 +36,7 @@ Only `StreamTransformation.minResources` is used for a pod template. TBD ## Kubernetes Resource Management -Resource management uses a default service account every pod contains. It should has admin privileges to be able +Resource management uses a default service account every pod contains. It should have admin privileges to be able to create and delete pods: ``` kubectl create clusterrolebinding serviceaccounts-cluster-admin \ @@ -52,6 +53,11 @@ Then in needs to be containerized: ```cd flink-contrib/docker-flink/ sh build.sh --from-local-dist --image-name flink-mmx ``` +And uploaded to gcp (for example): +``` +docker tag flink-mmx gcr.io/metamarkets-prod-xpn-host/flink-mmx +gcloud docker -- push gcr.io/metamarkets-prod-xpn-host/flink-mmx +``` If minikube is used then a container image should be uploaded into minikube node. So before building a container image a docker env is supposed to be exported: ``` diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java index 2cdbf82c8200f..993dcec2f35de 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java @@ -45,7 +45,7 @@ public interface KubernetesClient extends AutoCloseable { void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException; /** - * stop cluster and clean up all resources, include services, auxiliary services and all running pods. + * Stop cluster and clean up all resources, include services, auxiliary services and all running pods. * */ void stopAndCleanupCluster(String clusterId); @@ -55,7 +55,7 @@ public interface KubernetesClient extends AutoCloseable { void logException(Exception e); /** - * retrieval rest endpoint of the giving flink clusterId. + * Retrieval rest endpoint of the giving flink clusterId. */ Endpoint getResetEndpoint(String flinkClusterId); } From 9efe807e240e9f609803e057780f1062f4477ec2 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Tue, 2 Apr 2019 16:58:48 +0700 Subject: [PATCH 12/13] BACKEND-1137: Refactored KubernetesClusterDescriptor --- .../client/DefaultKubernetesClient.java | 9 ++ .../cluster/KubernetesClusterDescriptor.java | 86 +++++++++---------- 2 files changed, 48 insertions(+), 47 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java index bbe91eda58907..c4d20ccae1c5f 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java @@ -125,6 +125,11 @@ public void createClusterPod(ResourceProfile resourceProfile) throws KubernetesC } } + /** + * Terminates a cluster pod. + * @param resourceID cluster pod id in terms of flink + * @throws KubernetesClientException + */ @Override public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientException { final String podName = getPodName(resourceID); @@ -133,6 +138,10 @@ public void terminateClusterPod(ResourceID resourceID) throws KubernetesClientEx podResourceProfiles.remove(podName); } + /** + * Stops and cleans up a cluster. + * @param clusterId cluster id + */ @Override public void stopAndCleanupCluster(String clusterId) { LOG.info("Stopping the cluster and deleting all its task manager pods"); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java index d43c8c72300df..82b212da9f27c 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java @@ -45,6 +45,7 @@ /** * Kubernetes specific {@link ClusterDescriptor} implementation. * This class is responsible for cluster creation from scratch + * and communication with its api */ public class KubernetesClusterDescriptor implements ClusterDescriptor { @@ -63,44 +64,26 @@ public KubernetesClusterDescriptor(@Nonnull FlinkKubernetesOptions options, @Non this.client = client; } - private String generateClusterId() { - return CLUSTER_ID_PREFIX + UUID.randomUUID(); - } - @Override public String getClusterDescription() { return CLUSTER_DESCRIPTION; } - private ClusterClient createClusterEndpoint(Endpoint clusterEndpoint, String clusterId) throws Exception { - - Configuration configuration = new Configuration(this.options.getConfiguration()); - configuration.setString(JobManagerOptions.ADDRESS, clusterEndpoint.getAddress()); - configuration.setInteger(JobManagerOptions.PORT, clusterEndpoint.getPort()); - return new RestClusterClient<>(configuration, clusterId); - } - @Override public ClusterClient retrieve(String clusterId) throws ClusterRetrieveException { try { - Endpoint clusterEndpoint = this.client.getResetEndpoint(clusterId); - return this.createClusterEndpoint(clusterEndpoint, clusterId); + Endpoint clusterEndpoint = client.getResetEndpoint(clusterId); + return createClusterEndpoint(clusterEndpoint, clusterId); } catch (Exception e) { - this.client.logException(e); - throw new ClusterRetrieveException("Could not create the RestClusterClient.", e); + throw new ClusterRetrieveException("Could not create the RestClusterClient", e); } } @Override public ClusterClient deploySessionCluster(ClusterSpecification clusterSpecification) throws ClusterDeploymentException { - - String clusterId = this.generateClusterId(); - - //TODO: add arguments - final List args = Arrays.asList(); - - return this.deployClusterInternal(clusterId, args); + String clusterId = generateClusterId(); + return deployClusterInternal(clusterId, null); } @Override @@ -108,47 +91,56 @@ public ClusterClient deployJobCluster(ClusterSpecification clusterSpecif throw new NotImplementedException(); } - @Nonnull - private ClusterClient deployClusterInternal(String clusterId, List args) throws ClusterDeploymentException { + @Override + public void killCluster(String clusterId) throws FlinkException { try { - Endpoint clusterEndpoint = this.client.createClusterService(); - this.client.createClusterPod(null); - return this.createClusterEndpoint(clusterEndpoint, clusterId); + client.stopAndCleanupCluster(clusterId); } catch (Exception e) { - this.client.logException(e); - this.tryKillCluster(clusterId); - throw new ClusterDeploymentException("Could not create Kubernetes cluster " + clusterId, e); + client.logException(e); + throw new FlinkException(String.format("Could not create Kubernetes cluster [%s]", clusterId)); } } - /** - * Try to kill cluster without throw exception. - */ - private void tryKillCluster(String clusterId) { + @Override + public void close() { try { - this.killCluster(clusterId); + client.close(); } catch (Exception e) { - this.client.logException(e); + LOG.error("Failed to close Kubernetes client: {}", e.toString()); } } - @Override - public void killCluster(String clusterId) throws FlinkException { + private String generateClusterId() { + return CLUSTER_ID_PREFIX + UUID.randomUUID(); + } + + private ClusterClient createClusterEndpoint(Endpoint clusterEndpoint, String clusterId) throws Exception { + Configuration configuration = new Configuration(options.getConfiguration()); + configuration.setString(JobManagerOptions.ADDRESS, clusterEndpoint.getAddress()); + configuration.setInteger(JobManagerOptions.PORT, clusterEndpoint.getPort()); + return new RestClusterClient<>(configuration, clusterId); + } + + @Nonnull + private ClusterClient deployClusterInternal(String clusterId, List args) throws ClusterDeploymentException { try { - this.client.stopAndCleanupCluster(clusterId); + Endpoint clusterEndpoint = client.createClusterService(); + client.createClusterPod(null); + return createClusterEndpoint(clusterEndpoint, clusterId); } catch (Exception e) { - this.client.logException(e); - throw new FlinkException("Could not create Kubernetes cluster " + clusterId); + tryKillCluster(clusterId); + throw new ClusterDeploymentException(String.format("Could not create Kubernetes cluster [%s]", clusterId), e); } } - @Override - public void close() { + /** + * Try to kill cluster without throw exception. + */ + private void tryKillCluster(String clusterId) { try { - this.client.close(); + killCluster(clusterId); } catch (Exception e) { - this.client.logException(e); - LOG.error("failed to close client, exception {}", e.toString()); + LOG.error("Could not kill a cluster [{}]: {}", clusterId, e.toString()); } } } From 54f7c7d2f9a8a590c0c739e4370a1a16ac470008 Mon Sep 17 00:00:00 2001 From: Eugene Sevastianov Date: Tue, 2 Apr 2019 19:48:28 +0700 Subject: [PATCH 13/13] BACKEND-1137: Refactored KubernetesClusterDescriptor and dependent --- .../flink/kubernetes/client/DefaultKubernetesClient.java | 5 ----- .../java/org/apache/flink/kubernetes/client/Endpoint.java | 2 +- .../apache/flink/kubernetes/client/KubernetesClient.java | 5 ----- .../kubernetes/cluster/KubernetesClusterDescriptor.java | 7 +++---- 4 files changed, 4 insertions(+), 15 deletions(-) diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java index c4d20ccae1c5f..a9d4ec50c5cf2 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/DefaultKubernetesClient.java @@ -157,11 +157,6 @@ public void stopAndCleanupCluster(String clusterId) { podResourceProfiles.clear(); } - @Override - public void logException(Exception e) { - LOG.error("Exception occurred", e); - } - @Override public Endpoint getResetEndpoint(String flinkClusterId) { throw new NotImplementedError(); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java index ca33bb7dcdc93..21cae78619cf9 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/Endpoint.java @@ -19,7 +19,7 @@ package org.apache.flink.kubernetes.client; /** - * represent a endpoint. + * Represents an endpoint. * */ public class Endpoint { diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java index 993dcec2f35de..9e925ac04d89b 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/client/KubernetesClient.java @@ -49,11 +49,6 @@ public interface KubernetesClient extends AutoCloseable { * */ void stopAndCleanupCluster(String clusterId); - /** - * Log exception. - * */ - void logException(Exception e); - /** * Retrieval rest endpoint of the giving flink clusterId. */ diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java index 82b212da9f27c..8d80958334989 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/cluster/KubernetesClusterDescriptor.java @@ -29,6 +29,7 @@ import org.apache.flink.kubernetes.FlinkKubernetesOptions; import org.apache.flink.kubernetes.client.Endpoint; import org.apache.flink.kubernetes.client.KubernetesClient; +import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.util.FlinkException; @@ -38,7 +39,6 @@ import javax.annotation.Nonnull; -import java.util.Arrays; import java.util.List; import java.util.UUID; @@ -96,8 +96,7 @@ public void killCluster(String clusterId) throws FlinkException { try { client.stopAndCleanupCluster(clusterId); } catch (Exception e) { - client.logException(e); - throw new FlinkException(String.format("Could not create Kubernetes cluster [%s]", clusterId)); + throw new FlinkException(String.format("Could not create Kubernetes cluster [%s]", clusterId), e); } } @@ -125,7 +124,7 @@ private ClusterClient createClusterEndpoint(Endpoint clusterEndpoint, St private ClusterClient deployClusterInternal(String clusterId, List args) throws ClusterDeploymentException { try { Endpoint clusterEndpoint = client.createClusterService(); - client.createClusterPod(null); + client.createClusterPod(ResourceProfile.UNKNOWN); return createClusterEndpoint(clusterEndpoint, clusterId); } catch (Exception e) { tryKillCluster(clusterId);