Skip to content

Commit

Permalink
EXP: provide manysearch binary. (#70)
Browse files Browse the repository at this point in the history
* [MRG] Add `-c` `--cores` (#57)

* add -c --cores

* no need to change other functions

* clean up spaces ;)

* refactoring & actual_cores

---------

Co-authored-by: C. Titus Brown <[email protected]>

* MRG: add a simple command line test of `--cores` (#65)

* add -c --cores

* no need to change other functions

* clean up spaces ;)

* refactoring & actual_cores

* add a simple test of -c

* check num cores

---------

Co-authored-by: Mohamed Abuelanin <[email protected]>

* MRG: switch to AGPL license (#66)

* switch to AGPL

* Update README.md

* Update README.md

---------

Co-authored-by: Tessa Pierce Ward <[email protected]>
Co-authored-by: Mohamed Abuelanin <[email protected]>

* add manysearch cmd line for profiling help

* provide 'manysearch' binary

* disable jaccard

---------

Co-authored-by: Mohamed Abuelanin <[email protected]>
Co-authored-by: Tessa Pierce Ward <[email protected]>
  • Loading branch information
3 people authored Sep 1, 2023
1 parent 3647625 commit 3051256
Show file tree
Hide file tree
Showing 9 changed files with 813 additions and 10 deletions.
5 changes: 5 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ edition = "2021"
name = "pyo3_branchwater"
crate-type = ["cdylib"]

[[bin]]
name = "manysearch"
path = "src/main.rs"

[dependencies]
pyo3 = { version = "0.18.1", features = ["extension-module", "anyhow"] }
rayon = "1.5.1"
Expand All @@ -31,3 +35,4 @@ tempfile = "3.3.0"
#target-cpu=native
lto = "thin"
opt-level = 3
debug = true
661 changes: 661 additions & 0 deletions LICENSE.txt

Large diffs are not rendered by default.

11 changes: 10 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -121,4 +121,13 @@ maturin build

---

CTB Aug 2023
## License

This software is under the AGPL license. Please see [LICENSE.txt](LICENSE.txt).

## Authors

Luiz Irber
C. Titus Brown
Mohamed Abuelanin
N. Tessa Pierce-Ward
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@ fastmultigather = "pyo3_branchwater:Branchwater_Fastmultigather"
python-source = "src/python"

[metadata]
license = { text = "BSD 3-Clause License" }
license = { text = "GNU Affero General Public License v3" }
10 changes: 7 additions & 3 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,15 +320,19 @@ fn do_multigather(query_filenames: String,
}

#[pyfunction]
fn get_num_threads() -> PyResult<usize> {
Ok(rayon::current_num_threads())
fn set_global_thread_pool(num_threads: usize) -> PyResult<usize> {
if let Ok(_) = std::panic::catch_unwind(|| rayon::ThreadPoolBuilder::new().num_threads(num_threads).build_global()) {
Ok(rayon::current_num_threads())
} else {
Err(PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Could not set the number of threads. Global thread pool might already be initialized."))
}
}

#[pymodule]
fn pyo3_branchwater(_py: Python, m: &PyModule) -> PyResult<()> {
m.add_function(wrap_pyfunction!(do_manysearch, m)?)?;
m.add_function(wrap_pyfunction!(do_countergather, m)?)?;
m.add_function(wrap_pyfunction!(do_multigather, m)?)?;
m.add_function(wrap_pyfunction!(get_num_threads, m)?)?;
m.add_function(wrap_pyfunction!(set_global_thread_pool, m)?)?;
Ok(())
}
55 changes: 55 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
use std::path::PathBuf;

use anyhow::Result;

pub mod utils;
pub mod manysearch;

use manysearch::manysearch;

use clap::Parser;

#[derive(Parser, Debug)]
#[clap(author, version, about, long_about = None)]
struct Cli {
/// List of queries (one sig path per line in the file)
#[clap(parse(from_os_str))]
querylist: PathBuf,

/// List of signatures to search
#[clap(parse(from_os_str))]
siglist: PathBuf,

/// ksize
#[clap(short, long, default_value = "31")]
ksize: u8,

/// threshold
#[clap(short, long, default_value = "0.85")]
threshold: f64,

/// scaled
#[clap(short, long, default_value = "1000")]
scaled: usize,

/// The path for output
#[clap(parse(from_os_str), short, long)]
output: Option<PathBuf>,
}

fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();

let opts = Cli::parse();

manysearch(
opts.querylist,
opts.siglist,
opts.threshold,
opts.ksize,
opts.scaled,
opts.output,
)?;

Ok(())
}
7 changes: 5 additions & 2 deletions src/manysearch.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use std::path::Path;
use std::sync::atomic;
use std::sync::atomic::AtomicUsize;

use anyhow::Result;
use anyhow::{Result, bail};

use crate::utils::{prepare_query,
load_sketchlist_filenames, load_sketches};
Expand Down Expand Up @@ -121,12 +121,15 @@ pub fn manysearch<P: AsRef<Path>>(
let overlap = q.minhash.count_common(&search_sm.minhash, false).unwrap() as f64;
let query_size = q.minhash.size() as f64;

/*
let mut merged = q.minhash.clone();
merged.merge(&search_sm.minhash).ok();
let total_size = merged.size() as f64;
*/

let containment = overlap / query_size;
let jaccard = overlap / total_size;
// let jaccard = overlap / total_size;
let jaccard = 0;
if containment > threshold {
results.push((q.name.clone(),
q.md5sum.clone(),
Expand Down
46 changes: 43 additions & 3 deletions src/python/pyo3_branchwater/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,33 @@
import argparse
from sourmash.plugins import CommandLinePlugin
from sourmash.logging import notify
import os

from . import pyo3_branchwater


def get_max_cores():
try:
if 'SLURM_CPUS_ON_NODE' in os.environ:
return int(os.environ['SLURM_CPUS_ON_NODE'])
elif 'SLURM_JOB_CPUS_PER_NODE' in os.environ:
cpus_per_node_str = os.environ['SLURM_JOB_CPUS_PER_NODE']
return int(cpus_per_node_str.split('x')[0])
else:
return os.cpu_count()
except Exception:
return os.cpu_count()


def set_thread_pool(user_cores):
avail_threads = get_max_cores()
num_threads = min(avail_threads, user_cores) if user_cores else avail_threads
if user_cores and user_cores > avail_threads:
notify(f"warning: only {avail_threads} threads available, using {avail_threads}")
actual_rayon_cores = pyo3_branchwater.set_global_thread_pool(num_threads)
return actual_rayon_cores


class Branchwater_Manysearch(CommandLinePlugin):
command = 'manysearch'
description = 'massively parallel sketch search'
Expand All @@ -24,11 +48,16 @@ def __init__(self, p):
help='k-mer size at which to select sketches')
p.add_argument('-s', '--scaled', default=1000, type=int,
help='scaled factor at which to do comparisons')
p.add_argument('-c', '--cores', default=0, type=int,
help='number of cores to use (default is all available)')

def main(self, args):
notify(f"ksize: {args.ksize} / scaled: {args.scaled} / threshold: {args.threshold}")
num_threads = pyo3_branchwater.get_num_threads()

num_threads = set_thread_pool(args.cores)

notify(f"searching all sketches in '{args.query_paths}' against '{args.against_paths}' using {num_threads} threads")

super().main(args)
status = pyo3_branchwater.do_manysearch(args.query_paths,
args.against_paths,
Expand Down Expand Up @@ -59,10 +88,16 @@ def __init__(self, p):
help='k-mer size at which to do comparisons (default: 31)')
p.add_argument('-s', '--scaled', default=1000, type=int,
help='scaled factor at which to do comparisons (default: 1000)')
p.add_argument('-c', '--cores', default=0, type=int,
help='number of cores to use (default is all available)')


def main(self, args):
notify(f"ksize: {args.ksize} / scaled: {args.scaled} / threshold bp: {args.threshold_bp}")
num_threads = pyo3_branchwater.get_num_threads()

num_threads = set_thread_pool(args.cores)


notify(f"gathering all sketches in '{args.query_sig}' against '{args.against_paths}' using {num_threads} threads")
super().main(args)
status = pyo3_branchwater.do_countergather(args.query_sig,
Expand Down Expand Up @@ -93,10 +128,15 @@ def __init__(self, p):
help='k-mer size at which to do comparisons (default: 31)')
p.add_argument('-s', '--scaled', default=1000, type=int,
help='scaled factor at which to do comparisons (default: 1000)')
p.add_argument('-c', '--cores', default=0, type=int,
help='number of cores to use (default is all available)')


def main(self, args):
notify(f"ksize: {args.ksize} / scaled: {args.scaled} / threshold bp: {args.threshold_bp}")
num_threads = pyo3_branchwater.get_num_threads()

num_threads = set_thread_pool(args.cores)

notify(f"gathering all sketches in '{args.query_paths}' against '{args.against_paths}' using {num_threads} threads")
super().main(args)
status = pyo3_branchwater.do_multigather(args.query_paths,
Expand Down
26 changes: 26 additions & 0 deletions src/python/tests/test_search.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ def test_simple(runtmp):
assert cont == 0.4885


def test_simple_with_cores(runtmp, capfd):
# test basic execution with -c argument (that it runs, at least!)
query_list = runtmp.output('query.txt')
against_list = runtmp.output('against.txt')

sig2 = get_test_data('2.fa.sig.gz')
sig47 = get_test_data('47.fa.sig.gz')
sig63 = get_test_data('63.fa.sig.gz')

make_file_list(query_list, [sig2, sig47, sig63])
make_file_list(against_list, [sig2, sig47, sig63])

output = runtmp.output('out.csv')

runtmp.sourmash('scripts', 'manysearch', query_list, against_list,
'-o', output, '-c', '4')
assert os.path.exists(output)

df = pandas.read_csv(output)
assert len(df) == 5

result = runtmp.last_result
print(result.err)
assert " using 4 threads" in result.err


def test_simple_threshold(runtmp):
# test with a simple threshold => only 3 results
query_list = runtmp.output('query.txt')
Expand Down

0 comments on commit 3051256

Please sign in to comment.