From 0fc7f10613e6f79203fac05fc8200e1e31194b49 Mon Sep 17 00:00:00 2001 From: Benjamin Wingfield Date: Mon, 18 Sep 2023 15:40:01 +0100 Subject: [PATCH] add workflow-monitor-path parameter --- data/templates/callback.txt | 2 +- src/main.rs | 7 +++++-- src/slurm/job.rs | 10 +++++----- 3 files changed, 11 insertions(+), 8 deletions(-) diff --git a/data/templates/callback.txt b/data/templates/callback.txt index 94fa74c..32dd7bd 100644 --- a/data/templates/callback.txt +++ b/data/templates/callback.txt @@ -8,7 +8,7 @@ set -euxo pipefail module load python-data/3.10-23.07 # run the monitor in the background -python3 {workflow_monitor_path} \ +{workflow_monitor_path} \ --callback_token $CALLBACK_TOKEN \ --namespace {namespace} & diff --git a/src/main.rs b/src/main.rs index b9d77fb..afdf150 100644 --- a/src/main.rs +++ b/src/main.rs @@ -53,7 +53,10 @@ struct Args { globus_jar_path: PathBuf, /// Which platform namespace do you want to deploy to? [dev, test, prod] #[arg(short, long, value_enum)] - namespace: PlatformNamespace + namespace: PlatformNamespace, + /// Path to the workflow monitor binary (installed in a venv) + #[arg(long)] + workflow_monitor_path: PathBuf, } /// A directory for storing working data @@ -105,7 +108,7 @@ async fn main() { if let Some(jobs) = jobs { for job in jobs { - let job_path = job.create(&wd, &args.globus_jar_path, &args.namespace); + let job_path = job.create(&wd, &args.globus_jar_path, &args.namespace, &args.workflow_monitor_path); if !args.dry_run { job.stage(&conn); job.submit(&conn, job_path); diff --git a/src/slurm/job.rs b/src/slurm/job.rs index 77b8490..4fc98ce 100644 --- a/src/slurm/job.rs +++ b/src/slurm/job.rs @@ -24,7 +24,8 @@ pub struct JobPath { } impl JobRequest { - pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf, namespace: &PlatformNamespace) -> JobPath { + pub fn create(&self, wd: &WorkingDirectory, globus_path: &PathBuf, namespace: &PlatformNamespace, + monitor_path: &PathBuf) -> JobPath { let instance_wd = WorkingDirectory { path: wd.path.join(&&self.pipeline_param.id) }; info!("Creating job {} in working directory {}", &&self.pipeline_param.id, &instance_wd.path.display()); @@ -35,7 +36,7 @@ impl JobRequest { fs::create_dir(&instance_wd.path).expect("Create working directory"); let header: Header = render_header(&&self.pipeline_param); - let callback: Callback = render_callback(&&self.pipeline_param, &namespace); + let callback: Callback = render_callback(&&self.pipeline_param, &namespace, &monitor_path); let vars: EnvVars = read_environment_variables(); let workflow: Workflow = render_nxf(&globus_path, &&self.pipeline_param, &wd.path, &namespace); let job = JobTemplate { header, callback, vars, workflow }; @@ -234,15 +235,14 @@ fn render_nxf(globus_path: &PathBuf, param: &PipelineParam, work_dir: &Path, nam } /// Render the callback using TinyTemplate -fn render_callback(param: &PipelineParam, namespace: &PlatformNamespace) -> Callback { +fn render_callback(param: &PipelineParam, namespace: &PlatformNamespace, monitor_path: &&PathBuf) -> Callback { /// included callback template static CALLBACK: &str = include_str!(concat!(env!("CARGO_MANIFEST_DIR"), "/data/templates/callback.txt")); let mut tt = TinyTemplate::new(); tt.add_template("callback", CALLBACK).expect("Template"); let name: &String = ¶m.id; - static WORKFLOW_MONITOR_PATH: &str = "/scratch/project_2004504/bwingfield/workflow-monitor/main.py"; let context = CallbackContext { name: name.clone(), - workflow_monitor_path: WORKFLOW_MONITOR_PATH.to_string(), + workflow_monitor_path: monitor_path.to_string_lossy().to_string(), namespace: namespace.to_string() }; Callback { content: tt.render("callback", &context).expect("Rendered callback") }