From 386b93cd8a5442457fa1e8d1dd287307ef800bf4 Mon Sep 17 00:00:00 2001 From: "C. Titus Brown" Date: Sat, 12 Aug 2023 09:36:10 -0700 Subject: [PATCH] MRG: Add threaded multigather to pyo3_branchwater code; add `fastmultigather` CLI plugin. (#3) * add method for parallel foo * remove comment * rename prefetch to find_overlapping * add multigather * start adding error handling * add some error handling * turn on parallel * update printouts * catch error when loading a bad sig * print when no matches * closer to fix merge * update pyproject * bump maturin version * minor updates * change quoting around names in CSV output * add number of threads output * compiles * more compile * more cleanup, add plugin * update docs --- doc/README.md | 44 +++-- pyproject.toml | 1 + python/pyo3_branchwater/sourmash_plugin.py | 26 +++ src/lib.rs | 186 ++++++++++++++++++++- 4 files changed, 239 insertions(+), 18 deletions(-) diff --git a/doc/README.md b/doc/README.md index 99154eb2..b9215cdb 100644 --- a/doc/README.md +++ b/doc/README.md @@ -1,12 +1,12 @@ -# fastgather and manysearch - an introduction +# fastgather, fastmultigather, and manysearch - an introduction -This repository implements two sourmash plugins, `fastgather` and `manysearch`. These plugins make use of multithreading in Rust to provide very fast implementations of `search` and `gather`. With large databases, these commands can be hundreds to thousands of times faster, and 10-50x lower memory. +This repository implements three sourmash plugins, `fastgather`, `fastmultigather`, and `manysearch`. These plugins make use of multithreading in Rust to provide very fast implementations of `search` and `gather`. With large databases, these commands can be hundreds to thousands of times faster, and 10-50x lower memory. The main *drawback* to these plugin commands is that their inputs and outputs are not as rich as the native sourmash commands. In particular, this means that input databases need to be prepared differently, and the output may be most useful as a prefilter in conjunction with regular sourmash commands. ## Preparing the database -Both `manysearch` and `fastgather` use +All three commands use _text files containing lists of signature files_, or "fromfiles", for the search database, and `manysearch` uses these "fromfiles" for queries, too. (Yes, this plugin will eventually be upgraded to support zip files; keep an eye on [sourmash#2230](https://github.com/sourmash-bio/sourmash/pull/2230).) @@ -25,24 +25,20 @@ find gtdb-reps-rs214-k21/ -name "*.sig" -exec gzip {} \; find gtdb-reps-rs214-k21/ -type f > list.gtdb-reps-rs214-k21.txt ``` +(Note that once [sourmash#2712](https://github.com/sourmash-bio/sourmash/pull/2712) is merged, you will be able to do `sourmash sig split ... -E .sig.gz` to skip the additional find/gzip command.) ## Running the commands -`manysearch` takes two file lists as input, and outputs a CSV -``` -sourmash scripts manysearch query-list.txt podar-ref-list.txt -o results.csv -``` +### Running `manysearch` -`fastgather` takes a query metagenome and a file list as the database, and outputs a CSV: -``` -sourmash scripts fastgather query.sig.gz podar-ref-lists.txt -o results.csv -``` -## Using the output +The `manysearch` command finds overlaps between one or more query genomes, and one or more subject genomes. -### `manysearch` output -The `manysearch` command finds overlaps between one or more query genomes, and one or more subject genomes. +`manysearch` takes two file lists as input, and outputs a CSV +``` +sourmash scripts manysearch query-list.txt podar-ref-list.txt -o results.csv +``` To run it, you need to provide two "fromfiles" containing lists of paths to signature files (`.sig` or `.sig.gz`). If you create a fromfile as above with GTDB reps, you can generate a query fromfile like so: @@ -57,7 +53,14 @@ sourmash scripts manysearch list.query.txt list.gtdb-rs214-k21.txt -o query.x.g The results file here, `query.x.gtdb-reps.csv`, will have five columns: `query` and `query_md5`, `match` and `match_md5`, and `containment.` -### Using `fastgather` to create a picklist for `sourmash gather` +### Running `fastgather` + +`fastgather` takes a query metagenome and a file list as the database, and outputs a CSV: +``` +sourmash scripts fastgather query.sig.gz podar-ref-list.txt -o results.csv +``` + +#### Using `fastgather` to create a picklist for `sourmash gather` The `fastgather` command is a much faster version of sourmash gather. @@ -80,6 +83,15 @@ sourmash gather SRR606249.trim.sig.gz /group/ctbrowngrp/sourmash-db/gtdb-rs214/g Here it's important that the picklist be used on a sourmash collection that contains a manifest - this will prevent sourmash from loading any sketches other than the ones in the fastgather CSV file. (Zip file collections with manifests are produced automatically when `-o filename.zip` is used with `sketch dna`, and can also be prepared with `sourmash sig cat`.) -### Example +#### Example of picklist A complete example Snakefile implementing the above workflow is available [in the 2023-swine-usda](https://github.com/ctb/2023-swine-usda/blob/main/Snakefile) repository. + +### Running `fastmultigather` + +`fastmultigather` takes a file list of query metagenomes and a file list for the database, and outputs many CSVs: +``` +sourmash scripts fastmultigather query-list.txt podar-ref-lists.txt +``` + +The main advantage that `fastmultigather` has over `fastgather` is that you only load the database files once, which can be a significant time savings for large databases! diff --git a/pyproject.toml b/pyproject.toml index 142d58ee..75ec5c35 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -19,6 +19,7 @@ build-backend = "maturin" [project.entry-points."sourmash.cli_script"] manysearch = "pyo3_branchwater.sourmash_plugin:Branchwater_Manysearch" fastgather = "pyo3_branchwater.sourmash_plugin:Branchwater_Fastgather" +fastmultigather = "pyo3_branchwater.sourmash_plugin:Branchwater_Fastmultigather" [tool.maturin] python-source = "python" diff --git a/python/pyo3_branchwater/sourmash_plugin.py b/python/pyo3_branchwater/sourmash_plugin.py index 133bf6c7..7ca60466 100755 --- a/python/pyo3_branchwater/sourmash_plugin.py +++ b/python/pyo3_branchwater/sourmash_plugin.py @@ -71,3 +71,29 @@ def main(self, args): if args.output_prefetch: notify(f"prefetch results in '{args.output_prefetch}'") return status + + +class Branchwater_Fastmultigather(CommandLinePlugin): + command = 'fastmultigather' + description = 'massively parallel sketch multigather' + + def __init__(self, p): + super().__init__(p) + p.add_argument('query_paths', help="a text file containing paths to .sig/.sig.gz files to query") + p.add_argument('against_paths', help="a text file containing paths to .sig/.sig.gz files to search against") + p.add_argument('-t', '--threshold-bp', default=100000, type=float) + p.add_argument('-k', '--ksize', default=31, type=int) + p.add_argument('-s', '--scaled', default=1000, type=int) + + def main(self, args): + num_threads = pyo3_branchwater.get_num_threads() + notify(f"gathering all sketches in '{args.query_paths}' against '{args.against_paths}' using {num_threads} threads") + super().main(args) + status = pyo3_branchwater.do_multigather(args.query_paths, + args.against_paths, + int(args.threshold_bp), + args.ksize, + args.scaled) + if status == 0: + notify(f"...fastmultigather is done!") + return status diff --git a/src/lib.rs b/src/lib.rs index d79b7a07..e1da72d0 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -3,6 +3,9 @@ // not orig. This is different from sourmash... use pyo3::prelude::*; +use pyo3::create_exception; +use pyo3::exceptions::PyException; + use rayon::prelude::*; use std::fs::File; @@ -24,11 +27,24 @@ use sourmash::signature::{Signature, SigsTrait}; use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash}; use sourmash::sketch::Sketch; +create_exception!(pymagsearch, SomeError, pyo3::exceptions::PyException); + +impl std::convert::From for PyErr { + fn from(err: SomeError) -> PyErr { + PyException::new_err(err.to_string()) + } +} + +/// check to see if two KmerMinHash are compatible. +/// +/// CTB note: despite the name, downsampling is not performed? +/// Although it checks if they are compatible in one direction... + fn check_compatible_downsample( me: &KmerMinHash, other: &KmerMinHash, ) -> Result<(), sourmash::Error> { - /* + /* // ignore num minhashes. if self.num != other.num { return Err(Error::MismatchNum { n1: self.num, @@ -55,6 +71,10 @@ fn check_compatible_downsample( Ok(()) } +/// Given a search Signature containing one or more sketches, and a template +/// Sketch, return a compatible (& now downsampled) Sketch from the search +/// Signature. + fn prepare_query(search_sig: &Signature, template: &Sketch) -> Option { let mut search_mh = None; if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) { @@ -279,6 +299,8 @@ impl PartialEq for PrefetchResult { impl Eq for PrefetchResult {} +// Find overlapping above specified threshold. + fn prefetch( query: &KmerMinHash, sketchlist: BinaryHeap, @@ -389,9 +411,45 @@ fn load_sketches_above_threshold( Ok(matchlist) } +fn consume_query_by_gather + std::fmt::Debug + std::fmt::Display + Clone>( + mut query: KmerMinHash, + matchlist: BinaryHeap, + threshold_hashes: u64, + gather_output: Option

, + query_label: String +) -> Result<(), Box> { + // Set up a writer for gather output + let gather_out: Box = match gather_output { + Some(path) => Box::new(BufWriter::new(File::create(path).unwrap())), + None => Box::new(std::io::stdout()), + }; + let mut writer = BufWriter::new(gather_out); + writeln!(&mut writer, "rank,match,md5sum,overlap").ok(); + + let mut matching_sketches = matchlist; + let mut rank = 0; + + while !matching_sketches.is_empty() { + eprintln!("remaining: {} {}", query.size(), matching_sketches.len()); + let best_element = matching_sketches.peek().unwrap(); + + // remove! + query.remove_from(&best_element.minhash)?; + + writeln!(&mut writer, "{},\"{}\",{},{}", rank, best_element.name, best_element.minhash.md5sum(), best_element.overlap).ok(); + + // recalculate remaining overlaps between query and all sketches. + // note: this is parallelized. + matching_sketches = prefetch(&query, matching_sketches, threshold_hashes); + rank = rank + 1; + } + Ok(()) +} + + /// Run counter-gather with a query against a list of files. -fn countergather + std::fmt::Debug>( +fn countergather + std::fmt::Debug + std::fmt::Display + Clone>( query_filename: P, matchlist_filename: P, threshold_bp: usize, @@ -464,6 +522,7 @@ fn countergather + std::fmt::Debug>( None => Box::new(Vec::new()), }; let mut writer = BufWriter::new(prefetch_out); + writeln!(&mut writer, "match,md5sum,overlap").unwrap(); for m in &matchlist { writeln!(&mut writer, "\"{}\",{},{}", m.name, m.minhash.md5sum(), m.overlap).ok(); @@ -499,7 +558,111 @@ fn countergather + std::fmt::Debug>( matching_sketches = prefetch(&query, matching_sketches, threshold_hashes); rank = rank + 1; } + Ok(()) +} + +/// Run counter-gather for multiple queries against a list of files. +fn multigather + std::fmt::Debug + Clone>( + query_filenames: P, + matchlist_filename: P, + threshold_bp: usize, + ksize: u8, + scaled: usize, +) -> Result<(), Box> { + let max_hash = max_hash_for_scaled(scaled as u64); + let template_mh = KmerMinHash::builder() + .num(0u32) + .ksize(ksize as u32) + .max_hash(max_hash) + .build(); + let template = Sketch::MinHash(template_mh); + + // load the list of query paths + let querylist_paths = load_sketchlist_filenames(query_filenames)?; + println!("Loaded {} sig paths in querylist", querylist_paths.len()); + + // build the list of paths to match against. + println!("Loading matchlist"); + let matchlist_paths = load_sketchlist_filenames(matchlist_filename)?; + println!("Loaded {} sig paths in matchlist", matchlist_paths.len()); + + let threshold_hashes : u64 = { + let x = threshold_bp / scaled; + if x > 0 { + x + } else { + 1 + } + }.try_into().unwrap(); + + println!("threshold overlap: {} {}", threshold_hashes, threshold_bp); + + // Load all the against sketches + let sketchlist = load_sketches(matchlist_paths, &template).unwrap(); + + // Iterate over all queries => do prefetch and gather! + let processed_queries = AtomicUsize::new(0); + + querylist_paths + .par_iter() + .for_each(|q| { + let i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst); + let query_label = q.clone().into_os_string().into_string().unwrap(); + + if let Some(query) = { + // load query from q + let mut mm = None; + if let Ok(sigs) = Signature::from_path(dbg!(q)) { + for sig in &sigs { + if let Some(mh) = prepare_query(sig, &template) { + mm = Some(mh); + break; + } + } + } + mm + } { + // filter first set of matches out of sketchlist + let matchlist: BinaryHeap = sketchlist + .par_iter() + .filter_map(|sm| { + let mut mm = None; + + if let Ok(overlap) = sm.minhash.count_common(&query, false) { + if overlap >= threshold_hashes { + let result = PrefetchResult { + name: sm.name.clone(), + minhash: sm.minhash.clone(), + overlap, + }; + mm = Some(result); + } + } + mm + }) + .collect(); + + if matchlist.len() > 0 { + // CTB let prefetch_output = format!("prefetch-{i}.csv"); + let gather_output = format!("gather-{i}.csv"); + + // save initial list of matches to prefetch output + // CTB write_prefetch(query_label.clone(), Some(prefetch_output), + // &matchlist); + + // now, do the gather! + consume_query_by_gather(query, matchlist, threshold_hashes, + Some(gather_output), query_label); + } else { + println!("No matches to '{}'", query_label); + } + } else { + println!("ERROR loading signature from '{}'", query_label); + } + }); + + Ok(()) } @@ -542,6 +705,23 @@ fn do_countergather(query_filename: String, } } +#[pyfunction] +fn do_multigather(query_filenames: String, + siglist_path: String, + threshold_bp: usize, + ksize: u8, + scaled: usize +) -> PyResult { + match multigather(query_filenames, siglist_path, threshold_bp, + ksize, scaled) { + Ok(_) => Ok(0), + Err(e) => { + eprintln!("Error: {e}"); + Ok(1) + } + } +} + #[pyfunction] fn get_num_threads() -> PyResult { Ok(rayon::current_num_threads()) @@ -551,6 +731,8 @@ fn get_num_threads() -> PyResult { fn pyo3_branchwater(_py: Python, m: &PyModule) -> PyResult<()> { m.add_function(wrap_pyfunction!(do_search, m)?)?; m.add_function(wrap_pyfunction!(do_countergather, m)?)?; + m.add_function(wrap_pyfunction!(do_multigather, m)?)?; + m.add("SomeError", _py.get_type::())?; m.add_function(wrap_pyfunction!(get_num_threads, m)?)?; Ok(()) }