Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

set up namespace support #3

Merged
merged 2 commits into from
Sep 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion data/templates/callback.txt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,12 @@ set -euxo pipefail
module load python-data/3.10-23.07

# run the monitor in the background
python3 {workflow_monitor_path} &
python3 {workflow_monitor_path} \
--callback_token $CALLBACK_TOKEN \
--namespace {namespace} &

# grab workflow monitor pid
workflow_monitor_pid=$!

# ------------------------------------------------------------------------------

2 changes: 1 addition & 1 deletion data/templates/nxf.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ nextflow run {pgsc_calc_dir} -profile singularity \
-c {work_dir}/{name}/allas.config \
-params-file {work_dir}/{name}/params.json \
--input {work_dir}/{name}/input.json \
--outdir s3://intervene-dev/{name} \
--outdir s3://intervene-{namespace}/{name} \
--min_overlap 0.01 \
--max_cpus 40 \
--max_memory "32.GB" \
Expand Down
17 changes: 12 additions & 5 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,19 +12,21 @@
#![warn(missing_docs)]

use std::fs;
use std::path::{PathBuf};
use std::path::PathBuf;

use clap::Parser;
use log::info;
use rusqlite::Connection;

use crate::db::ingest::message::ingest_message;
use crate::db::job::load::get_valid_jobs;
use crate::namespace::PlatformNamespace;
use crate::slurm::job_request::JobRequest;

mod db;
mod request;
mod slurm;
mod namespace;

#[derive(Parser, Debug)]
#[command(name = "hattivatti")]
Expand All @@ -48,7 +50,10 @@ struct Args {
dry_run: bool,
/// Path to the globus file handler jar
#[arg(short, long)]
globus_jar_path: PathBuf
globus_jar_path: PathBuf,
/// Which platform namespace do you want to deploy to? [dev, test, prod]
#[arg(short, long, value_enum)]
namespace: PlatformNamespace
}

/// A directory for storing working data
Expand All @@ -70,15 +75,17 @@ async fn main() {
info!("terve! starting up :)");

let args = Args::parse();
let wd = WorkingDirectory { path: args.work_dir };

let wd = WorkingDirectory { path: args.work_dir.join(args.namespace.to_string()) };
info!("Setting work directory to: {:?}", &wd.path);
fs::create_dir_all(&wd.path).expect("Can't create working directory");

let conn: Connection = db::open::open_db(&wd)
.expect("Database connection");

let schema = request::schema::load_schema(args.schema_dir.as_path());
let s3_client = request::message::make_allas_client();
let messages = request::message::fetch_all(&s3_client, &schema).await;
let messages = request::message::fetch_all(&s3_client, &schema, &args.namespace).await;

if let Some(messages) = messages {
for message in messages {
Expand All @@ -98,7 +105,7 @@ async fn main() {

if let Some(jobs) = jobs {
for job in jobs {
let job_path = job.create(&wd, &args.globus_jar_path);
let job_path = job.create(&wd, &args.globus_jar_path, &args.namespace);
if !args.dry_run {
job.stage(&conn);
job.submit(&conn, job_path);
Expand Down
19 changes: 19 additions & 0 deletions src/namespace.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
use std::fmt;
use clap::ValueEnum;

#[derive(Copy, Clone, Debug, PartialEq, Eq, PartialOrd, Ord, ValueEnum)]
pub enum PlatformNamespace {
Dev,
Test,
Prod
}

impl fmt::Display for PlatformNamespace {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
PlatformNamespace::Dev => write!(f, "dev"),
PlatformNamespace::Test => write!(f, "test"),
PlatformNamespace::Prod => write!(f, "prod")
}
}
}
9 changes: 5 additions & 4 deletions src/request/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use log::{info, warn};
use rusoto_s3::S3;
use serde_json::Value;
use tokio::io::AsyncReadExt;
use crate::namespace::PlatformNamespace;

/// Create an s3 client capable of connecting to Allas
///
Expand Down Expand Up @@ -41,13 +42,13 @@ pub struct AllasMessage {
}

/// Fetch all messages in the work queue on Allas
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) -> Option<Vec<AllasMessage>> {
let bucket = "intervene-dev";
pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema, namespace: &PlatformNamespace) -> Option<Vec<AllasMessage>> {
let bucket = format!("intervene-{namespace}");
let prefix = "job-queue";
info!("Checking Allas queue {bucket}/{prefix} for new messages");

let list_request = rusoto_s3::ListObjectsV2Request {
bucket: bucket.into(),
bucket: bucket.clone(),
prefix: Some(prefix.into()),
..Default::default()
};
Expand All @@ -66,7 +67,7 @@ pub async fn fetch_all(s3_client: &rusoto_s3::S3Client, schema: &JSONSchema) ->
for object in objects {
let key = object.key.unwrap();
info!("Object key: {}", key);
let content = read_job(&s3_client, bucket, &key).await;
let content = read_job(&s3_client, &bucket, &key).await;
// info!("Object content: {content}");
jobs.push(AllasMessage::new(content,
bucket.to_string(),
Expand Down
21 changes: 13 additions & 8 deletions src/slurm/job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use chrono::Utc;
use log::{info, warn};
use serde::Serialize;
use tinytemplate::TinyTemplate;
use crate::namespace::PlatformNamespace;

use crate::slurm::job_request::{GlobusDetails, JobRequest, NxfParamsFile, PipelineParam, TargetGenome};
use crate::WorkingDirectory;
Expand All @@ -23,7 +24,7 @@ pub struct JobPath {
}

impl JobRequest {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf) -> JobPath {
pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf, namespace: &PlatformNamespace) -> JobPath {
let instance_wd = WorkingDirectory { path: wd.path.join(&&self.pipeline_param.id) };
info!("Creating job {} in working directory {}", &&self.pipeline_param.id, &instance_wd.path.display());

Expand All @@ -34,9 +35,9 @@ impl JobRequest {
fs::create_dir(&instance_wd.path).expect("Create working directory");

let header: Header = render_header(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param);
let callback: Callback = render_callback(&&self.pipeline_param, &namespace);
let vars: EnvVars = read_environment_variables();
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path);
let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path, &namespace);
let job = JobTemplate { header, callback, vars, workflow };

let path = &instance_wd.path.join("job.sh");
Expand Down Expand Up @@ -148,6 +149,7 @@ struct EnvVarContext {
struct NextflowContext {
name: String,
work_dir: String,
namespace: String,
pgsc_calc_dir: String,
globus_path: String,
globus_parent_path: String
Expand All @@ -157,7 +159,8 @@ struct NextflowContext {
#[derive(Serialize)]
struct CallbackContext {
name: String,
workflow_monitor_path: String
workflow_monitor_path: String,
namespace: String
}

/// Write nextflow parameters to working directory
Expand Down Expand Up @@ -211,18 +214,18 @@ fn read_environment_variables() -> EnvVars {
}

/// Render the workflow commands using TinyTemplate
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path) -> Workflow {
fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path, namespace: &PlatformNamespace) -> Workflow {
/// included workflow template
static NXF: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/nxf.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("nxf", NXF).expect("Template");
let name: &String = &param.id;
let wd = work_dir.to_str().expect("path").to_string();
// todo: make dynamic based on deployment namespace
/// installation directory of pgsc_calc (TODO: make this a parameter)
static PGSC_CALC_DIR: &str = "/scratch/project_2004504/pgsc_calc/";
let context = NextflowContext { name: name.clone(),
work_dir: wd,
namespace: namespace.to_string(),
pgsc_calc_dir: PGSC_CALC_DIR.to_string(),
globus_path: globus_path.to_str().expect("Globus path").to_string(),
globus_parent_path: globus_path.parent().expect("Globus parent").to_str().expect("Globus parent path").to_string()
Expand All @@ -231,15 +234,17 @@ fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path) ->
}

/// Render the callback using TinyTemplate
fn render_callback(param: &PipelineParam) -> Callback {
fn render_callback(param: &PipelineParam, namespace: &PlatformNamespace) -> Callback {
/// included callback template
static CALLBACK: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/callback.txt"));
let mut tt = TinyTemplate::new();
tt.add_template("callback", CALLBACK).expect("Template");
let name: &String = &param.id;
static WORKFLOW_MONITOR_PATH: &str = "/scratch/project_2004504/bwingfield/workflow-monitor/main.py";
let context = CallbackContext { name: name.clone(),
workflow_monitor_path: WORKFLOW_MONITOR_PATH.to_string() };
workflow_monitor_path: WORKFLOW_MONITOR_PATH.to_string(),
namespace: namespace.to_string()
};
Callback { content: tt.render("callback", &context).expect("Rendered callback") }
}

Expand Down