Skip to content

Commit

Permalink
feat: add constants to worker (#16)
Browse files Browse the repository at this point in the history
* refactor: export globals with redis

* refactor: remove future

* feat: add rush_plan

* feat: add constants

* docs: worker loop

* test: constants

* fix: pkgdown

* chore: rename heartbeat
  • Loading branch information
be-marc authored Nov 26, 2023
1 parent 66d3f37 commit ee7a9e6
Show file tree
Hide file tree
Showing 11 changed files with 74 additions and 13 deletions.
2 changes: 1 addition & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@

export(Rush)
export(RushWorker)
export(fun_heartbeat)
export(get_hostname)
export(heartbeat)
export(rsh)
export(rush_available)
export(rush_config)
Expand Down
2 changes: 2 additions & 0 deletions R/Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,8 @@ Rush = R6::R6Class("Rush",
#' Start workers locally with `processx`.
#' The [processx::process] are stored in `$processes`.
#' Alternatively, use `$create_worker_script()` to create a script for starting workers on remote machines.
#' By default, [worker_loop_default()] is used as worker loop.
#' This function takes the arguments `fun` and optionally `constants` which are passed in `...`.
#'
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
Expand Down
2 changes: 1 addition & 1 deletion R/RushWorker.R
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ RushWorker = R6::R6Class("RushWorker",
heartbeat_expire = heartbeat_expire,
pid = Sys.getpid()
)
self$heartbeat = callr::r_bg(fun_heartbeat, args = heartbeat_args, supervise = TRUE)
self$heartbeat = callr::r_bg(heartbeat, args = heartbeat_args, supervise = TRUE)

# wait until heartbeat process is able to work
Sys.sleep(1)
Expand Down
2 changes: 1 addition & 1 deletion R/heartbeat_loops.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
#' @template param_heartbeat_expire
#'
#' @export
fun_heartbeat = function(network_id, config, worker_id, heartbeat_period, heartbeat_expire, pid) {
heartbeat = function(network_id, config, worker_id, heartbeat_period, heartbeat_expire, pid) {
r = redux::hiredis(config)
worker_id_key = sprintf("%s:%s", network_id, worker_id)
heartbeat_key = sprintf("%s:%s:heartbeat", network_id, worker_id)
Expand Down
7 changes: 5 additions & 2 deletions R/worker_loops.R
Original file line number Diff line number Diff line change
Expand Up @@ -7,18 +7,21 @@
#'
#' @param fun (`function`)\cr
#' Function to be executed.
#' @param constants (`list`)\cr
#' List of constants passed to `fun`.
#' @param rush ([RushWorker])\cr
#' Rush worker instance.
#'
#' @export
worker_loop_default = function(fun, rush) {
worker_loop_default = function(fun, constants = NULL, rush) {
assert_function(fun)
assert_list(constants, null.ok = TRUE, names = "named")

while(!rush$terminated) {
task = rush$pop_task()
if (!is.null(task)) {
tryCatch({
ys = mlr3misc::invoke(fun, .args = task$xs)
ys = mlr3misc::invoke(fun, .args = c(task$xs, constants))
rush$push_results(task$key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
Expand Down
2 changes: 2 additions & 0 deletions man/Rush.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions man/fun_heartbeat.Rd → man/heartbeat.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion man/worker_loop_default.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 10 additions & 3 deletions pkgdown/_pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,16 @@ reference:
- Rush
- RushWorker
- rsh
- title: Misc
- title: Plan
- rush_plan
- rush_available
- rush_config
- title: Worker Loop
contents:
- worker_loop_default
- start_worker
- fun_heartbeat
- worker_loop_default
- heartbeat
- title: Misc
contents:
- get_hostname
- rush-package
45 changes: 45 additions & 0 deletions tests/testthat/test-Rush.R
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,34 @@ test_that("a remote worker is started", {
expect_set_equal(rush$worker_info$host, "remote")
})

# start workers with script ----------------------------------------------------

test_that("worker can be started with script", {
# skip_on_cran()
skip_on_ci()
skip_if(TRUE)

config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, ...) list(y = x1 + x2)

rush$create_worker_script(fun = fun)
})

test_that("a remote worker is started", {
# skip_on_cran()

config = start_flush_redis()
fun = function(x1, x2, ...) list(y = x1 + x2)
rush = Rush$new(network_id = "test-rush", config = config)

withr::with_envvar(list("HOST" = "remote_host"), {
rush$start_workers(fun = fun, n_workers = 2, heartbeat_period = 1, heartbeat_expire = 2, await_workers = TRUE)
})

expect_set_equal(rush$worker_info$host, "remote")
})

# stop workers -----------------------------------------------------------------

test_that("a worker is terminated", {
Expand Down Expand Up @@ -683,6 +711,23 @@ test_that("terminating workers on idle works", {
expect_rush_reset(rush)
})

test_that("constants works", {
# skip_on_cran()

config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
fun = function(x1, x2, x3, ...) list(y = x1 + x2 + x3)
rush$start_workers(fun = fun, n_workers = 4, constants = list(x3 = 5), await_workers = TRUE)

xss = list(list(x1 = 1, x2 = 2))
keys = rush$push_tasks(xss)
rush$await_tasks(keys)

expect_equal(rush$fetch_finished_tasks()$y, 8)

expect_rush_reset(rush)
})

# rush network without controller ----------------------------------------------

test_that("network without controller works", {
Expand Down
1 change: 0 additions & 1 deletion tests/testthat/test-rush_plan.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ test_that("rush_plan family works", {
skip_on_cran()
skip_on_ci()


expect_false(rush_available())
config = redis_config()
rush_plan(n_workers = 2, config)
Expand Down

0 comments on commit ee7a9e6

Please sign in to comment.