From 92878c3ab8c104f748868aa71f01689559d3d3c3 Mon Sep 17 00:00:00 2001 From: Maxim Kuvyrkov Date: Wed, 14 Jun 2023 09:26:23 +0000 Subject: [PATCH] Match concurrency to available CPU bandwidth This change allows ninja to throttle number of parallel tasks based on feedback from Linux kernel's PSI (Pressure Stall Information) interfacts. It extends "-l" parameter to accept negative values; "-l-NN" means that ninja should limit concurrency when processes in current cgroup spend more than NN% of their time stalled on CPU. E.g., running "ninja -j100 -l-10" on a 32-core machine will quickly settle on parallelism of 32-34. This option is designed to make ninja use all CPU bandwidth available to a cgroup-based container, while not starting excessive number of processes, which could eat up all RAM. The motivation for this feature is to automatically reduce parallelism when the system is about to run out of RAM. If the system has swap enabled, "ninja -l-10" will dance with parallelism on the edge of just using a bit of swap. As soon as a process starts swapping, CPU "stalled" cycles increase, and parallelism is reduced. The same argument works when a process is waiting for its turn to use IO and/or network. --- src/build.cc | 42 +++++++++++++++++++--- src/debug_flags.cc | 2 ++ src/debug_flags.h | 2 ++ src/ninja.cc | 14 ++++++-- src/util.cc | 90 ++++++++++++++++++++++++++++++++++++++++++++++ src/util.h | 4 +++ 6 files changed, 148 insertions(+), 6 deletions(-) diff --git a/src/build.cc b/src/build.cc index 76ff93af03..b0c2e28dbb 100644 --- a/src/build.cc +++ b/src/build.cc @@ -475,10 +475,44 @@ void RealCommandRunner::Abort() { bool RealCommandRunner::CanRunMore() const { size_t subproc_number = - subprocs_.running_.size() + subprocs_.finished_.size(); - return (int)subproc_number < config_.parallelism - && ((subprocs_.running_.empty() || config_.max_load_average <= 0.0f) - || GetLoadAverage() < config_.max_load_average); + subprocs_.running_.size() + subprocs_.finished_.size(); + + if ((int)subproc_number >= config_.parallelism) + return false; + + if (subprocs_.running_.empty()) + return true; + + if (config_.max_load_average > 0.0f) { + double loadavg = GetLoadAverage(); + + if (loadavg < config_.max_load_average) + return true; + + if (g_syslimits) + fprintf (stderr, "\nninja syslimits: loadavg %.0f >= %.0f\n", + loadavg, config_.max_load_average); + + return false; + } else if (config_.max_load_average < -0.1f) { + double wait_ratio = GetCPUWaitRatio(subproc_number, config_.parallelism); + + if (wait_ratio < -0.1f) { + fprintf (stderr, "\nninja syslimits: system does not support PSI\n"); + return false; + } + + if (wait_ratio < -config_.max_load_average) + return true; + + if (g_syslimits) + fprintf (stderr, + "\nninja syslimits: wait_ratio %.0f >= %.0f; subprocs: %zu\n", + wait_ratio, -config_.max_load_average, subproc_number); + + return false; + } else + return true; } bool RealCommandRunner::StartCommand(Edge* edge) { diff --git a/src/debug_flags.cc b/src/debug_flags.cc index 44b14c483b..9a178e6540 100644 --- a/src/debug_flags.cc +++ b/src/debug_flags.cc @@ -14,6 +14,8 @@ bool g_explaining = false; +bool g_syslimits = false; + bool g_keep_depfile = false; bool g_keep_rsp = false; diff --git a/src/debug_flags.h b/src/debug_flags.h index e08a43b438..43199d988f 100644 --- a/src/debug_flags.h +++ b/src/debug_flags.h @@ -30,4 +30,6 @@ extern bool g_keep_rsp; extern bool g_experimental_statcache; +extern bool g_syslimits; + #endif // NINJA_EXPLAIN_H_ diff --git a/src/ninja.cc b/src/ninja.cc index 887d89f8d8..b752d869bb 100644 --- a/src/ninja.cc +++ b/src/ninja.cc @@ -228,7 +228,13 @@ void Usage(const BuildConfig& config) { "\n" " -j N run N jobs in parallel (0 means infinity) [default=%d on this system]\n" " -k N keep going until N jobs fail (0 means infinity) [default=1]\n" -" -l N do not start new jobs if the load average is greater than N\n" +" -l N do not start new jobs if system load is greater than N;\n" +" if N is positive,\n" +" then compare against system load average (absolute value);\n" +" if N is negative,\n" +" then compare against process stalled time (percentage);\n" +" e.g., -l-10 will not start new jobs if existing processes\n" +" spend, on average, 10%% of their time waiting for CPU slice;\n" " -n dry run (don't run commands but act like they succeeded)\n" "\n" " -d MODE enable debugging (use '-d list' to list modes)\n" @@ -1161,6 +1167,7 @@ bool DebugEnable(const string& name) { #ifdef _WIN32 " nostatcache don't batch stat() calls per directory and cache them\n" #endif +" syslimits print notes when parallelism is limited by system pressure\n" "multiple modes can be enabled via -d FOO -d BAR\n"); return false; } else if (name == "stats") { @@ -1178,11 +1185,14 @@ bool DebugEnable(const string& name) { } else if (name == "nostatcache") { g_experimental_statcache = false; return true; + } else if (name == "syslimits") { + g_syslimits = true; + return true; } else { const char* suggestion = SpellcheckString(name.c_str(), "stats", "explain", "keepdepfile", "keeprsp", - "nostatcache", NULL); + "nostatcache", "syslimits", NULL); if (suggestion) { Error("unknown debug setting '%s', did you mean '%s'?", name.c_str(), suggestion); diff --git a/src/util.cc b/src/util.cc index eefa3f50cd..900245850a 100644 --- a/src/util.cc +++ b/src/util.cc @@ -49,6 +49,7 @@ #include #elif defined(linux) || defined(__GLIBC__) #include +#include #include #include #include "string_piece_util.h" @@ -59,6 +60,7 @@ #endif #include "edit_distance.h" +#include "metrics.h" using namespace std; @@ -835,6 +837,94 @@ double GetLoadAverage() { } #endif // _WIN32 +double GetCPUWaitRatio(size_t subproc_number, int parallelism) { +#if defined(linux) || defined(__GLIBC__) + static double oncpu_ratio = 100.0f; + static uint64_t prev_stalled(0); + static int64_t prev_timestamp(0); + + // We use kernel's PSI infrastructure to calculate amount of time + // we are waiting for CPU. It would be great to just use 10-second + // average (avg10 below), but, unfortunately, that's too "slow" + // an average to provide satisfactory results. Using avg10 we will + // oscillate too far into overloading and underloading the system. + // Instead, we use raw total stalled count and divide it by time + // elapsed since previous measurement. + // + // The "total" units are microseconds, but documentation does not say + // whether it's cumulative across all CPUs or not. Apparently, it's + // not cumulative. IIUC, on an 8-core system if we have 6 processes + // running at 100% and another 2 stalled at 100% -- then every second + // the "total" stalled count will be increased by 1000000 [microseconds]. + // The count will be increased by the same 1000000 [microseconds] if all + // 8 processes are 100% stalled. + + ifstream cpupressure("/sys/fs/cgroup/cpu.pressure", ifstream::in); + string token; + uint64_t stalled(0); + bool psi_ok(false); + while (cpupressure >> token) { + // Extract "total" from + // some avg10=0.01 avg60=4.76 avg300=6.17 total=11527181835 + if (token == "some") { + cpupressure >> token; // avg10= + cpupressure >> token; // avg60= + cpupressure >> token; // avg300= + cpupressure >> token; // total= + + // Parse total=NUM + token = token.substr(token.find("=") + 1); + stalled = (uint64_t) strtoull(token.c_str(), NULL, 10); + psi_ok = true; + break; + } + } + + if (! psi_ok) + // Unsupported. + return -1.0f; + + // We could use micro-second HighResTimer(), if we wanted to, + // but milliseconds provide good-enough granularity. + int64_t timestamp = GetTimeMillis(); + + if (prev_timestamp == 0) { + prev_timestamp = timestamp; + prev_stalled = stalled; + return 0.0f; + } + + uint64_t stalled_ticks = stalled - prev_stalled; + uint64_t clock_ticks = 1000 * (timestamp - prev_timestamp); + + if (stalled_ticks < clock_ticks) { + // Clock advanced, so update oncpu_ratio with latest measurements. + // Pass new measurements through a simple noise filter. + oncpu_ratio *= ((double) subproc_number + / (subproc_number + 1)); + oncpu_ratio += ((100.0f * (clock_ticks - stalled_ticks) / clock_ticks) + / (subproc_number + 1)); + + if (0 < stalled_ticks) { + // Again, to reduce noise in oncpu_ratio we update prev_* values only + // we get a new "stalled" reading. + prev_timestamp = timestamp; + prev_stalled = stalled; + } + } else { + // Clock didn't advance, this usually happens during initial + // startup, when we start config_.parallelism tasks in rapid + // succession. Slightly reduce oncpu_ratio to throttle startup + // of new processes until we get an updated measurement. + oncpu_ratio *= (double) parallelism / (parallelism + 1); + } + + return 100.0f - oncpu_ratio; +#else + return -1.0f; +#endif +} + string ElideMiddle(const string& str, size_t width) { switch (width) { case 0: return ""; diff --git a/src/util.h b/src/util.h index 4a7fea2258..f9d730cd6b 100644 --- a/src/util.h +++ b/src/util.h @@ -103,6 +103,10 @@ int GetProcessorCount(); /// on error. double GetLoadAverage(); +/// @return percentage of time tasks are waiting for CPU. +/// A negative value is returned for unsupported platforms. +double GetCPUWaitRatio(size_t subproc_number, int parallelism); + /// Elide the given string @a str with '...' in the middle if the length /// exceeds @a width. std::string ElideMiddle(const std::string& str, size_t width);