diff --git a/R/Rush.R b/R/Rush.R
index c486e7b..dee44e0 100644
--- a/R/Rush.R
+++ b/R/Rush.R
@@ -180,6 +180,7 @@ Rush = R6::R6Class("Rush",
heartbeat_expire = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = 0,
+ max_retries = 0,
supervise = TRUE,
worker_loop = worker_loop_default,
...
@@ -187,6 +188,10 @@ Rush = R6::R6Class("Rush",
n_workers = assert_count(n_workers %??% rush_env$n_workers)
assert_flag(wait_for_workers)
assert_flag(supervise)
+ r = self$connector
+
+ # set global maximum retries of tasks
+ private$.max_retries = assert_count(max_retries)
# push worker config to redis
private$.push_worker_config(
@@ -196,6 +201,7 @@ Rush = R6::R6Class("Rush",
heartbeat_expire = heartbeat_expire,
lgr_thresholds = lgr_thresholds,
lgr_buffer_size = lgr_buffer_size,
+ max_retries = max_retries,
worker_loop = worker_loop,
...
)
@@ -306,7 +312,7 @@ Rush = R6::R6Class("Rush",
# Push terminate signal to worker
cmds = map(worker_ids, function(worker_id) {
- c("SET", private$.get_worker_key("terminate", worker_id), "TRUE")
+ c("SET", private$.get_worker_key("terminate", worker_id), "1")
})
r$pipeline(.commands = cmds)
@@ -353,10 +359,11 @@ Rush = R6::R6Class("Rush",
#' Workers with a heartbeat process are checked with the heartbeat.
#' Lost tasks are marked as `"lost"`.
#'
- #' @param restart (`logical(1)`)\cr
+ #' @param restart_workers (`logical(1)`)\cr
#' Whether to restart lost workers.
- detect_lost_workers = function(restart = FALSE) {
- assert_flag(restart)
+ detect_lost_workers = function(restart_workers = FALSE, restart_tasks = FALSE) {
+ assert_flag(restart_workers)
+ assert_flag(restart_tasks)
r = self$connector
# check workers with a heartbeat
@@ -392,7 +399,7 @@ Rush = R6::R6Class("Rush",
lost_workers = local_workers[!running]
lg$error("Lost %i worker(s): %s", length(lost_workers), str_collapse(lost_workers))
- if (restart) {
+ if (restart_workers) {
self$restart_workers(unlist(lost_workers))
lost_workers
} else {
@@ -412,17 +419,34 @@ Rush = R6::R6Class("Rush",
if (length(lost_workers)) {
running_tasks = self$fetch_running_tasks(fields = "worker_extra")
if (!nrow(running_tasks)) return(invisible(self))
- keys = running_tasks[lost_workers, keys, on = "worker_id"]
+ lost_workers = unlist(lost_workers)
+ keys = running_tasks[list(lost_workers), keys, on = "worker_id"]
lg$error("Lost %i task(s): %s", length(keys), str_collapse(keys))
- cmds = unlist(map(keys, function(key) {
- list(
- list("HSET", key, "state", failed_state),
- c("SREM", private$.get_key("running_tasks"), key),
- c("RPUSH", private$.get_key("failed_tasks"), key))
- }), recursive = FALSE)
- r$pipeline(.commands = cmds)
+ if (restart_tasks) {
+
+ # check whether the tasks should be retried
+ retry = self$n_tries(keys) < private$.max_retries
+ keys = keys[retry]
+
+ if (length(keys)) {
+ lg$error("Retry %i lost task(s): %s", length(keys), str_collapse(keys))
+ cmds = map(keys, function(key) {
+ c("HINCRBY", key, "n_tries", 1)
+ })
+ cmds = c(cmds, list(
+ c("RPUSH", private$.get_key("queued_tasks"), keys),
+ c("SREM", private$.get_key("running_tasks"), keys)
+ ))
+ r$pipeline(.commands = cmds)
+ }
+ } else {
+ cmds = list(
+ c("RPUSH", private$.get_key("failed_tasks"), keys),
+ c("SREM", private$.get_key("running_tasks"), keys))
+ r$pipeline(.commands = cmds)
+ }
}
return(invisible(self))
@@ -523,10 +547,10 @@ Rush = R6::R6Class("Rush",
lg$debug("Pushing %i task(s) to the shared queue", length(xss))
- keys = self$write_hashes(xs = xss, xs_extra = extra, state = "queued")
+ keys = self$write_hashes(xs = xss, xs_extra = extra)
r$command(c("LPUSH", private$.get_key("queued_tasks"), keys))
r$command(c("SADD", private$.get_key("all_tasks"), keys))
- if (terminate_workers) r$command(c("SET", private$.get_key("terminate_on_idle"), "TRUE"))
+ if (terminate_workers) r$command(c("SET", private$.get_key("terminate_on_idle"), 1))
return(invisible(keys))
},
@@ -559,7 +583,7 @@ Rush = R6::R6Class("Rush",
lg$debug("Pushing %i task(s) to %i priority queue(s) and %i task(s) to the shared queue.",
sum(!is.na(priority)), length(unique(priority[!is.na(priority)])), sum(is.na(priority)))
- keys = self$write_hashes(xs = xss, xs_extra = extra, state = "queued")
+ keys = self$write_hashes(xs = xss, xs_extra = extra)
cmds = pmap(list(priority, keys), function(worker_id, key) {
if (is.na(worker_id)) {
c("LPUSH", private$.get_key("queued_tasks"), key)
@@ -824,25 +848,14 @@ Rush = R6::R6Class("Rush",
#' @param keys (character())\cr
#' Keys of the hashes.
#' If `NULL` new keys are generated.
- #' @param state (`character(1)`)\cr
- #' State of the hashes.
#'
#' @return (`character()`)\cr
#' Keys of the hashes.
- write_hashes = function(..., .values = list(), keys = NULL, state = NA_character_) {
+ write_hashes = function(..., .values = list(), keys = NULL) {
values = discard(c(list(...), .values), function(l) !length(l))
assert_list(values, names = "unique", types = "list", min.len = 1)
fields = names(values)
keys = assert_character(keys %??% uuid::UUIDgenerate(n = length(values[[1]])), len = length(values[[1]]), .var.name = "keys")
- assert_string(state, na.ok = TRUE)
- bin_state = switch(state,
- "queued" = queued_state,
- "running" = running_state,
- "failed" = failed_state,
- "finished" = finished_state,
- `NA_character_` = na_state,
- redux::object_to_bin(list(state = state))
- )
lg$debug("Writting %i hash(es) with %i field(s)", length(keys), length(fields))
@@ -856,7 +869,7 @@ Rush = R6::R6Class("Rush",
# merge fields and values alternatively
# c and rbind are fastest option in R
# data is not copied
- c("HSET", key, c(rbind(fields, bin_values)), "state", list(bin_state))
+ c("HSET", key, c(rbind(fields, bin_values)))
})
self$connector$pipeline(.commands = cmds)
@@ -893,6 +906,23 @@ Rush = R6::R6Class("Rush",
# unserialize lists of the second level
# combine elements of the third level to one list
map(hashes, function(hash) unlist(map_if(hash, function(x) !is.null(x), redux::bin_to_object), recursive = FALSE))
+ },
+
+ #' @description
+ #' Returns the number of attempts to evaluate a task.
+ #'
+ #' @param keys (`character()`)\cr
+ #' Keys of the tasks.
+ #'
+ #' @return (`integer()`)\cr
+ #' Number of attempts.
+ n_tries = function(keys) {
+ assert_character(keys)
+ r = self$connector
+
+ # n_retries is not set when the task never failed before
+ n_tries = r$pipeline(.commands = map(keys, function(key) c("HGET", key, "n_tries")))
+ map_int(n_tries, function(value) if (is.null(value)) 0L else as.integer(value))
}
),
@@ -1119,6 +1149,8 @@ Rush = R6::R6Class("Rush",
#
.hostname = NULL,
+ .max_retries = NULL,
+
# prefix key with instance id
.get_key = function(key) {
sprintf("%s:%s", self$network_id, key)
@@ -1138,6 +1170,7 @@ Rush = R6::R6Class("Rush",
heartbeat_expire = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = 0,
+ max_retries = 0,
worker_loop = worker_loop_default,
...
) {
@@ -1148,6 +1181,7 @@ Rush = R6::R6Class("Rush",
if (!is.null(heartbeat_period)) require_namespaces("callr")
assert_vector(lgr_thresholds, names = "named", null.ok = TRUE)
assert_count(lgr_buffer_size)
+ assert_count(max_retries)
assert_function(worker_loop)
dots = list(...)
r = self$connector
@@ -1166,7 +1200,8 @@ Rush = R6::R6Class("Rush",
heartbeat_period = heartbeat_period,
heartbeat_expire = heartbeat_expire,
lgr_thresholds = lgr_thresholds,
- lgr_buffer_size = lgr_buffer_size)
+ lgr_buffer_size = lgr_buffer_size,
+ max_retries = max_retries)
# arguments needed for initializing the worker
start_args = list(
@@ -1241,10 +1276,3 @@ Rush = R6::R6Class("Rush",
)
)
-# common state for all tasks
-# used in $write_hashes()
-queued_state = redux::object_to_bin(list(state = "queued"))
-running_state = redux::object_to_bin(list(state = "running"))
-failed_state = redux::object_to_bin(list(state = "failed"))
-finished_state = redux::object_to_bin(list(state = "finished"))
-na_state = redux::object_to_bin(list(state = NA_character_))
diff --git a/R/RushWorker.R b/R/RushWorker.R
index 538986b..3ce3a9e 100644
--- a/R/RushWorker.R
+++ b/R/RushWorker.R
@@ -43,12 +43,14 @@ RushWorker = R6::R6Class("RushWorker",
heartbeat_period = NULL,
heartbeat_expire = NULL,
lgr_thresholds = NULL,
- lgr_buffer_size = 0
+ lgr_buffer_size = 0,
+ max_retries = 0
) {
super$initialize(network_id = network_id, config = config)
self$host = assert_choice(host, c("local", "remote"))
self$worker_id = assert_string(worker_id %??% uuid::UUIDgenerate())
+ private$.max_retries = assert_count(max_retries)
r = self$connector
# setup heartbeat
@@ -137,7 +139,7 @@ RushWorker = R6::R6Class("RushWorker",
lg$debug("Pushing %i running task(s).", length(xss))
- keys = self$write_hashes(xs = xss, xs_extra = extra, state = "running")
+ keys = self$write_hashes(xs = xss, xs_extra = extra)
r$command(c("SADD", private$.get_key("running_tasks"), keys))
r$command(c("SADD", private$.get_key("all_tasks"), keys))
@@ -156,7 +158,7 @@ RushWorker = R6::R6Class("RushWorker",
key = r$command(c("BLMPOP", timeout, 2, private$.get_worker_key("queued_tasks"), private$.get_key("queued_tasks"), "RIGHT"))[[2]][[1]]
if (is.null(key)) return(NULL)
- self$write_hashes(worker_extra = list(list(pid = Sys.getpid(), worker_id = self$worker_id)), keys = key, state = "running")
+ self$write_hashes(worker_extra = list(list(pid = Sys.getpid(), worker_id = self$worker_id)), keys = key)
# move key from queued to running
r$command(c("SADD", private$.get_key("running_tasks"), key))
@@ -178,20 +180,16 @@ RushWorker = R6::R6Class("RushWorker",
#' Status of the tasks.
#' If `"finished"` the tasks are moved to the finished tasks.
#' If `"error"` the tasks are moved to the failed tasks.
- push_results = function(keys, yss = list(), extra = list(), conditions = list(), state = "finished") {
+ push_results = function(keys, yss = list(), extra = list(), conditions = list()) {
assert_string(keys)
assert_list(yss, types = "list")
assert_list(extra, types = "list")
- assert_list(conditions, types = "list")
- assert_choice(state, c("finished", "failed"))
r = self$connector
# write result to hash
- self$write_hashes(ys = yss, ys_extra = extra, condition = conditions, keys = keys, state = state)
-
- destination = if (state == "finished") "finished_tasks" else "failed_tasks"
+ self$write_hashes(ys = yss, ys_extra = extra, condition = conditions, keys = keys)
- # move key from running to finished or failed
+ # move key from running to finished
# keys of finished and failed tasks are stored in a list i.e. the are ordered by time.
# each rush instance only needs to record how many results it has already seen
# to cheaply get the latest results and cache the finished tasks
@@ -199,7 +197,24 @@ RushWorker = R6::R6Class("RushWorker",
# but at the moment a list seems to be the better option
r$pipeline(.commands = list(
c("SREM", private$.get_key("running_tasks"), keys),
- c("RPUSH", private$.get_key(destination), keys)
+ c("RPUSH", private$.get_key("finished_tasks"), keys)
+ ))
+
+ return(invisible(self))
+ },
+
+ push_failed = function(keys, conditions) {
+ assert_string(keys)
+ assert_list(conditions, types = "list")
+ r = self$connector
+
+ # write condition to hash
+ self$write_hashes(condition = conditions, keys = keys)
+
+ # move key from running to failed
+ r$pipeline(.commands = list(
+ c("SREM", private$.get_key("running_tasks"), keys),
+ c("RPUSH", private$.get_key("failed_tasks"), keys)
))
return(invisible(self))
@@ -223,7 +238,7 @@ RushWorker = R6::R6Class("RushWorker",
#' Used in the worker loop to determine whether to continue.
terminated = function() {
r = self$connector
- r$GET(private$.get_worker_key("terminate")) %??% "FALSE" == "TRUE"
+ as.logical(r$EXISTS(private$.get_worker_key("terminate")))
},
#' @field terminated_on_idle (`logical(1)`)\cr
@@ -231,7 +246,7 @@ RushWorker = R6::R6Class("RushWorker",
#' Used in the worker loop to determine whether to continue.
terminated_on_idle = function() {
r = self$connector
- r$GET(private$.get_key("terminate_on_idle")) %??% "FALSE" == "TRUE" && !as.logical(self$n_queued_tasks)
+ as.logical(r$EXISTS(private$.get_key("terminate_on_idle"))) && !as.logical(self$n_queued_tasks)
}
)
)
diff --git a/R/worker_loops.R b/R/worker_loops.R
index ef3635a..8623ba2 100644
--- a/R/worker_loops.R
+++ b/R/worker_loops.R
@@ -25,7 +25,7 @@ worker_loop_default = function(fun, constants = NULL, rush) {
rush$push_results(task$key, yss = list(ys))
}, error = function(e) {
condition = list(message = e$message)
- rush$push_results(task$key, conditions = list(condition), state = "failed")
+ rush$push_failed(task$key, conditions = list(condition))
})
} else {
if (rush$terminated_on_idle) break
diff --git a/man/Rush.Rd b/man/Rush.Rd
index 38936d6..e6e920e 100644
--- a/man/Rush.Rd
+++ b/man/Rush.Rd
@@ -232,6 +232,7 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho
\item \href{#method-Rush-wait_for_tasks}{\code{Rush$wait_for_tasks()}}
\item \href{#method-Rush-write_hashes}{\code{Rush$write_hashes()}}
\item \href{#method-Rush-read_hashes}{\code{Rush$read_hashes()}}
+\item \href{#method-Rush-n_tries}{\code{Rush$n_tries()}}
\item \href{#method-Rush-clone}{\code{Rush$clone()}}
}
}
@@ -314,6 +315,7 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi
heartbeat_expire = NULL,
lgr_thresholds = NULL,
lgr_buffer_size = 0,
+ max_retries = 0,
supervise = TRUE,
worker_loop = worker_loop_default,
...
@@ -489,13 +491,13 @@ But checking local workers on windows might be very slow.
Workers with a heartbeat process are checked with the heartbeat.
Lost tasks are marked as \code{"lost"}.
\subsection{Usage}{
-\if{html}{\out{
}}\preformatted{Rush$detect_lost_workers(restart = FALSE)}\if{html}{\out{
}}
+\if{html}{\out{}}\preformatted{Rush$detect_lost_workers(restart_workers = FALSE, restart_tasks = FALSE)}\if{html}{\out{
}}
}
\subsection{Arguments}{
\if{html}{\out{}}
\describe{
-\item{\code{restart}}{(\code{logical(1)})\cr
+\item{\code{restart_workers}}{(\code{logical(1)})\cr
Whether to restart lost workers.}
}
\if{html}{\out{
}}
@@ -968,7 +970,7 @@ Vectors in list columns must be wrapped in lists.
Otherwise, \verb{$read_values()} will expand the table by the length of the vectors.
For example, \verb{xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = c("A", "B", "C"))) does not work. Pass }xs_extra = list(list(extra = list(c("A", "B", "C"))))` instead.
\subsection{Usage}{
-\if{html}{\out{}}\preformatted{Rush$write_hashes(..., .values = list(), keys = NULL, state = NA_character_)}\if{html}{\out{
}}
+\if{html}{\out{}}\preformatted{Rush$write_hashes(..., .values = list(), keys = NULL)}\if{html}{\out{
}}
}
\subsection{Arguments}{
@@ -985,9 +987,6 @@ The names of the list are used as fields.}
\item{\code{keys}}{(character())\cr
Keys of the hashes.
If \code{NULL} new keys are generated.}
-
-\item{\code{state}}{(\code{character(1)})\cr
-State of the hashes.}
}
\if{html}{\out{}}
}
@@ -1025,6 +1024,28 @@ The inner list is the combination of the lists stored at the different fields.
}
}
\if{html}{\out{
}}
+\if{html}{\out{}}
+\if{latex}{\out{\hypertarget{method-Rush-n_tries}{}}}
+\subsection{Method \code{n_tries()}}{
+Returns the number of attempts to evaluate a task.
+\subsection{Usage}{
+\if{html}{\out{}}\preformatted{Rush$n_tries(keys)}\if{html}{\out{
}}
+}
+
+\subsection{Arguments}{
+\if{html}{\out{}}
+\describe{
+\item{\code{keys}}{(\code{character()})\cr
+Keys of the tasks.}
+}
+\if{html}{\out{
}}
+}
+\subsection{Returns}{
+(\code{integer()})\cr
+Number of attempts.
+}
+}
+\if{html}{\out{
}}
\if{html}{\out{}}
\if{latex}{\out{\hypertarget{method-Rush-clone}{}}}
\subsection{Method \code{clone()}}{
diff --git a/man/RushWorker.Rd b/man/RushWorker.Rd
index 19dc803..c774086 100644
--- a/man/RushWorker.Rd
+++ b/man/RushWorker.Rd
@@ -47,6 +47,7 @@ Used in the worker loop to determine whether to continue.}
\item \href{#method-RushWorker-push_running_task}{\code{RushWorker$push_running_task()}}
\item \href{#method-RushWorker-pop_task}{\code{RushWorker$pop_task()}}
\item \href{#method-RushWorker-push_results}{\code{RushWorker$push_results()}}
+\item \href{#method-RushWorker-push_failed}{\code{RushWorker$push_failed()}}
\item \href{#method-RushWorker-set_terminated}{\code{RushWorker$set_terminated()}}
\item \href{#method-RushWorker-clone}{\code{RushWorker$clone()}}
}
@@ -65,6 +66,7 @@ Used in the worker loop to determine whether to continue.}
rush::Rush$fetch_running_tasks()
rush::Rush$fetch_tasks()
rush::Rush$format()
+rush::Rush$n_tries()
rush::Rush$print()
rush::Rush$push_priority_tasks()
rush::Rush$push_tasks()
@@ -96,7 +98,8 @@ Creates a new instance of this \link[R6:R6Class]{R6} class.
heartbeat_period = NULL,
heartbeat_expire = NULL,
lgr_thresholds = NULL,
- lgr_buffer_size = 0
+ lgr_buffer_size = 0,
+ max_retries = 0
)}\if{html}{\out{}}
}
@@ -194,8 +197,7 @@ Pushes results to the data base.
keys,
yss = list(),
extra = list(),
- conditions = list(),
- state = "finished"
+ conditions = list()
)}\if{html}{\out{}}
}
@@ -221,6 +223,15 @@ If \code{"error"} the tasks are moved to the failed tasks.}
}
\if{html}{\out{}}
}
+}
+\if{html}{\out{
}}
+\if{html}{\out{}}
+\if{latex}{\out{\hypertarget{method-RushWorker-push_failed}{}}}
+\subsection{Method \code{push_failed()}}{
+\subsection{Usage}{
+\if{html}{\out{}}\preformatted{RushWorker$push_failed(keys, conditions)}\if{html}{\out{
}}
+}
+
}
\if{html}{\out{
}}
\if{html}{\out{}}
diff --git a/tests/testthat/_snaps/Rush.md b/tests/testthat/_snaps/Rush.md
new file mode 100644
index 0000000..d045ba2
--- /dev/null
+++ b/tests/testthat/_snaps/Rush.md
@@ -0,0 +1,9 @@
+# worker can be started with script
+
+ Code
+ rush$create_worker_script(fun = fun)
+ Output
+ INFO (400): [rush] Start worker with:
+ INFO (400): [rush] Rscript -e 'rush::start_worker(network_id = 'test-rush', hostname = 'host', url = 'redis://127.0.0.1:6379')'
+ INFO (400): [rush] See ?rush::start_worker for more details.
+
diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R
index 6d26ddb..87f36b0 100644
--- a/tests/testthat/test-Rush.R
+++ b/tests/testthat/test-Rush.R
@@ -129,12 +129,23 @@ test_that("globals are available on the worker", {
test_that("worker can be started with script", {
skip_on_cran()
skip_on_ci()
+ set.seed(1) # make log messages reproducible
+
+ root_logger = lgr::get_logger("root")
+ old_fmt = root_logger$appenders$cons$layout$fmt
+ root_logger$appenders$cons$layout$set_fmt("%L (%n): %m")
+
+ on.exit({
+ root_logger$appenders$cons$layout$set_fmt(old_fmt)
+ })
config = start_flush_redis()
- rush = Rush$new(network_id = "test-rush", config = config)
+ withr::with_envvar(list("HOST" = "host"), {
+ rush = Rush$new(network_id = "test-rush", config = config)
+ })
fun = function(x1, x2, ...) list(y = x1 + x2)
- rush$create_worker_script(fun = fun)
+ expect_snapshot(rush$create_worker_script(fun = fun))
})
test_that("a remote worker is started", {
@@ -250,31 +261,6 @@ test_that("a remote worker is killed via the heartbeat", {
expect_rush_reset(rush)
})
-# restart workers --------------------------------------------------------------
-
-test_that("restarting a worker works", {
- skip_on_cran()
- skip_on_ci()
-
-
- config = start_flush_redis()
- rush = Rush$new(network_id = "test-rush", config = config)
- fun = function(x1, x2, ...) list(y = x1 + x2)
-
- rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
- worker_id_1 = rush$running_worker_ids[1]
- worker_id_2 = rush$running_worker_ids[2]
-
- tools::pskill(rush$worker_info[worker_id == worker_id_1, pid])
- Sys.sleep(1)
- expect_false(rush$processes[[worker_id_1]]$is_alive())
-
- rush$detect_lost_workers(restart = TRUE)
- expect_true(rush$processes[[worker_id_1]]$is_alive())
-
- expect_rush_reset(rush)
-})
-
# task evaluation --------------------------------------------------------------
test_that("evaluating a task works", {
@@ -402,7 +388,7 @@ test_that("caching results works", {
test_that("a segfault on a local worker is detected", {
skip_on_cran()
skip_on_ci()
- skip_on_os("windows")
+ skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -425,7 +411,7 @@ test_that("a segfault on a local worker is detected", {
test_that("a segfault on a worker is detected via the heartbeat", {
skip_on_cran()
skip_on_ci()
- skip_on_os("windows")
+ skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -453,6 +439,7 @@ test_that("a segfault on a worker is detected via the heartbeat", {
test_that("a simple error is catched", {
skip_on_cran()
skip_on_ci()
+ skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -464,7 +451,7 @@ test_that("a simple error is catched", {
xss = list(list(x1 = 1, x2 = 2), list(x1 = 0, x2 = 2))
keys = rush$push_tasks(xss)
- rush$wait_for_tasks(keys)
+ rush$wait_for_tasks(keys, detect_lost_workers = TRUE)
Sys.sleep(2)
# check task count
@@ -500,6 +487,7 @@ test_that("a simple error is catched", {
test_that("a lost task is detected", {
skip_on_cran()
skip_on_ci()
+ skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -548,6 +536,7 @@ test_that("a lost task is detected", {
test_that("a lost task is detected when waiting", {
skip_on_cran()
skip_on_ci()
+ skip_if(TRUE) # does not work in testthat on environment
config = start_flush_redis()
rush = Rush$new(network_id = "test-rush", config = config)
@@ -591,6 +580,53 @@ test_that("a lost task is detected when waiting", {
expect_rush_reset(rush)
})
+# restart tasks and workers ----------------------------------------------------
+
+test_that("restarting a worker works", {
+ skip_on_cran()
+ skip_on_ci()
+
+ config = start_flush_redis()
+ rush = Rush$new(network_id = "test-rush", config = config)
+ fun = function(x1, x2, ...) list(y = x1 + x2)
+
+ rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE)
+ worker_id_1 = rush$running_worker_ids[1]
+ worker_id_2 = rush$running_worker_ids[2]
+
+ tools::pskill(rush$worker_info[worker_id == worker_id_1, pid])
+ Sys.sleep(1)
+ expect_false(rush$processes[[worker_id_1]]$is_alive())
+
+ rush$detect_lost_workers(restart_workers = TRUE)
+ expect_true(rush$processes[[worker_id_1]]$is_alive())
+
+ expect_rush_reset(rush)
+})
+
+test_that("a task is restarted when a worker is lost", {
+ skip_on_cran()
+ skip_on_ci()
+ set.seed(1) # make log messages reproducible
+ skip_if(TRUE) # does not work in testthat on environment
+
+ config = start_flush_redis()
+ rush = Rush$new(network_id = "test-rush", config = config)
+ fun = function(x1, x2, ...) {
+ tools::pskill(Sys.getpid())
+ }
+
+ rush$start_workers(fun = fun, n_workers = 1, max_retries = 1, wait_for_workers = TRUE)
+
+ xss = list(list(x1 = 1, x2 = 2))
+ keys = rush$push_tasks(xss)
+
+ rush$detect_lost_workers(restart_workers = TRUE, restart_tasks = TRUE)
+
+ expect_equal(rush$n_tries(keys), 1)
+
+ expect_rush_reset(rush)
+})
# receiving results ------------------------------------------------------------
diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R
index fc888fe..aacd6b8 100644
--- a/tests/testthat/test-RushWorker.R
+++ b/tests/testthat/test-RushWorker.R
@@ -171,43 +171,6 @@ test_that("writing hashes to specific keys works", {
expect_error(rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys), "Assertion on 'keys' failed")
})
-test_that("writing a hash with a state works", {
- skip_on_cran()
- skip_on_ci()
-
- config = start_flush_redis()
- rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
-
- # one element
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), state = "queued")
- expect_equal(rush$read_hashes(keys, c("xs", "state")), list(list(x1 = 1, x2 = 2, state = "queued")))
-
- # two elements
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), state = "queued")
- expect_equal(rush$read_hashes(keys, c("xs", "state")), list(list(x1 = 1, x2 = 2, state = "queued"), list(x1 = 1, x2 = 3, state = "queued")))
-})
-
-test_that("writing a hash with a NA state works", {
- skip_on_cran()
- skip_on_ci()
-
- config = start_flush_redis()
- rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
-
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), state = NA_character_)
- expect_equal(rush$read_hashes(keys, c("xs", "state")), list(list(x1 = 1, x2 = 2, state = NA_character_)))
-})
-
-test_that("writing a hash with a arbitrary state works", {
- skip_on_cran()
- skip_on_ci()
-
- config = start_flush_redis()
- rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
-
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), state = "test")
- expect_equal(rush$read_hashes(keys, c("xs", "state")), list(list(x1 = 1, x2 = 2, state = "test")))
-})
test_that("writing list columns works", {
skip_on_cran()
@@ -216,7 +179,7 @@ test_that("writing list columns works", {
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))), state = "finished")
+ keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A"))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
@@ -224,7 +187,7 @@ test_that("writing list columns works", {
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))), state = "finished")
+ keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3]))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
expect_list(rush$fetch_finished_tasks()$extra, len = 1)
@@ -232,7 +195,7 @@ test_that("writing list columns works", {
config = start_flush_redis()
rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
- keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B"))), state = "finished")
+ keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B"))))
rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys))
rush$read_hashes(keys, c("xs", "xs_extra"))
@@ -478,7 +441,7 @@ test_that("pushing a failed tasks works", {
rush$push_tasks(xss)
task = rush$pop_task()
- rush$push_results(task$key, condition = list(list(message = "error")), state = "failed")
+ rush$push_failed(task$key, condition = list(list(message = "error")))
# check task count
expect_equal(rush$n_tasks, 1)
@@ -557,7 +520,7 @@ test_that("moving and fetching tasks works", {
# push failed task
task = rush$pop_task()
- rush$push_results(task$key, condition = list(list(message = "error")), state = "failed")
+ rush$push_failed(task$key, condition = list(list(message = "error")))
queued_tasks = rush$fetch_queued_tasks()
expect_data_table(queued_tasks, nrows = 1)
expect_character(queued_tasks$keys, unique = TRUE)
@@ -643,7 +606,7 @@ test_that("fetching as list works", {
# push failed task
task = rush$pop_task()
- rush$push_results(task$key, condition = list(list(message = "error")), state = "failed")
+ rush$push_failed(task$key, condition = list(list(message = "error")))
failed_tasks = rush$fetch_failed_tasks(data_format = "list")
expect_list(failed_tasks, len = 1)
expect_names(names(failed_tasks), identical.to = task$key)
@@ -889,3 +852,40 @@ test_that("pushing tasks and terminating worker works", {
expect_rush_reset(rush, type = "terminate")
})
+test_that("n_retries method works", {
+ skip_on_cran()
+ skip_on_ci()
+
+ config = start_flush_redis()
+ rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
+
+ xss = list(list(x1 = 1, x2 = 2))
+ keys = rush$push_tasks(xss)
+
+ expect_equal(rush$n_tries(keys), 0)
+
+ xss = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 2))
+ keys = rush$push_tasks(xss)
+
+ expect_equal(rush$n_tries(keys), c(0, 0))
+
+ rush$connector$command(c("HINCRBY", keys[1], "n_tries", 1))
+
+ expect_equal(rush$n_tries(keys), c(1L, 0))
+
+ expect_rush_reset(rush, type = "terminate")
+})
+
+test_that("terminate on idle works", {
+ config = start_flush_redis()
+ rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
+
+ xss = list(list(x1 = 1, x2 = 2))
+ keys = rush$push_tasks(xss, terminate_workers = TRUE)
+ expect_false(rush$terminated_on_idle)
+
+ rush$pop_task()
+ expect_true(rush$terminated_on_idle)
+
+ expect_rush_reset(rush, type = "terminate")
+})
diff --git a/tests/testthat/test-worker_loops.R b/tests/testthat/test-worker_loops.R
new file mode 100644
index 0000000..de09f52
--- /dev/null
+++ b/tests/testthat/test-worker_loops.R
@@ -0,0 +1,40 @@
+test_that("worker_loop_default works", {
+ config = start_flush_redis()
+ rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
+ xss = list(list(x1 = 1, x2 = 2))
+ rush$push_tasks(xss, terminate_workers = TRUE)
+ fun = function(x1, x2, ...) list(y = x1 + x2)
+
+ expect_null(worker_loop_default(fun, rush = rush))
+ expect_equal(rush$n_finished_tasks, 1L)
+
+ expect_rush_reset(rush, type = "terminate")
+})
+
+test_that("worker_loop_default works with failed task", {
+ config = start_flush_redis()
+ rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
+ xss = list(list(x1 = 1, x2 = 2))
+ rush$push_tasks(xss, terminate_workers = TRUE)
+ fun = function(x1, x2, ...) stop("failed")
+
+ expect_null(worker_loop_default(fun, rush = rush))
+ expect_equal(rush$n_failed_tasks, 1L)
+
+ expect_rush_reset(rush, type = "terminate")
+})
+
+test_that("worker_loop_default works with terminate ", {
+ config = start_flush_redis()
+ rush = RushWorker$new(network_id = "test-rush", config = config, host = "local")
+ xss = list(list(x1 = 1, x2 = 2))
+ rush$push_tasks(xss, terminate_workers = TRUE)
+ fun = function(x1, x2, ...) stop("failed")
+
+ expect_null(worker_loop_default(fun, rush = rush))
+ expect_equal(rush$n_failed_tasks, 1L)
+
+ expect_rush_reset(rush, type = "terminate")
+})
+
+