Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TestingPeriodicTaskExecutorFactory to force synchronization in unit tests #19675

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import com.facebook.presto.spi.resourceGroups.SelectionContext;
import com.facebook.presto.spi.resourceGroups.SelectionCriteria;
import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand Down Expand Up @@ -90,7 +91,7 @@ public final class InternalResourceGroupManager<C>
private static final int REFRESH_EXECUTOR_POOL_SIZE = 2;

private final ScheduledExecutorService refreshExecutor = newScheduledThreadPool(REFRESH_EXECUTOR_POOL_SIZE, daemonThreadsNamed("resource-group-manager-refresher-%d-" + REFRESH_EXECUTOR_POOL_SIZE));
private final PeriodicTaskExecutor resourceGroupRuntimeExecutor;
private final Optional<PeriodicTaskExecutor> resourceGroupRuntimeExecutor;
private final List<RootInternalResourceGroup> rootGroups = new CopyOnWriteArrayList<>();
private final ConcurrentMap<ResourceGroupId, InternalResourceGroup> groups = new ConcurrentHashMap<>();
private final AtomicReference<ResourceGroupConfigurationManager<C>> configurationManager;
Expand Down Expand Up @@ -122,7 +123,8 @@ public InternalResourceGroupManager(
MBeanExporter exporter,
ResourceGroupService resourceGroupService,
ServerConfig serverConfig,
InternalNodeManager nodeManager)
InternalNodeManager nodeManager,
PeriodicTaskExecutorFactory periodicTaskExecutorFactory)
{
this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null");
this.exporter = requireNonNull(exporter, "exporter is null");
Expand All @@ -135,9 +137,15 @@ public InternalResourceGroupManager(
this.concurrencyThreshold = queryManagerConfig.getConcurrencyThresholdToEnableResourceGroupRefresh();
this.resourceGroupRuntimeInfoRefreshInterval = queryManagerConfig.getResourceGroupRunTimeInfoRefreshInterval();
this.isResourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled();
this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo);
configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory());
this.isConfigurationManagerLoaded = new AtomicBoolean(false);
if (isResourceManagerEnabled) {
this.resourceGroupRuntimeExecutor = Optional.of(requireNonNull(periodicTaskExecutorFactory, "periodicTaskExecutorFactory is null")
.createPeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), this::refreshResourceGroupRuntimeInfo));
}
else {
this.resourceGroupRuntimeExecutor = Optional.empty();
}
}

@Override
Expand Down Expand Up @@ -256,7 +264,7 @@ public ResourceGroupConfigurationManager<C> getConfigurationManager()
public void destroy()
{
refreshExecutor.shutdownNow();
resourceGroupRuntimeExecutor.stop();
resourceGroupRuntimeExecutor.ifPresent(PeriodicTaskExecutor::stop);
}

@PostConstruct
Expand All @@ -273,9 +281,7 @@ public void start()
throw t;
}
}, 1, 1, MILLISECONDS);
if (isResourceManagerEnabled) {
resourceGroupRuntimeExecutor.start();
}
resourceGroupRuntimeExecutor.ifPresent(PeriodicTaskExecutor::start);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.resourcemanager;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForPeriodicTaskExecutor
{
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.facebook.presto.spi.HostAddress;
import com.facebook.presto.spi.QueryId;
import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import io.airlift.units.Duration;

import javax.annotation.PostConstruct;
Expand All @@ -34,7 +35,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Supplier;

Expand All @@ -48,7 +48,7 @@ public class ResourceManagerClusterStatusSender
private final InternalNodeManager internalNodeManager;
private final ResourceGroupManager<?> resourceGroupManager;
private final Supplier<NodeStatus> statusSupplier;
private final ScheduledExecutorService executor;
private final PeriodicTaskExecutorFactory periodicTaskExecutorFactory;
private final Duration queryHeartbeatInterval;

private final Map<QueryId, PeriodicTaskExecutor> queries = new ConcurrentHashMap<>();
Expand All @@ -61,7 +61,7 @@ public ResourceManagerClusterStatusSender(
@ForResourceManager DriftClient<ResourceManagerClient> resourceManagerClient,
InternalNodeManager internalNodeManager,
StatusResource statusResource,
@ForResourceManager ScheduledExecutorService executor,
PeriodicTaskExecutorFactory periodicTaskExecutorFactory,
ResourceManagerConfig resourceManagerConfig,
ServerConfig serverConfig,
ResourceGroupManager<?> resourceGroupManager)
Expand All @@ -70,7 +70,7 @@ public ResourceManagerClusterStatusSender(
resourceManagerClient,
internalNodeManager,
requireNonNull(statusResource, "statusResource is null")::getStatus,
executor,
periodicTaskExecutorFactory,
resourceManagerConfig,
serverConfig,
resourceGroupManager);
Expand All @@ -80,19 +80,22 @@ public ResourceManagerClusterStatusSender(
DriftClient<ResourceManagerClient> resourceManagerClient,
InternalNodeManager internalNodeManager,
Supplier<NodeStatus> statusResource,
ScheduledExecutorService executor,
PeriodicTaskExecutorFactory periodicTaskExecutorFactory,
ResourceManagerConfig resourceManagerConfig,
ServerConfig serverConfig,
ResourceGroupManager<?> resourceGroupManager)
{
this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.statusSupplier = requireNonNull(statusResource, "statusResource is null");
this.executor = requireNonNull(executor, "executor is null");
this.periodicTaskExecutorFactory = requireNonNull(periodicTaskExecutorFactory, "periodicTaskExecutorFactory is null");
this.queryHeartbeatInterval = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getQueryHeartbeatInterval();
this.nodeHeartbeatSender = new PeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), executor, this::sendNodeHeartbeat);
this.nodeHeartbeatSender = periodicTaskExecutorFactory.createPeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), this::sendNodeHeartbeat);
this.resourceRuntimeHeartbeatSender = serverConfig.isCoordinator() ? Optional.of(
new PeriodicTaskExecutor(resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(), executor, this::sendResourceGroupRuntimeHeartbeat)) : Optional.empty();
periodicTaskExecutorFactory.createPeriodicTaskExecutor(
resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(),
this::sendResourceGroupRuntimeHeartbeat))
: Optional.empty();
this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null");
}

Expand Down Expand Up @@ -123,9 +126,8 @@ public void registerQuery(ManagedQueryExecution queryExecution)
QueryId queryId = queryExecution.getBasicQueryInfo().getQueryId();
queries.computeIfAbsent(queryId, unused -> {
AtomicLong sequenceId = new AtomicLong();
PeriodicTaskExecutor taskExecutor = new PeriodicTaskExecutor(
PeriodicTaskExecutor taskExecutor = periodicTaskExecutorFactory.createPeriodicTaskExecutor(
queryHeartbeatInterval.toMillis(),
executor,
() -> sendQueryHeartbeat(queryExecution, sequenceId.incrementAndGet()));
taskExecutor.start();
return taskExecutor;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@
import com.facebook.presto.resourcemanager.ClusterMemoryManagerService;
import com.facebook.presto.resourcemanager.ClusterQueryTrackerService;
import com.facebook.presto.resourcemanager.ClusterStatusSender;
import com.facebook.presto.resourcemanager.ForPeriodicTaskExecutor;
import com.facebook.presto.resourcemanager.ForResourceManager;
import com.facebook.presto.resourcemanager.NoopResourceGroupService;
import com.facebook.presto.resourcemanager.RaftConfig;
Expand Down Expand Up @@ -222,6 +223,9 @@
import com.facebook.presto.type.TypeDeserializer;
import com.facebook.presto.util.FinalizerService;
import com.facebook.presto.util.GcStatusMonitor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import com.facebook.presto.util.SimplePeriodicTaskExecutorFactory;
import com.facebook.presto.util.ThrowingPeriodicTaskExecutorFactory;
import com.facebook.presto.version.EmbedVersion;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableList;
Expand Down Expand Up @@ -457,6 +461,7 @@ public void configure(Binder moduleBinder)
jaxrsBinder(moduleBinder).bind(ResourceManagerHeartbeatResource.class);
}
moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON);
binder.bind(PeriodicTaskExecutorFactory.class).to(SimplePeriodicTaskExecutorFactory.class).in(Scopes.SINGLETON);
if (serverConfig.isCoordinator()) {
moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON);
moduleBinder.bind(ClusterQueryTrackerService.class).in(Scopes.SINGLETON);
Expand Down Expand Up @@ -486,10 +491,28 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon
daemonThreadsNamed("resource-manager-executor-%s"));
return listeningDecorator(executor);
}

@Provides
@Singleton
@ForPeriodicTaskExecutor
public Optional<ScheduledExecutorService> scheduledExecutorService(ResourceManagerConfig config)
{
return Optional.of(createConcurrentScheduledExecutor("resource-manager-heartbeats", config.getHeartbeatConcurrency(), config.getHeartbeatThreads()));
}

@Provides
@Singleton
@ForPeriodicTaskExecutor
public ExecutorService executorService(ResourceManagerConfig config, @ForPeriodicTaskExecutor Optional<ScheduledExecutorService> scheduledExecutorService)
{
checkArgument(scheduledExecutorService.isPresent());
return scheduledExecutorService.get();
}
},
moduleBinder -> {
moduleBinder.bind(ClusterStatusSender.class).toInstance(execution -> {});
moduleBinder.bind(ResourceGroupService.class).to(NoopResourceGroupService.class).in(Scopes.SINGLETON);
moduleBinder.bind(PeriodicTaskExecutorFactory.class).to(ThrowingPeriodicTaskExecutorFactory.class);
}));

FeaturesConfig featuresConfig = buildConfigObject(FeaturesConfig.class);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.testing;

import com.facebook.presto.util.PeriodicTaskExecutor;
import com.facebook.presto.util.PeriodicTaskExecutorFactory;
import com.google.common.util.concurrent.MoreExecutors;

import java.util.ArrayList;
import java.util.List;
import java.util.Optional;

public class TestingPeriodicTaskExecutorFactory
implements PeriodicTaskExecutorFactory
{
private final List<PeriodicTaskExecutor> periodicTaskExecutors = new ArrayList<>();

@Override
public PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, Runnable runnable)
{
PeriodicTaskExecutor periodicTaskExecutor = new PeriodicTaskExecutor(0, 0, MoreExecutors.newDirectExecutorService(), Optional.empty(), runnable, i -> i);
periodicTaskExecutors.add(periodicTaskExecutor);
return periodicTaskExecutor;
}

public void tick()
{
periodicTaskExecutors.forEach(PeriodicTaskExecutor::tick);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -78,6 +79,7 @@ public class ConfidenceBasedNodeTtlFetcherManager
private final CounterStat refreshFailures = new CounterStat();
private final Consumer<AllNodes> nodeChangeListener = this::refreshTtlInfo;
private PeriodicTaskExecutor periodicTtlRefresher;
private ScheduledExecutorService scheduledExecutor = newSingleThreadScheduledExecutor(threadsNamed("refresh-node-ttl-executor-%s"));

@Inject
public ConfidenceBasedNodeTtlFetcherManager(InternalNodeManager nodeManager, NodeSchedulerConfig schedulerConfig, NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig)
Expand All @@ -98,7 +100,8 @@ public void scheduleRefresh()
periodicTtlRefresher = new PeriodicTaskExecutor(
ttlFetcher.get().getRefreshInterval().toMillis(),
nodeTtlFetcherManagerConfig.getInitialDelayBeforeRefresh().toMillis(),
newSingleThreadScheduledExecutor(threadsNamed("refresh-node-ttl-executor-%s")),
scheduledExecutor,
Optional.of(scheduledExecutor),
this::refreshTtlInfo,
ConfidenceBasedNodeTtlFetcherManager::jitterForPeriodicRefresh);
periodicTtlRefresher.start();
Expand All @@ -107,6 +110,7 @@ public void scheduleRefresh()
@PreDestroy
public void stop()
{
scheduledExecutor.shutdownNow();
nodeManager.removeNodeChangeListener(nodeChangeListener);
if (periodicTtlRefresher != null) {
periodicTtlRefresher.stop();
Expand Down
Loading
Loading