diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java index 40223e093e8..3d51f15c19d 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/GobblinTemporalConfigurationKeys.java @@ -64,4 +64,9 @@ public interface GobblinTemporalConfigurationKeys { String TEMPORAL_NUM_WORKERS_PER_CONTAINER = PREFIX + "num.workers.per.container"; int DEFAULT_TEMPORAL_NUM_WORKERS_PER_CONTAINERS = 1; String TEMPORAL_CONNECTION_STRING = PREFIX + "connection.string"; + + /** + * Prefix for Gobblin-on-Temporal Dynamic Scaling + */ + String DYNAMIC_SCALING_PREFIX = PREFIX + "dynamic.scaling."; } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java index bf1f1d2e099..d67825cb226 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/dynamic/WorkerProfile.java @@ -18,12 +18,23 @@ package org.apache.gobblin.temporal.dynamic; import com.typesafe.config.Config; +import lombok.AllArgsConstructor; import lombok.Data; /** A named worker {@link Config} */ @Data +@AllArgsConstructor public class WorkerProfile { private final String name; private final Config config; + + /** + * Constructs a `WorkerProfile` with the baseline name and the specified configuration. + * + * @param config the configuration for the worker profile + */ + public WorkerProfile(Config config) { + this(WorkforceProfiles.BASELINE_NAME, config); + } } diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java new file mode 100644 index 00000000000..3022d5c9ecc --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/AbstractDynamicScalingYarnServiceManager.java @@ -0,0 +1,120 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.List; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; + +import org.apache.commons.collections.CollectionUtils; + +import com.typesafe.config.Config; +import com.google.common.base.Optional; +import com.google.common.util.concurrent.AbstractIdleService; + +import lombok.AllArgsConstructor; +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; + +/** + * This class manages the dynamic scaling of the {@link YarnService} by periodically polling for scaling directives and passing + * the latest scaling directives to the {@link DynamicScalingYarnService} for processing. + * + * This is an abstract class that provides the basic functionality for managing dynamic scaling. Subclasses should implement + * {@link #createScalingDirectiveSource()} to provide a {@link ScalingDirectiveSource} that will be used to get scaling directives. + * + * The actual implemented class needs to be passed as value of config {@link org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES} + */ +@Slf4j +public abstract class AbstractDynamicScalingYarnServiceManager extends AbstractIdleService { + + protected final static String DYNAMIC_SCALING_POLLING_INTERVAL = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "polling.interval"; + private final int DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS = 60; + protected final Config config; + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScheduledExecutorService dynamicScalingExecutor; + + public AbstractDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + this.config = appMaster.getConfig(); + if (appMaster.get_yarnService() instanceof DynamicScalingYarnService) { + this.dynamicScalingYarnService = (DynamicScalingYarnService) appMaster.get_yarnService(); + } else { + String errorMsg = "Failure while getting YarnService Instance from GobblinTemporalApplicationMaster::get_yarnService()" + + " YarnService {" + appMaster.get_yarnService().getClass().getName() + "} is not an instance of DynamicScalingYarnService"; + log.error(errorMsg); + throw new RuntimeException(errorMsg); + } + this.dynamicScalingExecutor = Executors.newSingleThreadScheduledExecutor( + ExecutorsUtils.newThreadFactory(Optional.of(log), + Optional.of("DynamicScalingExecutor"))); + } + + @Override + protected void startUp() { + int scheduleInterval = ConfigUtils.getInt(this.config, DYNAMIC_SCALING_POLLING_INTERVAL, + DEFAULT_DYNAMIC_SCALING_POLLING_INTERVAL_SECS); + log.info("Starting the {} with re-scaling interval of {} seconds", this.getClass().getSimpleName(), scheduleInterval); + + this.dynamicScalingExecutor.scheduleAtFixedRate( + new GetScalingDirectivesRunnable(this.dynamicScalingYarnService, createScalingDirectiveSource()), + scheduleInterval, scheduleInterval, TimeUnit.SECONDS + ); + } + + @Override + protected void shutDown() { + log.info("Stopping the " + this.getClass().getSimpleName()); + ExecutorsUtils.shutdownExecutorService(this.dynamicScalingExecutor, Optional.of(log)); + } + + /** + * Create a {@link ScalingDirectiveSource} to use for getting scaling directives. + */ + protected abstract ScalingDirectiveSource createScalingDirectiveSource(); + + /** + * A {@link Runnable} that gets the scaling directives from the {@link ScalingDirectiveSource} and passes them to the + * {@link DynamicScalingYarnService} for processing. + */ + @AllArgsConstructor + static class GetScalingDirectivesRunnable implements Runnable { + private final DynamicScalingYarnService dynamicScalingYarnService; + private final ScalingDirectiveSource scalingDirectiveSource; + + @Override + public void run() { + try { + List scalingDirectives = scalingDirectiveSource.getScalingDirectives(); + if (CollectionUtils.isNotEmpty(scalingDirectives)) { + dynamicScalingYarnService.reviseWorkforcePlanAndRequestNewContainers(scalingDirectives); + } + } catch (IOException e) { + log.error("Failed to get scaling directives", e); + } catch (Throwable t) { + log.error("Unexpected error with dynamic scaling via directives", t); + } + } + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java new file mode 100644 index 00000000000..0720017b852 --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnService.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.List; + +import org.apache.commons.collections.CollectionUtils; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.StaffingDeltas; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; +import org.apache.gobblin.temporal.dynamic.WorkforcePlan; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; +import org.apache.gobblin.temporal.dynamic.WorkforceStaffing; +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +/** + * Service for dynamically scaling Gobblin containers running on YARN. + * This service manages workforce staffing and plans, and requests new containers as needed. + */ +@Slf4j +public class DynamicScalingYarnService extends YarnService { + + /** this holds the current count of containers already requested for each worker profile */ + private final WorkforceStaffing actualWorkforceStaffing; + /** this holds the current total workforce plan as per latest received scaling directives */ + private final WorkforcePlan workforcePlan; + + public DynamicScalingYarnService(Config config, String applicationName, String applicationId, + YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { + super(config, applicationName, applicationId, yarnConfiguration, fs, eventBus); + + this.actualWorkforceStaffing = WorkforceStaffing.initialize(0); + this.workforcePlan = new WorkforcePlan(this.config, this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY)); + } + + @Override + protected synchronized void requestInitialContainers() { + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + /** + * Revises the workforce plan and requests new containers based on the given scaling directives. + * + * @param scalingDirectives the list of scaling directives + */ + public synchronized void reviseWorkforcePlanAndRequestNewContainers(List scalingDirectives) { + if (CollectionUtils.isEmpty(scalingDirectives)) { + return; + } + this.workforcePlan.reviseWhenNewer(scalingDirectives); + StaffingDeltas deltas = this.workforcePlan.calcStaffingDeltas(this.actualWorkforceStaffing); + requestNewContainersForStaffingDeltas(deltas); + } + + private synchronized void requestNewContainersForStaffingDeltas(StaffingDeltas deltas) { + deltas.getPerProfileDeltas().forEach(profileDelta -> { + if (profileDelta.getDelta() > 0) { // scale up! + WorkerProfile workerProfile = profileDelta.getProfile(); + String profileName = workerProfile.getName(); + int currNumContainers = this.actualWorkforceStaffing.getStaffing(profileName).orElse(0); + int delta = profileDelta.getDelta(); + log.info("Requesting {} new containers for profile {} having currently {} containers", delta, + WorkforceProfiles.renderName(profileName), currNumContainers); + requestContainersForWorkerProfile(workerProfile, delta); + // update our staffing after requesting new containers + this.actualWorkforceStaffing.reviseStaffing(profileName, currNumContainers + delta, System.currentTimeMillis()); + } else if (profileDelta.getDelta() < 0) { // scale down! + // TODO: Decide how to handle negative deltas + log.warn("Handling of Negative delta is not supported yet : Profile {} delta {} ", + profileDelta.getProfile().getName(), profileDelta.getDelta()); + } // else, already at staffing plan (or at least have requested, so in-progress) + }); + } + +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java new file mode 100644 index 00000000000..f6b65bbd2db --- /dev/null +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/FsSourceDynamicScalingYarnServiceManager.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.util.Optional; + +import org.apache.hadoop.fs.FileSystem; + +import org.apache.gobblin.temporal.GobblinTemporalConfigurationKeys; +import org.apache.gobblin.temporal.dynamic.FsScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; + +/** + * {@link FsScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}. + */ +public class FsSourceDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager { + // TODO: replace fetching of these configs using a new method similar to JobStateUtils::getWorkDirRoot + public final static String DYNAMIC_SCALING_DIRECTIVES_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "directives.dir"; + public final static String DYNAMIC_SCALING_ERRORS_DIR = GobblinTemporalConfigurationKeys.DYNAMIC_SCALING_PREFIX + "errors.dir"; + private final FileSystem fs; + + public FsSourceDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + super(appMaster); + this.fs = appMaster.getFs(); + } + + @Override + protected ScalingDirectiveSource createScalingDirectiveSource() { + return new FsScalingDirectiveSource( + this.fs, + this.config.getString(DYNAMIC_SCALING_DIRECTIVES_DIR), + Optional.ofNullable(this.config.getString(DYNAMIC_SCALING_ERRORS_DIR)) + ); + } +} diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java index b7957bd9a26..3efadb11b38 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/GobblinTemporalApplicationMaster.java @@ -114,7 +114,7 @@ public GobblinTemporalApplicationMaster(String applicationName, String applicati protected YarnService buildTemporalYarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs) throws Exception { - return new YarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus); + return new DynamicScalingYarnService(config, applicationName, applicationId, yarnConfiguration, fs, this.eventBus); } /** diff --git a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java index c8fbd047c5d..ec4da215a63 100644 --- a/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java +++ b/gobblin-temporal/src/main/java/org/apache/gobblin/temporal/yarn/YarnService.java @@ -23,7 +23,6 @@ import java.time.Duration; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.List; @@ -34,6 +33,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.stream.IntStream; import org.apache.commons.lang.StringUtils; @@ -109,6 +109,7 @@ import org.apache.gobblin.yarn.event.ContainerReleaseRequest; import org.apache.gobblin.yarn.event.ContainerShutdownRequest; import org.apache.gobblin.yarn.event.NewContainerRequest; +import org.apache.gobblin.temporal.dynamic.WorkerProfile; /** * This class is responsible for all Yarn-related stuffs including ApplicationMaster registration, @@ -130,8 +131,7 @@ class YarnService extends AbstractIdleService { private final String appViewAcl; //Default helix instance tag derived from cluster level config private final String helixInstanceTags; - - private final Config config; + protected final Config config; private final EventBus eventBus; @@ -146,17 +146,11 @@ class YarnService extends AbstractIdleService { private final AMRMClientAsync amrmClientAsync; private final NMClientAsync nmClientAsync; private final ExecutorService containerLaunchExecutor; - - private final int initialContainers; private final int requestedContainerMemoryMbs; private final int requestedContainerCores; - private final int jvmMemoryOverheadMbs; - private final double jvmMemoryXmxRatio; private final boolean containerHostAffinityEnabled; private final int helixInstanceMaxRetries; - - private final Optional containerJvmArgs; private final String containerTimezone; private final String proxyJvmArgs; @@ -200,6 +194,9 @@ class YarnService extends AbstractIdleService { private volatile boolean shutdownInProgress = false; private final boolean jarCacheEnabled; + private static final long DEFAULT_ALLOCATION_REQUEST_ID = 0L; + private final AtomicLong allocationRequestIdGenerator = new AtomicLong(DEFAULT_ALLOCATION_REQUEST_ID); + private final ConcurrentMap workerProfileByAllocationRequestId = new ConcurrentHashMap<>(); public YarnService(Config config, String applicationName, String applicationId, YarnConfiguration yarnConfiguration, FileSystem fs, EventBus eventBus) throws Exception { @@ -229,7 +226,6 @@ public YarnService(Config config, String applicationName, String applicationId, this.nmClientAsync = closer.register(NMClientAsync.createNMClientAsync(getNMClientCallbackHandler())); this.nmClientAsync.init(this.yarnConfiguration); - this.initialContainers = config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY); this.requestedContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); this.requestedContainerCores = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); this.containerHostAffinityEnabled = config.getBoolean(GobblinYarnConfigurationKeys.CONTAINER_HOST_AFFINITY_ENABLED); @@ -238,10 +234,6 @@ public YarnService(Config config, String applicationName, String applicationId, this.helixInstanceTags = ConfigUtils.getString(config, GobblinClusterConfigurationKeys.HELIX_INSTANCE_TAGS_KEY, GobblinClusterConfigurationKeys.HELIX_DEFAULT_TAG); - this.containerJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ? - Optional.of(config.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) : - Optional.absent(); - this.proxyJvmArgs = config.hasPath(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) ? config.getString(GobblinYarnConfigurationKeys.YARN_APPLICATION_PROXY_JVM_ARGS) : StringUtils.EMPTY; @@ -257,27 +249,12 @@ public YarnService(Config config, String applicationName, String applicationId, GobblinYarnConfigurationKeys.RELEASED_CONTAINERS_CACHE_EXPIRY_SECS, GobblinYarnConfigurationKeys.DEFAULT_RELEASED_CONTAINERS_CACHE_EXPIRY_SECS), TimeUnit.SECONDS).build(); - this.jvmMemoryXmxRatio = ConfigUtils.getDouble(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); - - Preconditions.checkArgument(this.jvmMemoryXmxRatio >= 0 && this.jvmMemoryXmxRatio <= 1, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + " must be between 0 and 1 inclusive"); - - this.jvmMemoryOverheadMbs = ConfigUtils.getInt(this.config, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, - GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); - - Preconditions.checkArgument(this.jvmMemoryOverheadMbs < this.requestedContainerMemoryMbs * this.jvmMemoryXmxRatio, - GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + " cannot be more than " - + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " - + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); - this.appViewAcl = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.APP_VIEW_ACL, GobblinYarnConfigurationKeys.DEFAULT_APP_VIEW_ACL); this.containerTimezone = ConfigUtils.getString(this.config, GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_TIMEZONE, GobblinYarnConfigurationKeys.DEFAULT_GOBBLIN_YARN_CONTAINER_TIMEZONE); this.jarCacheEnabled = ConfigUtils.getBoolean(this.config, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED, GobblinYarnConfigurationKeys.JAR_CACHE_ENABLED_DEFAULT); + } @SuppressWarnings("unused") @@ -344,7 +321,7 @@ protected synchronized void startUp() throws Exception { this.maxResourceCapacity = Optional.of(response.getMaximumResourceCapability()); LOGGER.info("Requesting initial containers"); - requestInitialContainers(this.initialContainers); + requestInitialContainers(); } @Override @@ -419,41 +396,25 @@ private EventSubmitter buildEventSubmitter() { .build(); } - /** - * Request an allocation of containers. If numTargetContainers is larger than the max of current and expected number - * of containers then additional containers are requested. - *

- * If numTargetContainers is less than the current number of allocated containers then release free containers. - * Shrinking is relative to the number of currently allocated containers since it takes time for containers - * to be allocated and assigned work and we want to avoid releasing a container prematurely before it is assigned - * work. This means that a container may not be released even though numTargetContainers is less than the requested - * number of containers. The intended usage is for the caller of this method to make periodic calls to attempt to - * adjust the cluster towards the desired number of containers. - * - * @param inUseInstances a set of in use instances - * @return whether successfully requested the target number of containers - */ - public synchronized boolean requestTargetNumberOfContainers(int numContainers, Set inUseInstances) { - int defaultContainerMemoryMbs = config.getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); - int defaultContainerCores = config.getInt(GobblinYarnConfigurationKeys. CONTAINER_CORES_KEY); - - LOGGER.info("Trying to set numTargetContainers={}, in-use helix instances count is {}, container map size is {}", - numContainers, inUseInstances.size(), this.containerMap.size()); - - requestContainers(numContainers, Resource.newInstance(defaultContainerMemoryMbs, defaultContainerCores)); - LOGGER.info("Current tag-container desired count:{}, tag-container allocated: {}", numContainers, this.allocatedContainerCountMap); - return true; + /** unless overridden to actually scale, "initial" containers will be the app's *only* containers! */ + protected synchronized void requestInitialContainers() { + WorkerProfile baselineWorkerProfile = new WorkerProfile(this.config); + int numContainers = this.config.getInt(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY); + LOGGER.info("Requesting {} initial (static) containers with baseline (only) profile, never to be re-scaled", numContainers); + requestContainersForWorkerProfile(baselineWorkerProfile, numContainers); } - // Request initial containers with default resource and helix tag - private void requestInitialContainers(int containersRequested) { - requestTargetNumberOfContainers(containersRequested, Collections.EMPTY_SET); + protected synchronized void requestContainersForWorkerProfile(WorkerProfile workerProfile, int numContainers) { + int containerMemoryMbs = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY); + int containerCores = workerProfile.getConfig().getInt(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY); + long allocationRequestId = storeByUniqueAllocationRequestId(workerProfile); + requestContainers(numContainers, Resource.newInstance(containerMemoryMbs, containerCores), Optional.of(allocationRequestId)); } private void requestContainer(Optional preferredNode, Optional resourceOptional) { Resource desiredResource = resourceOptional.or(Resource.newInstance( this.requestedContainerMemoryMbs, this.requestedContainerCores)); - requestContainer(preferredNode, desiredResource); + requestContainer(preferredNode, desiredResource, Optional.absent()); } /** @@ -462,14 +423,14 @@ private void requestContainer(Optional preferredNode, Optional * @param numContainers * @param resource */ - private void requestContainers(int numContainers, Resource resource) { - LOGGER.info("Requesting {} containers with resource={}", numContainers, resource); + protected void requestContainers(int numContainers, Resource resource, Optional optAllocationRequestId) { + LOGGER.info("Requesting {} containers with resource={} and allocation request id = {}", numContainers, resource, optAllocationRequestId); IntStream.range(0, numContainers) - .forEach(i -> requestContainer(Optional.absent(), resource)); + .forEach(i -> requestContainer(Optional.absent(), resource, optAllocationRequestId)); } // Request containers with specific resource requirement - private void requestContainer(Optional preferredNode, Resource resource) { + private void requestContainer(Optional preferredNode, Resource resource, Optional optAllocationRequestId) { // Fail if Yarn cannot meet container resource requirements Preconditions.checkArgument(resource.getMemory() <= this.maxResourceCapacity.get().getMemory() && resource.getVirtualCores() <= this.maxResourceCapacity.get().getVirtualCores(), @@ -485,8 +446,11 @@ private void requestContainer(Optional preferredNode, Resource resource) priority.setPriority(priorityNum); String[] preferredNodes = preferredNode.isPresent() ? new String[] {preferredNode.get()} : null; + + long allocationRequestId = optAllocationRequestId.or(DEFAULT_ALLOCATION_REQUEST_ID); + this.amrmClientAsync.addContainerRequest( - new AMRMClient.ContainerRequest(resource, preferredNodes, null, priority)); + new AMRMClient.ContainerRequest(resource, preferredNodes, null, priority, allocationRequestId)); } protected ContainerLaunchContext newContainerLaunchContext(ContainerInfo containerInfo) @@ -590,15 +554,49 @@ protected ByteBuffer getSecurityTokens() throws IOException { @VisibleForTesting protected String buildContainerCommand(Container container, String helixParticipantId, String helixInstanceTag) { + long allocationRequestId = container.getAllocationRequestId(); + WorkerProfile workerProfile = Optional.fromNullable(this.workerProfileByAllocationRequestId.get(allocationRequestId)) + .or(() -> { + LOGGER.warn("No Worker Profile found for {}, so falling back to default", allocationRequestId); + return this.workerProfileByAllocationRequestId.computeIfAbsent(DEFAULT_ALLOCATION_REQUEST_ID, k -> { + LOGGER.warn("WARNING: (LIKELY) UNEXPECTED CONCURRENCY: No Worker Profile even yet mapped to the default allocation request ID {} - creating one now", DEFAULT_ALLOCATION_REQUEST_ID); + return new WorkerProfile(this.config); + }); + }); + Config workerProfileConfig = workerProfile.getConfig(); + + double workerJvmMemoryXmxRatio = ConfigUtils.getDouble(workerProfileConfig, + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, + GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_XMX_RATIO); + + int workerJvmMemoryOverheadMbs = ConfigUtils.getInt(workerProfileConfig, + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, + GobblinYarnConfigurationKeys.DEFAULT_CONTAINER_JVM_MEMORY_OVERHEAD_MBS); + + Preconditions.checkArgument(workerJvmMemoryXmxRatio >= 0 && workerJvmMemoryXmxRatio <= 1, + workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY + + " must be between 0 and 1 inclusive"); + + long containerMemoryMbs = container.getResource().getMemorySize(); + + Preconditions.checkArgument(workerJvmMemoryOverheadMbs < containerMemoryMbs * workerJvmMemoryXmxRatio, + workerProfile.getName() + " : " + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY + + " cannot be more than " + GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY + " * " + + GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY); + + Optional workerJvmArgs = workerProfileConfig.hasPath(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY) ? + Optional.of(workerProfileConfig.getString(GobblinYarnConfigurationKeys.CONTAINER_JVM_ARGS_KEY)) : + Optional.absent(); + String containerProcessName = GobblinTemporalYarnTaskRunner.class.getSimpleName(); StringBuilder containerCommand = new StringBuilder() .append(ApplicationConstants.Environment.JAVA_HOME.$()).append("/bin/java") - .append(" -Xmx").append((int) (container.getResource().getMemory() * this.jvmMemoryXmxRatio) - - this.jvmMemoryOverheadMbs).append("M") + .append(" -Xmx").append((int) (container.getResource().getMemory() * workerJvmMemoryXmxRatio) - + workerJvmMemoryOverheadMbs).append("M") .append(" -D").append(GobblinYarnConfigurationKeys.JVM_USER_TIMEZONE_CONFIG).append("=").append(this.containerTimezone) .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_DIR_NAME).append("=").append(ApplicationConstants.LOG_DIR_EXPANSION_VAR) .append(" -D").append(GobblinYarnConfigurationKeys.GOBBLIN_YARN_CONTAINER_LOG_FILE_NAME).append("=").append(containerProcessName).append(".").append(ApplicationConstants.STDOUT) - .append(" ").append(JvmUtils.formatJvmArguments(this.containerJvmArgs)) + .append(" ").append(JvmUtils.formatJvmArguments(workerJvmArgs)) .append(" ").append(this.proxyJvmArgs) .append(" ").append(GobblinTemporalYarnTaskRunner.class.getName()) .append(" --").append(GobblinClusterConfigurationKeys.APPLICATION_NAME_OPTION_NAME) @@ -755,6 +753,18 @@ private ImmutableMap.Builder buildContainerStatusEventMetadata(C return eventMetadataBuilder; } + /** + * Generates a unique allocation request ID for the given worker profile and store the id to profile mapping. + * + * @param workerProfile the worker profile for which the allocation request ID is generated + * @return the generated allocation request ID + */ + protected long storeByUniqueAllocationRequestId(WorkerProfile workerProfile) { + long allocationRequestId = allocationRequestIdGenerator.getAndIncrement(); + this.workerProfileByAllocationRequestId.put(allocationRequestId, workerProfile); + return allocationRequestId; + } + /** * A custom implementation of {@link AMRMClientAsync.CallbackHandler}. */ @@ -803,24 +813,36 @@ public void onContainersAllocated(List containers) { // Find matching requests and remove the request (YARN-660). We the scheduler are responsible // for cleaning up requests after allocation based on the design in the described ticket. // YARN does not have a delta request API and the requests are not cleaned up automatically. - // Try finding a match first with the host as the resource name then fall back to any resource match. + // Try finding a match first with requestAllocationId (which should always be the case) then fall back to + // finding a match with the host as the resource name which then will fall back to any resource match. // Also see YARN-1902. Container count will explode without this logic for removing container requests. - List> matchingRequests = amrmClientAsync - .getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource()); + Collection matchingRequestsByAllocationRequestId = amrmClientAsync.getMatchingRequests(container.getAllocationRequestId()); + if (!matchingRequestsByAllocationRequestId.isEmpty()) { + AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequestsByAllocationRequestId.iterator().next(); + LOGGER.info("Found matching requests {}, removing first matching request {}", + matchingRequestsByAllocationRequestId, firstMatchingContainerRequest); - if (matchingRequests.isEmpty()) { - LOGGER.debug("Matching request by host {} not found", container.getNodeHttpAddress()); + amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest); + } else { + LOGGER.info("Matching request by allocation request id {} not found", container.getAllocationRequestId()); - matchingRequests = amrmClientAsync - .getMatchingRequests(container.getPriority(), ResourceRequest.ANY, container.getResource()); - } + List> matchingRequestsByHost = amrmClientAsync + .getMatchingRequests(container.getPriority(), container.getNodeHttpAddress(), container.getResource()); - if (!matchingRequests.isEmpty()) { - AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequests.get(0).iterator().next(); - LOGGER.debug("Found matching requests {}, removing first matching request {}", - matchingRequests, firstMatchingContainerRequest); + if (matchingRequestsByHost.isEmpty()) { + LOGGER.info("Matching request by host {} not found", container.getNodeHttpAddress()); - amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest); + matchingRequestsByHost = amrmClientAsync + .getMatchingRequests(container.getPriority(), ResourceRequest.ANY, container.getResource()); + } + + if (!matchingRequestsByHost.isEmpty()) { + AMRMClient.ContainerRequest firstMatchingContainerRequest = matchingRequestsByHost.get(0).iterator().next(); + LOGGER.info("Found matching requests {}, removing first matching request {}", + matchingRequestsByAllocationRequestId, firstMatchingContainerRequest); + + amrmClientAsync.removeContainerRequest(firstMatchingContainerRequest); + } } containerLaunchExecutor.submit(new Runnable() { diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java new file mode 100644 index 00000000000..6bdfe46276f --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/dynamic/DummyScalingDirectiveSource.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.dynamic; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.atomic.AtomicInteger; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + + +/** + * A dummy implementation of {@link ScalingDirectiveSource} that returns a fixed set of {@link ScalingDirective}s. + */ +public class DummyScalingDirectiveSource implements ScalingDirectiveSource { + private final AtomicInteger numInvocations = new AtomicInteger(0); + private final Optional derivedFromBaseline; + public DummyScalingDirectiveSource() { + this.derivedFromBaseline = Optional.of(new ProfileDerivation(WorkforceProfiles.BASELINE_NAME, + new ProfileOverlay.Adding( + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_MEMORY_MBS_KEY, "2048"), + new ProfileOverlay.KVPair(GobblinYarnConfigurationKeys.CONTAINER_CORES_KEY, "2") + ) + )); + } + + /** + * @return - A fixed set of {@link ScalingDirective}s corresponding to the invocation number. + */ + @Override + public List getScalingDirectives() { + // Note - profile should exist already or is derived from other profile + int currNumInvocations = this.numInvocations.getAndIncrement(); + long currentTime = System.currentTimeMillis(); + if (currNumInvocations == 0) { + // here we are returning two new profile with initial container counts and these should be launched + // both profiles should have different timestampEpochMillis so that both are processed otherwise + // org.apache.gobblin.temporal.dynamic.WorkforcePlan$IllegalRevisionException$OutOfOrderDirective can occur + return Arrays.asList( + new ScalingDirective("firstProfile", 3, currentTime, this.derivedFromBaseline), + new ScalingDirective("secondProfile", 2, currentTime + 1, this.derivedFromBaseline) + ); + } else if (currNumInvocations == 1) { + // here we are increasing containers to 5 for firstProfile and 3 for secondProfile so that 2 new extra containers + // should be launched for firstProfile and 1 new extra container for secondProfile + return Arrays.asList( + new ScalingDirective("firstProfile", 5, currentTime), + new ScalingDirective("secondProfile", 3, currentTime + 1) + ); + } else if (currNumInvocations == 2) { + // the count is same as previous invocation so no new containers should be launched + return Arrays.asList( + new ScalingDirective("firstProfile", 5, currentTime), + new ScalingDirective("secondProfile", 3, currentTime + 1) + ); + } + return new ArrayList<>(); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java new file mode 100644 index 00000000000..b79f8089381 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DummyDynamicScalingYarnServiceManager.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; + +/** + * {@link DummyScalingDirectiveSource} based implementation of {@link AbstractDynamicScalingYarnServiceManager}. + * This class is meant to be used for integration testing purposes only. + * This is initialized using config {@link org.apache.gobblin.yarn.GobblinYarnConfigurationKeys#APP_MASTER_SERVICE_CLASSES} while testing + */ +public class DummyDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager { + + public DummyDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster) { + super(appMaster); + } + + @Override + protected ScalingDirectiveSource createScalingDirectiveSource() { + return new DummyScalingDirectiveSource(); + } +} \ No newline at end of file diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java new file mode 100644 index 00000000000..dd4243d3fa5 --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceManagerTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.MockitoAnnotations; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.ScalingDirectiveSource; +import org.apache.gobblin.temporal.dynamic.DummyScalingDirectiveSource; + +import static org.apache.gobblin.temporal.yarn.AbstractDynamicScalingYarnServiceManager.DYNAMIC_SCALING_POLLING_INTERVAL; + +/** Tests for {@link AbstractDynamicScalingYarnServiceManager}*/ +public class DynamicScalingYarnServiceManagerTest { + + @Mock private DynamicScalingYarnService mockDynamicScalingYarnService; + @Mock private ScalingDirectiveSource mockScalingDirectiveSource; + @Mock private GobblinTemporalApplicationMaster mockGobblinTemporalApplicationMaster; + + @BeforeMethod + public void setup() { + MockitoAnnotations.openMocks(this); + // Using 1 second as polling interval so that the test runs faster and + // GetScalingDirectivesRunnable.run() will be called equal to amount of sleep introduced between startUp + // and shutDown in seconds + Config config = ConfigFactory.empty().withValue(DYNAMIC_SCALING_POLLING_INTERVAL, ConfigValueFactory.fromAnyRef(1)); + Mockito.when(mockGobblinTemporalApplicationMaster.getConfig()).thenReturn(config); + Mockito.when(mockGobblinTemporalApplicationMaster.get_yarnService()).thenReturn(mockDynamicScalingYarnService); + } + + @Test + public void testWhenScalingDirectivesIsNulOrEmpty() throws IOException, InterruptedException { + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()).thenReturn(null).thenReturn(new ArrayList<>()); + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(3000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.never()).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } + + /** Note : this test uses {@link DummyScalingDirectiveSource}*/ + @Test + public void testWithDummyScalingDirectiveSource() throws InterruptedException { + // DummyScalingDirectiveSource returns 2 scaling directives in first 3 invocations and after that it returns empty list + // so the total number of invocations after three invocations should always be 3 + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, new DummyScalingDirectiveSource()); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(5000); // 5 seconds sleep so that GetScalingDirectivesRunnable.run() is called for 5 times + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(3)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } + + @Test + public void testWithRandomScalingDirectives() throws IOException, InterruptedException { + ScalingDirective mockScalingDirective = Mockito.mock(ScalingDirective.class); + List mockedScalingDirectives = Arrays.asList(mockScalingDirective, mockScalingDirective); + Mockito.when(mockScalingDirectiveSource.getScalingDirectives()) + .thenReturn(new ArrayList<>()) + .thenReturn(mockedScalingDirectives) + .thenReturn(mockedScalingDirectives) + .thenReturn(null); + + TestDynamicScalingYarnServiceManager testDynamicScalingYarnServiceManager = new TestDynamicScalingYarnServiceManager( + mockGobblinTemporalApplicationMaster, mockScalingDirectiveSource); + testDynamicScalingYarnServiceManager.startUp(); + Thread.sleep(5000); + testDynamicScalingYarnServiceManager.shutDown(); + Mockito.verify(mockDynamicScalingYarnService, Mockito.times(2)).reviseWorkforcePlanAndRequestNewContainers(Mockito.anyList()); + } + + /** Test implementation of {@link AbstractDynamicScalingYarnServiceManager} which returns passed + * {@link ScalingDirectiveSource} when {@link #createScalingDirectiveSource()} is called while initialising + * {@link AbstractDynamicScalingYarnServiceManager.GetScalingDirectivesRunnable} + * */ + protected static class TestDynamicScalingYarnServiceManager extends AbstractDynamicScalingYarnServiceManager { + private final ScalingDirectiveSource _scalingDirectiveSource; + public TestDynamicScalingYarnServiceManager(GobblinTemporalApplicationMaster appMaster, ScalingDirectiveSource scalingDirectiveSource) { + super(appMaster); + this._scalingDirectiveSource = scalingDirectiveSource; + } + + @Override + protected ScalingDirectiveSource createScalingDirectiveSource() { + return this._scalingDirectiveSource; + } + } + +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java new file mode 100644 index 00000000000..6c0946aabbe --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/DynamicScalingYarnServiceTest.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.net.URL; +import java.util.Collections; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.base.Optional; +import com.google.common.eventbus.EventBus; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + +import org.apache.gobblin.temporal.dynamic.ScalingDirective; +import org.apache.gobblin.temporal.dynamic.WorkforceProfiles; + +/** Tests for {@link DynamicScalingYarnService} */ +public class DynamicScalingYarnServiceTest { + private Config defaultConfigs; + private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); + private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); + private final EventBus eventBus = new EventBus("TemporalDynamicScalingYarnServiceTest"); + + @BeforeClass + public void setup() { + URL url = DynamicScalingYarnServiceTest.class.getClassLoader() + .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); // using same initial config as of YarnServiceTest + Assert.assertNotNull(url, "Could not find resource " + url); + this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + } + + @Test + public void testReviseWorkforcePlanAndRequestNewContainers() throws Exception { + int numNewContainers = 5; + DynamicScalingYarnService dynamicScalingYarnService = new DynamicScalingYarnService(this.defaultConfigs, "testApp", "testAppId", yarnConfiguration, mockFileSystem, eventBus); + DynamicScalingYarnService dynamicScalingYarnServiceSpy = Mockito.spy(dynamicScalingYarnService); + Mockito.doNothing().when(dynamicScalingYarnServiceSpy).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); + ScalingDirective baseScalingDirective = new ScalingDirective(WorkforceProfiles.BASELINE_NAME, numNewContainers, System.currentTimeMillis()); + dynamicScalingYarnServiceSpy.reviseWorkforcePlanAndRequestNewContainers(Collections.singletonList(baseScalingDirective)); + Mockito.verify(dynamicScalingYarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(numNewContainers), Mockito.any(Resource.class), Mockito.any(Optional.class)); + } +} diff --git a/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java new file mode 100644 index 00000000000..3c81316b85c --- /dev/null +++ b/gobblin-temporal/src/test/java/org/apache/gobblin/temporal/yarn/YarnServiceTest.java @@ -0,0 +1,126 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.temporal.yarn; + +import java.io.IOException; +import java.net.URL; + +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +import com.google.common.base.Optional; + +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.mockito.MockedStatic; +import org.mockito.Mockito; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValueFactory; +import com.google.common.eventbus.EventBus; + +import org.apache.gobblin.yarn.GobblinYarnConfigurationKeys; + +import static org.mockito.Mockito.*; + + +/** + * Tests for {@link YarnService} + * + * NOTE : This test is a partial clone of {@link org.apache.gobblin.yarn.YarnServiceTest} + * */ +public class YarnServiceTest { + private Config defaultConfigs; + private final YarnConfiguration yarnConfiguration = new YarnConfiguration(); + private final FileSystem mockFileSystem = Mockito.mock(FileSystem.class); + private final EventBus eventBus = new EventBus("TemporalYarnServiceTest"); + private AMRMClientAsync mockAMRMClient; + private RegisterApplicationMasterResponse mockRegisterApplicationMasterResponse; + + @BeforeClass + public void setup() throws IOException, YarnException { + mockAMRMClient = mock(AMRMClientAsync.class); + mockRegisterApplicationMasterResponse = mock(RegisterApplicationMasterResponse.class); + + URL url = YarnServiceTest.class.getClassLoader() + .getResource(YarnServiceTest.class.getSimpleName() + ".conf"); + Assert.assertNotNull(url, "Could not find resource " + url); + this.defaultConfigs = ConfigFactory.parseURL(url).resolve(); + + MockedStatic amrmClientAsyncMockStatic = mockStatic(AMRMClientAsync.class); + + amrmClientAsyncMockStatic.when(() -> AMRMClientAsync.createAMRMClientAsync(anyInt(), any(AMRMClientAsync.CallbackHandler.class))) + .thenReturn(mockAMRMClient); + doNothing().when(mockAMRMClient).init(any(YarnConfiguration.class)); + + when(mockAMRMClient.registerApplicationMaster(anyString(), anyInt(), anyString())) + .thenReturn(mockRegisterApplicationMasterResponse); + when(mockRegisterApplicationMasterResponse.getMaximumResourceCapability()) + .thenReturn(Mockito.mock(Resource.class)); + } + + @Test + public void testYarnServiceStartupWithInitialContainers() throws Exception { + int expectedNumContainers = 3; + Config config = this.defaultConfigs.withValue(GobblinYarnConfigurationKeys.INITIAL_CONTAINERS_KEY, ConfigValueFactory.fromAnyRef(expectedNumContainers)); + YarnService yarnService = new YarnService(config, "testApplicationName", "testApplicationId", yarnConfiguration, mockFileSystem, eventBus); + YarnService yarnServiceSpy = Mockito.spy(yarnService); + Mockito.doNothing().when(yarnServiceSpy).requestContainers(Mockito.anyInt(), Mockito.any(Resource.class), Mockito.any(Optional.class)); + yarnServiceSpy.startUp(); + Mockito.verify(yarnServiceSpy, Mockito.times(1)).requestContainers(Mockito.eq(expectedNumContainers), Mockito.any(Resource.class), Mockito.any(Optional.class)); + } + + @Test + public void testBuildContainerCommand() throws Exception { + final double jvmMemoryXmxRatio = 0.7; + final int jvmMemoryOverheadMbs = 50; + final int resourceMemoryMB = 3072; + final int expectedJvmMemory = (int) (resourceMemoryMB * jvmMemoryXmxRatio) - jvmMemoryOverheadMbs; + + Config config = this.defaultConfigs + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_XMX_RATIO_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryXmxRatio)) + .withValue(GobblinYarnConfigurationKeys.CONTAINER_JVM_MEMORY_OVERHEAD_MBS_KEY, ConfigValueFactory.fromAnyRef(jvmMemoryOverheadMbs)); + + Resource resource = Resource.newInstance(resourceMemoryMB, 2); + + Container mockContainer = Mockito.mock(Container.class); + Mockito.when(mockContainer.getResource()).thenReturn(resource); + Mockito.when(mockContainer.getAllocationRequestId()).thenReturn(0L); + + YarnService yarnService = new YarnService( + config, + "testApplicationName", + "testApplicationId", + yarnConfiguration, + mockFileSystem, + eventBus + ); + + yarnService.startUp(); + + String command = yarnService.buildContainerCommand(mockContainer, "testHelixParticipantId", "testHelixInstanceTag"); + Assert.assertTrue(command.contains("-Xmx" + expectedJvmMemory + "M")); + } +} diff --git a/gobblin-temporal/src/test/resources/YarnServiceTest.conf b/gobblin-temporal/src/test/resources/YarnServiceTest.conf new file mode 100644 index 00000000000..0903ced00b1 --- /dev/null +++ b/gobblin-temporal/src/test/resources/YarnServiceTest.conf @@ -0,0 +1,6 @@ +# Adding some default configs used while initializing YarnService for tests +gobblin.yarn.initial.containers=0 +gobblin.yarn.container.memory.mbs=1024 +gobblin.yarn.container.cores=1 +gobblin.yarn.container.affinity.enabled=false +gobblin.yarn.helix.instance.max.retries=1 \ No newline at end of file