From 3c4547b66603995c15234e02cd550c4464af55fb Mon Sep 17 00:00:00 2001 From: ptaylor Date: Thu, 24 Oct 2024 15:08:05 -0700 Subject: [PATCH 1/6] consistently report job_id and optionally server_id in scheduler and server logs --- src/bin/sccache-dist/build.rs | 189 ++++++++++++++++++-------- src/bin/sccache-dist/build_freebsd.rs | 113 ++++++++++----- src/bin/sccache-dist/main.rs | 94 +++++++++---- src/dist/http.rs | 27 +++- src/dist/mod.rs | 1 + src/server.rs | 19 ++- 6 files changed, 314 insertions(+), 129 deletions(-) diff --git a/src/bin/sccache-dist/build.rs b/src/bin/sccache-dist/build.rs index 815674661..cf6874644 100644 --- a/src/bin/sccache-dist/build.rs +++ b/src/bin/sccache-dist/build.rs @@ -17,8 +17,8 @@ use flate2::read::GzDecoder; use fs_err as fs; use libmount::Overlay; use sccache::dist::{ - BuildResult, BuilderIncoming, CompileCommand, InputsReader, OutputData, ProcessOutput, TcCache, - Toolchain, + BuildResult, BuilderIncoming, CompileCommand, InputsReader, JobId, OutputData, ProcessOutput, + TcCache, Toolchain, }; use sccache::lru_disk_cache::Error as LruError; use std::collections::{hash_map, HashMap}; @@ -167,6 +167,7 @@ impl OverlayBuilder { fn prepare_overlay_dirs( &self, + job_id: JobId, tc: &Toolchain, tccache: &Mutex, ) -> Result { @@ -186,7 +187,11 @@ impl OverlayBuilder { entry.build_count += 1; entry.clone() } else { - trace!("Creating toolchain directory for {}", tc.archive_id); + trace!( + "[prepare_overlay_dirs({})]: Creating toolchain directory for {}", + job_id, + tc.archive_id + ); fs::create_dir(&toolchain_dir)?; let mut tccache = tccache.lock().unwrap(); @@ -203,7 +208,10 @@ impl OverlayBuilder { tar::Archive::new(GzDecoder::new(toolchain_rdr)) .unpack(&toolchain_dir) .or_else(|e| { - warn!("Failed to unpack toolchain: {:?}", e); + warn!( + "[prepare_overlay_dirs({})]: Failed to unpack toolchain: {:?}", + job_id, e + ); fs::remove_dir_all(&toolchain_dir) .context("Failed to remove unpacked toolchain")?; tccache @@ -229,9 +237,9 @@ impl OverlayBuilder { entries.sort_by(|a, b| (a.1).ctime.cmp(&(b.1).ctime)); entries.truncate(entries.len() / 2); for (tc, _) in entries { - warn!("Removing old un-compressed toolchain: {:?}", tc); + warn!("[prepare_overlay_dirs({})]: Removing old un-compressed toolchain: {:?}", job_id, tc); assert!(toolchain_dir_map.remove(tc).is_some()); - fs::remove_dir_all(&self.dir.join("toolchains").join(&tc.archive_id)) + fs::remove_dir_all(self.dir.join("toolchains").join(&tc.archive_id)) .context("Failed to remove old toolchain directory")?; } } @@ -239,7 +247,12 @@ impl OverlayBuilder { } }; - trace!("Creating build directory for {}-{}", tc.archive_id, id); + trace!( + "[prepare_overlay_dirs({})]: Creating build directory for {}-{}", + job_id, + tc.archive_id, + id + ); let build_dir = self .dir .join("builds") @@ -252,15 +265,21 @@ impl OverlayBuilder { } fn perform_build( + job_id: JobId, bubblewrap: &Path, compile_command: CompileCommand, inputs_rdr: InputsReader, output_paths: Vec, overlay: &OverlaySpec, ) -> Result { - trace!("Compile environment: {:?}", compile_command.env_vars); trace!( - "Compile command: {:?} {:?}", + "[perform_build({})]: Compile environment: {:?}", + job_id, + compile_command.env_vars + ); + trace!( + "[perform_build({})]: Compile command: {:?} {:?}", + job_id, compile_command.executable, compile_command.arguments ); @@ -306,7 +325,7 @@ impl OverlayBuilder { .mount() .map_err(|e| anyhow!("Failed to mount overlay FS: {}", e.to_string()))?; - trace!("copying in inputs"); + trace!("[perform_build({})]: copying in inputs", job_id); // Note that we don't unpack directly into the upperdir since there overlayfs has some // special marker files that we don't want to create by accident (or malicious intent) tar::Archive::new(inputs_rdr) @@ -321,7 +340,7 @@ impl OverlayBuilder { } = compile_command; let cwd = Path::new(&cwd); - trace!("creating output directories"); + trace!("[perform_build({})]: creating output directories", job_id); fs::create_dir_all(join_suffix(&target_dir, cwd)) .context("Failed to create cwd")?; for path in output_paths.iter() { @@ -335,7 +354,7 @@ impl OverlayBuilder { .context("Failed to create an output directory")?; } - trace!("performing compile"); + trace!("[perform_build({})]: performing compile", job_id); // Bubblewrap notes: // - We're running as uid 0 (to do the mounts above), and so bubblewrap is run as uid 0 // - There's special handling in bubblewrap to compare uid and euid - of interest to us, @@ -370,7 +389,10 @@ impl OverlayBuilder { for (k, v) in env_vars { if k.contains('=') { - warn!("Skipping environment variable: {:?}", k); + warn!( + "[perform_build({})]: Skipping environment variable: {:?}", + job_id, k + ); continue; } cmd.arg("--setenv").arg(k).arg(v); @@ -381,10 +403,14 @@ impl OverlayBuilder { let compile_output = cmd .output() .context("Failed to retrieve output from compile")?; - trace!("compile_output: {:?}", compile_output); + trace!( + "[perform_build({})]: compile_output: {:?}", + job_id, + compile_output + ); let mut outputs = vec![]; - trace!("retrieving {:?}", output_paths); + trace!("[perform_build({})]: retrieving {:?}", job_id, output_paths); for path in output_paths { let abspath = join_suffix(&target_dir, cwd.join(&path)); // Resolve in case it's relative since we copy it from the root level match fs::File::open(abspath) { @@ -395,7 +421,10 @@ impl OverlayBuilder { } Err(e) => { if e.kind() == io::ErrorKind::NotFound { - debug!("Missing output path {:?}", path) + debug!( + "[perform_build({})]: Missing output path {:?}", + job_id, path + ) } else { return Err( Error::from(e).context("Failed to open output file") @@ -419,7 +448,7 @@ impl OverlayBuilder { // Failing during cleanup is pretty unexpected, but we can still return the successful compile // TODO: if too many of these fail, we should mark this builder as faulty - fn finish_overlay(&self, _tc: &Toolchain, overlay: OverlaySpec) { + fn finish_overlay(&self, job_id: JobId, _tc: &Toolchain, overlay: OverlaySpec) { // TODO: collect toolchain directories let OverlaySpec { @@ -428,7 +457,8 @@ impl OverlayBuilder { } = overlay; if let Err(e) = fs::remove_dir_all(&build_dir) { error!( - "Failed to remove build directory {}: {}", + "[finish_overlay({})]: Failed to remove build directory {}: {}", + job_id, build_dir.display(), e ); @@ -439,21 +469,29 @@ impl OverlayBuilder { impl BuilderIncoming for OverlayBuilder { fn run_build( &self, + job_id: JobId, tc: Toolchain, command: CompileCommand, outputs: Vec, inputs_rdr: InputsReader, tccache: &Mutex, ) -> Result { - debug!("Preparing overlay"); + debug!("[run_build({})]: Preparing overlay", job_id); let overlay = self - .prepare_overlay_dirs(&tc, tccache) + .prepare_overlay_dirs(job_id, &tc, tccache) .context("failed to prepare overlay dirs")?; - debug!("Performing build in {:?}", overlay); - let res = Self::perform_build(&self.bubblewrap, command, inputs_rdr, outputs, &overlay); - debug!("Finishing with overlay"); - self.finish_overlay(&tc, overlay); - debug!("Returning result"); + debug!("[run_build({})]: Performing build in {:?}", job_id, overlay); + let res = Self::perform_build( + job_id, + &self.bubblewrap, + command, + inputs_rdr, + outputs, + &overlay, + ); + debug!("[run_build({})]: Finishing with overlay", job_id); + self.finish_overlay(job_id, &tc, overlay); + debug!("[run_build({})]: Returning result", job_id); res.context("Compilation execution failed") } } @@ -572,7 +610,12 @@ impl DockerBuilder { // If we have a spare running container, claim it and remove it from the available list, // otherwise try and create a new container (possibly creating the Docker image along // the way) - fn get_container(&self, tc: &Toolchain, tccache: &Mutex) -> Result { + fn get_container( + &self, + job_id: JobId, + tc: &Toolchain, + tccache: &Mutex, + ) -> Result { let container = { let mut map = self.container_lists.lock().unwrap(); map.entry(tc.clone()).or_default().pop() @@ -587,8 +630,8 @@ impl DockerBuilder { match map.entry(tc.clone()) { hash_map::Entry::Occupied(e) => e.get().clone(), hash_map::Entry::Vacant(e) => { - info!("Creating Docker image for {:?} (may block requests)", tc); - let image = Self::make_image(tc, tccache)?; + info!("[get_container({})]: Creating Docker image for {:?} (may block requests)", job_id, tc); + let image = Self::make_image(job_id, tc, tccache)?; e.insert(image.clone()); image } @@ -599,7 +642,7 @@ impl DockerBuilder { } } - fn clean_container(&self, cid: &str) -> Result<()> { + fn clean_container(&self, job_id: JobId, cid: &str) -> Result<()> { // Clean up any running processes Command::new("docker") .args(["exec", cid, "/busybox", "kill", "-9", "-1"]) @@ -645,7 +688,10 @@ impl DockerBuilder { .check_run() { // We do a final check anyway, so just continue - warn!("Failed to remove added path in a container: {}", e) + warn!( + "[clean_container({})]: Failed to remove added path in a container: {}", + job_id, e + ) } } @@ -664,15 +710,18 @@ impl DockerBuilder { // Failing during cleanup is pretty unexpected, but we can still return the successful compile // TODO: if too many of these fail, we should mark this builder as faulty - fn finish_container(&self, tc: &Toolchain, cid: String) { + fn finish_container(&self, job_id: JobId, tc: &Toolchain, cid: String) { // TODO: collect images - if let Err(e) = self.clean_container(&cid) { - info!("Failed to clean container {}: {}", cid, e); + if let Err(e) = self.clean_container(job_id, &cid) { + info!( + "[finish_container({})]: Failed to clean container {}: {}", + job_id, cid, e + ); if let Err(e) = docker_rm(&cid) { warn!( - "Failed to remove container {} after failed clean: {}", - cid, e + "[finish_container({})]: Failed to remove container {} after failed clean: {}", + job_id, cid, e ); } return; @@ -680,20 +729,26 @@ impl DockerBuilder { // Good as new, add it back to the container list if let Some(entry) = self.container_lists.lock().unwrap().get_mut(tc) { - debug!("Reclaimed container {}", cid); + debug!( + "[finish_container({})]: Reclaimed container {}", + job_id, cid + ); entry.push(cid) } else { warn!( - "Was ready to reclaim container {} but toolchain went missing", - cid + "[finish_container({})]: Was ready to reclaim container {} but toolchain went missing", + job_id, cid ); if let Err(e) = docker_rm(&cid) { - warn!("Failed to remove container {}: {}", cid, e); + warn!( + "[finish_container({})]: Failed to remove container {}: {}", + job_id, cid, e + ); } } } - fn make_image(tc: &Toolchain, tccache: &Mutex) -> Result { + fn make_image(job_id: JobId, tc: &Toolchain, tccache: &Mutex) -> Result { let cid = Command::new("docker") .args(["create", BASE_DOCKER_IMAGE, "/busybox", "true"]) .check_stdout_trim() @@ -711,7 +766,7 @@ impl DockerBuilder { } }; - trace!("Copying in toolchain"); + trace!("[make_image({})]: Copying in toolchain", job_id); Command::new("docker") .args(["cp", "-", &format!("{}:/", cid)]) .check_piped(&mut |stdin| { @@ -751,19 +806,25 @@ impl DockerBuilder { } fn perform_build( + job_id: JobId, compile_command: CompileCommand, mut inputs_rdr: InputsReader, output_paths: Vec, cid: &str, ) -> Result { - trace!("Compile environment: {:?}", compile_command.env_vars); trace!( - "Compile command: {:?} {:?}", + "[perform_build({})]: Compile environment: {:?}", + job_id, + compile_command.env_vars + ); + trace!( + "[perform_build({})]: Compile command: {:?} {:?}", + job_id, compile_command.executable, compile_command.arguments ); - trace!("copying in inputs"); + trace!("[perform_build({})]: copying in inputs", job_id); Command::new("docker") .args(["cp", "-", &format!("{}:/", cid)]) .check_piped(&mut |stdin| { @@ -781,7 +842,7 @@ impl DockerBuilder { } = compile_command; let cwd = Path::new(&cwd); - trace!("creating output directories"); + trace!("[perform_build({})]: creating output directories", job_id); assert!(!output_paths.is_empty()); let mut cmd = Command::new("docker"); cmd.args(["exec", cid, "/busybox", "mkdir", "-p"]).arg(cwd); @@ -797,13 +858,16 @@ impl DockerBuilder { cmd.check_run() .context("Failed to create directories required for compile in container")?; - trace!("performing compile"); + trace!("[perform_build({})]: performing compile", job_id); // TODO: likely shouldn't perform the compile as root in the container let mut cmd = Command::new("docker"); cmd.arg("exec"); for (k, v) in env_vars { if k.contains('=') { - warn!("Skipping environment variable: {:?}", k); + warn!( + "[perform_build({})]: Skipping environment variable: {:?}", + job_id, k + ); continue; } let mut env = k; @@ -818,10 +882,14 @@ impl DockerBuilder { cmd.arg(executable); cmd.args(arguments); let compile_output = cmd.output().context("Failed to start executing compile")?; - trace!("compile_output: {:?}", compile_output); + trace!( + "[perform_build({})]: compile_output: {:?}", + job_id, + compile_output + ); let mut outputs = vec![]; - trace!("retrieving {:?}", output_paths); + trace!("[perform_build({})]: retrieving {:?}", job_id, output_paths); for path in output_paths { let abspath = cwd.join(&path); // Resolve in case it's relative since we copy it from the root level // TODO: this isn't great, but cp gives it out as a tar @@ -835,7 +903,10 @@ impl DockerBuilder { .expect("Failed to read compress output stdout"); outputs.push((path, output)) } else { - debug!("Missing output path {:?}", path) + debug!( + "[perform_build({})]: Missing output path {:?}", + job_id, path + ) } } @@ -852,22 +923,26 @@ impl BuilderIncoming for DockerBuilder { // From Server fn run_build( &self, + job_id: JobId, tc: Toolchain, command: CompileCommand, outputs: Vec, inputs_rdr: InputsReader, tccache: &Mutex, ) -> Result { - debug!("Finding container"); + debug!("[run_build({})]: Finding container", job_id); let cid = self - .get_container(&tc, tccache) + .get_container(job_id, &tc, tccache) .context("Failed to get a container for build")?; - debug!("Performing build with container {}", cid); - let res = Self::perform_build(command, inputs_rdr, outputs, &cid) + debug!( + "[run_build({})]: Performing build with container {}", + job_id, cid + ); + let res = Self::perform_build(job_id, command, inputs_rdr, outputs, &cid) .context("Failed to perform build")?; - debug!("Finishing with container {}", cid); - self.finish_container(&tc, cid); - debug!("Returning result"); + debug!("[run_build({})]: Finishing with container {}", job_id, cid); + self.finish_container(job_id, &tc, cid); + debug!("[run_build({})]: Returning result", job_id); Ok(res) } } diff --git a/src/bin/sccache-dist/build_freebsd.rs b/src/bin/sccache-dist/build_freebsd.rs index 7dc6b4d55..3b139f878 100644 --- a/src/bin/sccache-dist/build_freebsd.rs +++ b/src/bin/sccache-dist/build_freebsd.rs @@ -15,8 +15,8 @@ use anyhow::{bail, Context, Error, Result}; use flate2::read::GzDecoder; use sccache::dist::{ - BuildResult, BuilderIncoming, CompileCommand, InputsReader, OutputData, ProcessOutput, TcCache, - Toolchain, + BuildResult, BuilderIncoming, CompileCommand, InputsReader, JobId, OutputData, ProcessOutput, + TcCache, Toolchain, }; use sccache::lru_disk_cache::Error as LruError; use std::collections::{hash_map, HashMap}; @@ -146,7 +146,12 @@ impl PotBuilder { // If we have a spare running container, claim it and remove it from the available list, // otherwise try and create a new container (possibly creating the Pot image along // the way) - fn get_container(&self, tc: &Toolchain, tccache: &Mutex) -> Result { + fn get_container( + &self, + job_id: JobId, + tc: &Toolchain, + tccache: &Mutex, + ) -> Result { let container = { let mut map = self.container_lists.lock().unwrap(); map.entry(tc.clone()).or_insert_with(Vec::new).pop() @@ -161,8 +166,9 @@ impl PotBuilder { match map.entry(tc.clone()) { hash_map::Entry::Occupied(e) => e.get().clone(), hash_map::Entry::Vacant(e) => { - info!("Creating pot image for {:?} (may block requests)", tc); + info!("[get_container({})]: Creating pot image for {:?} (may block requests)", job_id, tc); let image = Self::make_image( + job_id, tc, tccache, &self.pot_fs_root, @@ -201,17 +207,21 @@ impl PotBuilder { // Failing during cleanup is pretty unexpected, but we can still return the successful compile // TODO: if too many of these fail, we should mark this builder as faulty fn finish_container( + job_id: JobId, container_lists: Arc>>>, tc: Toolchain, cid: String, pot_cmd: &PathBuf, ) { if let Err(e) = Self::clean_container(&cid) { - info!("Failed to clean container {}: {}", cid, e); + info!( + "[finish_container({})]: Failed to clean container {}: {}", + job_id, cid, e + ); if let Err(e) = pot_rm(&cid, pot_cmd) { warn!( - "Failed to remove container {} after failed clean: {}", - cid, e + "[finish_container({})]: Failed to remove container {} after failed clean: {}", + job_id, cid, e ); } return; @@ -219,20 +229,27 @@ impl PotBuilder { // Good as new, add it back to the container list if let Some(entry) = container_lists.lock().unwrap().get_mut(&tc) { - debug!("Reclaimed container {}", cid); + debug!( + "[finish_container({})]: Reclaimed container {}", + job_id, cid + ); entry.push(cid) } else { warn!( - "Was ready to reclaim container {} but toolchain went missing", - cid + "[finish_container({})]: Was ready to reclaim container {} but toolchain went missing", + job_id, cid ); if let Err(e) = pot_rm(&cid, pot_cmd) { - warn!("Failed to remove container {}: {}", cid, e); + warn!( + "[finish_container({})]: Failed to remove container {}: {}", + job_id, cid, e + ); } } } fn make_image( + job_id: JobId, tc: &Toolchain, tccache: &Mutex, pot_fs_root: &Path, @@ -241,7 +258,11 @@ impl PotBuilder { pot_clone_args: &[String], ) -> Result { let imagename = format!("sccache-image-{}", &tc.archive_id); - trace!("Creating toolchain image: {}", imagename); + trace!( + "[make_image({})]: Creating toolchain image: {}", + job_id, + imagename + ); let mut clone_args: Vec<&str> = ["clone", "-p", &imagename, "-P", clone_from].to_vec(); clone_args.append(&mut pot_clone_args.iter().map(|s| s as &str).collect()); Command::new(pot_cmd) @@ -258,11 +279,14 @@ impl PotBuilder { Err(e) => return Err(Error::from(e).context("failed to get toolchain from cache")), }; - trace!("Copying in toolchain"); + trace!("[make_image({})]: Copying in toolchain", job_id); tar::Archive::new(GzDecoder::new(toolchain_rdr)) .unpack(pot_fs_root.join("jails").join(&imagename).join("m")) .or_else(|e| { - warn!("Failed to unpack toolchain: {:?}", e); + warn!( + "[make_image({})]: Failed to unpack toolchain: {:?}", + job_id, e + ); tccache .remove(tc) .context("Failed to remove corrupt toolchain")?; @@ -303,20 +327,26 @@ impl PotBuilder { } fn perform_build( + job_id: JobId, compile_command: CompileCommand, inputs_rdr: InputsReader, output_paths: Vec, cid: &str, pot_fs_root: &Path, ) -> Result { - trace!("Compile environment: {:?}", compile_command.env_vars); trace!( - "Compile command: {:?} {:?}", + "[perform_build({})]: Compile environment: {:?}", + job_id, + compile_command.env_vars + ); + trace!( + "[perform_build({})]: Compile command: {:?} {:?}", + job_id, compile_command.executable, compile_command.arguments ); - trace!("copying in inputs"); + trace!("[perform_build({})]: copying in inputs", job_id); // not elegant tar::Archive::new(inputs_rdr) .unpack(pot_fs_root.join("jails").join(cid).join("m")) @@ -330,7 +360,7 @@ impl PotBuilder { } = compile_command; let cwd = Path::new(&cwd); - trace!("creating output directories"); + trace!("[perform_build({})]: creating output directories", job_id); assert!(!output_paths.is_empty()); let mut cmd = Command::new("jexec"); cmd.args(&[cid, "mkdir", "-p"]).arg(cwd); @@ -346,14 +376,17 @@ impl PotBuilder { cmd.check_run() .context("Failed to create directories required for compile in container")?; - trace!("performing compile"); + trace!("[perform_build({})]: performing compile", job_id); // TODO: likely shouldn't perform the compile as root in the container let mut cmd = Command::new("jexec"); cmd.arg(cid); cmd.arg("env"); for (k, v) in env_vars { if k.contains('=') { - warn!("Skipping environment variable: {:?}", k); + warn!( + "[perform_build({})]: Skipping environment variable: {:?}", + job_id, k + ); continue; } let mut env = k; @@ -368,10 +401,14 @@ impl PotBuilder { cmd.arg(executable); cmd.args(arguments); let compile_output = cmd.output().context("Failed to start executing compile")?; - trace!("compile_output: {:?}", compile_output); + trace!( + "[perform_build({})]: compile_output: {:?}", + job_id, + compile_output + ); let mut outputs = vec![]; - trace!("retrieving {:?}", output_paths); + trace!("[perform_build({})]: retrieving {:?}", job_id, output_paths); for path in output_paths { let abspath = cwd.join(&path); // Resolve in case it's relative since we copy it from the root level // TODO: this isn't great, but cp gives it out as a tar @@ -385,7 +422,10 @@ impl PotBuilder { .expect("Failed to read compress output stdout"); outputs.push((path, output)) } else { - debug!("Missing output path {:?}", path) + debug!( + "[perform_build({})]: Missing output path {:?}", + job_id, path + ) } } @@ -402,20 +442,31 @@ impl BuilderIncoming for PotBuilder { // From Server fn run_build( &self, + job_id: JobId, tc: Toolchain, command: CompileCommand, outputs: Vec, inputs_rdr: InputsReader, tccache: &Mutex, ) -> Result { - debug!("Finding container"); + debug!("[run_build({})]: Finding container", job_id); let cid = self - .get_container(&tc, tccache) + .get_container(job_id, &tc, tccache) .context("Failed to get a container for build")?; - debug!("Performing build with container {}", cid); - let res = Self::perform_build(command, inputs_rdr, outputs, &cid, &self.pot_fs_root) - .context("Failed to perform build")?; - debug!("Finishing with container {}", cid); + debug!( + "[run_build({})]: Performing build with container {}", + job_id, cid + ); + let res = Self::perform_build( + job_id, + command, + inputs_rdr, + outputs, + &cid, + &self.pot_fs_root, + ) + .context("Failed to perform build")?; + debug!("[run_build({})]: Finishing with container {}", job_id, cid); let cloned = self.clone(); let tc = tc; while cloned.cleanup_thread_count.fetch_add(1, Ordering::SeqCst) @@ -425,10 +476,10 @@ impl BuilderIncoming for PotBuilder { hint::spin_loop(); } thread::spawn(move || { - Self::finish_container(cloned.container_lists, tc, cid, &cloned.pot_cmd); + Self::finish_container(job_id, cloned.container_lists, tc, cid, &cloned.pot_cmd); cloned.cleanup_thread_count.fetch_sub(1, Ordering::SeqCst); }); - debug!("Returning result"); + debug!("[run_build({})]: Returning result", job_id); Ok(res) } } diff --git a/src/bin/sccache-dist/main.rs b/src/bin/sccache-dist/main.rs index 83d530060..f54d414ce 100644 --- a/src/bin/sccache-dist/main.rs +++ b/src/bin/sccache-dist/main.rs @@ -656,45 +656,76 @@ impl SchedulerIncoming for Scheduler { let mut jobs = self.jobs.lock().unwrap(); let mut servers = self.servers.lock().unwrap(); - if let btree_map::Entry::Occupied(mut entry) = jobs.entry(job_id) { - let job_detail = entry.get(); - if job_detail.server_id != server_id { + if let btree_map::Entry::Occupied(mut job) = jobs.entry(job_id) { + let cur_state = job.get().state; + + if job.get().server_id != server_id { bail!( - "Job id {} is not registered on server {:?}", + "[update_job_state({}, {})]: Job state updated from {:?} to {:?}, but job is not registered to server", job_id, - server_id + server_id.addr(), + cur_state, job_state ) } let now = Instant::now(); - let mut server_details = servers.get_mut(&server_id); - if let Some(ref mut details) = server_details { - details.last_seen = now; + + let server = match servers.get_mut(&server_id) { + Some(server) => { + server.last_seen = now; + server + } + None => { + let (job_id, _) = job.remove_entry(); + bail!( + "[update_job_state({}, {})]: Job state updated from {:?} to {:?}, but server is not known to scheduler", + job_id, server_id.addr(), cur_state, job_state + ) + } }; - match (job_detail.state, job_state) { - (JobState::Pending, JobState::Ready) => entry.get_mut().state = job_state, + match (cur_state, job_state) { + (JobState::Pending, JobState::Ready) => { + // Update the job's `last_seen` time to ensure it isn't + // pruned for taking longer than UNCLAIMED_READY_TIMEOUT + server.jobs_unclaimed.entry(job_id).and_modify(|e| *e = now); + job.get_mut().state = job_state; + } (JobState::Ready, JobState::Started) => { - if let Some(details) = server_details { - details.jobs_unclaimed.remove(&job_id); - } else { - warn!("Job state updated, but server is not known to scheduler") - } - entry.get_mut().state = job_state + server.jobs_unclaimed.remove(&job_id); + job.get_mut().state = job_state; } (JobState::Started, JobState::Complete) => { - let (job_id, _) = entry.remove_entry(); - if let Some(entry) = server_details { - assert!(entry.jobs_assigned.remove(&job_id)) - } else { - bail!("Job was marked as finished, but server is not known to scheduler") + let (job_id, _) = job.remove_entry(); + if !server.jobs_assigned.remove(&job_id) { + bail!( + "[update_job_state({}, {})]: Job was marked as finished, but job is not known to scheduler", + job_id, server_id.addr() + ) } } - (from, to) => bail!("Invalid job state transition from {} to {}", from, to), + (from, to) => bail!( + "[update_job_state({}, {})]: Invalid job state transition from {:?} to {:?}", + job_id, + server_id.addr(), + from, + to, + ), } - info!("Job {} updated state to {:?}", job_id, job_state); + info!( + "[update_job_state({}, {})]: Job state updated from {:?} to {:?}", + job_id, + server_id.addr(), + cur_state, + job_state + ); } else { - bail!("Unknown job") + bail!( + "[update_job_state({}, {})]: Cannot update unknown job state to {:?}", + job_id, + server_id.addr(), + job_state + ) } Ok(UpdateJobStateResult::Success) } @@ -739,12 +770,19 @@ impl Server { impl ServerIncoming for Server { fn handle_assign_job(&self, job_id: JobId, tc: Toolchain) -> Result { let need_toolchain = !self.cache.lock().unwrap().contains_toolchain(&tc); - assert!(self + if let Some(other_tc) = self .job_toolchains .lock() .unwrap() - .insert(job_id, tc) - .is_none()); + .insert(job_id, tc.clone()) + { + bail!( + "[{}]: Failed to replace toolchain {:?} with {:?}", + job_id, + other_tc, + tc + ); + }; let state = if need_toolchain { JobState::Pending } else { @@ -800,7 +838,7 @@ impl ServerIncoming for Server { Some(tc) => { match self .builder - .run_build(tc, command, outputs, inputs_rdr, &self.cache) + .run_build(job_id, tc, command, outputs, inputs_rdr, &self.cache) { Err(e) => Err(e.context("run build failed")), Ok(res) => Ok(RunJobResult::Complete(JobComplete { diff --git a/src/dist/http.rs b/src/dist/http.rs index 621fe8277..27be7afeb 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -973,14 +973,14 @@ mod server { (POST) (/api/v1/distserver/assign_job/{job_id: JobId}) => { job_auth_or_401!(request, &job_authorizer, job_id); let toolchain = try_or_400_log!(req_id, bincode_input(request)); - trace!("Req {}: assign_job({}): {:?}", req_id, job_id, toolchain); + debug!("Req {}: assign_job({}): {:?}", req_id, job_id, toolchain); let res: AssignJobResult = try_or_500_log!(req_id, handler.handle_assign_job(job_id, toolchain)); prepare_response(request, &res) }, (POST) (/api/v1/distserver/submit_toolchain/{job_id: JobId}) => { job_auth_or_401!(request, &job_authorizer, job_id); - trace!("Req {}: submit_toolchain({})", req_id, job_id); + debug!("Req {}: submit_toolchain({})", req_id, job_id); let body = request.data().expect("body was already read in submit_toolchain"); let toolchain_rdr = ToolchainReader(Box::new(body)); @@ -997,7 +997,28 @@ mod server { let mut bincode_reader = body.take(bincode_length); let runjob = try_or_500_log!(req_id, bincode::deserialize_from(&mut bincode_reader) .context("failed to deserialize run job request")); - trace!("Req {}: run_job({}): {:?}", req_id, job_id, runjob); + + if log_enabled!(log::Level::Trace) { + trace!("Req {}: run_job({}): {:?}", req_id, job_id, runjob); + } else if log_enabled!(log::Level::Debug) { + let RunJobHttpRequest { command, outputs: _ } = &runjob; + let dist::CompileCommand { + env_vars: _, + executable, + arguments, + cwd + } = &command; + debug!( + "Req {}: run_job({}): cwd={:?}, cmd={:?}", + req_id, + job_id, + cwd, + [vec![executable.clone()], arguments.to_vec()] + .concat() + .join(" ") + ); + } + let RunJobHttpRequest { command, outputs } = runjob; let body = bincode_reader.into_inner(); let inputs_rdr = InputsReader(Box::new(ZlibReadDecoder::new(body))); diff --git a/src/dist/mod.rs b/src/dist/mod.rs index 93a6859cf..b96c230b5 100644 --- a/src/dist/mod.rs +++ b/src/dist/mod.rs @@ -706,6 +706,7 @@ pub trait BuilderIncoming: Send + Sync { // From Server fn run_build( &self, + job_id: JobId, toolchain: Toolchain, command: CompileCommand, outputs: Vec, diff --git a/src/server.rs b/src/server.rs index 03c2b6a63..cc296f302 100644 --- a/src/server.rs +++ b/src/server.rs @@ -56,7 +56,6 @@ use std::task::{Context, Poll, Waker}; use std::time::Duration; #[cfg(feature = "dist-client")] use std::time::Instant; -use std::u64; use tokio::sync::Mutex; use tokio::sync::RwLock; use tokio::{ @@ -1278,18 +1277,18 @@ where let mut stats = me.stats.lock().await; match compiled { CompileResult::Error => { - debug!("compile result: cache error"); + debug!("[{}]: compile result: cache error", out_pretty); stats.cache_errors.increment(&kind, &lang); } CompileResult::CacheHit(duration) => { - debug!("compile result: cache hit"); + debug!("[{}]: compile result: cache hit", out_pretty); stats.cache_hits.increment(&kind, &lang); stats.cache_read_hit_duration += duration; } CompileResult::CacheMiss(miss_type, dist_type, duration, future) => { - debug!("compile result: cache miss"); + debug!("[{}]: compile result: cache miss", out_pretty); match dist_type { DistType::NoDist => {} @@ -1315,17 +1314,17 @@ where } stats.cache_misses.increment(&kind, &lang); stats.compiler_write_duration += duration; - debug!("stats after compile result: {stats:?}"); + trace!("[{}]: stats after compile result: {:?}", out_pretty, stats); cache_write = Some(future); } CompileResult::NotCacheable => { - debug!("compile result: not cacheable"); + debug!("[{}]: compile result: not cacheable", out_pretty); stats.cache_misses.increment(&kind, &lang); stats.non_cacheable_compilations += 1; } CompileResult::CompileFailed => { - debug!("compile result: compile failed"); + debug!("[{}]: compile result: compile failed", out_pretty); stats.compile_fails += 1; } @@ -1338,7 +1337,7 @@ where stdout, stderr, } = out; - trace!("CompileFinished retcode: {}", status); + debug!("[{}]: CompileFinished retcode: {}", out_pretty, status); match status.code() { Some(code) => res.retcode = Some(code), None => res.signal = Some(get_signal(status)), @@ -1350,7 +1349,7 @@ where let mut stats = me.stats.lock().await; match err.downcast::() { Ok(ProcessError(output)) => { - debug!("Compilation failed: {:?}", output); + debug!("[{}]: Compilation failed: {:?}", out_pretty, output); stats.compile_fails += 1; match output.status.code() { Some(code) => res.retcode = Some(code), @@ -1397,7 +1396,7 @@ where if let Some(cache_write) = cache_write { match cache_write.await { Err(e) => { - debug!("Error executing cache write: {}", e); + debug!("[{}]: Error executing cache write: {}", out_pretty, e); me.stats.lock().await.cache_write_errors += 1; } //TODO: save cache stats! From df2e4a1e7a4619db14fc23c5e85b36c7058d9fdd Mon Sep 17 00:00:00 2001 From: ptaylor Date: Thu, 24 Oct 2024 15:10:59 -0700 Subject: [PATCH 2/6] ensure client tracks certs by server_id, and both the scheduler and client remove certs when a server's cert changes --- src/dist/http.rs | 20 ++++++++++++++------ 1 file changed, 14 insertions(+), 6 deletions(-) diff --git a/src/dist/http.rs b/src/dist/http.rs index 27be7afeb..96ee1a8d6 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -736,6 +736,8 @@ mod server { reqwest::Certificate::from_pem(&cert_pem) .context("failed to interpret pem as certificate")?, ); + // Remove the old entry first so it isn't added to the client in the following loop + certs.remove(&server_id); for (_, cert_pem) in certs.values() { client_builder = client_builder.add_root_certificate( reqwest::Certificate::from_pem(cert_pem).expect("previously valid cert"), @@ -1106,7 +1108,7 @@ mod client { auth_token: String, scheduler_url: reqwest::Url, // cert_digest -> cert_pem - server_certs: Arc, Vec>>>, + server_certs: Arc, Vec)>>>, client: Arc>, pool: tokio::runtime::Handle, tc_cache: Arc, @@ -1149,7 +1151,8 @@ mod client { fn update_certs( client: &mut reqwest::Client, - certs: &mut HashMap, Vec>, + certs: &mut HashMap, Vec)>, + server_id: dist::ServerId, cert_digest: Vec, cert_pem: Vec, ) -> Result<()> { @@ -1159,7 +1162,9 @@ mod client { reqwest::Certificate::from_pem(&cert_pem) .context("failed to interpret pem as certificate")?, ); - for cert_pem in certs.values() { + // Remove the old entry first so it isn't added to the client in the following loop + certs.remove(&server_id); + for (_, cert_pem) in certs.values() { client_async_builder = client_async_builder.add_root_certificate( reqwest::Certificate::from_pem(cert_pem).expect("previously valid cert"), ); @@ -1174,7 +1179,7 @@ mod client { .context("failed to create an async HTTP client")?; // Use the updated certificates *client = new_client_async; - certs.insert(cert_digest, cert_pem); + certs.insert(server_id, (cert_digest, cert_pem)); Ok(()) } } @@ -1201,8 +1206,10 @@ mod client { job_alloc, need_toolchain, }); - if server_certs.lock().unwrap().contains_key(&cert_digest) { - return alloc_job_res; + if let Some((digest, _)) = server_certs.lock().unwrap().get(&server_id) { + if cert_digest == *digest { + return alloc_job_res; + } } info!( "Need to request new certificate for server {}", @@ -1227,6 +1234,7 @@ mod client { Self::update_certs( &mut client.lock().unwrap(), &mut server_certs.lock().unwrap(), + server_id, res.cert_digest, res.cert_pem, ) From fe83892a493565e7bb732d049cf8defb25bf9c6c Mon Sep 17 00:00:00 2001 From: ptaylor Date: Thu, 24 Oct 2024 15:12:01 -0700 Subject: [PATCH 3/6] rewrite scheduler handle_alloc_job to be resilient to server errors and try other candidates instead of failing --- src/bin/sccache-dist/main.rs | 324 +++++++++++++++++++++++------------ 1 file changed, 213 insertions(+), 111 deletions(-) diff --git a/src/bin/sccache-dist/main.rs b/src/bin/sccache-dist/main.rs index f54d414ce..c87ecb043 100644 --- a/src/bin/sccache-dist/main.rs +++ b/src/bin/sccache-dist/main.rs @@ -3,6 +3,7 @@ extern crate log; use anyhow::{bail, Context, Error, Result}; use base64::Engine; +use itertools::Itertools; use rand::{rngs::OsRng, RngCore}; use sccache::config::{ scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN, @@ -399,141 +400,242 @@ impl Default for Scheduler { } } +fn error_chain_to_string(err: &Error) -> String { + let mut err_msg = err.to_string(); + let mut maybe_cause = err.source(); + while let Some(cause) = maybe_cause { + err_msg.push_str(", caused by: "); + err_msg.push_str(&cause.to_string()); + maybe_cause = cause.source(); + } + err_msg +} + impl SchedulerIncoming for Scheduler { fn handle_alloc_job( &self, requester: &dyn SchedulerOutgoing, tc: Toolchain, ) -> Result { - let (job_id, server_id, auth) = { + // Attempt to allocate a job to the best server. The best server is the server + // with the fewest assigned jobs and least-recently-reported error. Servers + // whose load exceeds `MAX_PER_CORE_LOAD` are not considered candidates for + // job assignment. + // + // If we fail to assign a job to a server, attempt to assign the job to the next + // best candidate until either the job has been assigned successfully, or the + // candidate list has been exhausted. + // + // Special care is taken to not lock `self.servers` or `self.jobs` while network + // requests are in-flight, as that will block other request-handling threads and + // deadlock the scheduler. + // + // Do not assert!() anywhere, as that permanently corrupts the scheduler. + // All error conditions must fail gracefully. + + let make_auth_token = |job_id: JobId, server_id: ServerId| { // LOCKS let mut servers = self.servers.lock().unwrap(); + if let Some(details) = servers.get_mut(&server_id) { + let auth = details + .job_authorizer + .generate_token(job_id) + .map_err(Error::from) + .context("Could not create an auth token")?; + + // + // Eagerly associate this job with the server so other threads consider this job + // when computing load for this server, and potentially select another server for + // assignment. + // + + // Throw an error if a job with the same ID has already been assigned to this server. + if details.jobs_assigned.contains(&job_id) + || details.jobs_unclaimed.contains_key(&job_id) + { + bail!("Failed to assign job to server {}", server_id.addr()); + } - let res = { - let mut best = None; - let mut best_err = None; - let mut best_load: f64 = MAX_PER_CORE_LOAD; - let now = Instant::now(); - for (&server_id, details) in servers.iter_mut() { - let load = details.jobs_assigned.len() as f64 / details.num_cpus as f64; + details.jobs_assigned.insert(job_id); + details.jobs_unclaimed.insert(job_id, Instant::now()); - if let Some(last_error) = details.last_error { - if load < MAX_PER_CORE_LOAD { - if now.duration_since(last_error) > SERVER_REMEMBER_ERROR_TIMEOUT { - details.last_error = None; - } - match best_err { - Some(( - _, - &mut ServerDetails { - last_error: Some(best_last_err), - .. - }, - )) => { - if last_error < best_last_err { - trace!( - "Selected {:?}, its most recent error is {:?} ago", - server_id, - now - last_error - ); - best_err = Some((server_id, details)); - } - } - _ => { - trace!( - "Selected {:?}, its most recent error is {:?} ago", - server_id, - now - last_error - ); - best_err = Some((server_id, details)); - } - } - } - } else if load < best_load { - best = Some((server_id, details)); - trace!("Selected {:?} as the server with the best load", server_id); - best_load = load; - if load == 0f64 { - break; - } - } - } + Ok(auth) + } else { + bail!("Failed to assign job to unknown server") + } + }; - // Assign the job to our best choice - if let Some((server_id, server_details)) = best.or(best_err) { - let job_count = self.job_count.fetch_add(1, Ordering::SeqCst) as u64; - let job_id = JobId(job_count); - assert!(server_details.jobs_assigned.insert(job_id)); - assert!(server_details - .jobs_unclaimed - .insert(job_id, Instant::now()) - .is_none()); - - info!( - "Job {} created and will be assigned to server {:?}", - job_id, server_id - ); - let auth = server_details - .job_authorizer - .generate_token(job_id) - .map_err(Error::from) - .context("Could not create an auth token for this job")?; - Some((job_id, server_id, auth)) - } else { - None + let try_alloc_job = |job_id: JobId, server_id: ServerId, auth: String, tc: Toolchain| { + let AssignJobResult { + state, + need_toolchain, + } = match requester.do_assign_job(server_id, job_id, tc, auth.clone()) { + Ok(res) => res, + Err(err) => { + // LOCKS + let mut servers = self.servers.lock().unwrap(); + // Couldn't assign the job, so undo the eager assignment above + if let Some(details) = servers.get_mut(&server_id) { + details.jobs_assigned.remove(&job_id); + details.jobs_unclaimed.remove(&job_id); + details.last_error = Some(Instant::now()); + } + return Err(err); } }; - if let Some(res) = res { - res - } else { - let msg = format!( - "Insufficient capacity across {} available servers", - servers.len() + // LOCKS + let mut jobs = self.jobs.lock().unwrap(); + if jobs.contains_key(&job_id) { + bail!( + "Failed to assign job to server {} with state {}", + server_id.addr(), + state ); - return Ok(AllocJobResult::Fail { msg }); } - }; - let AssignJobResult { - state, - need_toolchain, - } = requester - .do_assign_job(server_id, job_id, tc, auth.clone()) - .with_context(|| { + + jobs.insert(job_id, JobDetail { server_id, state }); + + if log_enabled!(log::Level::Trace) { // LOCKS let mut servers = self.servers.lock().unwrap(); - if let Some(entry) = servers.get_mut(&server_id) { - entry.last_error = Some(Instant::now()); - entry.jobs_unclaimed.remove(&job_id); - if !entry.jobs_assigned.remove(&job_id) { - "assign job failed and job not known to the server" - } else { - "assign job failed, job un-assigned from the server" + if let Some(details) = servers.get_mut(&server_id) { + if let Some(last_error) = details.last_error { + trace!( + "[alloc_job({})]: Assigned job to server {:?} whose most recent error was {:?} ago", + job_id, + server_id.addr(), + Instant::now() - last_error + ); } - } else { - "assign job failed and server not known" } - })?; - { - // LOCKS - let mut jobs = self.jobs.lock().unwrap(); + } info!( - "Job {} successfully assigned and saved with state {:?}", - job_id, state + "[alloc_job({})]: Job created and assigned to server {:?} with state {:?}", + job_id, + server_id.addr(), + state ); - assert!(jobs - .insert(job_id, JobDetail { server_id, state }) - .is_none()); - } - let job_alloc = JobAlloc { - auth, - job_id, - server_id, + + Ok(AllocJobResult::Success { + job_alloc: JobAlloc { + auth, + job_id, + server_id, + }, + need_toolchain, + }) }; - Ok(AllocJobResult::Success { - job_alloc, - need_toolchain, + + let sort_servers_by_least_load_and_oldest_error = || { + let now = Instant::now(); + // LOCKS + let mut servers = self.servers.lock().unwrap(); + + // Compute instantaneous load and update shared server state + servers + .iter_mut() + .map(|(server_id, details)| { + // Assume all jobs assigned to this server will eventually be handled. + let load = details.jobs_assigned.len() as f64 / details.num_cpus as f64; + // Forget errors that are too old to care about anymore + if let Some(last_error) = details.last_error { + // TODO: Explain why we only reset errors when load < MAX_LOAD_PER_CORE? + if load < MAX_PER_CORE_LOAD + && now.duration_since(last_error) > SERVER_REMEMBER_ERROR_TIMEOUT + { + details.last_error = None; + } + } + (server_id, details, load) + }) + // Sort servers by least load and oldest error + .sorted_by(|(_, details_a, load_a), (_, details_b, load_b)| { + match (details_a.last_error, details_b.last_error) { + // If neither server has a recent error, prefer the one with lowest load + (None, None) => load_a.total_cmp(load_b), + // Prefer servers with no recent errors over servers with recent errors + (None, Some(_)) => std::cmp::Ordering::Less, + (Some(_), None) => std::cmp::Ordering::Greater, + // If both servers have an error, prefer the one with the oldest error + (Some(err_a), Some(err_b)) => err_b.cmp(&err_a), + } + }) + // Collect to avoid retaining the lock on `self.servers`. + // Use `server_id` as the key for `self.servers` lookups later. + .map(|(server_id, _, _)| *server_id) + .collect::>() + }; + + // Create a list of server candidates sorted by least load and oldest error + let sorted_servers = sort_servers_by_least_load_and_oldest_error(); + let num_servers = sorted_servers.len(); + + let job_id = self.job_count.fetch_add(1, Ordering::SeqCst) as u64; + let job_id = JobId(job_id); + + let mut result = None; + + // Loop through candidate servers. + // Exit the loop once we've allocated the job. + // Try the next candidate if we encounter an error. + for server_id in sorted_servers { + // Compute load again local to the loop. + // Since alloc_job in other threads can recover from errors and assign jobs to the + // next-best candidate, the load initially computed in `sort_servers()` can drift. + // Computing load again ensures we allocate accurately based on the current stats. + let load = { + // LOCKS + let mut servers = self.servers.lock().unwrap(); + if let Some(details) = servers.get_mut(&server_id) { + details.jobs_assigned.len() as f64 / details.num_cpus as f64 + } else { + MAX_PER_CORE_LOAD + } + }; + + // Never assign jobs to overloaded servers + if load >= MAX_PER_CORE_LOAD { + continue; + } + + // Generate job auth token for this server + let auth = match make_auth_token(job_id, server_id) { + Ok(auth) => auth, + Err(err) => { + warn!("[alloc_job({})]: {}", job_id, error_chain_to_string(&err)); + result = Some(Err(err)); + continue; + } + }; + + // Attempt to allocate the job to this server. If alloc_job fails, + // store the error and attempt to allocate to the next server. + // If all servers error, return the last error to the client. + match try_alloc_job(job_id, server_id, auth, tc.clone()) { + Ok(res) => { + // If alloc_job succeeded, return the result + result = Some(Ok(res)); + break; + } + Err(err) => { + // If alloc_job failed, try the next best server + warn!("[alloc_job({})]: {}", job_id, error_chain_to_string(&err)); + result = Some(Err(err)); + continue; + } + } + } + + result.unwrap_or_else(|| { + // Fallback to the default failure case + Ok(AllocJobResult::Fail { + msg: format!( + "[alloc_job({})]: Insufficient capacity across {} available servers", + job_id, num_servers + ), + }) }) } From 4657454dd3481b4461e394bc2df97d4892ba7c1d Mon Sep 17 00:00:00 2001 From: ptaylor Date: Fri, 25 Oct 2024 16:21:50 +0000 Subject: [PATCH 4/6] support retrying distributed compilations --- Cargo.toml | 2 +- docs/Configuration.md | 3 + src/compiler/c.rs | 5 + src/compiler/compiler.rs | 387 ++++++++++++++++++++++++--------------- src/compiler/rust.rs | 4 + src/dist/mod.rs | 4 +- src/server.rs | 4 +- 7 files changed, 261 insertions(+), 148 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index ce3426c05..5a76aa8ab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -54,6 +54,7 @@ hyper-util = { version = "0.1.3", optional = true, features = [ "server", ] } is-terminal = "0.4.12" +itertools = "0.12" jobserver = "0.1" jwt = { package = "jsonwebtoken", version = "9", optional = true } libc = "0.2.153" @@ -126,7 +127,6 @@ assert_cmd = "2.0.13" cc = "1.0" chrono = "0.4.33" filetime = "0.2" -itertools = "0.12" predicates = "=3.1.0" serial_test = "3.1" temp-env = "0.3.6" diff --git a/docs/Configuration.md b/docs/Configuration.md index 953440dc1..5cb1d328f 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -123,6 +123,9 @@ The latest `cache.XXX` entries may be found here: https://github.com/mozilla/scc Whatever is set by a file based configuration, it is overruled by the env configuration variables +### dist +* `SCCACHE_DIST_RETRY_LIMIT` Number of times the client should retry failed distributed compilations. The default is `0` (no retries). + ### misc * `SCCACHE_ALLOW_CORE_DUMPS` to enable core dumps by the server diff --git a/src/compiler/c.rs b/src/compiler/c.rs index be335d9a6..9a23298b8 100644 --- a/src/compiler/c.rs +++ b/src/compiler/c.rs @@ -130,6 +130,7 @@ impl ParsedArguments { } /// A generic implementation of the `Compilation` trait for C/C++ compilers. +#[derive(Debug, Clone)] struct CCompilation { parsed_args: ParsedArguments, #[cfg(feature = "dist-client")] @@ -1173,6 +1174,10 @@ impl Compilation for CCompilation { }), ) } + + fn box_clone(&self) -> Box { + Box::new((*self).clone()) + } } #[cfg(feature = "dist-client")] diff --git a/src/compiler/compiler.rs b/src/compiler/compiler.rs index f74f972cd..07d94e582 100644 --- a/src/compiler/compiler.rs +++ b/src/compiler/compiler.rs @@ -540,6 +540,19 @@ where .map(move |o| (cacheable, DistType::NoDist, o)) } +#[cfg(feature = "dist-client")] +/// Number of times to retry distributed compilations +const DEFAULT_DIST_RETRY_LIMIT: u64 = 0; + +#[cfg(feature = "dist-client")] +/// Get the number of times to retry distributed compilations +fn get_dist_retry_limit() -> u64 { + std::env::var("SCCACHE_DIST_RETRY_LIMIT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_DIST_RETRY_LIMIT) +} + #[cfg(feature = "dist-client")] async fn dist_or_local_compile( dist_client: Option>, @@ -552,7 +565,7 @@ async fn dist_or_local_compile( where T: CommandCreatorSync, { - use std::io; + // use std::io; let rewrite_includes_only = match dist_client { Some(ref client) => client.rewrite_includes_only(), @@ -574,152 +587,49 @@ where } }; - debug!("[{}]: Attempting distributed compilation", out_pretty); - let out_pretty2 = out_pretty.clone(); - - let local_executable = compile_cmd.executable.clone(); - let local_executable2 = compile_cmd.executable.clone(); - - let do_dist_compile = async move { - let mut dist_compile_cmd = - dist_compile_cmd.context("Could not create distributed compile command")?; - debug!("[{}]: Creating distributed compile request", out_pretty); - let dist_output_paths = compilation - .outputs() - .map(|output| path_transformer.as_dist_abs(&cwd.join(output.path))) - .collect::>() - .context("Failed to adapt an output path for distributed compile")?; - let (inputs_packager, toolchain_packager, outputs_rewriter) = - compilation.into_dist_packagers(path_transformer)?; + let dist_retry_limit = get_dist_retry_limit() + 1; + let mut num_dist_attempts = 1; + let dist_compile_attempts = loop { debug!( - "[{}]: Identifying dist toolchain for {:?}", - out_pretty, local_executable + "[{}]: Run distributed compilation (attempt {} of {})", + out_pretty, num_dist_attempts, dist_retry_limit ); - let (dist_toolchain, maybe_dist_compile_executable) = dist_client - .put_toolchain(local_executable, weak_toolchain_key, toolchain_packager) - .await?; - let mut tc_archive = None; - if let Some((dist_compile_executable, archive_path)) = maybe_dist_compile_executable { - dist_compile_cmd.executable = dist_compile_executable; - tc_archive = Some(archive_path); - } - - debug!("[{}]: Requesting allocation", out_pretty); - let jares = dist_client.do_alloc_job(dist_toolchain.clone()).await?; - let job_alloc = match jares { - dist::AllocJobResult::Success { - job_alloc, - need_toolchain: true, - } => { - debug!( - "[{}]: Sending toolchain {} for job {}", - out_pretty, dist_toolchain.archive_id, job_alloc.job_id - ); - match dist_client - .do_submit_toolchain(job_alloc.clone(), dist_toolchain) - .await - .map_err(|e| e.context("Could not submit toolchain"))? + match dist_compile( + path_transformer.clone(), + dist_client.clone(), + cwd.clone(), + compilation.clone(), + weak_toolchain_key.clone(), + compile_cmd.clone(), + dist_compile_cmd.clone(), + out_pretty.clone(), + ) + .await + { + Ok(res) => { + break Ok(res); + } + Err(err) => { + if num_dist_attempts < dist_retry_limit + && err.downcast_ref::().is_none() { - dist::SubmitToolchainResult::Success => Ok(job_alloc), - dist::SubmitToolchainResult::JobNotFound => { - bail!("Job {} not found on server", job_alloc.job_id) - } - dist::SubmitToolchainResult::CannotCache => bail!( - "Toolchain for job {} could not be cached by server", - job_alloc.job_id - ), + warn!( + "[{}]: Error running distributed compilation (attempt {} of {}), retrying. {:#}", + out_pretty, num_dist_attempts, dist_retry_limit, err + ); + num_dist_attempts += 1; + continue; } + break Err(err); } - dist::AllocJobResult::Success { - job_alloc, - need_toolchain: false, - } => Ok(job_alloc), - dist::AllocJobResult::Fail { msg } => { - Err(anyhow!("Failed to allocate job").context(msg)) - } - }?; - let job_id = job_alloc.job_id; - let server_id = job_alloc.server_id; - debug!("[{}]: Running job", out_pretty); - let ((job_id, server_id), (jres, path_transformer)) = dist_client - .do_run_job( - job_alloc, - dist_compile_cmd, - dist_output_paths, - inputs_packager, - ) - .await - .map(move |res| ((job_id, server_id), res)) - .with_context(|| { - format!( - "could not run distributed compilation job on {:?}", - server_id - ) - })?; - - let jc = match jres { - dist::RunJobResult::Complete(jc) => jc, - dist::RunJobResult::JobNotFound => bail!("Job {} not found on server", job_id), - }; - info!( - "fetched {:?}", - jc.outputs - .iter() - .map(|(p, bs)| (p, bs.lens().to_string())) - .collect::>() - ); - let mut output_paths: Vec = vec![]; - macro_rules! try_or_cleanup { - ($v:expr) => {{ - match $v { - Ok(v) => v, - Err(e) => { - // Do our best to clear up. We may end up deleting a file that we just wrote over - // the top of, but it's better to clear up too much than too little - for local_path in output_paths.iter() { - if let Err(e) = fs::remove_file(local_path) { - if e.kind() != io::ErrorKind::NotFound { - warn!("{} while attempting to clear up {}", e, local_path.display()) - } - } - } - return Err(e) - }, - } - }}; } - - for (path, output_data) in jc.outputs { - let len = output_data.lens().actual; - let local_path = try_or_cleanup!(path_transformer - .to_local(&path) - .with_context(|| format!("unable to transform output path {}", path))); - output_paths.push(local_path); - // Do this first so cleanup works correctly - let local_path = output_paths.last().expect("nothing in vec after push"); - - let mut file = try_or_cleanup!(File::create(local_path) - .with_context(|| format!("Failed to create output file {}", local_path.display()))); - let count = try_or_cleanup!(io::copy(&mut output_data.into_reader(), &mut file) - .with_context(|| format!("Failed to write output to {}", local_path.display()))); - - assert!(count == len); - } - let extra_inputs = match tc_archive { - Some(p) => vec![p], - None => vec![], - }; - try_or_cleanup!(outputs_rewriter - .handle_outputs(&path_transformer, &output_paths, &extra_inputs) - .with_context(|| "failed to rewrite outputs from compile")); - Ok((DistType::Ok(server_id), jc.output.into())) }; - use futures::TryFutureExt; - do_dist_compile - .or_else(move |e| async move { + match dist_compile_attempts { + Ok((dt, o)) => Ok((cacheable, dt, o)), + Err(e) => { if let Some(HttpClientError(_)) = e.downcast_ref::() { Err(e) } else if let Some(lru_disk_cache::Error::FileTooLarge) = @@ -728,24 +638,208 @@ where Err(anyhow!( "Could not cache dist toolchain for {:?} locally. Increase `toolchain_cache_size` or decrease the toolchain archive size.", - local_executable2 + compile_cmd.executable )) } else { // `{:#}` prints the error and the causes in a single line. let errmsg = format!("{:#}", e); warn!( "[{}]: Could not perform distributed compile, falling back to local: {}", - out_pretty2, errmsg + out_pretty, errmsg ); compile_cmd .execute(&creator) .await - .map(|o| (DistType::Error, o)) + .map(|o| (cacheable, DistType::Error, o)) + } + } + } +} + +#[cfg(feature = "dist-client")] +#[allow(clippy::too_many_arguments)] +async fn dist_compile( + mut path_transformer: dist::PathTransformer, + dist_client: Arc, + cwd: PathBuf, + compilation: Box, + weak_toolchain_key: String, + compile_cmd: CompileCommand, + dist_compile_cmd: Option, + out_pretty: String, +) -> Result<(DistType, std::process::Output)> { + use std::io; + + let mut dist_compile_cmd = + dist_compile_cmd.context("Could not create distributed compile command")?; + + debug!("[{}]: Creating distributed compile request", out_pretty); + + let dist_output_paths = compilation + .outputs() + .map(|output| path_transformer.as_dist_abs(&cwd.join(output.path))) + .collect::>() + .context("Failed to adapt an output path for distributed compile")?; + let (inputs_packager, toolchain_packager, outputs_rewriter) = + compilation.into_dist_packagers(path_transformer)?; + + debug!( + "[{}]: Identifying dist toolchain for {:?}", + out_pretty, compile_cmd.executable + ); + let (dist_toolchain, maybe_dist_compile_executable) = dist_client + .put_toolchain( + compile_cmd.executable, + weak_toolchain_key, + toolchain_packager, + ) + .await?; + let mut tc_archive = None; + if let Some((dist_compile_executable, archive_path)) = maybe_dist_compile_executable { + dist_compile_cmd.executable = dist_compile_executable; + tc_archive = Some(archive_path); + } + + debug!("[{}]: Requesting allocation", out_pretty); + let jares = dist_client.do_alloc_job(dist_toolchain.clone()).await?; + let job_alloc = match jares { + dist::AllocJobResult::Success { + job_alloc, + need_toolchain: true, + } => { + debug!( + "[{}]: Successfully allocated job {}", + out_pretty, job_alloc.job_id + ); + debug!( + "[{}]: Sending toolchain {} for job {}", + out_pretty, dist_toolchain.archive_id, job_alloc.job_id + ); + + let archive_id = dist_toolchain.archive_id.clone(); + + match dist_client + .do_submit_toolchain(job_alloc.clone(), dist_toolchain) + .await + .map_err(|e| e.context("Could not submit toolchain"))? + { + dist::SubmitToolchainResult::Success => { + debug!( + "[{}]: Successfully sent toolchain {} for job {}", + out_pretty, archive_id, job_alloc.job_id + ); + Ok(job_alloc) + } + dist::SubmitToolchainResult::JobNotFound => { + bail!( + "[{}]: Job {} not found on server", + out_pretty, + job_alloc.job_id + ) + } + dist::SubmitToolchainResult::CannotCache => bail!( + "[{}]: Toolchain for job {} could not be cached by server", + out_pretty, + job_alloc.job_id + ), } - }) - .map_ok(move |(dt, o)| (cacheable, dt, o)) + } + dist::AllocJobResult::Success { + job_alloc, + need_toolchain: false, + } => { + debug!( + "[{}]: Successfully allocated job {}", + out_pretty, job_alloc.job_id + ); + Ok(job_alloc) + } + dist::AllocJobResult::Fail { msg } => Err(anyhow!("Failed to allocate job").context(msg)), + }?; + let job_id = job_alloc.job_id; + let server_id = job_alloc.server_id; + debug!( + "[{}]: Running job {} on server {:?}", + out_pretty, + job_id, + server_id.addr() + ); + let ((job_id, server_id), (jres, path_transformer)) = dist_client + .do_run_job( + job_alloc, + dist_compile_cmd, + dist_output_paths, + inputs_packager, + ) .await + .map(move |res| ((job_id, server_id), res)) + .with_context(|| { + format!( + "Could not run distributed compilation job on {:?}", + server_id.addr() + ) + })?; + + let jc = match jres { + dist::RunJobResult::Complete(jc) => jc, + dist::RunJobResult::JobNotFound => { + bail!("[{}]: Job {} not found on server", out_pretty, job_id) + } + }; + debug!( + "[{}]: Fetched {:?}", + out_pretty, + jc.outputs + .iter() + .map(|(p, bs)| (p, bs.lens().to_string())) + .collect::>() + ); + let mut output_paths: Vec = vec![]; + macro_rules! try_or_cleanup { + ($v:expr) => {{ + match $v { + Ok(v) => v, + Err(e) => { + // Do our best to clear up. We may end up deleting a file that we just wrote over + // the top of, but it's better to clear up too much than too little + for local_path in output_paths.iter() { + if let Err(e) = fs::remove_file(local_path) { + if e.kind() != io::ErrorKind::NotFound { + warn!("[{}]: {} while attempting to clear up {}", out_pretty, e, local_path.display()) + } + } + } + return Err(e) + }, + } + }}; + } + + for (path, output_data) in jc.outputs { + let len = output_data.lens().actual; + let local_path = try_or_cleanup!(path_transformer.to_local(&path).with_context( + || format!("[{}]: unable to transform output path {}", out_pretty, path) + )); + output_paths.push(local_path); + // Do this first so cleanup works correctly + let local_path = output_paths.last().expect("nothing in vec after push"); + + let mut file = try_or_cleanup!(File::create(local_path) + .with_context(|| format!("Failed to create output file {}", local_path.display()))); + let count = try_or_cleanup!(io::copy(&mut output_data.into_reader(), &mut file) + .with_context(|| format!("Failed to write output to {}", local_path.display()))); + + assert!(count == len); + } + let extra_inputs = match tc_archive { + Some(p) => vec![p], + None => vec![], + }; + try_or_cleanup!(outputs_rewriter + .handle_outputs(&path_transformer, &output_paths, &extra_inputs) + .with_context(|| "Failed to rewrite outputs from compile")); + Ok((DistType::Ok(server_id), jc.output.into())) } impl Clone for Box> { @@ -776,6 +870,14 @@ pub trait Compilation: Send { /// Each item is a descriptive (and unique) name of the output paired with /// the path where it'll show up. fn outputs<'a>(&'a self) -> Box + 'a>; + + fn box_clone(&self) -> Box; +} + +impl Clone for Box { + fn clone(&self) -> Box { + self.box_clone() + } } #[cfg(feature = "dist-client")] @@ -1464,7 +1566,6 @@ mod test { use std::io::{Cursor, Write}; use std::sync::Arc; use std::time::Duration; - use std::u64; use test_case::test_case; use tokio::runtime::Runtime; diff --git a/src/compiler/rust.rs b/src/compiler/rust.rs index c72fdc51b..efa88ba42 100644 --- a/src/compiler/rust.rs +++ b/src/compiler/rust.rs @@ -1787,6 +1787,10 @@ impl Compilation for RustCompilation { optional: v.optional, })) } + + fn box_clone(&self) -> Box { + Box::new((*self).clone()) + } } // TODO: we do end up with slashes facing the wrong way, but Windows is agnostic so it's diff --git a/src/dist/mod.rs b/src/dist/mod.rs index b96c230b5..b72892d0d 100644 --- a/src/dist/mod.rs +++ b/src/dist/mod.rs @@ -92,7 +92,7 @@ mod path_transform { } } - #[derive(Debug)] + #[derive(Debug, Clone)] pub struct PathTransformer { dist_to_local_path: HashMap, } @@ -269,7 +269,7 @@ mod path_transform { use std::iter; use std::path::{Path, PathBuf}; - #[derive(Debug)] + #[derive(Debug, Clone)] pub struct PathTransformer; impl PathTransformer { diff --git a/src/server.rs b/src/server.rs index cc296f302..dbf9df309 100644 --- a/src/server.rs +++ b/src/server.rs @@ -843,14 +843,14 @@ where me.handle_compile(compile).await } Request::GetStats => { - debug!("handle_client: get_stats"); + trace!("handle_client: get_stats"); me.get_info() .await .map(|i| Response::Stats(Box::new(i))) .map(Message::WithoutBody) } Request::DistStatus => { - debug!("handle_client: dist_status"); + trace!("handle_client: dist_status"); me.get_dist_status() .await .map(Response::DistStatus) From 6cd9ff37faafba4e592c08f0ac5bffcd29b1f2bc Mon Sep 17 00:00:00 2001 From: ptaylor Date: Fri, 25 Oct 2024 18:41:10 +0000 Subject: [PATCH 5/6] Allow configuring connection and request timeouts --- docs/Configuration.md | 2 ++ src/dist/http.rs | 51 +++++++++++++++++++++++++++++++++---------- src/util.rs | 4 ++++ 3 files changed, 45 insertions(+), 12 deletions(-) diff --git a/docs/Configuration.md b/docs/Configuration.md index 5cb1d328f..8af4e4430 100644 --- a/docs/Configuration.md +++ b/docs/Configuration.md @@ -124,6 +124,8 @@ Whatever is set by a file based configuration, it is overruled by the env configuration variables ### dist +* `SCCACHE_DIST_CONNECT_TIMEOUT` Timeout in seconds for connections to an sccache-dist server. Default is `5`. +* `SCCACHE_DIST_REQUEST_TIMEOUT` Timeout in seconds for compile requests to an sccache-dist server. Default is `600`. * `SCCACHE_DIST_RETRY_LIMIT` Number of times the client should retry failed distributed compilations. The default is `0` (no retries). ### misc diff --git a/src/dist/http.rs b/src/dist/http.rs index 96ee1a8d6..0d9d3fba0 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -20,6 +20,37 @@ pub use self::server::{ ClientAuthCheck, ClientVisibleMsg, Scheduler, ServerAuthCheck, HEARTBEAT_TIMEOUT, }; +use std::env; +use std::time::Duration; + +/// Default timeout for connections to an sccache-dist server +const DEFAULT_DIST_CONNECT_TIMEOUT: u64 = 5; + +/// Timeout for connections to an sccache-dist server +pub fn get_connect_timeout() -> Duration { + Duration::new( + env::var("SCCACHE_DIST_CONNECT_TIMEOUT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_DIST_CONNECT_TIMEOUT), + 0, + ) +} + +/// Default timeout for compile requests to an sccache-dist server +const DEFAULT_DIST_REQUEST_TIMEOUT: u64 = 600; + +/// Timeout for compile requests to an sccache-dist server +pub fn get_request_timeout() -> Duration { + Duration::new( + env::var("SCCACHE_DIST_REQUEST_TIMEOUT") + .ok() + .and_then(|s| s.parse().ok()) + .unwrap_or(DEFAULT_DIST_REQUEST_TIMEOUT), + 0, + ) +} + mod common { use reqwest::header; use serde::{Deserialize, Serialize}; @@ -264,7 +295,7 @@ mod server { AllocJobHttpResponse, HeartbeatServerHttpRequest, JobJwt, ReqwestRequestBuilderExt, RunJobHttpRequest, ServerCertificateHttpResponse, }; - use super::urls; + use super::{get_connect_timeout, get_request_timeout, urls}; use crate::dist::{ self, AllocJobResult, AssignJobResult, HeartbeatServerResult, InputsReader, JobAuthorizer, JobId, JobState, RunJobResult, SchedulerStatusResult, ServerId, ServerNonce, @@ -745,6 +776,8 @@ mod server { } // Finish the client let new_client = client_builder + .timeout(get_request_timeout()) + .connect_timeout(get_connect_timeout()) // Disable connection pool to avoid broken connection // between runtime .pool_max_idle_per_host(0) @@ -1092,18 +1125,14 @@ mod client { use std::io::Write; use std::path::{Path, PathBuf}; use std::sync::{Arc, Mutex}; - use std::time::Duration; use super::common::{ bincode_req_fut, AllocJobHttpResponse, ReqwestRequestBuilderExt, RunJobHttpRequest, ServerCertificateHttpResponse, }; - use super::urls; + use super::{get_connect_timeout, get_request_timeout, urls}; use crate::errors::*; - const REQUEST_TIMEOUT_SECS: u64 = 600; - const CONNECT_TIMEOUT_SECS: u64 = 5; - pub struct Client { auth_token: String, scheduler_url: reqwest::Url, @@ -1125,11 +1154,9 @@ mod client { auth_token: String, rewrite_includes_only: bool, ) -> Result { - let timeout = Duration::new(REQUEST_TIMEOUT_SECS, 0); - let connect_timeout = Duration::new(CONNECT_TIMEOUT_SECS, 0); let client = reqwest::ClientBuilder::new() - .timeout(timeout) - .connect_timeout(connect_timeout) + .timeout(get_request_timeout()) + .connect_timeout(get_connect_timeout()) // Disable connection pool to avoid broken connection // between runtime .pool_max_idle_per_host(0) @@ -1170,9 +1197,9 @@ mod client { ); } // Finish the client - let timeout = Duration::new(REQUEST_TIMEOUT_SECS, 0); let new_client_async = client_async_builder - .timeout(timeout) + .timeout(get_request_timeout()) + .connect_timeout(get_connect_timeout()) // Disable keep-alive .pool_max_idle_per_host(0) .build() diff --git a/src/util.rs b/src/util.rs index 001aedd2f..075dfd35e 100644 --- a/src/util.rs +++ b/src/util.rs @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#[cfg(any(feature = "dist-server", feature = "dist-client"))] +use crate::dist::http::{get_connect_timeout, get_request_timeout}; use crate::mock_command::{CommandChild, RunCommand}; use blake3::Hasher as blake3_Hasher; use byteorder::{BigEndian, ByteOrder}; @@ -941,6 +943,8 @@ pub fn daemonize() -> Result<()> { pub fn new_reqwest_blocking_client() -> reqwest::blocking::Client { reqwest::blocking::Client::builder() .pool_max_idle_per_host(0) + .timeout(get_request_timeout()) + .connect_timeout(get_connect_timeout()) .build() .expect("http client must build with success") } From 5f1d50e36b885a2ad9035061aaa1cf4e6a1ad132 Mon Sep 17 00:00:00 2001 From: ptaylor Date: Mon, 28 Oct 2024 23:34:52 +0000 Subject: [PATCH 6/6] track started job mtimes and assume jobs that take longer than `SCCACHE_DIST_REQUEST_TIMEOUT` seconds are stale and should be removed --- src/bin/sccache-dist/main.rs | 34 +++++++++++++++++++++++++++++++++- src/dist/http.rs | 22 +++++++++++----------- src/util.rs | 6 +++--- 3 files changed, 47 insertions(+), 15 deletions(-) diff --git a/src/bin/sccache-dist/main.rs b/src/bin/sccache-dist/main.rs index c87ecb043..50cd0033e 100644 --- a/src/bin/sccache-dist/main.rs +++ b/src/bin/sccache-dist/main.rs @@ -8,6 +8,7 @@ use rand::{rngs::OsRng, RngCore}; use sccache::config::{ scheduler as scheduler_config, server as server_config, INSECURE_DIST_CLIENT_TOKEN, }; +use sccache::dist::http::get_dist_request_timeout; use sccache::dist::{ self, AllocJobResult, AssignJobResult, BuilderIncoming, CompileCommand, HeartbeatServerResult, InputsReader, JobAlloc, JobAuthorizer, JobComplete, JobId, JobState, RunJobResult, @@ -317,6 +318,7 @@ const UNCLAIMED_READY_TIMEOUT: Duration = Duration::from_secs(60); #[derive(Copy, Clone)] struct JobDetail { + mtime: Instant, server_id: ServerId, state: JobState, } @@ -494,7 +496,14 @@ impl SchedulerIncoming for Scheduler { ); } - jobs.insert(job_id, JobDetail { server_id, state }); + jobs.insert( + job_id, + JobDetail { + server_id, + state, + mtime: Instant::now(), + }, + ); if log_enabled!(log::Level::Trace) { // LOCKS @@ -686,6 +695,27 @@ impl SchedulerIncoming for Scheduler { } } + // If the server has jobs assigned that have taken longer than `SCCACHE_DIST_REQUEST_TIMEOUT` seconds, + // either the client retried the compilation (due to a server error), or the client abandoned the job + // after calling `run_job/` (for example, when the user kills the build). + // + // In both cases the scheduler moves the job from pending/ready to started, and is expecting to hear + // from the build server that the job has been completed. That message will never arrive, so we track + // the time when jobs are started, and prune them if they take longer than the configured request + // timeout. + // + // Note: this assumes the scheduler's `SCCACHE_DIST_REQUEST_TIMEOUT` is >= the client's value, but + // that should be easy for the user to configure. + for &job_id in details.jobs_assigned.iter() { + if let Some(detail) = jobs.get(&job_id) { + if now.duration_since(detail.mtime) >= get_dist_request_timeout() { + // insert into jobs_unclaimed to avoid the warning below + details.jobs_unclaimed.insert(job_id, detail.mtime); + stale_jobs.push(job_id); + } + } + } + if !stale_jobs.is_empty() { warn!( "The following stale jobs will be de-allocated: {:?}", @@ -791,10 +821,12 @@ impl SchedulerIncoming for Scheduler { // Update the job's `last_seen` time to ensure it isn't // pruned for taking longer than UNCLAIMED_READY_TIMEOUT server.jobs_unclaimed.entry(job_id).and_modify(|e| *e = now); + job.get_mut().mtime = now; job.get_mut().state = job_state; } (JobState::Ready, JobState::Started) => { server.jobs_unclaimed.remove(&job_id); + job.get_mut().mtime = now; job.get_mut().state = job_state; } (JobState::Started, JobState::Complete) => { diff --git a/src/dist/http.rs b/src/dist/http.rs index 0d9d3fba0..429e6b58b 100644 --- a/src/dist/http.rs +++ b/src/dist/http.rs @@ -24,10 +24,10 @@ use std::env; use std::time::Duration; /// Default timeout for connections to an sccache-dist server -const DEFAULT_DIST_CONNECT_TIMEOUT: u64 = 5; +const DEFAULT_DIST_CONNECT_TIMEOUT: u64 = 30; /// Timeout for connections to an sccache-dist server -pub fn get_connect_timeout() -> Duration { +pub fn get_dist_connect_timeout() -> Duration { Duration::new( env::var("SCCACHE_DIST_CONNECT_TIMEOUT") .ok() @@ -41,7 +41,7 @@ pub fn get_connect_timeout() -> Duration { const DEFAULT_DIST_REQUEST_TIMEOUT: u64 = 600; /// Timeout for compile requests to an sccache-dist server -pub fn get_request_timeout() -> Duration { +pub fn get_dist_request_timeout() -> Duration { Duration::new( env::var("SCCACHE_DIST_REQUEST_TIMEOUT") .ok() @@ -295,7 +295,7 @@ mod server { AllocJobHttpResponse, HeartbeatServerHttpRequest, JobJwt, ReqwestRequestBuilderExt, RunJobHttpRequest, ServerCertificateHttpResponse, }; - use super::{get_connect_timeout, get_request_timeout, urls}; + use super::{get_dist_connect_timeout, get_dist_request_timeout, urls}; use crate::dist::{ self, AllocJobResult, AssignJobResult, HeartbeatServerResult, InputsReader, JobAuthorizer, JobId, JobState, RunJobResult, SchedulerStatusResult, ServerId, ServerNonce, @@ -776,8 +776,8 @@ mod server { } // Finish the client let new_client = client_builder - .timeout(get_request_timeout()) - .connect_timeout(get_connect_timeout()) + .timeout(get_dist_request_timeout()) + .connect_timeout(get_dist_connect_timeout()) // Disable connection pool to avoid broken connection // between runtime .pool_max_idle_per_host(0) @@ -1130,7 +1130,7 @@ mod client { bincode_req_fut, AllocJobHttpResponse, ReqwestRequestBuilderExt, RunJobHttpRequest, ServerCertificateHttpResponse, }; - use super::{get_connect_timeout, get_request_timeout, urls}; + use super::{get_dist_connect_timeout, get_dist_request_timeout, urls}; use crate::errors::*; pub struct Client { @@ -1155,8 +1155,8 @@ mod client { rewrite_includes_only: bool, ) -> Result { let client = reqwest::ClientBuilder::new() - .timeout(get_request_timeout()) - .connect_timeout(get_connect_timeout()) + .timeout(get_dist_request_timeout()) + .connect_timeout(get_dist_connect_timeout()) // Disable connection pool to avoid broken connection // between runtime .pool_max_idle_per_host(0) @@ -1198,8 +1198,8 @@ mod client { } // Finish the client let new_client_async = client_async_builder - .timeout(get_request_timeout()) - .connect_timeout(get_connect_timeout()) + .timeout(get_dist_request_timeout()) + .connect_timeout(get_dist_connect_timeout()) // Disable keep-alive .pool_max_idle_per_host(0) .build() diff --git a/src/util.rs b/src/util.rs index 075dfd35e..80c7c4451 100644 --- a/src/util.rs +++ b/src/util.rs @@ -13,7 +13,7 @@ // limitations under the License. #[cfg(any(feature = "dist-server", feature = "dist-client"))] -use crate::dist::http::{get_connect_timeout, get_request_timeout}; +use crate::dist::http::{get_dist_connect_timeout, get_dist_request_timeout}; use crate::mock_command::{CommandChild, RunCommand}; use blake3::Hasher as blake3_Hasher; use byteorder::{BigEndian, ByteOrder}; @@ -943,8 +943,8 @@ pub fn daemonize() -> Result<()> { pub fn new_reqwest_blocking_client() -> reqwest::blocking::Client { reqwest::blocking::Client::builder() .pool_max_idle_per_host(0) - .timeout(get_request_timeout()) - .connect_timeout(get_connect_timeout()) + .timeout(get_dist_request_timeout()) + .connect_timeout(get_dist_connect_timeout()) .build() .expect("http client must build with success") }