Skip to content

Commit

Permalink
MRG: Add threaded multigather to pyo3_branchwater code; add `fastmult…
Browse files Browse the repository at this point in the history
…igather` CLI plugin. (#3)

* add method for parallel foo

* remove comment

* rename prefetch to find_overlapping

* add multigather

* start adding error handling

* add some error handling

* turn on parallel

* update printouts

* catch error when loading a bad sig

* print when no matches

* closer to fix merge

* update pyproject

* bump maturin version

* minor updates

* change quoting around names in CSV output

* add number of threads output

* compiles

* more compile

* more cleanup, add plugin

* update docs
  • Loading branch information
ctb authored Aug 12, 2023
1 parent 4a4b8d0 commit 386b93c
Show file tree
Hide file tree
Showing 4 changed files with 239 additions and 18 deletions.
44 changes: 28 additions & 16 deletions doc/README.md
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
# fastgather and manysearch - an introduction
# fastgather, fastmultigather, and manysearch - an introduction

This repository implements two sourmash plugins, `fastgather` and `manysearch`. These plugins make use of multithreading in Rust to provide very fast implementations of `search` and `gather`. With large databases, these commands can be hundreds to thousands of times faster, and 10-50x lower memory.
This repository implements three sourmash plugins, `fastgather`, `fastmultigather`, and `manysearch`. These plugins make use of multithreading in Rust to provide very fast implementations of `search` and `gather`. With large databases, these commands can be hundreds to thousands of times faster, and 10-50x lower memory.

The main *drawback* to these plugin commands is that their inputs and outputs are not as rich as the native sourmash commands. In particular, this means that input databases need to be prepared differently, and the output may be most useful as a prefilter in conjunction with regular sourmash commands.

## Preparing the database

Both `manysearch` and `fastgather` use
All three commands use
_text files containing lists of signature files_, or "fromfiles", for the search database, and `manysearch` uses these "fromfiles" for queries, too.

(Yes, this plugin will eventually be upgraded to support zip files; keep an eye on [sourmash#2230](https://github.com/sourmash-bio/sourmash/pull/2230).)
Expand All @@ -25,24 +25,20 @@ find gtdb-reps-rs214-k21/ -name "*.sig" -exec gzip {} \;
find gtdb-reps-rs214-k21/ -type f > list.gtdb-reps-rs214-k21.txt
```
(Note that once [sourmash#2712](https://github.com/sourmash-bio/sourmash/pull/2712) is merged, you will be able to do `sourmash sig split ... -E .sig.gz` to skip the additional find/gzip command.)

## Running the commands

`manysearch` takes two file lists as input, and outputs a CSV
```
sourmash scripts manysearch query-list.txt podar-ref-list.txt -o results.csv
```
### Running `manysearch`

`fastgather` takes a query metagenome and a file list as the database, and outputs a CSV:
```
sourmash scripts fastgather query.sig.gz podar-ref-lists.txt -o results.csv
```

## Using the output
The `manysearch` command finds overlaps between one or more query genomes, and one or more subject genomes.

### `manysearch` output

The `manysearch` command finds overlaps between one or more query genomes, and one or more subject genomes.
`manysearch` takes two file lists as input, and outputs a CSV
```
sourmash scripts manysearch query-list.txt podar-ref-list.txt -o results.csv
```

To run it, you need to provide two "fromfiles" containing lists of paths to signature files (`.sig` or `.sig.gz`). If you create a fromfile as above with GTDB reps, you can generate a query fromfile like so:

Expand All @@ -57,7 +53,14 @@ sourmash scripts manysearch list.query.txt list.gtdb-rs214-k21.txt -o query.x.g

The results file here, `query.x.gtdb-reps.csv`, will have five columns: `query` and `query_md5`, `match` and `match_md5`, and `containment.`

### Using `fastgather` to create a picklist for `sourmash gather`
### Running `fastgather`

`fastgather` takes a query metagenome and a file list as the database, and outputs a CSV:
```
sourmash scripts fastgather query.sig.gz podar-ref-list.txt -o results.csv
```

#### Using `fastgather` to create a picklist for `sourmash gather`

The `fastgather` command is a much faster version of sourmash gather.

Expand All @@ -80,6 +83,15 @@ sourmash gather SRR606249.trim.sig.gz /group/ctbrowngrp/sourmash-db/gtdb-rs214/g

Here it's important that the picklist be used on a sourmash collection that contains a manifest - this will prevent sourmash from loading any sketches other than the ones in the fastgather CSV file. (Zip file collections with manifests are produced automatically when `-o filename.zip` is used with `sketch dna`, and can also be prepared with `sourmash sig cat`.)

### Example
#### Example of picklist

A complete example Snakefile implementing the above workflow is available [in the 2023-swine-usda](https://github.com/ctb/2023-swine-usda/blob/main/Snakefile) repository.

### Running `fastmultigather`

`fastmultigather` takes a file list of query metagenomes and a file list for the database, and outputs many CSVs:
```
sourmash scripts fastmultigather query-list.txt podar-ref-lists.txt
```

The main advantage that `fastmultigather` has over `fastgather` is that you only load the database files once, which can be a significant time savings for large databases!
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ build-backend = "maturin"
[project.entry-points."sourmash.cli_script"]
manysearch = "pyo3_branchwater.sourmash_plugin:Branchwater_Manysearch"
fastgather = "pyo3_branchwater.sourmash_plugin:Branchwater_Fastgather"
fastmultigather = "pyo3_branchwater.sourmash_plugin:Branchwater_Fastmultigather"

[tool.maturin]
python-source = "python"
Expand Down
26 changes: 26 additions & 0 deletions python/pyo3_branchwater/sourmash_plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,29 @@ def main(self, args):
if args.output_prefetch:
notify(f"prefetch results in '{args.output_prefetch}'")
return status


class Branchwater_Fastmultigather(CommandLinePlugin):
command = 'fastmultigather'
description = 'massively parallel sketch multigather'

def __init__(self, p):
super().__init__(p)
p.add_argument('query_paths', help="a text file containing paths to .sig/.sig.gz files to query")
p.add_argument('against_paths', help="a text file containing paths to .sig/.sig.gz files to search against")
p.add_argument('-t', '--threshold-bp', default=100000, type=float)
p.add_argument('-k', '--ksize', default=31, type=int)
p.add_argument('-s', '--scaled', default=1000, type=int)

def main(self, args):
num_threads = pyo3_branchwater.get_num_threads()
notify(f"gathering all sketches in '{args.query_paths}' against '{args.against_paths}' using {num_threads} threads")
super().main(args)
status = pyo3_branchwater.do_multigather(args.query_paths,
args.against_paths,
int(args.threshold_bp),
args.ksize,
args.scaled)
if status == 0:
notify(f"...fastmultigather is done!")
return status
186 changes: 184 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@
// not orig. This is different from sourmash...

use pyo3::prelude::*;
use pyo3::create_exception;
use pyo3::exceptions::PyException;

use rayon::prelude::*;

use std::fs::File;
Expand All @@ -24,11 +27,24 @@ use sourmash::signature::{Signature, SigsTrait};
use sourmash::sketch::minhash::{max_hash_for_scaled, KmerMinHash};
use sourmash::sketch::Sketch;

create_exception!(pymagsearch, SomeError, pyo3::exceptions::PyException);

impl std::convert::From<SomeError> for PyErr {
fn from(err: SomeError) -> PyErr {
PyException::new_err(err.to_string())
}
}

/// check to see if two KmerMinHash are compatible.
///
/// CTB note: despite the name, downsampling is not performed?
/// Although it checks if they are compatible in one direction...

fn check_compatible_downsample(
me: &KmerMinHash,
other: &KmerMinHash,
) -> Result<(), sourmash::Error> {
/*
/* // ignore num minhashes.
if self.num != other.num {
return Err(Error::MismatchNum {
n1: self.num,
Expand All @@ -55,6 +71,10 @@ fn check_compatible_downsample(
Ok(())
}

/// Given a search Signature containing one or more sketches, and a template
/// Sketch, return a compatible (& now downsampled) Sketch from the search
/// Signature.

fn prepare_query(search_sig: &Signature, template: &Sketch) -> Option<KmerMinHash> {
let mut search_mh = None;
if let Some(Sketch::MinHash(mh)) = search_sig.select_sketch(template) {
Expand Down Expand Up @@ -279,6 +299,8 @@ impl PartialEq for PrefetchResult {

impl Eq for PrefetchResult {}

// Find overlapping above specified threshold.

fn prefetch(
query: &KmerMinHash,
sketchlist: BinaryHeap<PrefetchResult>,
Expand Down Expand Up @@ -389,9 +411,45 @@ fn load_sketches_above_threshold(
Ok(matchlist)
}

fn consume_query_by_gather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
mut query: KmerMinHash,
matchlist: BinaryHeap<PrefetchResult>,
threshold_hashes: u64,
gather_output: Option<P>,
query_label: String
) -> Result<(), Box<dyn std::error::Error>> {
// Set up a writer for gather output
let gather_out: Box<dyn Write> = match gather_output {
Some(path) => Box::new(BufWriter::new(File::create(path).unwrap())),
None => Box::new(std::io::stdout()),
};
let mut writer = BufWriter::new(gather_out);
writeln!(&mut writer, "rank,match,md5sum,overlap").ok();

let mut matching_sketches = matchlist;
let mut rank = 0;

while !matching_sketches.is_empty() {
eprintln!("remaining: {} {}", query.size(), matching_sketches.len());
let best_element = matching_sketches.peek().unwrap();

// remove!
query.remove_from(&best_element.minhash)?;

writeln!(&mut writer, "{},\"{}\",{},{}", rank, best_element.name, best_element.minhash.md5sum(), best_element.overlap).ok();

// recalculate remaining overlaps between query and all sketches.
// note: this is parallelized.
matching_sketches = prefetch(&query, matching_sketches, threshold_hashes);
rank = rank + 1;
}
Ok(())
}


/// Run counter-gather with a query against a list of files.

fn countergather<P: AsRef<Path> + std::fmt::Debug>(
fn countergather<P: AsRef<Path> + std::fmt::Debug + std::fmt::Display + Clone>(
query_filename: P,
matchlist_filename: P,
threshold_bp: usize,
Expand Down Expand Up @@ -464,6 +522,7 @@ fn countergather<P: AsRef<Path> + std::fmt::Debug>(
None => Box::new(Vec::new()),
};
let mut writer = BufWriter::new(prefetch_out);

writeln!(&mut writer, "match,md5sum,overlap").unwrap();
for m in &matchlist {
writeln!(&mut writer, "\"{}\",{},{}", m.name, m.minhash.md5sum(), m.overlap).ok();
Expand Down Expand Up @@ -499,7 +558,111 @@ fn countergather<P: AsRef<Path> + std::fmt::Debug>(
matching_sketches = prefetch(&query, matching_sketches, threshold_hashes);
rank = rank + 1;
}
Ok(())
}

/// Run counter-gather for multiple queries against a list of files.

fn multigather<P: AsRef<Path> + std::fmt::Debug + Clone>(
query_filenames: P,
matchlist_filename: P,
threshold_bp: usize,
ksize: u8,
scaled: usize,
) -> Result<(), Box<dyn std::error::Error>> {
let max_hash = max_hash_for_scaled(scaled as u64);
let template_mh = KmerMinHash::builder()
.num(0u32)
.ksize(ksize as u32)
.max_hash(max_hash)
.build();
let template = Sketch::MinHash(template_mh);

// load the list of query paths
let querylist_paths = load_sketchlist_filenames(query_filenames)?;
println!("Loaded {} sig paths in querylist", querylist_paths.len());

// build the list of paths to match against.
println!("Loading matchlist");
let matchlist_paths = load_sketchlist_filenames(matchlist_filename)?;
println!("Loaded {} sig paths in matchlist", matchlist_paths.len());

let threshold_hashes : u64 = {
let x = threshold_bp / scaled;
if x > 0 {
x
} else {
1
}
}.try_into().unwrap();

println!("threshold overlap: {} {}", threshold_hashes, threshold_bp);

// Load all the against sketches
let sketchlist = load_sketches(matchlist_paths, &template).unwrap();

// Iterate over all queries => do prefetch and gather!
let processed_queries = AtomicUsize::new(0);

querylist_paths
.par_iter()
.for_each(|q| {
let i = processed_queries.fetch_add(1, atomic::Ordering::SeqCst);
let query_label = q.clone().into_os_string().into_string().unwrap();

if let Some(query) = {
// load query from q
let mut mm = None;
if let Ok(sigs) = Signature::from_path(dbg!(q)) {
for sig in &sigs {
if let Some(mh) = prepare_query(sig, &template) {
mm = Some(mh);
break;
}
}
}
mm
} {
// filter first set of matches out of sketchlist
let matchlist: BinaryHeap<PrefetchResult> = sketchlist
.par_iter()
.filter_map(|sm| {
let mut mm = None;

if let Ok(overlap) = sm.minhash.count_common(&query, false) {
if overlap >= threshold_hashes {
let result = PrefetchResult {
name: sm.name.clone(),
minhash: sm.minhash.clone(),
overlap,
};
mm = Some(result);
}
}
mm
})
.collect();

if matchlist.len() > 0 {
// CTB let prefetch_output = format!("prefetch-{i}.csv");
let gather_output = format!("gather-{i}.csv");

// save initial list of matches to prefetch output
// CTB write_prefetch(query_label.clone(), Some(prefetch_output),
// &matchlist);

// now, do the gather!
consume_query_by_gather(query, matchlist, threshold_hashes,
Some(gather_output), query_label);
} else {
println!("No matches to '{}'", query_label);
}
} else {
println!("ERROR loading signature from '{}'", query_label);
}
});


Ok(())
}

Expand Down Expand Up @@ -542,6 +705,23 @@ fn do_countergather(query_filename: String,
}
}

#[pyfunction]
fn do_multigather(query_filenames: String,
siglist_path: String,
threshold_bp: usize,
ksize: u8,
scaled: usize
) -> PyResult<u8> {
match multigather(query_filenames, siglist_path, threshold_bp,
ksize, scaled) {
Ok(_) => Ok(0),
Err(e) => {
eprintln!("Error: {e}");
Ok(1)
}
}
}

#[pyfunction]
fn get_num_threads() -> PyResult<usize> {
Ok(rayon::current_num_threads())
Expand All @@ -551,6 +731,8 @@ fn get_num_threads() -> PyResult<usize> {
fn pyo3_branchwater(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(do_search, m)?)?;
m.add_function(wrap_pyfunction!(do_countergather, m)?)?;
m.add_function(wrap_pyfunction!(do_multigather, m)?)?;
m.add("SomeError", _py.get_type::<SomeError>())?;
m.add_function(wrap_pyfunction!(get_num_threads, m)?)?;
Ok(())
}

0 comments on commit 386b93c

Please sign in to comment.