diff --git a/DESCRIPTION b/DESCRIPTION index 1912a7b..69a8da1 100644 --- a/DESCRIPTION +++ b/DESCRIPTION @@ -16,9 +16,9 @@ Imports: jsonlite, lgr, mlr3misc, + parallel, processx, redux, - rlecuyer, uuid Suggests: callr, @@ -28,6 +28,6 @@ Suggests: withr Encoding: UTF-8 Roxygen: list(markdown = TRUE) -RoxygenNote: 7.2.3 +RoxygenNote: 7.2.3.9000 Config/testthat/edition: 3 Config/testthat/parallel: false diff --git a/NAMESPACE b/NAMESPACE index 2e74e07..0c93c06 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -10,13 +10,15 @@ export(rush_available) export(rush_config) export(rush_plan) export(start_worker) +export(worker_loop_callr) export(worker_loop_default) import(checkmate) import(data.table) import(mlr3misc) import(redux) -import(rlecuyer) importFrom(jsonlite,fromJSON) +importFrom(parallel,nextRNGStream) +importFrom(parallel,nextRNGSubStream) importFrom(processx,process) importFrom(utils,str) importFrom(uuid,UUIDgenerate) diff --git a/R/Rush.R b/R/Rush.R index d45b36a..a0d2025 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -87,6 +87,13 @@ #' Saving log messages adds a small overhead but is useful for debugging. #' By default, no log messages are stored. #' +#' @section Seed: +#' Setting a seed is important for reproducibility. +#' The tasks can be evaluated with a specific L'Ecuyer-CMRG seed. +#' If an initial seed is passed, the seed is used to generate L'Ecuyer-CMRG seeds for each task. +#' Each task is then evaluated with a separate RNG stream. +#' See [parallel::nextRNGStream] for more details. +#' #' @template param_network_id #' @template param_config #' @template param_worker_loop @@ -99,7 +106,6 @@ #' @template param_lgr_buffer_size #' @template param_seed #' @template param_data_format -#' @template param_max_tries #' #' #' @export @@ -124,7 +130,7 @@ Rush = R6::R6Class("Rush", #' @description #' Creates a new instance of this [R6][R6::R6Class] class. - initialize = function(network_id = NULL, config = NULL) { + initialize = function(network_id = NULL, config = NULL, seed = NULL) { self$network_id = assert_string(network_id, null.ok = TRUE) %??% uuid::UUIDgenerate() self$config = assert_class(config, "redis_config", null.ok = TRUE) %??% rush_env$config if (is.null(self$config)) self$config = redux::redis_config() @@ -133,6 +139,26 @@ Rush = R6::R6Class("Rush", } self$connector = redux::hiredis(self$config) private$.hostname = get_hostname() + + if (!is.null(seed)) { + if (is_lecyer_cmrg_seed(seed)) { + # use supplied L'Ecuyer-CMRG seed + private$.seed = seed + } else { + # generate new L'Ecuyer-CMRG seed + assert_count(seed) + + # save old rng state and kind and switch to L'Ecuyer-CMRG + oseed = get_random_seed() + okind = RNGkind("L'Ecuyer-CMRG")[1] + + # restore old rng state and kind + on.exit(set_random_seed(oseed, kind = okind), add = TRUE) + + set.seed(seed) + private$.seed = get_random_seed() + } + } }, #' @description @@ -183,8 +209,6 @@ Rush = R6::R6Class("Rush", heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - max_tries = 0, - seed = NULL, supervise = TRUE, worker_loop = worker_loop_default, ... @@ -194,9 +218,6 @@ Rush = R6::R6Class("Rush", assert_flag(supervise) r = self$connector - # set global maximum retries of tasks - private$.max_tries = assert_count(max_tries) - # push worker config to redis private$.push_worker_config( globals = globals, @@ -205,8 +226,6 @@ Rush = R6::R6Class("Rush", heartbeat_expire = heartbeat_expire, lgr_thresholds = lgr_thresholds, lgr_buffer_size = lgr_buffer_size, - max_tries = max_tries, - seed = seed, worker_loop = worker_loop, ... ) @@ -216,7 +235,7 @@ Rush = R6::R6Class("Rush", processx::process$new("Rscript", args = c("-e", sprintf("rush::start_worker(network_id = '%s', worker_id = '%s', hostname = '%s', url = '%s')", self$network_id, worker_id, private$.hostname, self$config$url)), - supervise = supervise) # , stdout = "|", stderr = "|" + supervise = supervise, stdout = "|", stderr = "|") # , stdout = "|", stderr = "|" }), worker_ids)) if (wait_for_workers) self$wait_for_workers(n_workers) @@ -375,7 +394,7 @@ Rush = R6::R6Class("Rush", # check workers with a heartbeat heartbeat_keys = r$SMEMBERS(private$.get_key("heartbeat_keys")) - lost_workers = if (length(heartbeat_keys)) { + lost_workers_heartbeat = if (length(heartbeat_keys)) { lg$debug("Checking %i worker(s) with heartbeat", length(heartbeat_keys)) running = as.logical(r$pipeline(.commands = map(heartbeat_keys, function(heartbeat_key) c("EXISTS", heartbeat_key)))) if (all(running)) return(invisible(self)) @@ -397,7 +416,7 @@ Rush = R6::R6Class("Rush", # check local workers without a heartbeat local_workers = r$SMEMBERS(private$.get_key("local_workers")) - lost_workers = if (length(local_workers)) { + lost_workers_local = if (length(local_workers)) { lg$debug("Checking %i worker(s) with process id", length(local_workers)) running = map_lgl(local_workers, function(worker_id) self$processes[[worker_id]]$is_alive()) if (all(running)) return(invisible(self)) @@ -423,6 +442,7 @@ Rush = R6::R6Class("Rush", } # mark lost tasks + lost_workers = c(lost_workers_heartbeat, lost_workers_local) if (length(lost_workers)) { running_tasks = self$fetch_running_tasks(fields = "worker_extra") if (!nrow(running_tasks)) return(invisible(self)) @@ -431,29 +451,8 @@ Rush = R6::R6Class("Rush", lg$error("Lost %i task(s): %s", length(keys), str_collapse(keys)) - if (restart_tasks) { - - # check whether the tasks should be retried - retry = self$n_tries(keys) < private$.max_tries - keys = keys[retry] - - if (length(keys)) { - lg$error("Retry %i lost task(s): %s", length(keys), str_collapse(keys)) - cmds = map(keys, function(key) { - c("HINCRBY", key, "n_tries", 1) - }) - cmds = c(cmds, list( - c("RPUSH", private$.get_key("queued_tasks"), keys), - c("SREM", private$.get_key("running_tasks"), keys) - )) - r$pipeline(.commands = cmds) - } - } else { - cmds = list( - c("RPUSH", private$.get_key("failed_tasks"), keys), - c("SREM", private$.get_key("running_tasks"), keys)) - r$pipeline(.commands = cmds) - } + conditions = list(list(message = "Worker has crashed or was killed")) + self$push_failed(keys, conditions = conditions) } return(invisible(self)) @@ -539,22 +538,53 @@ Rush = R6::R6Class("Rush", #' #' @param xss (list of named `list()`)\cr #' Lists of arguments for the function e.g. `list(list(x1, x2), list(x1, x2)))`. - #' @param extra (`list`)\cr + #' @param extra (`list()`)\cr #' List of additional information stored along with the task e.g. `list(list(timestamp), list(timestamp)))`. + #' @param seeds (`list()`)\cr + #' List of L'Ecuyer-CMRG seeds for each task e.g `list(list(c(104071, 490840688, 1690070564, -495119766, 503491950, 1801530932, -1629447803)))`. + #' If `NULL` but an initial seed is set, L'Ecuyer-CMRG seeds are generated from the initial seed. + #' If `NULL` and no initial seed is set, no seeds are used for the random number generator. + #' @param timeouts (`integer()`)\cr + #' Timeouts for each task in seconds e.g. `c(10, 15)`. + #' A single number is used as the timeout for all tasks. + #' If `NULL` no timeout is set. + #' @param max_retries (`integer()`)\cr + #' Number of retries for each task. + #' A single number is used as the number of retries for all tasks. + #' If `NULL` tasks are not retried. #' @param terminate_workers (`logical(1)`)\cr #' Whether to stop the workers after evaluating the tasks. #' #' @return (`character()`)\cr #' Keys of the tasks. - push_tasks = function(xss, extra = NULL, terminate_workers = FALSE) { + push_tasks = function(xss, extra = NULL, seeds = NULL, timeouts = NULL, max_retries = NULL, terminate_workers = FALSE) { assert_list(xss, types = "list") assert_list(extra, types = "list", null.ok = TRUE) + assert_list(seeds, types = "numeric", null.ok = TRUE) + assert_numeric(timeouts, null.ok = TRUE) + assert_numeric(max_retries, null.ok = TRUE) assert_flag(terminate_workers) r = self$connector lg$debug("Pushing %i task(s) to the shared queue", length(xss)) - keys = self$write_hashes(xs = xss, xs_extra = extra) + if (!is.null(private$.seed) && is.null(seeds)) { + + lg$debug("Creating %i L'Ecuyer-CMRG seeds", length(xss)) + + seeds = make_rng_seeds(length(xss), private$.seed) + # store last seed for next push + private$.seed = seeds[[length(seeds)]] + } + + # write tasks to hashes + keys = self$write_hashes( + xs = xss, + xs_extra = extra, + seed = seeds, + timeout = timeouts, + max_retries = max_retries) + cmds = list( c("RPUSH", private$.get_key("all_tasks"), keys), c("LPUSH", private$.get_key("queued_tasks"), keys)) @@ -606,6 +636,76 @@ Rush = R6::R6Class("Rush", return(invisible(keys)) }, + #' @description + #' Pushes failed tasks to the data base. + #' + #' @param keys (`character(1)`)\cr + #' Keys of the associated tasks. + #' @param conditions (named `list()`)\cr + #' List of lists of conditions. + push_failed = function(keys, conditions) { + assert_character(keys) + assert_list(conditions, types = "list") + r = self$connector + + # write condition to hash + self$write_hashes(condition = conditions, keys = keys) + + # move key from running to failed + r$pipeline(.commands = map(keys, function(key) { + c("SMOVE", private$.get_key("running_tasks"), private$.get_key("failed_tasks"), key) + })) + + return(invisible(self)) + }, + + #' @description + #' Retry failed tasks. + #' + #' @param keys (`character()`)\cr + #' Keys of the tasks to be retried. + #' @param ignore_max_retires (`logical(1)`)\cr + #' Whether to ignore the maximum number of retries. + #' @param next_seed (`logical(1)`)\cr + #' Whether to change the seed of the task. + retry_tasks = function(keys, ignore_max_retires = FALSE, next_seed = FALSE) { + assert_character(keys) + assert_flag(ignore_max_retires) + assert_flag(next_seed) + tasks = self$read_hashes(keys, fields = c("seed", "max_retries", "n_retries"), flatten = FALSE) + seeds = map(tasks, "seed") + n_retries = map_int(tasks, function(task) task$n_retries %??% 0L) + max_retries = map_dbl(tasks, function(task) task$max_retries %??% Inf) + failed = self$is_failed_task(keys) + retrieable = n_retries < max_retries + + if (!all(failed)) lg$error("Not all task(s) failed: %s", str_collapse(keys[!failed])) + + if (ignore_max_retires) { + keys = keys[failed] + } else { + if (!all(retrieable)) lg$error("Task(s) reached the maximum number of retries: %s", str_collapse(keys[!retrieable])) + keys = keys[failed & retrieable] + } + + if (length(keys)) { + + lg$debug("Retry %i task(s): %s", length(keys), str_collapse(keys)) + + # generate new L'Ecuyer-CMRG seeds + seeds = if (next_seed) map(seeds, function(seed) parallel::nextRNGSubStream(seed)) + + self$write_hashes(n_retries = n_retries + 1L, seed = seeds, keys = keys) + r = self$connector + r$pipeline(.commands = list( + c("SREM", private$.get_key("failed_tasks"), keys), + c("RPUSH", private$.get_key("queued_tasks"), keys) + )) + } + + return(invisible(self)) + }, + #' @description #' Fetch latest results from the data base. #' @@ -833,20 +933,15 @@ Rush = R6::R6Class("Rush", }, #' @description - #' Writes a list to redis hashes. - #' The function serializes each element and writes it to a new hash. + #' Writes R objects to Redis hashes. + #' The function takes the vectors in `...` as input and writes each element as a field-value pair to a new hash. #' The name of the argument defines the field into which the serialized element is written. #' For example, `xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4))` writes `serialize(list(x1 = 1, x2 = 2))` at field `xs` into a hash and `serialize(list(x1 = 3, x2 = 4))` at field `xs` into another hash. - #' The function can iterate over multiple lists simultaneously. + #' The function can iterate over multiple vectors simultaneously. #' For example, `xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))` creates two hashes with the fields `xs` and `ys`. - #' Different lengths are recycled. - #' The stored elements must be lists themselves. - #' The reading functions combine the hashes to a table where the names of the inner lists are the column names. - #' For example, `xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))` becomes `data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7))`. - #' Vectors in list columns must be wrapped in lists. - #' Otherwise, `$read_values()` will expand the table by the length of the vectors. - #' For example, `xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = c("A", "B", "C"))) does not work. - #' Pass `xs_extra = list(list(extra = list(c("A", "B", "C"))))` instead. + #' The vectors are recycled to the length of the longest vector. + #' Both lists and atomic vectors are supported. + #' Arguments that are `NULL` are ignored. #' #' @param ... (named `list()`)\cr #' Lists to be written to the hashes. @@ -861,16 +956,21 @@ Rush = R6::R6Class("Rush", #' @return (`character()`)\cr #' Keys of the hashes. write_hashes = function(..., .values = list(), keys = NULL) { + # discard empty lists values = discard(c(list(...), .values), function(l) !length(l)) - assert_list(values, names = "unique", types = "list", min.len = 1) fields = names(values) - keys = assert_character(keys %??% uuid::UUIDgenerate(n = length(values[[1]])), len = length(values[[1]]), .var.name = "keys") + n_hashes = max(map_int(values, length)) + if (is.null(keys)) { + keys = UUIDgenerate(n = n_hashes) + } else { + assert_character(keys, min.len = n_hashes) + } lg$debug("Writting %i hash(es) with %i field(s)", length(keys), length(fields)) # construct list of redis commands to write hashes cmds = pmap(c(list(key = keys), values), function(key, ...) { - # serialize lists + # serialize value of field bin_values = map(list(...), redux::object_to_bin) lg$debug("Serialzing %i value(s) to %s", length(bin_values), format(Reduce(`+`, map(bin_values, object.size)))) @@ -887,19 +987,24 @@ Rush = R6::R6Class("Rush", }, #' @description - #' Reads redis hashes written with `$write_hashes()`. - #' The function reads the values of the `fields` in the hashes stored at `keys`. - #' The values of a hash are deserialized and combined into a single list. + #' Reads R Objects from Redis hashes. + #' The function reads the field-value pairs of the hashes stored at `keys`. + #' The values of a hash are deserialized and combined to a list. + #' If `flatten` is `TRUE`, the values are flattened to a single list e.g. list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)) becomes list(x1 = 1, x2 = 2, y = 3). + #' The reading functions combine the hashes to a table where the names of the inner lists are the column names. + #' For example, `xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))` becomes `data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7))`. #' #' @param keys (`character()`)\cr #' Keys of the hashes. #' @param fields (`character()`)\cr #' Fields to be read from the hashes. + #' @param flatten (`logical(1)`)\cr + #' Whether to flatten the list. #' #' @return (list of `list()`)\cr #' The outer list contains one element for each key. #' The inner list is the combination of the lists stored at the different fields. - read_hashes = function(keys, fields) { + read_hashes = function(keys, fields, flatten = TRUE) { lg$debug("Reading %i hash(es) with %i field(s)", length(keys), length(fields)) @@ -909,29 +1014,69 @@ Rush = R6::R6Class("Rush", # list of hashes # first level contains hashes # second level contains fields - # the values of the fields are serialized lists + # the values of the fields are serialized lists and atomics hashes = self$connector$pipeline(.commands = cmds) - # unserialize lists of the second level - # combine elements of the third level to one list - map(hashes, function(hash) unlist(map_if(hash, function(x) !is.null(x), redux::bin_to_object), recursive = FALSE)) + if (flatten) { + # unserialize elements of the second level + # flatten elements of the third level to one list + # using mapply instead of pmap is faster + map(hashes, function(hash) unlist(.mapply(function(bin_value, field) { + # unserialize value + value = safe_bin_to_object(bin_value) + # wrap atomic values in list and name by field + if (is.atomic(value) && !is.null(value)) { + # list column or column with type of value + if (length(value) > 1) value = list(value) + value = setNames(list(value), field) + } + value + }, list(bin_value = hash, field = fields), NULL), recursive = FALSE)) + } else { + # unserialize elements of the second level + map(hashes, function(hash) setNames(map(hash, function(bin_value) { + safe_bin_to_object(bin_value) + }), fields)) + } }, #' @description - #' Returns the number of attempts to evaluate a task. + #' Reads a single Redis hash and returns the values as a list named by the fields. #' #' @param keys (`character()`)\cr - #' Keys of the tasks. + #' Keys of the hashes. + #' @param fields (`character()`)\cr + #' Fields to be read from the hashes. #' - #' @return (`integer()`)\cr - #' Number of attempts. - n_tries = function(keys) { - assert_character(keys) + #' @return (list of `list()`)\cr + #' The outer list contains one element for each key. + #' The inner list is the combination of the lists stored at the different fields. + read_hash = function(key, fields) { + lg$debug("Reading hash with %i field(s)", length(fields)) + + setNames(map(self$connector$HMGET(key, fields), safe_bin_to_object), fields) + }, + + #' @description + #' Checks whether tasks have the status `"running"`. + #' + #' @param keys (`character()`)\cr + #' Keys of the tasks. + is_running_task = function(keys) { r = self$connector + if (!length(keys)) return(logical(0)) + as.logical(r$command(c("SMISMEMBER", private$.get_key("running_tasks"), keys))) + }, - # n_retries is not set when the task never failed before - n_tries = r$pipeline(.commands = map(keys, function(key) c("HGET", key, "n_tries"))) - map_int(n_tries, function(value) if (is.null(value)) 0L else as.integer(value)) + #' @description + #' Checks whether tasks have the status `"failed"`. + #' + #' @param keys (`character()`)\cr + #' Keys of the tasks. + is_failed_task = function(keys) { + r = self$connector + if (!length(keys)) return(logical(0)) + as.logical(r$command(c("SMISMEMBER", private$.get_key("failed_tasks"), keys))) } ), @@ -1020,10 +1165,9 @@ Rush = R6::R6Class("Rush", #' Keys of failed tasks. failed_tasks = function() { r = self$connector - unlist(r$LRANGE(private$.get_key("failed_tasks"), 0, -1)) + unlist(r$SMEMBERS(private$.get_key("failed_tasks"))) }, - #' @field n_queued_tasks (`integer(1)`)\cr #' Number of queued tasks. n_queued_tasks = function() { @@ -1057,7 +1201,7 @@ Rush = R6::R6Class("Rush", #' Number of failed tasks. n_failed_tasks = function() { r = self$connector - as.integer(r$LLEN(private$.get_key("failed_tasks"))) %??% 0 + as.integer(r$SCARD(private$.get_key("failed_tasks"))) %??% 0 }, #' @field n_tasks (`integer(1)`)\cr @@ -1155,10 +1299,9 @@ Rush = R6::R6Class("Rush", .snapshot_schedule = NULL, - # .hostname = NULL, - .max_tries = NULL, + .seed = NULL, # prefix key with instance id .get_key = function(key) { @@ -1179,8 +1322,6 @@ Rush = R6::R6Class("Rush", heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - max_tries = 0, - seed = NULL, worker_loop = worker_loop_default, ... ) { @@ -1191,8 +1332,6 @@ Rush = R6::R6Class("Rush", if (!is.null(heartbeat_period)) require_namespaces("callr") assert_vector(lgr_thresholds, names = "named", null.ok = TRUE) assert_count(lgr_buffer_size) - assert_count(max_tries) - assert_int(seed, null.ok = TRUE) assert_function(worker_loop) dots = list(...) r = self$connector @@ -1211,9 +1350,7 @@ Rush = R6::R6Class("Rush", heartbeat_period = heartbeat_period, heartbeat_expire = heartbeat_expire, lgr_thresholds = lgr_thresholds, - lgr_buffer_size = lgr_buffer_size, - seed = seed, - max_tries = max_tries) + lgr_buffer_size = lgr_buffer_size) # arguments needed for initializing the worker start_args = list( @@ -1287,4 +1424,3 @@ Rush = R6::R6Class("Rush", } ) ) - diff --git a/R/RushWorker.R b/R/RushWorker.R index 3b02ca8..ad8617e 100644 --- a/R/RushWorker.R +++ b/R/RushWorker.R @@ -16,7 +16,6 @@ #' @template param_lgr_thresholds #' @template param_lgr_buffer_size #' @template param_seed -#' @template param_max_tries #' #' @export RushWorker = R6::R6Class("RushWorker", @@ -46,14 +45,12 @@ RushWorker = R6::R6Class("RushWorker", heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - seed = NULL, - max_tries = 0 + seed = NULL ) { - super$initialize(network_id = network_id, config = config) + super$initialize(network_id = network_id, config = config, seed = seed) self$host = assert_choice(host, c("local", "remote")) self$worker_id = assert_string(worker_id %??% uuid::UUIDgenerate()) - private$.max_tries = assert_count(max_tries) r = self$connector # setup heartbeat @@ -104,17 +101,6 @@ RushWorker = R6::R6Class("RushWorker", } } - # initialize seed table - if (!is.null(seed)) { - private$.seed = TRUE - .lec.SetPackageSeed(seed) - walk(self$tasks, function(key) { - .lec.CreateStream(key) - }) - } else { - private$.seed = FALSE - } - # register worker ids r$SADD(private$.get_key("worker_ids"), self$worker_id) r$SADD(private$.get_key("running_worker_ids"), self$worker_id) @@ -166,7 +152,9 @@ RushWorker = R6::R6Class("RushWorker", #' #' @param timeout (`numeric(1)`)\cr #' Time to wait for task in seconds. - pop_task = function(timeout = 1) { + #' @param fields (`character()`)\cr + #' Fields to be returned. + pop_task = function(timeout = 1, fields = "xs") { r = self$connector key = r$command(c("BLMPOP", timeout, 2, private$.get_worker_key("queued_tasks"), private$.get_key("queued_tasks"), "RIGHT"))[[2]][[1]] @@ -176,7 +164,10 @@ RushWorker = R6::R6Class("RushWorker", # move key from queued to running r$command(c("SADD", private$.get_key("running_tasks"), key)) - list(key = key, xs = redux::bin_to_object(r$HGET(key, "xs"))) + + task = self$read_hash(key = key, fields = fields) + task$key = key + task }, #' @description @@ -188,23 +179,20 @@ RushWorker = R6::R6Class("RushWorker", #' List of lists of named results. #' @param extra (named `list()`)\cr #' List of lists of additional information stored along with the results. - #' @param conditions (named `list()`)\cr - #' List of lists of conditions. - #' @param state (`character(1)`)\cr - #' Status of the tasks. - #' If `"finished"` the tasks are moved to the finished tasks. - #' If `"error"` the tasks are moved to the failed tasks. - push_results = function(keys, yss = list(), extra = list(), conditions = list()) { + push_results = function(keys, yss, extra = NULL) { assert_string(keys) assert_list(yss, types = "list") - assert_list(extra, types = "list") + assert_list(extra, types = "list", null.ok = TRUE) r = self$connector - # write result to hash - self$write_hashes(ys = yss, ys_extra = extra, condition = conditions, keys = keys) + # write results to hashes + self$write_hashes( + ys = yss, + ys_extra = extra, + keys = keys) # move key from running to finished - # keys of finished and failed tasks are stored in a list i.e. the are ordered by time. + # keys of finished tasks are stored in a list i.e. the are ordered by time # each rush instance only needs to record how many results it has already seen # to cheaply get the latest results and cache the finished tasks # under some conditions a set would be more advantageous e.g. to check if a task is finished, @@ -217,52 +205,6 @@ RushWorker = R6::R6Class("RushWorker", return(invisible(self)) }, - #' @description - #' Pushes failed tasks to the data base. - #' - #' @param keys (`character(1)`)\cr - #' Keys of the associated tasks. - #' @param conditions (named `list()`)\cr - #' List of lists of conditions. - push_failed = function(keys, conditions) { - assert_string(keys) - assert_list(conditions, types = "list") - r = self$connector - - # write condition to hash - self$write_hashes(condition = conditions, keys = keys) - - # move key from running to failed - r$pipeline(.commands = list( - c("SREM", private$.get_key("running_tasks"), keys), - c("RPUSH", private$.get_key("failed_tasks"), keys) - )) - - return(invisible(self)) - }, - - #' @description - #' Sets the seed for `key`. - #' Updates the seed table if necessary. - #' - #' @param key (`character(1)`)\cr - #' Key of the task. - set_seed = function(key) { - if (!private$.seed) return(invisible(self)) - r = self$connector - - # update seed table - n_streams = length(.lec.Random.seed.table$name) - if (self$n_tasks > n_streams) { - keys = r$LRANGE(private$.get_key("all_tasks"), n_streams, -1) - walk(keys, function(key) .lec.CreateStream(key)) - } - - # set seed - .lec.CurrentStream(key) - return(invisible(self)) - }, - #' @description #' Mark the worker as terminated. #' Last step in the worker loop before the worker terminates. @@ -291,9 +233,5 @@ RushWorker = R6::R6Class("RushWorker", r = self$connector as.logical(r$EXISTS(private$.get_key("terminate_on_idle"))) && !as.logical(self$n_queued_tasks) } - ), - - private = list( - .seed = NULL ) ) diff --git a/R/helper.R b/R/helper.R index 4ec66f6..ca5cdfe 100644 --- a/R/helper.R +++ b/R/helper.R @@ -23,125 +23,53 @@ get_hostname = function() { host[1] } -# internal pid_exists functions from parallelly package by Henrik Bengtsson -# for more information see -# https://github.com/HenrikBengtsson/parallelly/blob/78a1b44021df2973d05224bfaa0b1a7abaf791ff/R/utils%2Cpid.R -choose_pid_exists = function() { - os = .Platform$OS.type - pid_check = NULL - - suppressWarnings({ - if (os == "unix") { - if (isTRUE(pid_exists_by_pskill(Sys.getpid())) && getRversion() >= "3.5.0") { - pid_check = pid_exists_by_pskill - } else if (isTRUE(pid_exists_by_ps(Sys.getpid()))) { - pid_check = pid_exists_by_ps - } - } else if (os == "windows") { - if (isTRUE(pid_exists_by_tasklist(Sys.getpid()))) { - pid_check = pid_exists_by_tasklist - } else if (isTRUE(pid_exists_by_tasklist_filter(Sys.getpid()))) { - pid_check = pid_exists_by_tasklist_filter - } - } - }) +is_lecyer_cmrg_seed = function(seed) { + is.numeric(seed) && length(seed) == 7L && all(is.finite(seed)) && (seed[1] %% 10000L == 407L) +} - pid_check +get_random_seed = function() { + env = globalenv() + env$.Random.seed } -pid_exists_by_tasklist_filter = function(pid, debug = FALSE) { - for (kk in 1:5) { - res = tryCatch({ - args = c("/FI", shQuote(sprintf("PID eq %.0f", pid)), "/NH") - out = system2("tasklist", args = args, stdout = TRUE, stderr = "") - if (debug) { - cat(sprintf("Call: tasklist %s\n", paste(args, collapse = " "))) - print(out) - str(out) - } - out = gsub("(^[ ]+|[ ]+$)", "", out) - out = out[nzchar(out)] - if (debug) { - cat("Trimmed:\n") - print(out) - str(out) - } - out = grepl(sprintf(" %.0f ", pid), out) - if (debug) { - cat("Contains PID: ", paste(out, collapse = ", "), "\n", sep = "") - } - any(out) - }, error = function(ex) NA) - if (isTRUE(res)) return(res) - Sys.sleep(0.1) +set_random_seed = function(seed, kind = NULL) { + env = globalenv() + old_seed = env$.Random.seed + if (is.null(seed)) { + if (!is.null(kind)) RNGkind(kind) + rm(list = ".Random.seed", envir = env, inherits = FALSE) + } else { + env$.Random.seed = seed } - res + invisible(old_seed) } -pid_exists_by_tasklist = function(pid, debug = FALSE) { - for (kk in 1:5) { - res = tryCatch({ - out = system2("tasklist", stdout = TRUE, stderr = "") - if (debug) { - cat("Call: tasklist\n") - print(out) - str(out) - } - out = gsub("(^[ ]+|[ ]+$)", "", out) - out = out[nzchar(out)] - skip = grep("^====", out)[1] - if (!is.na(skip)) out = out[seq(from = skip + 1L, to = length(out))] - if (debug) { - cat("Trimmed:\n") - print(out) - str(out) - } - out = strsplit(out, split = "[ ]+", fixed = FALSE) - - n = lengths(out) - n = sort(n)[round(length(n) / 2)] - drop = n - 2L - out = lapply(out, FUN = function(x) rev(x)[-seq_len(drop)][1]) - out = unlist(out, use.names = FALSE) - if (debug) { - cat("Extracted: ", paste(sQuote(out), collapse = ", "), "\n", sep = "") - } - out = as.integer(out) - if (debug) { - cat("Parsed: ", paste(sQuote(out), collapse = ", "), "\n", sep = "") - } - out = (out == pid) - if (debug) { - cat("Equals PID: ", paste(out, collapse = ", "), "\n", sep = "") - } - any(out) - }, error = function(ex) NA) - if (isTRUE(res)) return(res) - Sys.sleep(0.1) +# creates n L'Ecuyer-CMRG streams +make_rng_seeds = function(n, seed) { + seeds = vector("list", length = n) + for (ii in seq_len(n)) { + seeds[[ii]] = seed = parallel::nextRNGStream(seed) } - res + seeds } -pid_exists_by_pskill = function(pid, debug = FALSE) { - tryCatch({ - as.logical(tools::pskill(pid, signal = 0L)) - }, error = function(ex) NA) +# skips serialization of NULL +safe_bin_to_object = function(bin) { + if (is.null(bin)) return(NULL) + redux::bin_to_object(bin) } -pid_exists_by_ps = function(pid, debug = FALSE) { - tryCatch({ - out = suppressWarnings({ - system2("ps", args = pid, stdout = TRUE, stderr = FALSE) - }) +# runs code with a specific rng state +with_rng_state = function(fun, args, seed) { + if (!is.null(seed)) assign(".Random.seed", seed, envir = globalenv()) + mlr3misc::invoke(fun, .args = args) +} + + +is_retriable = function(task) { + if (is.null(task$max_retries)) return(FALSE) + assert_int(task$max_retries) + assert_int(task$n_failures, null.ok = TRUE) - status = attr(out, "status") - if (is.numeric(status) && status < 0) return(NA) - out = gsub("(^[ ]+|[ ]+$)", "", out) - out = out[nzchar(out)] - out = strsplit(out, split = "[ ]+", fixed = FALSE) - out = lapply(out, FUN = function(x) x[1]) - out = unlist(out, use.names = FALSE) - out = suppressWarnings(as.integer(out)) - any(out == pid) - }, error = function(ex) NA) + task$max_retries > task$n_failures %??% 0 } diff --git a/R/worker_loops.R b/R/worker_loops.R index a5fbeff..586806b 100644 --- a/R/worker_loops.R +++ b/R/worker_loops.R @@ -17,20 +17,65 @@ worker_loop_default = function(fun, constants = NULL, rush) { assert_function(fun) assert_list(constants, null.ok = TRUE, names = "named") - while(!rush$terminated) { - task = rush$pop_task() + while(!rush$terminated && !rush$terminated_on_idle) { + task = rush$pop_task(fields = c("xs", "seed")) if (!is.null(task)) { - # set seed - rush$set_seed(task$key) tryCatch({ - ys = mlr3misc::invoke(fun, .args = c(task$xs, constants)) + # evaluate task with seed + ys = with_rng_state(fun, args = c(task$xs, constants), seed = task$seed) rush$push_results(task$key, yss = list(ys)) }, error = function(e) { condition = list(message = e$message) rush$push_failed(task$key, conditions = list(condition)) }) - } else { - if (rush$terminated_on_idle) break + } + } + + return(NULL) +} + +#' @title Single Task Worker Loop with callr Encapsulation +#' +#' @description +#' Worker loop that pops a single task from the queue, executes the function in an external callr session and pushes the results. +#' Supports timeouts on the tasks. +#' +#' @param fun (`function`)\cr +#' Function to be executed. +#' @param constants (`list`)\cr +#' List of constants passed to `fun`. +#' @param rush ([RushWorker])\cr +#' Rush worker instance. +#' +#' @export +worker_loop_callr = function(fun, constants = NULL, rush) { + assert_function(fun) + assert_list(constants, null.ok = TRUE, names = "named") + + while(!rush$terminated && !rush$terminated_on_idle) { + task = rush$pop_task(fields = c("xs", "seed", "timeout")) + if (!is.null(task)) { + tryCatch({ + # use with_rng_state because callr moves rng state + ys = callr::r(with_rng_state, + args = list(fun = fun, args = c(task$xs, constants), seed = task$seed), + supervise = TRUE, + timeout = task$timeout %??% Inf) + rush$push_results(task$key, yss = list(ys)) + }, error = function(e) { + if (inherits(e, "callr_timeout_error")) { + # session has timed out + condition = list(message = sprintf("Task timed out after %s seconds", task$timeout)) + } else if (is.null(e$parent$message)) { + # session has crashed + condition = list(message = "External R session has crashed or was killed") + } else { + # simple error + condition = list(message = e$parent$message) + } + rush$push_failed(task$key, conditions = list(condition)) + } + ) } } diff --git a/R/zzz.R b/R/zzz.R index ae12073..79072aa 100644 --- a/R/zzz.R +++ b/R/zzz.R @@ -2,11 +2,11 @@ #' @import redux #' @import mlr3misc #' @import checkmate -#' @import rlecuyer #' @importFrom processx process #' @importFrom uuid UUIDgenerate #' @importFrom utils str #' @importFrom jsonlite fromJSON +#' @importFrom parallel nextRNGStream nextRNGSubStream "_PACKAGE" .onLoad = function(libname, pkgname) { diff --git a/man-roxygen/param_seed.R b/man-roxygen/param_seed.R index b4c08a9..3015a26 100644 --- a/man-roxygen/param_seed.R +++ b/man-roxygen/param_seed.R @@ -1,2 +1,5 @@ -#' @param seed (`integer(1)`)\cr -#' Seed for the random number generator. +#' @param seed (`integer()`)\cr +#' Initial seed for the random number generator. +#' Either a L'Ecuyer-CMRG seed (`integer(7)`) or a regular RNG seed (`integer(1)`). +#' The later is converted to a L'Ecuyer-CMRG seed. +#' If `NULL`, no seed is used for the random number generator. diff --git a/man-roxygen/param_worker_loop.R b/man-roxygen/param_worker_loop.R index 6934bf3..79e213e 100644 --- a/man-roxygen/param_worker_loop.R +++ b/man-roxygen/param_worker_loop.R @@ -2,3 +2,4 @@ #' Loop run on the workers. #' Defaults to [worker_loop_default] which is called with `fun`. #' Pass `fun` in `...`. +#' Use [worker_loop_callr] to run `fun` in an external callr session. diff --git a/man/Rush.Rd b/man/Rush.Rd index ae6cdcd..05596f6 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -108,6 +108,15 @@ Saving log messages adds a small overhead but is useful for debugging. By default, no log messages are stored. } +\section{Seed}{ + +Setting a seed is important for reproducibility. +The tasks can be evaluated with a specific L'Ecuyer-CMRG seed. +If an initial seed is passed, the seed is used to generate L'Ecuyer-CMRG seeds for each task. +Each task is then evaluated with a separate RNG stream. +See \link[parallel:RngStream]{parallel::nextRNGStream} for more details. +} + \section{Public fields}{ \if{html}{\out{
}} \describe{ @@ -217,8 +226,11 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho \item \href{#method-Rush-detect_lost_workers}{\code{Rush$detect_lost_workers()}} \item \href{#method-Rush-reset}{\code{Rush$reset()}} \item \href{#method-Rush-read_log}{\code{Rush$read_log()}} +\item \href{#method-Rush-read_tasks}{\code{Rush$read_tasks()}} \item \href{#method-Rush-push_tasks}{\code{Rush$push_tasks()}} \item \href{#method-Rush-push_priority_tasks}{\code{Rush$push_priority_tasks()}} +\item \href{#method-Rush-push_failed}{\code{Rush$push_failed()}} +\item \href{#method-Rush-retry_tasks}{\code{Rush$retry_tasks()}} \item \href{#method-Rush-fetch_latest_results}{\code{Rush$fetch_latest_results()}} \item \href{#method-Rush-wait_for_latest_results}{\code{Rush$wait_for_latest_results()}} \item \href{#method-Rush-fetch_results}{\code{Rush$fetch_results()}} @@ -232,7 +244,8 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho \item \href{#method-Rush-wait_for_tasks}{\code{Rush$wait_for_tasks()}} \item \href{#method-Rush-write_hashes}{\code{Rush$write_hashes()}} \item \href{#method-Rush-read_hashes}{\code{Rush$read_hashes()}} -\item \href{#method-Rush-n_tries}{\code{Rush$n_tries()}} +\item \href{#method-Rush-is_running_task}{\code{Rush$is_running_task()}} +\item \href{#method-Rush-is_failed_task}{\code{Rush$is_failed_task()}} \item \href{#method-Rush-clone}{\code{Rush$clone()}} } } @@ -242,7 +255,7 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho \subsection{Method \code{new()}}{ Creates a new instance of this \link[R6:R6Class]{R6} class. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$new(network_id = NULL, config = NULL)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$new(network_id = NULL, config = NULL, seed = NULL)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -259,6 +272,12 @@ If \code{NULL}, configuration set by \code{\link[=rush_plan]{rush_plan()}} is us If \code{rush_plan()} has not been called, the \code{REDIS_URL} environment variable is parsed. If \code{REDIS_URL} is not set, a default configuration is used. See \link[redux:redis_config]{redux::redis_config} for details.} + +\item{\code{seed}}{(\code{integer()})\cr +Initial seed for the random number generator. +Either a L'Ecuyer-CMRG seed (\code{integer(7)}) or a regular RNG seed (\code{integer(1)}). +The later is converted to a L'Ecuyer-CMRG seed. +If \code{NULL}, no seed is used for the random number generator.} } \if{html}{\out{
}} } @@ -315,8 +334,6 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - max_tries = 0, - seed = NULL, supervise = TRUE, worker_loop = worker_loop_default, ... @@ -352,19 +369,14 @@ By default (\code{lgr_buffer_size = 0}), the log messages are directly saved in If \code{lgr_buffer_size > 0}, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.} -\item{\code{max_tries}}{(\code{integer(1)})\cr -Maximum number of tries for a task before it is considered failed.} - -\item{\code{seed}}{(\code{integer(1)})\cr -Seed for the random number generator.} - \item{\code{supervise}}{(\code{logical(1)})\cr Whether to kill the workers when the main R process is shut down.} \item{\code{worker_loop}}{(\code{function})\cr Loop run on the workers. Defaults to \link{worker_loop_default} which is called with \code{fun}. -Pass \code{fun} in \code{...}.} +Pass \code{fun} in \code{...}. +Use \link{worker_loop_callr} to run \code{fun} in an external callr session.} \item{\code{...}}{(\code{any})\cr Arguments passed to \code{worker_loop}.} @@ -435,7 +447,8 @@ This improves the performance of the logging.} \item{\code{worker_loop}}{(\code{function})\cr Loop run on the workers. Defaults to \link{worker_loop_default} which is called with \code{fun}. -Pass \code{fun} in \code{...}.} +Pass \code{fun} in \code{...}. +Use \link{worker_loop_callr} to run \code{fun} in an external callr session.} \item{\code{...}}{(\code{any})\cr Arguments passed to \code{worker_loop}.} @@ -552,6 +565,18 @@ If \code{NULL} all worker ids are used.} } \if{html}{\out{}} } +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-read_tasks}{}}} +\subsection{Method \code{read_tasks()}}{ +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$read_tasks( + keys, + fields = c("status", "seed", "timeout", "max_retries", "n_retries") +)}\if{html}{\out{
}} +} + } \if{html}{\out{
}} \if{html}{\out{}} @@ -560,7 +585,14 @@ If \code{NULL} all worker ids are used.} Pushes a task to the queue. Task is added to queued tasks. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$push_tasks(xss, extra = NULL, terminate_workers = FALSE)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$push_tasks( + xss, + extra = NULL, + seeds = NULL, + timeouts = NULL, + max_retries = NULL, + terminate_workers = FALSE +)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -569,9 +601,24 @@ Task is added to queued tasks. \item{\code{xss}}{(list of named \code{list()})\cr Lists of arguments for the function e.g. \verb{list(list(x1, x2), list(x1, x2)))}.} -\item{\code{extra}}{(\code{list})\cr +\item{\code{extra}}{(\code{list()})\cr List of additional information stored along with the task e.g. \verb{list(list(timestamp), list(timestamp)))}.} +\item{\code{seeds}}{(\code{list()})\cr +List of L'Ecuyer-CMRG seeds for each task e.g \code{list(list(c(104071, 490840688, 1690070564, -495119766, 503491950, 1801530932, -1629447803)))}. +If \code{NULL} but an initial seed is set, L'Ecuyer-CMRG seeds are generated from the initial seed. +If \code{NULL} and no initial seed is set, no seeds are used for the random number generator.} + +\item{\code{timeouts}}{(\code{integer()})\cr +Timeouts for each task in seconds e.g. \code{c(10, 15)}. +A single number is used as the timeout for all tasks. +If \code{NULL} no timeout is set.} + +\item{\code{max_retries}}{(\code{integer()})\cr +Number of retries for each task. +A single number is used as the number of retries for all tasks. +If \code{NULL} tasks are not retried.} + \item{\code{terminate_workers}}{(\code{logical(1)})\cr Whether to stop the workers after evaluating the tasks.} } @@ -615,6 +662,48 @@ Keys of the tasks. } } \if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-push_failed}{}}} +\subsection{Method \code{push_failed()}}{ +Pushes failed tasks to the data base. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$push_failed(keys, conditions)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{keys}}{(\code{character(1)})\cr +Keys of the associated tasks.} + +\item{\code{conditions}}{(named \code{list()})\cr +List of lists of conditions.} +} +\if{html}{\out{
}} +} +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-retry_tasks}{}}} +\subsection{Method \code{retry_tasks()}}{ +Retry failed tasks. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$retry_tasks(keys, ignore_max_retires = FALSE, next_seed = FALSE)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{keys}}{(\code{character()})\cr +Keys of the tasks to be retried.} + +\item{\code{next_seed}}{(\code{logical(1)})\cr +Whether to change the seed of the task.} +} +\if{html}{\out{
}} +} +} +\if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-fetch_latest_results}{}}} \subsection{Method \code{fetch_latest_results()}}{ @@ -966,19 +1055,15 @@ Comes with an overhead.} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-write_hashes}{}}} \subsection{Method \code{write_hashes()}}{ -Writes a list to redis hashes. -The function serializes each element and writes it to a new hash. +Writes R objects to Redis hashes. +The function takes the vectors in \code{...} as input and writes each element as a field-value pair to a new hash. The name of the argument defines the field into which the serialized element is written. For example, \code{xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4))} writes \code{serialize(list(x1 = 1, x2 = 2))} at field \code{xs} into a hash and \code{serialize(list(x1 = 3, x2 = 4))} at field \code{xs} into another hash. -The function can iterate over multiple lists simultaneously. +The function can iterate over multiple vectors simultaneously. For example, \verb{xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))} creates two hashes with the fields \code{xs} and \code{ys}. -Different lengths are recycled. -The stored elements must be lists themselves. -The reading functions combine the hashes to a table where the names of the inner lists are the column names. -For example, \verb{xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))} becomes \code{data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7))}. -Vectors in list columns must be wrapped in lists. -Otherwise, \verb{$read_values()} will expand the table by the length of the vectors. -For example, \verb{xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = c("A", "B", "C"))) does not work. Pass }xs_extra = list(list(extra = list(c("A", "B", "C"))))` instead. +The vectors are recycled to the length of the longest vector. +Both lists and atomic vectors are supported. +Arguments that are \code{NULL} are ignored. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$write_hashes(..., .values = list(), keys = NULL)}\if{html}{\out{
}} } @@ -1009,9 +1094,15 @@ Keys of the hashes. \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-read_hashes}{}}} \subsection{Method \code{read_hashes()}}{ -Reads redis hashes written with \verb{$write_hashes()}. +Reads Redis hashes and combines the values of the fields into a list. The function reads the values of the \code{fields} in the hashes stored at \code{keys}. The values of a hash are deserialized and combined into a single list. + +The reading functions combine the hashes to a table where the names of the inner lists are the column names. +For example, \verb{xs = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 4)), ys = list(list(y = 3), list(y = 7))} becomes \code{data.table(x1 = c(1, 3), x2 = c(2, 4), y = c(3, 7))}. +Vectors in list columns must be wrapped in lists. +Otherwise, \verb{$read_values()} will expand the table by the length of the vectors. +For example, \verb{xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = c("A", "B", "C"))) does not work. Pass }xs_extra = list(list(extra = list(c("A", "B", "C"))))` instead. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$read_hashes(keys, fields)}\if{html}{\out{
}} } @@ -1034,12 +1125,12 @@ The inner list is the combination of the lists stored at the different fields. } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-Rush-n_tries}{}}} -\subsection{Method \code{n_tries()}}{ -Returns the number of attempts to evaluate a task. +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-is_running_task}{}}} +\subsection{Method \code{is_running_task()}}{ +Checks whether tasks have the status \code{"running"}. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$n_tries(keys)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$is_running_task(keys)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -1050,9 +1141,23 @@ Keys of the tasks.} } \if{html}{\out{}} } -\subsection{Returns}{ -(\code{integer()})\cr -Number of attempts. +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-is_failed_task}{}}} +\subsection{Method \code{is_failed_task()}}{ +Checks whether tasks have the status \code{"failed"}. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$is_failed_task(keys)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{keys}}{(\code{character()})\cr +Keys of the tasks.} +} +\if{html}{\out{
}} } } \if{html}{\out{
}} diff --git a/man/RushWorker.Rd b/man/RushWorker.Rd index 3313255..87eff74 100644 --- a/man/RushWorker.Rd +++ b/man/RushWorker.Rd @@ -47,8 +47,7 @@ Used in the worker loop to determine whether to continue.} \item \href{#method-RushWorker-push_running_task}{\code{RushWorker$push_running_task()}} \item \href{#method-RushWorker-pop_task}{\code{RushWorker$pop_task()}} \item \href{#method-RushWorker-push_results}{\code{RushWorker$push_results()}} -\item \href{#method-RushWorker-push_failed}{\code{RushWorker$push_failed()}} -\item \href{#method-RushWorker-set_seed}{\code{RushWorker$set_seed()}} +\item \href{#method-RushWorker-retry_task}{\code{RushWorker$retry_task()}} \item \href{#method-RushWorker-set_terminated}{\code{RushWorker$set_terminated()}} \item \href{#method-RushWorker-clone}{\code{RushWorker$clone()}} } @@ -67,14 +66,18 @@ Used in the worker loop to determine whether to continue.}
  • rush::Rush$fetch_running_tasks()
  • rush::Rush$fetch_tasks()
  • rush::Rush$format()
  • -
  • rush::Rush$n_tries()
  • +
  • rush::Rush$is_failed_task()
  • +
  • rush::Rush$is_running_task()
  • rush::Rush$print()
  • +
  • rush::Rush$push_failed()
  • rush::Rush$push_priority_tasks()
  • rush::Rush$push_tasks()
  • rush::Rush$read_hashes()
  • rush::Rush$read_log()
  • +
  • rush::Rush$read_tasks()
  • rush::Rush$reset()
  • rush::Rush$restart_workers()
  • +
  • rush::Rush$retry_tasks()
  • rush::Rush$start_workers()
  • rush::Rush$stop_workers()
  • rush::Rush$wait_for_finished_tasks()
  • @@ -100,8 +103,7 @@ Creates a new instance of this \link[R6:R6Class]{R6} class. heartbeat_expire = NULL, lgr_thresholds = NULL, lgr_buffer_size = 0, - seed = NULL, - max_tries = 0 + seed = NULL )}\if{html}{\out{}} } @@ -142,11 +144,11 @@ By default (\code{lgr_buffer_size = 0}), the log messages are directly saved in If \code{lgr_buffer_size > 0}, the log messages are buffered and saved in the Redis data store when the buffer is full. This improves the performance of the logging.} -\item{\code{seed}}{(\code{integer(1)})\cr -Seed for the random number generator.} - -\item{\code{max_tries}}{(\code{integer(1)})\cr -Maximum number of tries for a task before it is considered failed.} +\item{\code{seed}}{(\code{integer()})\cr +Initial seed for the random number generator. +Either a L'Ecuyer-CMRG seed (\code{integer(7)}) or a regular RNG seed (\code{integer(1)}). +The later is converted to a L'Ecuyer-CMRG seed. +If \code{NULL}, no seed is used for the random number generator.} } \if{html}{\out{}} } @@ -183,7 +185,7 @@ Keys of the tasks. Pop a task from the queue. Task is moved to the running tasks. \subsection{Usage}{ -\if{html}{\out{
    }}\preformatted{RushWorker$pop_task(timeout = 1)}\if{html}{\out{
    }} +\if{html}{\out{
    }}\preformatted{RushWorker$pop_task(timeout = 1, fields = "xs")}\if{html}{\out{
    }} } \subsection{Arguments}{ @@ -191,6 +193,9 @@ Task is moved to the running tasks. \describe{ \item{\code{timeout}}{(\code{numeric(1)})\cr Time to wait for task in seconds.} + +\item{\code{fields}}{(\code{character()})\cr +Fields to be returned.} } \if{html}{\out{}} } @@ -233,41 +238,22 @@ If \code{"error"} the tasks are moved to the failed tasks.} } } \if{html}{\out{
    }} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-RushWorker-push_failed}{}}} -\subsection{Method \code{push_failed()}}{ -Pushes failed tasks to the data base. +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-RushWorker-retry_task}{}}} +\subsection{Method \code{retry_task()}}{ +Retry failed task. \subsection{Usage}{ -\if{html}{\out{
    }}\preformatted{RushWorker$push_failed(keys, conditions)}\if{html}{\out{
    }} +\if{html}{\out{
    }}\preformatted{RushWorker$retry_task(task, next_seed = FALSE)}\if{html}{\out{
    }} } \subsection{Arguments}{ \if{html}{\out{
    }} \describe{ -\item{\code{keys}}{(\code{character(1)})\cr -Keys of the associated tasks.} +\item{\code{task}}{(\code{list()})\cr +Task to be retried.} -\item{\code{conditions}}{(named \code{list()})\cr -List of lists of conditions.} -} -\if{html}{\out{
    }} -} -} -\if{html}{\out{
    }} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-RushWorker-set_seed}{}}} -\subsection{Method \code{set_seed()}}{ -Sets the seed for \code{key}. -Updates the seed table if necessary. -\subsection{Usage}{ -\if{html}{\out{
    }}\preformatted{RushWorker$set_seed(key)}\if{html}{\out{
    }} -} - -\subsection{Arguments}{ -\if{html}{\out{
    }} -\describe{ -\item{\code{key}}{(\code{character(1)})\cr -Key of the task.} +\item{\code{next_seed}}{(\code{logical(1)})\cr +Whether to change the seed of the task.} } \if{html}{\out{
    }} } diff --git a/man/worker_loop_callr.Rd b/man/worker_loop_callr.Rd new file mode 100644 index 0000000..391b51c --- /dev/null +++ b/man/worker_loop_callr.Rd @@ -0,0 +1,22 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/worker_loops.R +\name{worker_loop_callr} +\alias{worker_loop_callr} +\title{Single Task Worker Loop with callr Encapsulation} +\usage{ +worker_loop_callr(fun, constants = NULL, rush) +} +\arguments{ +\item{fun}{(\code{function})\cr +Function to be executed.} + +\item{constants}{(\code{list})\cr +List of constants passed to \code{fun}.} + +\item{rush}{(\link{RushWorker})\cr +Rush worker instance.} +} +\description{ +Worker loop that pops a single task from the queue, executes the function in an external callr session and pushes the results. +Supports timeouts on the tasks. +} diff --git a/pkgdown/_pkgdown.yml b/pkgdown/_pkgdown.yml index d840c71..5e816ba 100644 --- a/pkgdown/_pkgdown.yml +++ b/pkgdown/_pkgdown.yml @@ -51,6 +51,7 @@ reference: contents: - start_worker - worker_loop_default + - worker_loop_callr - heartbeat - AppenderRedis - title: Misc diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R index 55389e2..89cdf86 100644 --- a/tests/testthat/test-Rush.R +++ b/tests/testthat/test-Rush.R @@ -261,6 +261,157 @@ test_that("a remote worker is killed via the heartbeat", { expect_rush_reset(rush) }) +# low level read and write ----------------------------------------------------- + +test_that("reading and writing a hash works with flatten", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + + # one field with list + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2))) + expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2))) + + # one field with atomic + key = rush$write_hashes(timeout = 1) + expect_equal(rush$read_hashes(key, "timeout"), list(list(timeout = 1))) + + # two fields with lists + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3))) + expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3))) + + # two fields with list and empty list + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list()) + expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2))) + + # two fields with list and atomic + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1) + expect_equal(rush$read_hashes(key, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1))) +}) + +test_that("reading and writing a hash works without flatten", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = Rush$new(network_id = "test-rush", config = config) + + # one field with list + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2))) + expect_equal(rush$read_hashes(key, "xs", flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2)))) + + # one field with atomic + key = rush$write_hashes(timeout = 1) + expect_equal(rush$read_hashes(key, "timeout", flatten = FALSE), list(list(timeout = 1))) + + # two fields with lists + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3))) + expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = list(y = 3)))) + + # two fields with list and empty list + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list()) + expect_equal(rush$read_hashes(key, c("xs", "ys"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), ys = NULL))) + + # two fields with list and atomic + key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), timeout = 1) + expect_equal(rush$read_hashes(key, c("xs", "timeout"), flatten = FALSE), list(list(xs = list(x1 = 1, x2 = 2), timeout = 1))) +}) + +test_that("reading and writing hashes works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + + # one field with list + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) + expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) + + # one field atomic + keys = rush$write_hashes(timeout = c(1, 1)) + expect_equal(rush$read_hashes(keys, "timeout"), list(list(timeout = 1), list(timeout = 1))) + + # two fields with list and recycled atomic + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1) + expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))) + + # two fields + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4))) + expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4))) + + # two fields with list and atomic + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = c(1, 1)) + expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))) + + # two fields with list and recycled atomic + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), timeout = 1) + expect_equal(rush$read_hashes(keys, c("xs", "timeout")), list(list(x1 = 1, x2 = 2, timeout = 1), list(x1 = 1, x2 = 3, timeout = 1))) + + # two fields, one empty + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list()) + expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) + + # recycle + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3))) + expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3))) +}) + +test_that("writing hashes to specific keys works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + + # one element + keys = uuid::UUIDgenerate() + rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys) + expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2))) + + # two elements + keys = uuid::UUIDgenerate(n = 2) + rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys) + expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) + + # wrong number of keys + keys = uuid::UUIDgenerate() + expect_error(rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys), "Assertion on 'keys' failed") +}) + + +test_that("writing list columns works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A")))) + rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys)) + + expect_list(rush$fetch_finished_tasks()$extra, len = 1) + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3])))) + rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys)) + + expect_list(rush$fetch_finished_tasks()$extra, len = 1) + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + + keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B")))) + rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys)) + rush$read_hashes(keys, c("xs", "xs_extra")) + + expect_list(rush$fetch_finished_tasks()$extra, len = 2) +}) + # task evaluation -------------------------------------------------------------- test_that("evaluating a task works", { @@ -392,9 +543,7 @@ test_that("a segfault on a local worker is detected", { config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) - fun = function(x1, x2, ...) { - tools::pskill(Sys.getpid()) - } + fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase")) worker_ids = rush$start_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE) xss = list(list(x1 = 1, x2 = 2)) @@ -404,6 +553,7 @@ test_that("a segfault on a local worker is detected", { expect_null(rush$lost_worker_ids) rush$detect_lost_workers() expect_equal(rush$lost_worker_ids, worker_ids) + rush$fetch_failed_tasks() expect_rush_reset(rush) }) @@ -415,9 +565,7 @@ test_that("a segfault on a worker is detected via the heartbeat", { config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) - fun = function(x1, x2, ...) { - tools::pskill(Sys.getpid()) - } + fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase")) withr::with_envvar(list("HOST" = "remote_host"), { worker_ids = rush$start_workers(fun = fun, n_workers = 1, heartbeat_period = 1, heartbeat_expire = 2, wait_for_workers = TRUE) @@ -495,9 +643,7 @@ test_that("a lost task is detected", { # no task is running expect_class(rush$detect_lost_workers(), "Rush") - fun = function(x1, x2, ...) { - tools::pskill(Sys.getpid()) - } + fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase")) rush$start_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE) xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) @@ -525,8 +671,9 @@ test_that("a lost task is detected", { expect_data_table(rush$fetch_tasks(), nrows = 1) data = rush$fetch_failed_tasks() - expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys")) + expect_names(names(data), must.include = c("x1", "x2", "worker_id", "message", "keys")) expect_data_table(data, nrows = 1) + expect_equal(data$message, "Worker has crashed or was killed") expect_class(rush$detect_lost_workers(), "Rush") @@ -544,9 +691,7 @@ test_that("a lost task is detected when waiting", { # no task is running expect_class(rush$detect_lost_workers(), "Rush") - fun = function(x1, x2, ...) { - get("attach")(structure(list(), class = "UserDefinedDatabase")) - } + fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase")) rush$start_workers(fun = fun, n_workers = 1, wait_for_workers = TRUE) xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)) keys = rush$push_tasks(xss) @@ -604,30 +749,6 @@ test_that("restarting a worker works", { expect_rush_reset(rush) }) -test_that("a task is restarted when a worker is lost", { - skip_on_cran() - skip_on_ci() - set.seed(1) # make log messages reproducible - skip_if(TRUE) # does not work in testthat on environment - - config = start_flush_redis() - rush = Rush$new(network_id = "test-rush", config = config) - fun = function(x1, x2, ...) { - tools::pskill(Sys.getpid()) - } - - rush$start_workers(fun = fun, n_workers = 1, max_tries = 1, wait_for_workers = TRUE) - - xss = list(list(x1 = 1, x2 = 2)) - keys = rush$push_tasks(xss) - - rush$detect_lost_workers(restart_workers = TRUE, restart_tasks = TRUE) - - expect_equal(rush$n_tries(keys), 1) - - expect_rush_reset(rush) -}) - # receiving results ------------------------------------------------------------ test_that("blocking on new results works", { @@ -802,30 +923,58 @@ test_that("network without controller works", { # seed ------------------------------------------------------------------------- +test_that("seeds are generated from regular rng seed", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = Rush$new(network_id = "test-rush", config = config, seed = 123) + rush$push_tasks(list(list(x1 = 1, x2 = 2))) + tab = rush$fetch_tasks(fields = c("xs", "seed")) + expect_true(is_lecyer_cmrg_seed(tab$seed[[1]])) + + rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2))) + tab = rush$fetch_tasks(fields = c("xs", "seed")) + expect_true(tab$seed[[1]][2] != tab$seed[[2]][2]) + expect_true(tab$seed[[2]][2] != tab$seed[[3]][2]) +}) + +test_that("seed are generated from L'Ecuyer-CMRG seed", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = Rush$new(network_id = "test-rush", config = config, seed = c(10407L, 1801422725L, -2057975723L, 1156894209L, 1595475487L, 210384600L, -1655729657L)) + rush$push_tasks(list(list(x1 = 1, x2 = 2))) + tab = rush$fetch_tasks(fields = c("xs", "seed")) + expect_true(is_lecyer_cmrg_seed(tab$seed[[1]])) + + rush$push_tasks(list(list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2))) + tab = rush$fetch_tasks(fields = c("xs", "seed")) + expect_true(tab$seed[[1]][2] != tab$seed[[2]][2]) + expect_true(tab$seed[[2]][2] != tab$seed[[3]][2]) +}) + test_that("seed is set correctly on two workers", { skip_on_cran() skip_on_ci() config = start_flush_redis() - rush = Rush$new(network_id = "test-rush", config = config) + rush = Rush$new(network_id = "test-rush", config = config, seed = 123) fun = function(x1, x2, ...) list(y = sample(10000, 1)) - worker_ids = rush$start_workers(fun = fun, n_workers = 2, seed = 123456, wait_for_workers = TRUE) + worker_ids = rush$start_workers(fun = fun, n_workers = 2, wait_for_workers = TRUE) .keys = rush$push_tasks(list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 2, x2 = 3), list(x1 = 2, x2 = 4))) rush$wait_for_tasks(.keys) finished_tasks = rush$fetch_finished_tasks() - expect_equal(finished_tasks[.keys[1], y, on = "keys"], 4492) - expect_equal(finished_tasks[.keys[2], y, on = "keys"], 9223) - expect_equal(finished_tasks[.keys[3], y, on = "keys"], 2926) - expect_equal(finished_tasks[.keys[4], y, on = "keys"], 4937) + expect_set_equal(finished_tasks$y, c(5971L, 4090L, 1754L, 9794L)) .keys = rush$push_tasks(list(list(x1 = 5, x2 = 3), list(x1 = 5, x2 = 4))) rush$wait_for_tasks(.keys) finished_tasks = rush$fetch_finished_tasks() - expect_equal(finished_tasks[.keys[1], y, on = "keys"], 7814) - expect_equal(finished_tasks[.keys[2], y, on = "keys"], 713) + expect_set_equal(finished_tasks$y, c(1754L, 9794L, 4090L, 5971L, 8213L, 3865L)) expect_rush_reset(rush, type = "terminate") }) diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R index 0e9ac4c..b0871f3 100644 --- a/tests/testthat/test-RushWorker.R +++ b/tests/testthat/test-RushWorker.R @@ -98,110 +98,6 @@ test_that("a heartbeat is started", { expect_rush_reset(rush, type = "terminate") }) -test_that("reading and writing a hash works", { - skip_on_cran() - skip_on_ci() - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - # one field - key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2))) - expect_equal(rush$read_hashes(key, "xs"), list(list(x1 = 1, x2 = 2))) - - # two fields - key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list(list(y = 3))) - expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3))) - - # two fields, one empty - key = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), ys = list()) - expect_equal(rush$read_hashes(key, c("xs", "ys")), list(list(x1 = 1, x2 = 2))) - - # one field with empty list - key = rush$write_hashes(xs = list(list())) - expect_equal(rush$read_hashes(key, "xs"), list(list())) - - # one empty field - expect_error(rush$write_hashes(xs = list()), "Assertion on 'values' failed") -}) - -test_that("reading and writing hashes works", { - skip_on_cran() - skip_on_ci() - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - # one field - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) - expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) - - # two fields - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3), list(y = 4))) - expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 4))) - - # two fields, one empty - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list()) - expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) - - # recycle - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), ys = list(list(y = 3))) - expect_equal(rush$read_hashes(keys, c("xs", "ys")), list(list(x1 = 1, x2 = 2, y = 3), list(x1 = 1, x2 = 3, y = 3))) -}) - -test_that("writing hashes to specific keys works", { - skip_on_cran() - skip_on_ci() - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - # one element - keys = uuid::UUIDgenerate() - rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), keys = keys) - expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2))) - - # two elements - keys = uuid::UUIDgenerate(n = 2) - rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys) - expect_equal(rush$read_hashes(keys, "xs"), list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3))) - - # wrong number of keys - keys = uuid::UUIDgenerate() - expect_error(rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)), keys = keys), "Assertion on 'keys' failed") -}) - - -test_that("writing list columns works", { - skip_on_cran() - skip_on_ci() - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list("A")))) - rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys)) - - expect_list(rush$fetch_finished_tasks()$extra, len = 1) - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2)), xs_extra = list(list(extra = list(letters[1:3])))) - rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys)) - - expect_list(rush$fetch_finished_tasks()$extra, len = 1) - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - keys = rush$write_hashes(xs = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2)), xs_extra = list(list(extra = list("A")), list(extra = list("B")))) - rush$connector$command(c("LPUSH", "test-rush:finished_tasks", keys)) - rush$read_hashes(keys, c("xs", "xs_extra")) - - expect_list(rush$fetch_finished_tasks()$extra, len = 2) -}) - test_that("pushing a task to the queue works", { skip_on_cran() skip_on_ci() @@ -235,6 +131,10 @@ test_that("pushing a task to the queue works", { expect_data_table(data, nrows = 1) expect_data_table(rush$fetch_tasks(), nrows = 1) + # status checks + expect_false(rush$is_running_task(keys)) + expect_false(rush$is_failed_task(keys)) + expect_rush_reset(rush, type = "terminate") }) @@ -274,6 +174,10 @@ test_that("pushing a task with extras to the queue works", { expect_equal(data$timestamp, timestamp) expect_data_table(rush$fetch_tasks(), nrows = 1) + # status checks + expect_false(rush$is_running_task(keys)) + expect_false(rush$is_failed_task(keys)) + expect_rush_reset(rush, type = "terminate") }) @@ -311,6 +215,10 @@ test_that("pushing tasks to the queue works", { expect_character(data$keys, unique = TRUE, len = 2) expect_data_table(rush$fetch_tasks(), nrows = 2) + # status checks + expect_false(any(rush$is_running_task(keys))) + expect_false(any(rush$is_failed_task(keys))) + expect_rush_reset(rush, type = "terminate") }) @@ -351,6 +259,10 @@ test_that("pushing tasks with extras to the queue works", { expect_equal(data$timestamp, c(timestamp, timestamp)) expect_data_table(rush$fetch_tasks(), nrows = 2) + # status checks + expect_false(any(rush$is_running_task(keys))) + expect_false(any(rush$is_failed_task(keys))) + expect_rush_reset(rush, type = "terminate") }) @@ -390,6 +302,59 @@ test_that("popping a task from the queue works", { expect_data_table(data, nrows = 1) expect_data_table(rush$fetch_tasks(), nrows = 1) + # status checks + expect_true(rush$is_running_task(task$key)) + expect_false(rush$is_failed_task(task$key)) + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("popping a task with seed, max_retries and timeout works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + seed = 123456 + max_retries = 2 + timeout = 1 + rush$push_tasks(xss, seed = list(seed), max_retries = max_retries, timeout = timeout) + + # check task + task = rush$pop_task(fields = c("xs", "seed", "max_retries", "timeout")) + expect_equal(task$seed, seed) + expect_equal(task$max_retries, max_retries) + expect_equal(task$timeout, timeout) + expect_rush_task(task) + + # check task count + expect_equal(rush$n_tasks, 1) + expect_equal(rush$n_queued_tasks, 0) + expect_equal(rush$n_running_tasks, 1) + expect_equal(rush$n_finished_tasks, 0) + expect_equal(rush$n_failed_tasks, 0) + + # check keys in sets + expect_string(rush$tasks) + expect_null(rush$queued_tasks) + expect_string(rush$running_tasks) + expect_null(rush$finished_tasks) + expect_null(rush$failed_tasks) + + # check fetching + expect_data_table(rush$fetch_queued_tasks(), nrows = 0) + expect_data_table(rush$fetch_finished_tasks(), nrows = 0) + expect_data_table(rush$fetch_failed_tasks(), nrows = 0) + data = rush$fetch_running_tasks() + expect_names(names(data), must.include = c("x1", "x2", "worker_id", "keys")) + expect_data_table(data, nrows = 1) + expect_data_table(rush$fetch_tasks(), nrows = 1) + + # status checks + expect_true(rush$is_running_task(task$key)) + expect_false(rush$is_failed_task(task$key)) + expect_rush_reset(rush, type = "terminate") }) @@ -428,6 +393,10 @@ test_that("pushing a finished task works", { expect_data_table(data, nrows = 1) expect_data_table(rush$fetch_tasks(), nrows = 1) + # status checks + expect_false(rush$is_running_task(task$key)) + expect_false(rush$is_failed_task(task$key)) + expect_rush_reset(rush, type = "terminate") }) @@ -466,6 +435,136 @@ test_that("pushing a failed tasks works", { expect_data_table(data, nrows = 1) expect_data_table(rush$fetch_tasks(), nrows = 1) + # status checks + expect_false(rush$is_running_task(task$key)) + expect_true(rush$is_failed_task(task$key)) + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("retry a failed task works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + keys = rush$push_tasks(xss) + task = rush$pop_task() + + expect_output(rush$retry_tasks(keys), "Not all task") + + rush$push_failed(task$key, condition = list(list(message = "error"))) + + expect_equal(rush$n_queued_tasks, 0) + expect_equal(rush$n_failed_tasks, 1) + expect_true(rush$is_failed_task(task$key)) + + rush$retry_tasks(keys) + + expect_equal(rush$n_queued_tasks, 1) + expect_equal(rush$n_failed_tasks, 0) + expect_false(rush$is_failed_task(task$key)) + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("retry a failed task works and setting a new seed works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + seed = c(10407L, 1280795612L, -169270483L, -442010614L, -603558397L, -222347416L, 1489374793L) + keys = rush$push_tasks(xss, seed = list(seed)) + task = rush$pop_task(fields = c("xs", "seed")) + expect_equal(task$seed, seed) + + rush$push_failed(task$key, condition = list(list(message = "error"))) + + expect_equal(rush$n_queued_tasks, 0) + expect_equal(rush$n_failed_tasks, 1) + expect_true(rush$is_failed_task(task$key)) + + rush$retry_tasks(keys, next_seed = TRUE) + task_info = rush$read_hash(keys, "seed") + expect_true(is_lecyer_cmrg_seed(task_info$seed)) + expect_true(task_info$seed[2] != seed[2]) + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("retry a failed task works with a maximum of retries", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + keys = rush$push_tasks(xss, max_retries = 1) + task = rush$pop_task(fields = c("max_retries", "n_retries")) + + expect_equal(task$max_retries, 1) + expect_null(task$n_retries) + expect_output(rush$retry_tasks(keys), "Not all task") + + rush$push_failed(task$key, condition = list(list(message = "error"))) + + expect_equal(rush$n_queued_tasks, 0) + expect_equal(rush$n_failed_tasks, 1) + expect_true(rush$is_failed_task(task$key)) + + rush$retry_tasks(keys) + + task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries")) + expect_equal(task_info$max_retries, 1) + expect_equal(task_info$n_retries, 1) + expect_equal(rush$n_queued_tasks, 1) + expect_equal(rush$n_failed_tasks, 0) + expect_false(rush$is_failed_task(task$key)) + task = rush$pop_task() + + rush$push_failed(task$key, condition = list(list(message = "error"))) + expect_output(rush$retry_tasks(keys), "reached the maximum number of retries") + + rush$retry_tasks(keys, ignore_max_retires = TRUE) + task_info = rush$read_hash(keys, fields = c("max_retries", "n_retries")) + expect_equal(task_info$max_retries, 1) + expect_equal(task_info$n_retries, 2) + expect_equal(rush$n_queued_tasks, 1) + expect_equal(rush$n_failed_tasks, 0) + expect_false(rush$is_failed_task(task$key)) + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("retry failed tasks works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2), list(x1 = 1, x2 = 3)) + rush$push_tasks(xss) + task_1 = rush$pop_task() + task_2 = rush$pop_task() + keys = c(task_1$key, task_2$key) + + expect_output(rush$retry_tasks(keys), "Not all task") + + rush$push_failed(keys, condition = list(list(message = "error"))) + + expect_equal(rush$n_queued_tasks, 0) + expect_equal(rush$n_failed_tasks, 2) + expect_true(all(rush$is_failed_task(keys))) + + rush$retry_tasks(keys) + + expect_equal(rush$n_queued_tasks, 2) + expect_equal(rush$n_failed_tasks, 0) + expect_false(any(rush$is_failed_task(keys))) + expect_rush_reset(rush, type = "terminate") }) @@ -852,30 +951,6 @@ test_that("pushing tasks and terminating worker works", { expect_rush_reset(rush, type = "terminate") }) -test_that("n_retries method works", { - skip_on_cran() - skip_on_ci() - - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") - - xss = list(list(x1 = 1, x2 = 2)) - keys = rush$push_tasks(xss) - - expect_equal(rush$n_tries(keys), 0) - - xss = list(list(x1 = 1, x2 = 2), list(x1 = 3, x2 = 2)) - keys = rush$push_tasks(xss) - - expect_equal(rush$n_tries(keys), c(0, 0)) - - rush$connector$command(c("HINCRBY", keys[1], "n_tries", 1)) - - expect_equal(rush$n_tries(keys), c(1L, 0)) - - expect_rush_reset(rush, type = "terminate") -}) - test_that("terminate on idle works", { skip_on_cran() skip_on_ci() @@ -896,26 +971,18 @@ test_that("terminate on idle works", { # seed ------------------------------------------------------------------------- -test_that("seed is set correctly", { +test_that("popping a task with seed from the queue works", { skip_on_cran() skip_on_ci() - on.exit({ - .lec.exit() - }) - config = start_flush_redis() - rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456) - - expect_null(.lec.Random.seed.table$name) - - rush$push_tasks(list(list(x1 = 1, x2 = 2))) - task = rush$pop_task() - rush$set_seed(task$key) - - expect_equal(.lec.Random.seed.table$name, task$key) + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123) + xss = list(list(x1 = 1, x2 = 2)) + rush$push_tasks(xss) - expect_equal(sample(seq(100000), 1), 86412) + # check task seed + task = rush$pop_task(fields = c("xs", "seed")) + expect_true(is_lecyer_cmrg_seed(task$seed)) expect_rush_reset(rush, type = "terminate") }) diff --git a/tests/testthat/test-worker_loops.R b/tests/testthat/test-worker_loops.R index a3bdb94..954b1cd 100644 --- a/tests/testthat/test-worker_loops.R +++ b/tests/testthat/test-worker_loops.R @@ -1,3 +1,5 @@ +# default ---------------------------------------------------------------------- + test_that("worker_loop_default works", { config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -16,43 +18,110 @@ test_that("worker_loop_default works with failed task", { rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) rush$push_tasks(xss, terminate_workers = TRUE) - fun = function(x1, x2, ...) stop("failed") + fun = function(x1, x2, ...) stop("Simple R error") expect_null(worker_loop_default(fun, rush = rush)) expect_equal(rush$n_failed_tasks, 1L) + expect_equal(rush$fetch_failed_tasks()$message, "Simple R error") expect_rush_reset(rush, type = "terminate") }) -test_that("worker_loop_default works with terminate ", { +test_that("worker_loop_default retries failed task", { config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") xss = list(list(x1 = 1, x2 = 2)) + rush$push_tasks(xss, max_retries = 2, terminate_workers = TRUE) + fun = function(x1, x2, ...) stop("Simple R error") + + expect_null(worker_loop_default(fun, rush = rush)) + expect_equal(rush$n_failed_tasks, 1L) + expect_equal(rush$fetch_failed_tasks()$message, "Simple R error") + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("worker_loop_default sets seed is set correctly", { + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456) + xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)) rush$push_tasks(xss, terminate_workers = TRUE) - fun = function(x1, x2, ...) stop("failed") + fun = function(x1, x2, ...) list(y = sample(10000, 1)) expect_null(worker_loop_default(fun, rush = rush)) + + expect_equal(rush$fetch_finished_tasks()$y, c(7521, 1616, 551)) + + expect_rush_reset(rush, type = "terminate") +}) + +# callr ------------------------------------------------------------------------ + +test_that("worker_loop_callr works", { + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + rush$push_tasks(xss, terminate_workers = TRUE) + fun = function(x1, x2, ...) list(y = x1 + x2) + + expect_null(worker_loop_callr(fun, rush = rush)) + expect_equal(rush$n_finished_tasks, 1L) + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("worker_loop_callr works with failed task", { + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + rush$push_tasks(xss, terminate_workers = TRUE) + fun = function(x1, x2, ...) stop("Simple R error") + + expect_null(worker_loop_callr(fun, rush = rush)) + expect_equal(rush$n_failed_tasks, 1L) + expect_equal(rush$fetch_failed_tasks()$message, "Simple R error") + + expect_rush_reset(rush, type = "terminate") +}) + +test_that("worker_loop_callr works with lost task", { + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + rush$push_tasks(xss, terminate_workers = TRUE) + fun = function(x1, x2, ...) get("attach")(structure(list(), class = "UserDefinedDatabase")) + + expect_null(worker_loop_callr(fun, rush = rush)) expect_equal(rush$n_failed_tasks, 1L) + expect_equal(rush$fetch_failed_tasks()$message, "External R session has crashed or was killed") expect_rush_reset(rush, type = "terminate") }) -test_that("seed is set correctly", { +test_that("worker_loop_callr works with timeout", { + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") + xss = list(list(x1 = 1, x2 = 2)) + rush$push_tasks(xss, timeouts = 1, terminate_workers = TRUE) + fun = function(x1, x2, ...) Sys.sleep(10) - on.exit({ - .lec.exit() - }) + expect_null(worker_loop_callr(fun, rush = rush)) + expect_equal(rush$n_failed_tasks, 1L) + expect_equal(rush$fetch_failed_tasks()$message, "Task timed out after 1 seconds") + expect_rush_reset(rush, type = "terminate") +}) + +test_that("worker_loop_callr sets seed correctly", { config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123456) - xss = list(list(x1 = 1, x2 = 2)) + xss = list(list(x1 = 1, x2 = 2), list(x1 = 2, x2 = 2), list(x1 = 3, x2 = 2)) rush$push_tasks(xss, terminate_workers = TRUE) fun = function(x1, x2, ...) list(y = sample(10000, 1)) - expect_null(worker_loop_default(fun, rush = rush)) + expect_null(worker_loop_callr(fun, rush = rush)) - expect_equal(rush$fetch_finished_tasks()$y, 4492) + expect_equal(rush$fetch_finished_tasks()$y, c(7521, 1616, 551)) expect_rush_reset(rush, type = "terminate") }) -