diff --git a/NAMESPACE b/NAMESPACE
index 8a971a5..51a2c55 100644
--- a/NAMESPACE
+++ b/NAMESPACE
@@ -2,8 +2,8 @@
export(Rush)
export(RushWorker)
-export(fun_heartbeat)
export(get_hostname)
+export(heartbeat)
export(rsh)
export(rush_available)
export(rush_config)
diff --git a/R/Rush.R b/R/Rush.R
index f4544a4..04e4827 100644
--- a/R/Rush.R
+++ b/R/Rush.R
@@ -159,6 +159,8 @@ Rush = R6::R6Class("Rush",
#' Start workers locally with `processx`.
#' The [processx::process] are stored in `$processes`.
#' Alternatively, use `$create_worker_script()` to create a script for starting workers on remote machines.
+ #' By default, [worker_loop_default()] is used as worker loop.
+ #' This function takes the arguments `fun` and optionally `constants` which are passed in `...`.
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
diff --git a/R/RushWorker.R b/R/RushWorker.R
index b54aad9..d0f7041 100644
--- a/R/RushWorker.R
+++ b/R/RushWorker.R
@@ -60,7 +60,7 @@ RushWorker = R6::R6Class("RushWorker",
heartbeat_expire = heartbeat_expire,
pid = Sys.getpid()
)
- self$heartbeat = callr::r_bg(fun_heartbeat, args = heartbeat_args, supervise = TRUE)
+ self$heartbeat = callr::r_bg(heartbeat, args = heartbeat_args, supervise = TRUE)
# wait until heartbeat process is able to work
Sys.sleep(1)
diff --git a/R/heartbeat_loops.R b/R/heartbeat_loops.R
index 8768f87..198c196 100644
--- a/R/heartbeat_loops.R
+++ b/R/heartbeat_loops.R
@@ -15,7 +15,7 @@
#' @template param_heartbeat_expire
#'
#' @export
-fun_heartbeat = function(network_id, config, worker_id, heartbeat_period, heartbeat_expire, pid) {
+heartbeat = function(network_id, config, worker_id, heartbeat_period, heartbeat_expire, pid) {
r = redux::hiredis(config)
worker_id_key = sprintf("%s:%s", network_id, worker_id)
heartbeat_key = sprintf("%s:%s:heartbeat", network_id, worker_id)
diff --git a/R/worker_loops.R b/R/worker_loops.R
index ae0b7f7..b1064e8 100644
--- a/R/worker_loops.R
+++ b/R/worker_loops.R
@@ -7,18 +7,21 @@
#'
#' @param fun (`function`)\cr
#' Function to be executed.
+#' @param constants (`list`)\cr
+#' List of constants passed to `fun`.
#' @param rush ([RushWorker])\cr
#' Rush worker instance.
#'
#' @export
-worker_loop_default = function(fun, rush) {
+worker_loop_default = function(fun, constants = NULL, rush) {
assert_function(fun)
+ assert_list(constants, null.ok = TRUE, names = "named")
while(!rush$terminated) {
task = rush$pop_task()
if (!is.null(task)) {
tryCatch({
- ys = mlr3misc::invoke(fun, .args = task$xs)
+ ys = mlr3misc::invoke(fun, .args = c(task$xs, constants))
rush$push_results(task$key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
diff --git a/man/Rush.Rd b/man/Rush.Rd
index 1bc86f1..f06adea 100644
--- a/man/Rush.Rd
+++ b/man/Rush.Rd
@@ -302,6 +302,8 @@ Print method.
Start workers locally with \code{processx}.
The \link[processx:process]{processx::process} are stored in \verb{$processes}.
Alternatively, use \verb{$create_worker_script()} to create a script for starting workers on remote machines.
+By default, \code{\link[=worker_loop_default]{worker_loop_default()}} is used as worker loop.
+This function takes the arguments \code{fun} and optionally \code{constants} which are passed in \code{...}.
\subsection{Usage}{
\if{html}{\out{
}}\preformatted{Rush$start_workers(
n_workers = NULL,
diff --git a/man/fun_heartbeat.Rd b/man/heartbeat.Rd
similarity index 94%
rename from man/fun_heartbeat.Rd
rename to man/heartbeat.Rd
index 6554904..098d95c 100644
--- a/man/fun_heartbeat.Rd
+++ b/man/heartbeat.Rd
@@ -1,10 +1,10 @@
% Generated by roxygen2: do not edit by hand
% Please edit documentation in R/heartbeat_loops.R
-\name{fun_heartbeat}
-\alias{fun_heartbeat}
+\name{heartbeat}
+\alias{heartbeat}
\title{Heartbeat Loop}
\usage{
-fun_heartbeat(
+heartbeat(
network_id,
config,
worker_id,
diff --git a/man/worker_loop_default.Rd b/man/worker_loop_default.Rd
index fcfb3c8..4e33ce3 100644
--- a/man/worker_loop_default.Rd
+++ b/man/worker_loop_default.Rd
@@ -4,12 +4,15 @@
\alias{worker_loop_default}
\title{Single Task Worker Loop}
\usage{
-worker_loop_default(fun, rush)
+worker_loop_default(fun, constants = NULL, rush)
}
\arguments{
\item{fun}{(\code{function})\cr
Function to be executed.}
+\item{constants}{(\code{list})\cr
+List of constants passed to \code{fun}.}
+
\item{rush}{(\link{RushWorker})\cr
Rush worker instance.}
}
diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml
index 923d5e2..b6118df 100644
--- a/pkgdown/_pkgdown.yml
+++ b/pkgdown/_pkgdown.yml
@@ -42,9 +42,16 @@ reference:
- Rush
- RushWorker
- rsh
- - title: Misc
+ - title: Plan
+ - rush_plan
+ - rush_available
+ - rush_config
+ - title: Worker Loop
contents:
- - worker_loop_default
- start_worker
- - fun_heartbeat
+ - worker_loop_default
+ - heartbeat
+ - title: Misc
+ contents:
+ - get_hostname
- rush-package
diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R
index f635e8a..491cc40 100644
--- a/tests/testthat/test-Rush.R
+++ b/tests/testthat/test-Rush.R
@@ -186,6 +186,34 @@ test_that("a remote worker is started", {
expect_set_equal(rush$worker_info$host, "remote")
})
+# start workers with script ----------------------------------------------------
+
+test_that("worker can be started with script", {
+ # skip_on_cran()
+ skip_on_ci()
+ skip_if(TRUE)
+
+ config = start_flush_redis()
+ rush = Rush$new(network_id = "test-rush", config = config)
+ fun = function(x1, x2, ...) list(y = x1 + x2)
+
+ rush$create_worker_script(fun = fun)
+})
+
+test_that("a remote worker is started", {
+ # skip_on_cran()
+
+ config = start_flush_redis()
+ fun = function(x1, x2, ...) list(y = x1 + x2)
+ rush = Rush$new(network_id = "test-rush", config = config)
+
+ withr::with_envvar(list("HOST" = "remote_host"), {
+ rush$start_workers(fun = fun, n_workers = 2, heartbeat_period = 1, heartbeat_expire = 2, await_workers = TRUE)
+ })
+
+ expect_set_equal(rush$worker_info$host, "remote")
+})
+
# stop workers -----------------------------------------------------------------
test_that("a worker is terminated", {
@@ -683,6 +711,23 @@ test_that("terminating workers on idle works", {
expect_rush_reset(rush)
})
+test_that("constants works", {
+ # skip_on_cran()
+
+ config = start_flush_redis()
+ rush = Rush$new(network_id = "test-rush", config = config)
+ fun = function(x1, x2, x3, ...) list(y = x1 + x2 + x3)
+ rush$start_workers(fun = fun, n_workers = 4, constants = list(x3 = 5), await_workers = TRUE)
+
+ xss = list(list(x1 = 1, x2 = 2))
+ keys = rush$push_tasks(xss)
+ rush$await_tasks(keys)
+
+ expect_equal(rush$fetch_finished_tasks()$y, 8)
+
+ expect_rush_reset(rush)
+})
+
# rush network without controller ----------------------------------------------
test_that("network without controller works", {
diff --git a/tests/testthat/test-rush_plan.R b/tests/testthat/test-rush_plan.R
index 77a5984..02adc3f 100644
--- a/tests/testthat/test-rush_plan.R
+++ b/tests/testthat/test-rush_plan.R
@@ -2,7 +2,6 @@ test_that("rush_plan family works", {
skip_on_cran()
skip_on_ci()
-
expect_false(rush_available())
config = redis_config()
rush_plan(n_workers = 2, config)