From 15e474c913459e0316e11af80c0453b25da8c79f Mon Sep 17 00:00:00 2001 From: Tim Meehan Date: Tue, 16 May 2023 16:46:55 -0400 Subject: [PATCH] Refactor PeriodicTaskExecutor and add PeriodicTaskExecutorFactory --- .../InternalResourceGroupManager.java | 20 +++-- .../ForPeriodicTaskExecutor.java | 31 ++++++++ .../ResourceManagerClusterStatusSender.java | 22 +++--- .../presto/server/ServerMainModule.java | 23 ++++++ .../TestingPeriodicTaskExecutorFactory.java | 41 ++++++++++ .../ConfidenceBasedNodeTtlFetcherManager.java | 6 +- .../presto/util/PeriodicTaskExecutor.java | 57 +++++++++----- .../util/PeriodicTaskExecutorFactory.java | 24 ++++++ .../SimplePeriodicTaskExecutorFactory.java | 75 +++++++++++++++++++ .../ThrowingPeriodicTaskExecutorFactory.java | 24 ++++++ .../TestInternalResourceGroupManager.java | 5 +- ...estResourceManagerClusterStatusSender.java | 39 ++++------ .../presto/util/TestPeriodicTaskExecutor.java | 38 +++++----- presto-native-execution/velox | 2 +- .../presto/spark/PrestoSparkModule.java | 3 + .../TestDistributedTaskInfoResource.java | 23 ++++-- .../presto/tests/BlackHoleQueryRunner.java | 16 +++- 17 files changed, 359 insertions(+), 90 deletions(-) create mode 100644 presto-main/src/main/java/com/facebook/presto/resourcemanager/ForPeriodicTaskExecutor.java create mode 100644 presto-main/src/main/java/com/facebook/presto/testing/TestingPeriodicTaskExecutorFactory.java create mode 100644 presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutorFactory.java create mode 100644 presto-main/src/main/java/com/facebook/presto/util/SimplePeriodicTaskExecutorFactory.java create mode 100644 presto-main/src/main/java/com/facebook/presto/util/ThrowingPeriodicTaskExecutorFactory.java diff --git a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java index 96b7951e365c..8c0c31766b61 100644 --- a/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java +++ b/presto-main/src/main/java/com/facebook/presto/execution/resourceGroups/InternalResourceGroupManager.java @@ -32,6 +32,7 @@ import com.facebook.presto.spi.resourceGroups.SelectionContext; import com.facebook.presto.spi.resourceGroups.SelectionCriteria; import com.facebook.presto.util.PeriodicTaskExecutor; +import com.facebook.presto.util.PeriodicTaskExecutorFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -90,7 +91,7 @@ public final class InternalResourceGroupManager private static final int REFRESH_EXECUTOR_POOL_SIZE = 2; private final ScheduledExecutorService refreshExecutor = newScheduledThreadPool(REFRESH_EXECUTOR_POOL_SIZE, daemonThreadsNamed("resource-group-manager-refresher-%d-" + REFRESH_EXECUTOR_POOL_SIZE)); - private final PeriodicTaskExecutor resourceGroupRuntimeExecutor; + private final Optional resourceGroupRuntimeExecutor; private final List rootGroups = new CopyOnWriteArrayList<>(); private final ConcurrentMap groups = new ConcurrentHashMap<>(); private final AtomicReference> configurationManager; @@ -122,7 +123,8 @@ public InternalResourceGroupManager( MBeanExporter exporter, ResourceGroupService resourceGroupService, ServerConfig serverConfig, - InternalNodeManager nodeManager) + InternalNodeManager nodeManager, + PeriodicTaskExecutorFactory periodicTaskExecutorFactory) { this.queryManagerConfig = requireNonNull(queryManagerConfig, "queryManagerConfig is null"); this.exporter = requireNonNull(exporter, "exporter is null"); @@ -135,9 +137,15 @@ public InternalResourceGroupManager( this.concurrencyThreshold = queryManagerConfig.getConcurrencyThresholdToEnableResourceGroupRefresh(); this.resourceGroupRuntimeInfoRefreshInterval = queryManagerConfig.getResourceGroupRunTimeInfoRefreshInterval(); this.isResourceManagerEnabled = requireNonNull(serverConfig, "serverConfig is null").isResourceManagerEnabled(); - this.resourceGroupRuntimeExecutor = new PeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), refreshExecutor, this::refreshResourceGroupRuntimeInfo); configurationManagerFactories.putIfAbsent(LegacyResourceGroupConfigurationManager.NAME, new LegacyResourceGroupConfigurationManager.Factory()); this.isConfigurationManagerLoaded = new AtomicBoolean(false); + if (isResourceManagerEnabled) { + this.resourceGroupRuntimeExecutor = Optional.of(requireNonNull(periodicTaskExecutorFactory, "periodicTaskExecutorFactory is null") + .createPeriodicTaskExecutor(resourceGroupRuntimeInfoRefreshInterval.toMillis(), this::refreshResourceGroupRuntimeInfo)); + } + else { + this.resourceGroupRuntimeExecutor = Optional.empty(); + } } @Override @@ -256,7 +264,7 @@ public ResourceGroupConfigurationManager getConfigurationManager() public void destroy() { refreshExecutor.shutdownNow(); - resourceGroupRuntimeExecutor.stop(); + resourceGroupRuntimeExecutor.ifPresent(PeriodicTaskExecutor::stop); } @PostConstruct @@ -273,9 +281,7 @@ public void start() throw t; } }, 1, 1, MILLISECONDS); - if (isResourceManagerEnabled) { - resourceGroupRuntimeExecutor.start(); - } + resourceGroupRuntimeExecutor.ifPresent(PeriodicTaskExecutor::start); } } diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ForPeriodicTaskExecutor.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ForPeriodicTaskExecutor.java new file mode 100644 index 000000000000..187312c28e62 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ForPeriodicTaskExecutor.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.resourcemanager; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForPeriodicTaskExecutor +{ +} diff --git a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java index 08b7ab06d82d..2e672a3f6edc 100644 --- a/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java +++ b/presto-main/src/main/java/com/facebook/presto/resourcemanager/ResourceManagerClusterStatusSender.java @@ -24,6 +24,7 @@ import com.facebook.presto.spi.HostAddress; import com.facebook.presto.spi.QueryId; import com.facebook.presto.util.PeriodicTaskExecutor; +import com.facebook.presto.util.PeriodicTaskExecutorFactory; import io.airlift.units.Duration; import javax.annotation.PostConstruct; @@ -34,7 +35,6 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Supplier; @@ -48,7 +48,7 @@ public class ResourceManagerClusterStatusSender private final InternalNodeManager internalNodeManager; private final ResourceGroupManager resourceGroupManager; private final Supplier statusSupplier; - private final ScheduledExecutorService executor; + private final PeriodicTaskExecutorFactory periodicTaskExecutorFactory; private final Duration queryHeartbeatInterval; private final Map queries = new ConcurrentHashMap<>(); @@ -61,7 +61,7 @@ public ResourceManagerClusterStatusSender( @ForResourceManager DriftClient resourceManagerClient, InternalNodeManager internalNodeManager, StatusResource statusResource, - @ForResourceManager ScheduledExecutorService executor, + PeriodicTaskExecutorFactory periodicTaskExecutorFactory, ResourceManagerConfig resourceManagerConfig, ServerConfig serverConfig, ResourceGroupManager resourceGroupManager) @@ -70,7 +70,7 @@ public ResourceManagerClusterStatusSender( resourceManagerClient, internalNodeManager, requireNonNull(statusResource, "statusResource is null")::getStatus, - executor, + periodicTaskExecutorFactory, resourceManagerConfig, serverConfig, resourceGroupManager); @@ -80,7 +80,7 @@ public ResourceManagerClusterStatusSender( DriftClient resourceManagerClient, InternalNodeManager internalNodeManager, Supplier statusResource, - ScheduledExecutorService executor, + PeriodicTaskExecutorFactory periodicTaskExecutorFactory, ResourceManagerConfig resourceManagerConfig, ServerConfig serverConfig, ResourceGroupManager resourceGroupManager) @@ -88,11 +88,14 @@ public ResourceManagerClusterStatusSender( this.resourceManagerClient = requireNonNull(resourceManagerClient, "resourceManagerService is null"); this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null"); this.statusSupplier = requireNonNull(statusResource, "statusResource is null"); - this.executor = requireNonNull(executor, "executor is null"); + this.periodicTaskExecutorFactory = requireNonNull(periodicTaskExecutorFactory, "periodicTaskExecutorFactory is null"); this.queryHeartbeatInterval = requireNonNull(resourceManagerConfig, "resourceManagerConfig is null").getQueryHeartbeatInterval(); - this.nodeHeartbeatSender = new PeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), executor, this::sendNodeHeartbeat); + this.nodeHeartbeatSender = periodicTaskExecutorFactory.createPeriodicTaskExecutor(resourceManagerConfig.getNodeHeartbeatInterval().toMillis(), this::sendNodeHeartbeat); this.resourceRuntimeHeartbeatSender = serverConfig.isCoordinator() ? Optional.of( - new PeriodicTaskExecutor(resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(), executor, this::sendResourceGroupRuntimeHeartbeat)) : Optional.empty(); + periodicTaskExecutorFactory.createPeriodicTaskExecutor( + resourceManagerConfig.getResourceGroupRuntimeHeartbeatInterval().toMillis(), + this::sendResourceGroupRuntimeHeartbeat)) + : Optional.empty(); this.resourceGroupManager = requireNonNull(resourceGroupManager, "resourceGroupManager is null"); } @@ -123,9 +126,8 @@ public void registerQuery(ManagedQueryExecution queryExecution) QueryId queryId = queryExecution.getBasicQueryInfo().getQueryId(); queries.computeIfAbsent(queryId, unused -> { AtomicLong sequenceId = new AtomicLong(); - PeriodicTaskExecutor taskExecutor = new PeriodicTaskExecutor( + PeriodicTaskExecutor taskExecutor = periodicTaskExecutorFactory.createPeriodicTaskExecutor( queryHeartbeatInterval.toMillis(), - executor, () -> sendQueryHeartbeat(queryExecution, sequenceId.incrementAndGet())); taskExecutor.start(); return taskExecutor; diff --git a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java index 808b17e0dbc8..f354b5220d1a 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java +++ b/presto-main/src/main/java/com/facebook/presto/server/ServerMainModule.java @@ -124,6 +124,7 @@ import com.facebook.presto.resourcemanager.ClusterMemoryManagerService; import com.facebook.presto.resourcemanager.ClusterQueryTrackerService; import com.facebook.presto.resourcemanager.ClusterStatusSender; +import com.facebook.presto.resourcemanager.ForPeriodicTaskExecutor; import com.facebook.presto.resourcemanager.ForResourceManager; import com.facebook.presto.resourcemanager.NoopResourceGroupService; import com.facebook.presto.resourcemanager.RaftConfig; @@ -222,6 +223,9 @@ import com.facebook.presto.type.TypeDeserializer; import com.facebook.presto.util.FinalizerService; import com.facebook.presto.util.GcStatusMonitor; +import com.facebook.presto.util.PeriodicTaskExecutorFactory; +import com.facebook.presto.util.SimplePeriodicTaskExecutorFactory; +import com.facebook.presto.util.ThrowingPeriodicTaskExecutorFactory; import com.facebook.presto.version.EmbedVersion; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableList; @@ -457,6 +461,7 @@ public void configure(Binder moduleBinder) jaxrsBinder(moduleBinder).bind(ResourceManagerHeartbeatResource.class); } moduleBinder.bind(ClusterStatusSender.class).to(ResourceManagerClusterStatusSender.class).in(Scopes.SINGLETON); + binder.bind(PeriodicTaskExecutorFactory.class).to(SimplePeriodicTaskExecutorFactory.class).in(Scopes.SINGLETON); if (serverConfig.isCoordinator()) { moduleBinder.bind(ClusterMemoryManagerService.class).in(Scopes.SINGLETON); moduleBinder.bind(ClusterQueryTrackerService.class).in(Scopes.SINGLETON); @@ -486,10 +491,28 @@ public ListeningExecutorService createResourceManagerExecutor(ResourceManagerCon daemonThreadsNamed("resource-manager-executor-%s")); return listeningDecorator(executor); } + + @Provides + @Singleton + @ForPeriodicTaskExecutor + public Optional scheduledExecutorService(ResourceManagerConfig config) + { + return Optional.of(createConcurrentScheduledExecutor("resource-manager-heartbeats", config.getHeartbeatConcurrency(), config.getHeartbeatThreads())); + } + + @Provides + @Singleton + @ForPeriodicTaskExecutor + public ExecutorService executorService(ResourceManagerConfig config, @ForPeriodicTaskExecutor Optional scheduledExecutorService) + { + checkArgument(scheduledExecutorService.isPresent()); + return scheduledExecutorService.get(); + } }, moduleBinder -> { moduleBinder.bind(ClusterStatusSender.class).toInstance(execution -> {}); moduleBinder.bind(ResourceGroupService.class).to(NoopResourceGroupService.class).in(Scopes.SINGLETON); + moduleBinder.bind(PeriodicTaskExecutorFactory.class).to(ThrowingPeriodicTaskExecutorFactory.class); })); FeaturesConfig featuresConfig = buildConfigObject(FeaturesConfig.class); diff --git a/presto-main/src/main/java/com/facebook/presto/testing/TestingPeriodicTaskExecutorFactory.java b/presto-main/src/main/java/com/facebook/presto/testing/TestingPeriodicTaskExecutorFactory.java new file mode 100644 index 000000000000..f4104dde9908 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/testing/TestingPeriodicTaskExecutorFactory.java @@ -0,0 +1,41 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.testing; + +import com.facebook.presto.util.PeriodicTaskExecutor; +import com.facebook.presto.util.PeriodicTaskExecutorFactory; +import com.google.common.util.concurrent.MoreExecutors; + +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; + +public class TestingPeriodicTaskExecutorFactory + implements PeriodicTaskExecutorFactory +{ + private final List periodicTaskExecutors = new ArrayList<>(); + + @Override + public PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, Runnable runnable) + { + PeriodicTaskExecutor periodicTaskExecutor = new PeriodicTaskExecutor(0, 0, MoreExecutors.newDirectExecutorService(), Optional.empty(), runnable, i -> i); + periodicTaskExecutors.add(periodicTaskExecutor); + return periodicTaskExecutor; + } + + public void tick() + { + periodicTaskExecutors.forEach(PeriodicTaskExecutor::tick); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/ttl/nodettlfetchermanagers/ConfidenceBasedNodeTtlFetcherManager.java b/presto-main/src/main/java/com/facebook/presto/ttl/nodettlfetchermanagers/ConfidenceBasedNodeTtlFetcherManager.java index a716fc45095e..e7731d47ef56 100644 --- a/presto-main/src/main/java/com/facebook/presto/ttl/nodettlfetchermanagers/ConfidenceBasedNodeTtlFetcherManager.java +++ b/presto-main/src/main/java/com/facebook/presto/ttl/nodettlfetchermanagers/ConfidenceBasedNodeTtlFetcherManager.java @@ -43,6 +43,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; @@ -78,6 +79,7 @@ public class ConfidenceBasedNodeTtlFetcherManager private final CounterStat refreshFailures = new CounterStat(); private final Consumer nodeChangeListener = this::refreshTtlInfo; private PeriodicTaskExecutor periodicTtlRefresher; + private ScheduledExecutorService scheduledExecutor = newSingleThreadScheduledExecutor(threadsNamed("refresh-node-ttl-executor-%s")); @Inject public ConfidenceBasedNodeTtlFetcherManager(InternalNodeManager nodeManager, NodeSchedulerConfig schedulerConfig, NodeTtlFetcherManagerConfig nodeTtlFetcherManagerConfig) @@ -98,7 +100,8 @@ public void scheduleRefresh() periodicTtlRefresher = new PeriodicTaskExecutor( ttlFetcher.get().getRefreshInterval().toMillis(), nodeTtlFetcherManagerConfig.getInitialDelayBeforeRefresh().toMillis(), - newSingleThreadScheduledExecutor(threadsNamed("refresh-node-ttl-executor-%s")), + scheduledExecutor, + Optional.of(scheduledExecutor), this::refreshTtlInfo, ConfidenceBasedNodeTtlFetcherManager::jitterForPeriodicRefresh); periodicTtlRefresher.start(); @@ -107,6 +110,7 @@ public void scheduleRefresh() @PreDestroy public void stop() { + scheduledExecutor.shutdownNow(); nodeManager.removeNodeChangeListener(nodeChangeListener); if (periodicTtlRefresher != null) { periodicTtlRefresher.stop(); diff --git a/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutor.java b/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutor.java index 4be58fcf9393..a7c795ddbd5d 100644 --- a/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutor.java +++ b/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutor.java @@ -13,6 +13,8 @@ */ package com.facebook.presto.util; +import java.util.Optional; +import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadLocalRandom; @@ -28,7 +30,8 @@ public class PeriodicTaskExecutor implements AutoCloseable { private final long delayTargetMillis; - private final ScheduledExecutorService executor; + private final ExecutorService executor; + private final Optional scheduledExecutor; private final Runnable runnable; private final LongUnaryOperator nextDelayFunction; private final AtomicBoolean started = new AtomicBoolean(); @@ -37,25 +40,28 @@ public class PeriodicTaskExecutor private volatile ScheduledFuture scheduledFuture; private volatile boolean stopped; - public PeriodicTaskExecutor(long delayTargetMillis, ScheduledExecutorService executor, Runnable runnable) + public PeriodicTaskExecutor(long delayTargetMillis, ScheduledExecutorService scheduledExecutor, Runnable runnable) { - this(delayTargetMillis, 0, executor, runnable, PeriodicTaskExecutor::nextDelayWithJitterMillis); + this(delayTargetMillis, 0, scheduledExecutor, Optional.of(scheduledExecutor), runnable, PeriodicTaskExecutor::nextDelayWithJitterMillis); } - public PeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, ScheduledExecutorService executor, Runnable runnable) + public PeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, ExecutorService executor, Optional scheduledExecutor, Runnable runnable, LongUnaryOperator nextDelayFunction) { - this(delayTargetMillis, initDelayMillis, executor, runnable, PeriodicTaskExecutor::nextDelayWithJitterMillis); - } - - public PeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, ScheduledExecutorService executor, Runnable runnable, LongUnaryOperator nextDelayFunction) - { - checkArgument(delayTargetMillis > 0, "delayTargetMillis must be > 0"); - checkArgument(initDelayMillis >= 0, "initDelayMillis must be > 0"); + checkArgument(delayTargetMillis >= 0, "delayTargetMillis must be >= 0"); + checkArgument(initDelayMillis >= 0, "initDelayMillis must be >= 0"); this.delayTargetMillis = delayTargetMillis; this.executor = requireNonNull(executor, "executor is null"); + this.scheduledExecutor = requireNonNull(scheduledExecutor, "scheduledExecutor is null"); this.runnable = requireNonNull(runnable, "runnable is null"); this.nextDelayFunction = requireNonNull(nextDelayFunction, "nextDelayFunction is null"); this.delayMillis = initDelayMillis; + + if (delayTargetMillis == 0) { + checkArgument(!scheduledExecutor.isPresent(), "scheduledExecutor must not be present when delayTargetMillis is 0"); + } + else { + checkArgument(scheduledExecutor.isPresent(), "scheduledExecutor must be present when delayTargetMillis is not 0"); + } } public void start() @@ -65,30 +71,45 @@ public void start() } } - private void tick() + public void tick() { - scheduledFuture = executor.schedule(this::run, delayMillis, MILLISECONDS); + if (!stopped) { + if (scheduledExecutor.isPresent()) { + scheduledFuture = scheduledExecutor.get().schedule(this::run, delayMillis, MILLISECONDS); + } + else { + forceRun(); + } + } } private void run() { forceRun(); delayMillis = nextDelayFunction.applyAsLong(delayTargetMillis); - if (!stopped) { - tick(); - } + tick(); } public void forceRun() { - executor.execute(runnable); + executor.execute(() -> { + try { + runnable.run(); + } + catch (Throwable t) { + t.printStackTrace(); + } + }); } public void stop() { if (started.get()) { stopped = true; - scheduledFuture.cancel(false); + + if (scheduledFuture != null) { + scheduledFuture.cancel(false); + } } } diff --git a/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutorFactory.java b/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutorFactory.java new file mode 100644 index 000000000000..6c173435f7f6 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/util/PeriodicTaskExecutorFactory.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.util; + +public interface PeriodicTaskExecutorFactory +{ + default PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, Runnable runnable) + { + return createPeriodicTaskExecutor(delayTargetMillis, 0, runnable); + } + + PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, Runnable runnable); +} diff --git a/presto-main/src/main/java/com/facebook/presto/util/SimplePeriodicTaskExecutorFactory.java b/presto-main/src/main/java/com/facebook/presto/util/SimplePeriodicTaskExecutorFactory.java new file mode 100644 index 000000000000..c0943dd8cc0b --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/util/SimplePeriodicTaskExecutorFactory.java @@ -0,0 +1,75 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.util; + +import com.facebook.presto.resourcemanager.ForPeriodicTaskExecutor; + +import javax.inject.Inject; + +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadLocalRandom; +import java.util.function.LongUnaryOperator; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.Math.round; +import static java.util.Objects.requireNonNull; + +public class SimplePeriodicTaskExecutorFactory + implements AutoCloseable, PeriodicTaskExecutorFactory +{ + private final ExecutorService executor; + private final Optional scheduledExecutor; + private final LongUnaryOperator nextDelayFunction; + + @Inject + public SimplePeriodicTaskExecutorFactory(@ForPeriodicTaskExecutor ExecutorService executor, @ForPeriodicTaskExecutor Optional scheduledExecutor) + { + this(executor, scheduledExecutor, SimplePeriodicTaskExecutorFactory::nextDelayWithJitterMillis); + } + + public SimplePeriodicTaskExecutorFactory(ExecutorService executor, Optional scheduledExecutor, LongUnaryOperator nextDelayFunction) + { + this.executor = requireNonNull(executor, "executor is null"); + this.scheduledExecutor = requireNonNull(scheduledExecutor, "scheduledExecutor is null"); + this.nextDelayFunction = requireNonNull(nextDelayFunction, "nextDelayFunction is null"); + } + + @Override + public PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, Runnable runnable) + { + if (delayTargetMillis == 0) { + checkArgument(!scheduledExecutor.isPresent(), "scheduledExecutor must not be present when delayTargetMillis is 0"); + } + else { + checkArgument(scheduledExecutor.isPresent(), "scheduledExecutor must be present when delayTargetMillis is not 0"); + } + return new PeriodicTaskExecutor(delayTargetMillis, initDelayMillis, executor, scheduledExecutor, runnable, nextDelayFunction); + } + + private static long nextDelayWithJitterMillis(long delayTargetMillis) + { + double minSleepTimeMillis = delayTargetMillis / 2.0; + return round((minSleepTimeMillis + ThreadLocalRandom.current().nextDouble() * delayTargetMillis)); + } + + @Override + public void close() + throws Exception + { + executor.shutdownNow(); + scheduledExecutor.ifPresent(ExecutorService::shutdownNow); + } +} diff --git a/presto-main/src/main/java/com/facebook/presto/util/ThrowingPeriodicTaskExecutorFactory.java b/presto-main/src/main/java/com/facebook/presto/util/ThrowingPeriodicTaskExecutorFactory.java new file mode 100644 index 000000000000..780e6a7a6b87 --- /dev/null +++ b/presto-main/src/main/java/com/facebook/presto/util/ThrowingPeriodicTaskExecutorFactory.java @@ -0,0 +1,24 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.util; + +public class ThrowingPeriodicTaskExecutorFactory + implements PeriodicTaskExecutorFactory +{ + @Override + public PeriodicTaskExecutor createPeriodicTaskExecutor(long delayTargetMillis, long initDelayMillis, Runnable runnable) + { + throw new UnsupportedOperationException(); + } +} diff --git a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java index bc4cf9910353..5c63c511c7a2 100644 --- a/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java +++ b/presto-main/src/test/java/com/facebook/presto/execution/resourceGroups/TestInternalResourceGroupManager.java @@ -22,6 +22,7 @@ import com.facebook.presto.spi.PrestoException; import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.facebook.presto.spi.resourceGroups.SelectionContext; +import com.facebook.presto.testing.TestingPeriodicTaskExecutorFactory; import com.google.common.collect.ImmutableMap; import org.testng.annotations.Test; import org.weakref.jmx.MBeanExporter; @@ -33,7 +34,7 @@ public class TestInternalResourceGroupManager public void testQueryFailsWithInitializingConfigurationManager() { InternalResourceGroupManager> internalResourceGroupManager = new InternalResourceGroupManager<>((poolId, listener) -> {}, - new QueryManagerConfig(), new NodeInfo("test"), new MBeanExporter(new TestingMBeanServer()), () -> null, new ServerConfig(), new InMemoryNodeManager()); + new QueryManagerConfig(), new NodeInfo("test"), new MBeanExporter(new TestingMBeanServer()), () -> null, new ServerConfig(), new InMemoryNodeManager(), new TestingPeriodicTaskExecutorFactory()); internalResourceGroupManager.submit(new MockManagedQueryExecution(0), new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), command -> {}); } @@ -42,7 +43,7 @@ public void testQuerySucceedsWhenConfigurationManagerLoaded() throws Exception { InternalResourceGroupManager> internalResourceGroupManager = new InternalResourceGroupManager<>((poolId, listener) -> {}, - new QueryManagerConfig(), new NodeInfo("test"), new MBeanExporter(new TestingMBeanServer()), () -> null, new ServerConfig(), new InMemoryNodeManager()); + new QueryManagerConfig(), new NodeInfo("test"), new MBeanExporter(new TestingMBeanServer()), () -> null, new ServerConfig(), new InMemoryNodeManager(), new TestingPeriodicTaskExecutorFactory()); internalResourceGroupManager.loadConfigurationManager(); internalResourceGroupManager.submit(new MockManagedQueryExecution(0), new SelectionContext<>(new ResourceGroupId("global"), ImmutableMap.of()), command -> {}); } diff --git a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java index a3381a2a28ed..967fde3a91f5 100644 --- a/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java +++ b/presto-main/src/test/java/com/facebook/presto/resourcemanager/TestResourceManagerClusterStatusSender.java @@ -22,6 +22,7 @@ import com.facebook.presto.server.NodeStatus; import com.facebook.presto.server.ServerConfig; import com.facebook.presto.spi.ConnectorId; +import com.facebook.presto.testing.TestingPeriodicTaskExecutorFactory; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.airlift.units.Duration; @@ -33,11 +34,9 @@ import java.util.OptionalInt; import static io.airlift.units.DataSize.Unit.MEGABYTE; -import static java.lang.String.format; -import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; -import static org.testng.Assert.assertTrue; +import static org.testng.Assert.assertEquals; public class TestResourceManagerClusterStatusSender { @@ -57,12 +56,10 @@ public class TestResourceManagerClusterStatusSender 1, 2, 3); - private static final int HEARTBEAT_INTERVAL = 100; - private static final int SLEEP_DURATION = 1000; - private static final int TARGET_HEARTBEATS = SLEEP_DURATION / HEARTBEAT_INTERVAL; - + private static final int HEARTBEAT_INTERVAL = 0; private ResourceManagerClusterStatusSender sender; private TestingResourceManagerClient resourceManagerClient; + private TestingPeriodicTaskExecutorFactory periodicTaskExecutorFactory; @BeforeTest public void setup() @@ -80,12 +77,12 @@ public void setup() true, false, false)); - + periodicTaskExecutorFactory = new TestingPeriodicTaskExecutorFactory(); sender = new ResourceManagerClusterStatusSender( (addressSelectionContext, headers) -> resourceManagerClient, nodeManager, () -> NODE_STATUS, - newSingleThreadScheduledExecutor(), + periodicTaskExecutorFactory, new ResourceManagerConfig() .setNodeHeartbeatInterval(new Duration(HEARTBEAT_INTERVAL, MILLISECONDS)) .setQueryHeartbeatInterval(new Duration(HEARTBEAT_INTERVAL, MILLISECONDS)), @@ -101,15 +98,12 @@ public void tearDown() @Test(timeOut = 2_000) public void testNodeStatus() - throws Exception { sender.init(); - Thread.sleep(SLEEP_DURATION); + assertEquals(resourceManagerClient.getNodeHeartbeats(), 1); - int nodeHeartbeats = resourceManagerClient.getNodeHeartbeats(); - assertTrue(nodeHeartbeats > TARGET_HEARTBEATS * 0.5 && nodeHeartbeats <= TARGET_HEARTBEATS * 1.5, - format("Expect number of heartbeats to fall within target range (%s), +/- 50%%. Was: %s", TARGET_HEARTBEATS, nodeHeartbeats)); + periodicTaskExecutorFactory.tick(); } @Test(timeOut = 6_000) @@ -119,18 +113,15 @@ public void testQueryHeartbeat() MockManagedQueryExecution queryExecution = new MockManagedQueryExecution(1); sender.registerQuery(queryExecution); - Thread.sleep(SLEEP_DURATION); - - int queryHeartbeats = resourceManagerClient.getQueryHeartbeats(); - assertTrue(queryHeartbeats > TARGET_HEARTBEATS * 0.5 && queryHeartbeats <= TARGET_HEARTBEATS * 1.5, - format("Expect number of heartbeats to fall within target range (%s), +/- 50%%. Was: %s", TARGET_HEARTBEATS, queryHeartbeats)); + assertEquals(resourceManagerClient.getQueryHeartbeats(), 1); + periodicTaskExecutorFactory.tick(); + assertEquals(resourceManagerClient.getQueryHeartbeats(), 2); // Completing the query stops the heartbeats queryExecution.complete(); - queryHeartbeats = resourceManagerClient.getQueryHeartbeats(); - - Thread.sleep(SLEEP_DURATION); - - assertTrue(resourceManagerClient.getQueryHeartbeats() <= (queryHeartbeats + 1)); + periodicTaskExecutorFactory.tick(); + assertEquals(resourceManagerClient.getQueryHeartbeats(), 3); + periodicTaskExecutorFactory.tick(); + assertEquals(resourceManagerClient.getQueryHeartbeats(), 3); } } diff --git a/presto-main/src/test/java/com/facebook/presto/util/TestPeriodicTaskExecutor.java b/presto-main/src/test/java/com/facebook/presto/util/TestPeriodicTaskExecutor.java index 7499952ff9f1..562cd43e53d8 100644 --- a/presto-main/src/test/java/com/facebook/presto/util/TestPeriodicTaskExecutor.java +++ b/presto-main/src/test/java/com/facebook/presto/util/TestPeriodicTaskExecutor.java @@ -13,27 +13,26 @@ */ package com.facebook.presto.util; -import com.google.common.base.Stopwatch; +import com.google.common.util.concurrent.MoreExecutors; import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Executors; -import java.util.concurrent.ScheduledExecutorService; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicInteger; -import static java.util.concurrent.TimeUnit.SECONDS; import static org.testng.Assert.assertEquals; @Test(singleThreaded = true) public class TestPeriodicTaskExecutor { - private ScheduledExecutorService executorService; + private ExecutorService executorService; @BeforeMethod public void setUp() { - executorService = Executors.newSingleThreadScheduledExecutor(); + executorService = MoreExecutors.newDirectExecutorService(); } @AfterMethod @@ -42,24 +41,25 @@ public void tearDown() executorService.shutdownNow(); } - @Test(enabled = false) + @Test public void testTick() throws Exception { - int numberOfTicks = 2; - long durationBetweenTicksInSeconds = 2; + AtomicInteger counter = new AtomicInteger(); + Runnable runnable = counter::incrementAndGet; - CountDownLatch latch = new CountDownLatch(3); - Runnable runnable = latch::countDown; - - try (PeriodicTaskExecutor executor = new PeriodicTaskExecutor(SECONDS.toMillis(durationBetweenTicksInSeconds), 500, executorService, runnable, i -> i)) { + PeriodicTaskExecutor executor = new PeriodicTaskExecutor(0, 0, executorService, Optional.empty(), runnable, i -> i); + try { executor.start(); - Stopwatch stopwatch = Stopwatch.createStarted(); - latch.await(10, SECONDS); - stopwatch.stop(); + executor.tick(); + executor.tick(); - assertEquals((numberOfTicks * durationBetweenTicksInSeconds), stopwatch.elapsed(SECONDS)); - assertEquals(latch.getCount(), 0); // latch was counted down 3 times + assertEquals(counter.get(), 3); // counter was incremented 3 times + } + finally { + executor.close(); } + executor.tick(); + assertEquals(counter.get(), 3); // no-op after executor is closed } } diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 7f3588ef388d..867aefe47bdf 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 7f3588ef388d08f332d9aecee1d99f1fe22ea24e +Subproject commit 867aefe47bdf9f8594e251eb8ff8bdfed357de77 diff --git a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java index c26cba7e264f..b2637a933513 100644 --- a/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java +++ b/presto-spark-base/src/main/java/com/facebook/presto/spark/PrestoSparkModule.java @@ -204,7 +204,9 @@ import com.facebook.presto.ttl.nodettlfetchermanagers.NodeTtlFetcherManager; import com.facebook.presto.ttl.nodettlfetchermanagers.ThrowingNodeTtlFetcherManager; import com.facebook.presto.type.TypeDeserializer; +import com.facebook.presto.util.PeriodicTaskExecutorFactory; import com.facebook.presto.util.PrestoDataDefBindingHelper; +import com.facebook.presto.util.ThrowingPeriodicTaskExecutorFactory; import com.facebook.presto.version.EmbedVersion; import com.google.inject.Binder; import com.google.inject.Provides; @@ -517,6 +519,7 @@ protected void setup(Binder binder) binder.bind(NodeTtlFetcherManager.class).to(ThrowingNodeTtlFetcherManager.class).in(Scopes.SINGLETON); binder.bind(ClusterTtlProviderManager.class).to(ThrowingClusterTtlProviderManager.class).in(Scopes.SINGLETON); binder.bind(NodeStatusNotificationManager.class).in(Scopes.SINGLETON); + binder.bind(PeriodicTaskExecutorFactory.class).to(ThrowingPeriodicTaskExecutorFactory.class); // TODO: Decouple and remove: required by SessionPropertyDefaults, PluginManager, InternalResourceGroupManager, ConnectorManager configBinder(binder).bindConfig(NodeConfig.class); diff --git a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java index c4998106fd97..d96294676926 100644 --- a/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java +++ b/presto-tests/src/test/java/com/facebook/presto/resourcemanager/TestDistributedTaskInfoResource.java @@ -25,20 +25,21 @@ import com.facebook.presto.spi.resourceGroups.ResourceGroupId; import com.facebook.presto.tests.DistributedQueryRunner; import com.google.common.collect.ImmutableMap; +import org.intellij.lang.annotations.Language; import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import java.util.Map; import java.util.Optional; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeoutException; import static com.facebook.airlift.json.JsonCodec.jsonCodec; import static com.facebook.airlift.testing.Closeables.closeQuietly; -import static com.facebook.presto.tests.tpch.TpchQueryRunner.createQueryRunner; +import static com.facebook.presto.tests.BlackHoleQueryRunner.createQueryRunner; import static com.facebook.presto.utils.QueryExecutionClientUtil.getResponseEntity; -import static com.facebook.presto.utils.QueryExecutionClientUtil.runToCompletion; -import static com.facebook.presto.utils.QueryExecutionClientUtil.runToFirstResult; import static com.facebook.presto.utils.ResourceUtils.getResourceFilePath; import static com.google.common.base.Preconditions.checkState; import static java.lang.String.format; @@ -56,34 +57,42 @@ public class TestDistributedTaskInfoResource private TestingPrestoServer coordinator1; private TestingPrestoServer coordinator2; private TestingPrestoServer resourceManager; + private DistributedQueryRunner runner; + private ScheduledExecutorService executor; @BeforeClass public void setup() throws Exception { client = new JettyHttpClient(); - DistributedQueryRunner runner = createQueryRunner(ImmutableMap.of("query.client.timeout", "20s"), COORDINATOR_COUNT); + runner = createQueryRunner(ImmutableMap.of("query.client.timeout", "20s"), COORDINATOR_COUNT); coordinator1 = runner.getCoordinator(0); coordinator2 = runner.getCoordinator(1); Optional resourceManager = runner.getResourceManager(); checkState(resourceManager.isPresent(), "resource manager not present"); this.resourceManager = resourceManager.get(); + executor = Executors.newSingleThreadScheduledExecutor(); coordinator1.getResourceGroupManager().get().addConfigurationManagerFactory(new FileResourceGroupConfigurationManagerFactory()); coordinator1.getResourceGroupManager().get() .forceSetConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); coordinator2.getResourceGroupManager().get().addConfigurationManagerFactory(new FileResourceGroupConfigurationManagerFactory()); coordinator2.getResourceGroupManager().get() .forceSetConfigurationManager("file", ImmutableMap.of("resource-groups.config-file", getResourceFilePath("resource_groups_config_simple.json"))); + + @Language("SQL") String createTableSql = + "CREATE TABLE blackhole.default.test_table (x bigint) WITH (split_count = 1, pages_per_split = 1, rows_per_page = 1, page_processing_delay = '1h')"; + runner.execute(0, createTableSql); + runner.execute(1, createTableSql); } - @Test(timeOut = 220_000) + @Test(timeOut = 220_000_000) public void testDistributedGetTaskInfo() throws Exception { sleep(SECONDS.toMillis(5)); waitUntilCoordinatorsDiscoveredHealthyInRM(SECONDS.toMillis(15)); - runToCompletion(client, coordinator1, "SELECT 1"); - runToFirstResult(client, coordinator1, "SELECT * from tpch.sf101.orders"); + + executor.execute(() -> runner.execute(0, "SELECT * FROM blackhole.default.test_table")); Map resourceGroupRuntimeInfoSnapshot; int globalRunningQueries = 0; diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/BlackHoleQueryRunner.java b/presto-tests/src/test/java/com/facebook/presto/tests/BlackHoleQueryRunner.java index 263c4ee7bf75..5fe45dd118e5 100644 --- a/presto-tests/src/test/java/com/facebook/presto/tests/BlackHoleQueryRunner.java +++ b/presto-tests/src/test/java/com/facebook/presto/tests/BlackHoleQueryRunner.java @@ -37,13 +37,27 @@ public static DistributedQueryRunner createQueryRunner() public static DistributedQueryRunner createQueryRunner(Map extraProperties) throws Exception + { + return createQueryRunner(extraProperties, 1); + } + + public static DistributedQueryRunner createQueryRunner(Map extraProperties, int coordinatorCount) + throws Exception { Session session = testSessionBuilder() .setCatalog("blackhole") .setSchema("default") .build(); - DistributedQueryRunner queryRunner = new DistributedQueryRunner(session, 4, extraProperties); + DistributedQueryRunner.Builder builder = DistributedQueryRunner.builder(session) + .setNodeCount(4) + .setCoordinatorCount(coordinatorCount) + .setExtraProperties(extraProperties); + if (coordinatorCount > 1) { + builder = builder.setResourceManagerEnabled(true) + .setResourceManagerCount(1); + } + DistributedQueryRunner queryRunner = builder.build(); try { queryRunner.installPlugin(new BlackHolePlugin());