Skip to content

Commit

Permalink
Refactor PeriodicTaskExecutor and add PeriodicTaskExecutorFactory
Browse files Browse the repository at this point in the history
  • Loading branch information
tdcmeehan committed Nov 6, 2024
1 parent 2555927 commit 15e474c
Show file tree
Hide file tree
Showing 17 changed files with 359 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -90,7 +91,7 @@ public final class InternalResourceGroupManager<C>
private static final int REFRESH_EXECUTOR_POOL_SIZE = 2;

private final ScheduledExecutorService refreshExecutor = newScheduledThreadPool(REFRESH_EXECUTOR_POOL_SIZE, daemonThreadsNamed("resource-group-manager-refresher-%d-" + REFRESH_EXECUTOR_POOL_SIZE));
private final PeriodicTaskExecutor resourceGroupRuntimeExecutor;
private final Optional<PeriodicTaskExecutor> resourceGroupRuntimeExecutor;
private final List<RootInternalResourceGroup> rootGroups = new CopyOnWriteArrayList<>();
private final ConcurrentMap<ResourceGroupId, InternalResourceGroup> groups = new ConcurrentHashMap<>();
private final AtomicReference<ResourceGroupConfigurationManager<C>> configurationManager;
Expand Down Expand Up @@ -122,7 +123,8 @@ public InternalResourceGroupManager(
MBeanExporter exporter,
ResourceGroupService resourceGroupService,
ServerConfig serverConfig,
InternalNodeManager nodeManager)
InternalNodeManager nodeManager,
PeriodicTaskExecutorFactory periodicTaskExecutorFactory)
{
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.exporter = requireNonNull(exporter, "exporter is null");
Expand All @@ -135,9 +137,15 @@ public InternalResourceGroupManager(
this.concurrencyThreshold = queryManagerConfig.getConcurrencyThresholdToEnableResourceGroupRefresh();
this.resourceGroupRuntimeInfoRefreshInterval = queryManagerConfig.getResourceGroupRunTimeInfoRefreshInterval();
this.isResourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
this.isConfigurationManagerLoaded = new AtomicBoolean(false);
if (isResourceManagerEnabled) {
this.resourceGroupRuntimeExecutor = Optional.of(requireNonNull(periodicTaskExecutorFactory, "periodicTaskExecutorFactory is null")
.createPeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), this::refreshResourceGroupRuntimeInfo));
}
else {
this.resourceGroupRuntimeExecutor = Optional.empty();
}
}

@Override
Expand Down Expand Up @@ -256,7 +264,7 @@ public ResourceGroupConfigurationManager<C> getConfigurationManager()
public void destroy()
{
refreshExecutor.shutdownNow();
resourceGroupRuntimeExecutor.stop();
resourceGroupRuntimeExecutor.ifPresent(PeriodicTaskExecutor::stop);
}

@PostConstruct
Expand All @@ -273,9 +281,7 @@ public void start()
throw t;
}
}, 1, 1, MILLISECONDS);
if (isResourceManagerEnabled) {
resourceGroupRuntimeExecutor.start();
}
resourceGroupRuntimeExecutor.ifPresent(PeriodicTaskExecutor::start);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForPeriodicTaskExecutor
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import io.airlift.units.Duration;

import javax.annotation.PostConstruct;
Expand All @@ -34,7 +35,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -48,7 +48,7 @@ public class ResourceManagerClusterStatusSender
private final InternalNodeManager internalNodeManager;
private final ResourceGroupManager<?> resourceGroupManager;
private final Supplier<NodeStatus> statusSupplier;
private final ScheduledExecutorService executor;
private final PeriodicTaskExecutorFactory periodicTaskExecutorFactory;
private final Duration queryHeartbeatInterval;

private final Map<QueryId, PeriodicTaskExecutor> queries = new ConcurrentHashMap<>();
Expand All @@ -61,7 +61,7 @@ public ResourceManagerClusterStatusSender(
@ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
InternalNodeManager internalNodeManager,
StatusResource statusResource,
@ForResourceManager ScheduledExecutorService executor,
PeriodicTaskExecutorFactory periodicTaskExecutorFactory,
ResourceManagerConfig resourceManagerConfig,
ServerConfig serverConfig,
ResourceGroupManager<?> resourceGroupManager)
Expand All @@ -70,7 +70,7 @@ public ResourceManagerClusterStatusSender(
resourceManagerClient,
internalNodeManager,
requireNonNull(statusResource, "statusResource is null")::getStatus,
executor,
periodicTaskExecutorFactory,
resourceManagerConfig,
serverConfig,
resourceGroupManager);
Expand All @@ -80,19 +80,22 @@ public ResourceManagerClusterStatusSender(
DriftClient<ResourceManagerClient> resourceManagerClient,
InternalNodeManager internalNodeManager,
Supplier<NodeStatus> statusResource,
ScheduledExecutorService executor,
PeriodicTaskExecutorFactory periodicTaskExecutorFactory,
ResourceManagerConfig resourceManagerConfig,
ServerConfig serverConfig,
ResourceGroupManager<?> resourceGroupManager)
{
this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.statusSupplier = requireNonNull(statusResource, "statusResource is null");
this.executor = requireNonNull(executor, "executor is null");
this.periodicTaskExecutorFactory = requireNonNull(periodicTaskExecutorFactory, "periodicTaskExecutorFactory is null");
this.queryHeartbeatInterval = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getQueryHeartbeatInterval();
this.nodeHeartbeatSender = new PeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), executor, this::sendNodeHeartbeat);
this.nodeHeartbeatSender = periodicTaskExecutorFactory.createPeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), this::sendNodeHeartbeat);
this.resourceRuntimeHeartbeatSender = serverConfig.isCoordinator() ? Optional.of(
new PeriodicTaskExecutor(resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(), executor, this::sendResourceGroupRuntimeHeartbeat)) : Optional.empty();
periodicTaskExecutorFactory.createPeriodicTaskExecutor(
resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(),
this::sendResourceGroupRuntimeHeartbeat))
: Optional.empty();
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
}

Expand Down Expand Up @@ -123,9 +126,8 @@ public void registerQuery(ManagedQueryExecution queryExecution)
QueryId queryId = queryExecution.getBasicQueryInfo().getQueryId();
queries.computeIfAbsent(queryId, unused -> {
AtomicLong sequenceId = new AtomicLong();
PeriodicTaskExecutor taskExecutor = new PeriodicTaskExecutor(
PeriodicTaskExecutor taskExecutor = periodicTaskExecutorFactory.createPeriodicTaskExecutor(
queryHeartbeatInterval.toMillis(),
executor,
() -> sendQueryHeartbeat(queryExecution, sequenceId.incrementAndGet()));
taskExecutor.start();
return taskExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import com.facebook.presto.resourcemanager.ClusterMemoryManagerService;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.ClusterStatusSender;
import com.facebook.presto.resourcemanager.ForPeriodicTaskExecutor;
import com.facebook.presto.resourcemanager.ForResourceManager;
import com.facebook.presto.resourcemanager.NoopResourceGroupService;
import com.facebook.presto.resourcemanager.RaftConfig;
Expand Down Expand Up @@ -222,6 +223,9 @@
import com.facebook.presto.type.TypeDeserializer;
import com.facebook.presto.util.FinalizerService;
import com.facebook.presto.util.GcStatusMonitor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import com.facebook.presto.util.SimplePeriodicTaskExecutorFactory;
import com.facebook.presto.util.ThrowingPeriodicTaskExecutorFactory;
import com.facebook.presto.version.EmbedVersion;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -457,6 +461,7 @@ public void configure(Binder moduleBinder)
jaxrsBinder(moduleBinder).bind(ResourceManagerHeartbeatResource.class);
}
moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON);
binder.bind(PeriodicTaskExecutorFactory.class).to(SimplePeriodicTaskExecutorFactory.class).in(Scopes.SINGLETON);
if (serverConfig.isCoordinator()) {
moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON);
moduleBinder.bind(ClusterQueryTrackerService.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -486,10 +491,28 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
daemonThreadsNamed("resource-manager-executor-%s"));
return listeningDecorator(executor);
}

@Provides
@Singleton
@ForPeriodicTaskExecutor
public Optional<ScheduledExecutorService> scheduledExecutorService(ResourceManagerConfig config)
{
return Optional.of(createConcurrentScheduledExecutor("resource-manager-heartbeats", config.getHeartbeatConcurrency(), config.getHeartbeatThreads()));
}

@Provides
@Singleton
@ForPeriodicTaskExecutor
public ExecutorService executorService(ResourceManagerConfig config, @ForPeriodicTaskExecutor Optional<ScheduledExecutorService> scheduledExecutorService)
{
checkArgument(scheduledExecutorService.isPresent());
return scheduledExecutorService.get();
}
},
moduleBinder -> {
moduleBinder.bind(ClusterStatusSender.class).toInstance(execution -> {});
moduleBinder.bind(ResourceGroupService.class).to(NoopResourceGroupService.class).in(Scopes.SINGLETON);
moduleBinder.bind(PeriodicTaskExecutorFactory.class).to(ThrowingPeriodicTaskExecutorFactory.class);
}));

FeaturesConfig featuresConfig = buildConfigObject(FeaturesConfig.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.testing;

import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class TestingPeriodicTaskExecutorFactory
implements PeriodicTaskExecutorFactory
{
private final List<PeriodicTaskExecutor> periodicTaskExecutors = new ArrayList<>();

@Override
public PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, Runnable runnable)
{
PeriodicTaskExecutor periodicTaskExecutor = new PeriodicTaskExecutor(0, 0, MoreExecutors.newDirectExecutorService(), Optional.empty(), runnable, i -> i);
periodicTaskExecutors.add(periodicTaskExecutor);
return periodicTaskExecutor;
}

public void tick()
{
periodicTaskExecutors.forEach(PeriodicTaskExecutor::tick);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class ConfidenceBasedNodeTtlFetcherManager
private final CounterStat refreshFailures = new CounterStat();
private final Consumer<AllNodes> nodeChangeListener = this::refreshTtlInfo;
private PeriodicTaskExecutor periodicTtlRefresher;
private ScheduledExecutorService scheduledExecutor = newSingleThreadScheduledExecutor(threadsNamed("refresh-node-ttl-executor-%s"));

@Inject
public ConfidenceBasedNodeTtlFetcherManager(InternalNodeManager nodeManager, NodeSchedulerConfig schedulerConfig, NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig)
Expand All @@ -98,7 +100,8 @@ public void scheduleRefresh()
periodicTtlRefresher = new PeriodicTaskExecutor(
ttlFetcher.get().getRefreshInterval().toMillis(),
nodeTtlFetcherManagerConfig.getInitialDelayBeforeRefresh().toMillis(),
newSingleThreadScheduledExecutor(threadsNamed("refresh-node-ttl-executor-%s")),
scheduledExecutor,
Optional.of(scheduledExecutor),
this::refreshTtlInfo,
ConfidenceBasedNodeTtlFetcherManager::jitterForPeriodicRefresh);
periodicTtlRefresher.start();
Expand All @@ -107,6 +110,7 @@ public void scheduleRefresh()
@PreDestroy
public void stop()
{
scheduledExecutor.shutdownNow();
nodeManager.removeNodeChangeListener(nodeChangeListener);
if (periodicTtlRefresher != null) {
periodicTtlRefresher.stop();
Expand Down
Loading

0 comments on commit 15e474c

Please sign in to comment.