Skip to content

Commit

Permalink
[native] Report different thread pool executors' stats
Browse files Browse the repository at this point in the history
  • Loading branch information
tanjialiang authored and xiaoxmeng committed Nov 6, 2024
1 parent b036356 commit 2d1be02
Show file tree
Hide file tree
Showing 6 changed files with 214 additions and 123 deletions.
274 changes: 165 additions & 109 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,142 @@ folly::StringPiece getCounterForBlockingReason(
default:
return {};
}
return {};
}

class ThreadPoolExecutorStatsReporter {
public:
ThreadPoolExecutorStatsReporter(
folly::ThreadPoolExecutor* executor,
const std::string& poolName,
uint32_t estimatedMaxNumTasks)
: executor_(executor),
numThreadsMetricName_(
fmt::format(kCounterThreadPoolNumThreadsFormat, poolName)),
numActiveThreadsMetricName_(
fmt::format(kCounterThreadPoolNumActiveThreadsFormat, poolName)),
numPendingTasksMetricName_(
fmt::format(kCounterThreadPoolNumPendingTasksFormat, poolName)),
numTotalTasksMetricName_(
fmt::format(kCounterThreadPoolNumTotalTasksFormat, poolName)),
maxIdleTimeNsMetricName_(
fmt::format(kCounterThreadPoolMaxIdleTimeNsFormat, poolName)) {
VELOX_CHECK_NOT_NULL(executor_);
const auto numThreads = executor_->numThreads();
const auto numHistogramBuckets = 100;
DEFINE_METRIC(numThreadsMetricName_, facebook::velox::StatType::AVG);
DEFINE_HISTOGRAM_METRIC(
numActiveThreadsMetricName_, 1, 0, numThreads, 50, 90, 100);
DEFINE_HISTOGRAM_METRIC(
numPendingTasksMetricName_,
estimatedMaxNumTasks / numHistogramBuckets,
0,
estimatedMaxNumTasks,
50,
90,
100);
DEFINE_HISTOGRAM_METRIC(
numTotalTasksMetricName_,
estimatedMaxNumTasks / numHistogramBuckets,
0,
estimatedMaxNumTasks,
50,
90,
100);
DEFINE_HISTOGRAM_METRIC(
maxIdleTimeNsMetricName_,
10'000'000'000 /* 10s */,
0,
300'000'000'000 /* 300s */,
50,
90,
100);
}

void report() const {
const auto poolStats = executor_->getPoolStats();
RECORD_METRIC_VALUE(numThreadsMetricName_, poolStats.threadCount);
RECORD_HISTOGRAM_METRIC_VALUE(
numActiveThreadsMetricName_, poolStats.activeThreadCount);
RECORD_HISTOGRAM_METRIC_VALUE(
numPendingTasksMetricName_, poolStats.pendingTaskCount);
RECORD_HISTOGRAM_METRIC_VALUE(
numTotalTasksMetricName_, poolStats.totalTaskCount);
RECORD_HISTOGRAM_METRIC_VALUE(
maxIdleTimeNsMetricName_, poolStats.maxIdleTime.count());
}

private:
folly::ThreadPoolExecutor* const executor_{nullptr};

const std::string numThreadsMetricName_;
const std::string numActiveThreadsMetricName_;
const std::string numPendingTasksMetricName_;
const std::string numTotalTasksMetricName_;
const std::string maxIdleTimeNsMetricName_;
};

class HiveConnectorStatsReporter {
public:
explicit HiveConnectorStatsReporter(
std::shared_ptr<velox::connector::hive::HiveConnector> connector)
: connector_(std::move(connector)),
numElementsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumElementsFormat,
connector_->connectorId())),
pinnedSizeMetricName_(fmt::format(
kCounterHiveFileHandleCachePinnedSizeFormat,
connector_->connectorId())),
curSizeMetricName_(fmt::format(
kCounterHiveFileHandleCacheCurSizeFormat,
connector_->connectorId())),
numAccumulativeHitsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeHitsFormat,
connector_->connectorId())),
numAccumulativeLookupsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeLookupsFormat,
connector_->connectorId())),
numHitsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumHitsFormat,
connector_->connectorId())),
numLookupsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumLookupsFormat,
connector_->connectorId())) {
DEFINE_METRIC(numElementsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(pinnedSizeMetricName_, velox::StatType::AVG);
DEFINE_METRIC(curSizeMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numAccumulativeHitsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numAccumulativeLookupsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numHitsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numLookupsMetricName_, velox::StatType::AVG);
}

void report() {
auto stats = connector_->fileHandleCacheStats();
RECORD_METRIC_VALUE(numElementsMetricName_, stats.numElements);
RECORD_METRIC_VALUE(pinnedSizeMetricName_, stats.pinnedSize);
RECORD_METRIC_VALUE(curSizeMetricName_, stats.curSize);
RECORD_METRIC_VALUE(numAccumulativeHitsMetricName_, stats.numHits);
RECORD_METRIC_VALUE(numAccumulativeLookupsMetricName_, stats.numLookups);
RECORD_METRIC_VALUE(numHitsMetricName_, stats.numHits - lastNumHits_);
lastNumHits_ = stats.numHits;
RECORD_METRIC_VALUE(
numLookupsMetricName_, stats.numLookups - lastNumLookups_);
lastNumLookups_ = stats.numLookups;
}

private:
const std::shared_ptr<velox::connector::hive::HiveConnector> connector_;
const std::string numElementsMetricName_;
const std::string pinnedSizeMetricName_;
const std::string curSizeMetricName_;
const std::string numAccumulativeHitsMetricName_;
const std::string numAccumulativeLookupsMetricName_;
const std::string numHitsMetricName_;
const std::string numLookupsMetricName_;
size_t lastNumHits_{0};
size_t lastNumLookups_{0};
};

} // namespace

// Every two seconds we export server counters.
Expand All @@ -92,7 +226,10 @@ static constexpr size_t kHttpClientPeriodGlobalCounters{
PeriodicTaskManager::PeriodicTaskManager(
folly::CPUThreadPoolExecutor* driverCPUExecutor,
folly::CPUThreadPoolExecutor* spillerExecutor,
folly::IOThreadPoolExecutor* httpExecutor,
folly::IOThreadPoolExecutor* httpSrvIoExecutor,
folly::CPUThreadPoolExecutor* httpSrvCpuExecutor,
folly::IOThreadPoolExecutor* exchangeHttpIoExecutor,
folly::CPUThreadPoolExecutor* exchangeHttpCpuExecutor,
TaskManager* taskManager,
const velox::memory::MemoryAllocator* memoryAllocator,
const velox::cache::AsyncDataCache* asyncDataCache,
Expand All @@ -102,7 +239,10 @@ PeriodicTaskManager::PeriodicTaskManager(
PrestoServer* server)
: driverCPUExecutor_(driverCPUExecutor),
spillerExecutor_(spillerExecutor),
httpExecutor_(httpExecutor),
httpSrvIoExecutor_(httpSrvIoExecutor),
httpSrvCpuExecutor_(httpSrvCpuExecutor),
exchangeHttpIoExecutor_(exchangeHttpIoExecutor),
exchangeHttpCpuExecutor_(exchangeHttpCpuExecutor),
taskManager_(taskManager),
memoryAllocator_(memoryAllocator),
asyncDataCache_(asyncDataCache),
Expand All @@ -120,7 +260,7 @@ void PeriodicTaskManager::start() {
velox::startPeriodicStatsReporter(opts);

// If executors are null, don't bother starting this task.
if ((driverCPUExecutor_ != nullptr) || (httpExecutor_ != nullptr)) {
if ((driverCPUExecutor_ != nullptr) || (httpSrvIoExecutor_ != nullptr)) {
addExecutorStatsTask();
}

Expand Down Expand Up @@ -158,47 +298,29 @@ void PeriodicTaskManager::stop() {
repeatedRunner_.stop();
}

void PeriodicTaskManager::updateExecutorStats() {
if (driverCPUExecutor_ != nullptr) {
// Report the current queue size of the thread pool.
RECORD_METRIC_VALUE(
kCounterDriverCPUExecutorQueueSize,
driverCPUExecutor_->getTaskQueueSize());

// Report driver execution latency.
folly::stop_watch<std::chrono::milliseconds> timer;
driverCPUExecutor_->add([timer = timer]() {
RECORD_METRIC_VALUE(
kCounterDriverCPUExecutorLatencyMs, timer.elapsed().count());
});
}

if (spillerExecutor_ != nullptr) {
// Report the current queue size of the spiller thread pool.
RECORD_METRIC_VALUE(
kCounterSpillerExecutorQueueSize, spillerExecutor_->getTaskQueueSize());

// Report spiller execution latency.
folly::stop_watch<std::chrono::milliseconds> timer;
spillerExecutor_->add([timer = timer]() {
RECORD_METRIC_VALUE(
kCounterSpillerExecutorLatencyMs, timer.elapsed().count());
});
}

if (httpExecutor_ != nullptr) {
// Report the latency between scheduling the task and its execution.
folly::stop_watch<std::chrono::milliseconds> timer;
httpExecutor_->add([timer = timer]() {
RECORD_METRIC_VALUE(
kCounterHTTPExecutorLatencyMs, timer.elapsed().count());
});
}
}

void PeriodicTaskManager::addExecutorStatsTask() {
std::vector<ThreadPoolExecutorStatsReporter> reporters;
auto addExecutorFunc = [&](folly::ThreadPoolExecutor* executor,
const std::string& executorName,
uint32_t estimatedMaxNumTasks) {
if (executor != nullptr) {
reporters.push_back(ThreadPoolExecutorStatsReporter(
executor, executorName, estimatedMaxNumTasks));
}
};
addExecutorFunc(driverCPUExecutor_, "driver_cpu_executor", 5'000);
addExecutorFunc(spillerExecutor_, "spiller_executor", 5'000);
addExecutorFunc(httpSrvIoExecutor_, "http_srv_io_executor", 50'000);
addExecutorFunc(httpSrvCpuExecutor_, "http_srv_cpu_executor", 50'000);
addExecutorFunc(exchangeHttpIoExecutor_, "exchange_http_io_executor", 50'000);
addExecutorFunc(
exchangeHttpCpuExecutor_, "exchange_http_cpu_executor", 50'000);
addTask(
[this]() { updateExecutorStats(); },
[this, reporters = std::move(reporters)]() {
for (auto& reporter : reporters) {
reporter.report();
}
},
kTaskPeriodGlobalCounters,
"executor_counters");
}
Expand Down Expand Up @@ -277,72 +399,6 @@ void PeriodicTaskManager::addPrestoExchangeSourceMemoryStatsTask() {
"exchange_source_counters");
}

namespace {

class HiveConnectorStatsReporter {
public:
explicit HiveConnectorStatsReporter(
std::shared_ptr<velox::connector::hive::HiveConnector> connector)
: connector_(std::move(connector)),
numElementsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumElementsFormat,
connector_->connectorId())),
pinnedSizeMetricName_(fmt::format(
kCounterHiveFileHandleCachePinnedSizeFormat,
connector_->connectorId())),
curSizeMetricName_(fmt::format(
kCounterHiveFileHandleCacheCurSizeFormat,
connector_->connectorId())),
numAccumulativeHitsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeHitsFormat,
connector_->connectorId())),
numAccumulativeLookupsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumAccumulativeLookupsFormat,
connector_->connectorId())),
numHitsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumHitsFormat,
connector_->connectorId())),
numLookupsMetricName_(fmt::format(
kCounterHiveFileHandleCacheNumLookupsFormat,
connector_->connectorId())) {
DEFINE_METRIC(numElementsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(pinnedSizeMetricName_, velox::StatType::AVG);
DEFINE_METRIC(curSizeMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numAccumulativeHitsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numAccumulativeLookupsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numHitsMetricName_, velox::StatType::AVG);
DEFINE_METRIC(numLookupsMetricName_, velox::StatType::AVG);
}

void report() {
auto stats = connector_->fileHandleCacheStats();
RECORD_METRIC_VALUE(numElementsMetricName_, stats.numElements);
RECORD_METRIC_VALUE(pinnedSizeMetricName_, stats.pinnedSize);
RECORD_METRIC_VALUE(curSizeMetricName_, stats.curSize);
RECORD_METRIC_VALUE(numAccumulativeHitsMetricName_, stats.numHits);
RECORD_METRIC_VALUE(numAccumulativeLookupsMetricName_, stats.numLookups);
RECORD_METRIC_VALUE(numHitsMetricName_, stats.numHits - lastNumHits_);
lastNumHits_ = stats.numHits;
RECORD_METRIC_VALUE(
numLookupsMetricName_, stats.numLookups - lastNumLookups_);
lastNumLookups_ = stats.numLookups;
}

private:
const std::shared_ptr<velox::connector::hive::HiveConnector> connector_;
const std::string numElementsMetricName_;
const std::string pinnedSizeMetricName_;
const std::string curSizeMetricName_;
const std::string numAccumulativeHitsMetricName_;
const std::string numAccumulativeLookupsMetricName_;
const std::string numHitsMetricName_;
const std::string numLookupsMetricName_;
size_t lastNumHits_{0};
size_t lastNumLookups_{0};
};

} // namespace

void PeriodicTaskManager::addConnectorStatsTask() {
std::vector<HiveConnectorStatsReporter> reporters;
for (const auto& itr : connectors_) {
Expand Down
18 changes: 13 additions & 5 deletions presto-native-execution/presto_cpp/main/PeriodicTaskManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ class PeriodicTaskManager {
explicit PeriodicTaskManager(
folly::CPUThreadPoolExecutor* driverCPUExecutor,
folly::CPUThreadPoolExecutor* spillerExecutor,
folly::IOThreadPoolExecutor* httpExecutor,
folly::IOThreadPoolExecutor* httpSrvIoExecutor,
folly::CPUThreadPoolExecutor* httpSrvCpuExecutor,
folly::IOThreadPoolExecutor* exchangeHttpIoExecutor,
folly::CPUThreadPoolExecutor* exchangeHttpCpuExecutor,
TaskManager* taskManager,
const velox::memory::MemoryAllocator* memoryAllocator,
const velox::cache::AsyncDataCache* asyncDataCache,
Expand Down Expand Up @@ -96,7 +99,6 @@ class PeriodicTaskManager {

private:
void addExecutorStatsTask();
void updateExecutorStats();

void addTaskStatsTask();
void updateTaskStats();
Expand All @@ -123,9 +125,15 @@ class PeriodicTaskManager {
void detachWorker(const char* reason);
void maybeAttachWorker();

folly::CPUThreadPoolExecutor* driverCPUExecutor_;
folly::CPUThreadPoolExecutor* spillerExecutor_;
folly::IOThreadPoolExecutor* httpExecutor_;
folly::CPUThreadPoolExecutor* driverCPUExecutor_{nullptr};
folly::CPUThreadPoolExecutor* spillerExecutor_{nullptr};

folly::IOThreadPoolExecutor* httpSrvIoExecutor_{nullptr};
folly::CPUThreadPoolExecutor* httpSrvCpuExecutor_{nullptr};

folly::IOThreadPoolExecutor* exchangeHttpIoExecutor_{nullptr};
folly::CPUThreadPoolExecutor* exchangeHttpCpuExecutor_{nullptr};

TaskManager* taskManager_;
const velox::memory::MemoryAllocator* memoryAllocator_;
const velox::cache::AsyncDataCache* asyncDataCache_;
Expand Down
Loading

0 comments on commit 2d1be02

Please sign in to comment.