From 7f1618a01a67b5e70f8f1c6468db28c977803fb4 Mon Sep 17 00:00:00 2001 From: be-marc Date: Sun, 28 Apr 2024 17:37:40 +0200 Subject: [PATCH] fix: partial matches --- NAMESPACE | 1 + R/Rush.R | 2 +- R/rush_plan.R | 13 +++++++++++ man/Rush.Rd | 10 ++++++++- man/remove_rush_plan.Rd | 11 ++++++++++ man/rush_plan.Rd | 6 +++++- tests/testthat/_snaps/Rush.md | 11 ---------- tests/testthat/helper.R | 3 +-- tests/testthat/setup.R | 13 +++++++++++ tests/testthat/teardown.R | 2 ++ tests/testthat/test-Rush.R | 35 ++++++++++++++++-------------- tests/testthat/test-RushWorker.R | 37 ++++++++++++++++++++------------ tests/testthat/test-rush_plan.R | 9 +++++--- 13 files changed, 104 insertions(+), 49 deletions(-) create mode 100644 man/remove_rush_plan.Rd delete mode 100644 tests/testthat/_snaps/Rush.md create mode 100644 tests/testthat/setup.R create mode 100644 tests/testthat/teardown.R diff --git a/NAMESPACE b/NAMESPACE index 7708194..479e082 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -9,6 +9,7 @@ export(assert_rush_workers) export(assert_rushs) export(get_hostname) export(heartbeat) +export(remove_rush_plan) export(rsh) export(rush_available) export(rush_config) diff --git a/R/Rush.R b/R/Rush.R index 700659f..ef32b44 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -327,7 +327,7 @@ Rush = R6::R6Class("Rush", wait_for_workers = function(n, timeout = Inf) { assert_count(n) assert_number(timeout) - timeout = if (is.finite(timeout)) timeout else rush_config()$start_worker_timeout + timeout = if (is.finite(timeout)) timeout else rush_config()$start_worker_timeout %??% Inf start_time = Sys.time() while(self$n_workers < n) { diff --git a/R/rush_plan.R b/R/rush_plan.R index 35c2a4b..f2d32f5 100644 --- a/R/rush_plan.R +++ b/R/rush_plan.R @@ -9,6 +9,8 @@ #' If `NULL`, the `REDIS_URL` environment variable is parsed. #' If `REDIS_URL` is not set, a default configuration is used. #' See [redux::redis_config] for details. +#' @param start_worker_timeout (`numeric(1)`)\cr +#' The time in seconds to wait for a worker to start. #' #' @template param_n_workers #' @template param_lgr_thresholds @@ -55,6 +57,17 @@ rush_config = function() { start_worker_timeout = rush_env$start_worker_timeout) } +#' @title Remove Rush Plan +#' +#' @description +#' Removes the rush plan that was set by [rush_plan()]. +#' +#' @export +remove_rush_plan = function() { + rm(list = ls(envir = rush_env), envir = rush_env) + invisible(NULL) +} + #' @title Rush Available #' #' @description diff --git a/man/Rush.Rd b/man/Rush.Rd index 1ca0882..0daabda 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -346,6 +346,7 @@ This function takes the arguments \code{fun} and optionally \code{constants} whi \if{html}{\out{
}}\preformatted{Rush$start_workers( n_workers = NULL, wait_for_workers = TRUE, + timeout = Inf, globals = NULL, packages = NULL, heartbeat_period = NULL, @@ -367,6 +368,9 @@ Number of workers to be started.} \item{\code{wait_for_workers}}{(\code{logical(1)})\cr Whether to wait until all workers are available.} +\item{\code{timeout}}{(\code{numeric(1)})\cr +Timeout to wait for workers in seconds.} + \item{\code{globals}}{(\code{character()})\cr Global variables to be loaded to the workers global environment.} @@ -481,7 +485,7 @@ Arguments passed to \code{worker_loop}.} \subsection{Method \code{wait_for_workers()}}{ Wait until \code{n} workers are available. \subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$wait_for_workers(n)}\if{html}{\out{
}} +\if{html}{\out{
}}\preformatted{Rush$wait_for_workers(n, timeout = Inf)}\if{html}{\out{
}} } \subsection{Arguments}{ @@ -489,6 +493,10 @@ Wait until \code{n} workers are available. \describe{ \item{\code{n}}{(\code{integer(1)})\cr Number of workers to wait for.} + +\item{\code{timeout}}{(\code{numeric(1)})\cr +Timeout in seconds. +Default is \code{Inf}.} } \if{html}{\out{
}} } diff --git a/man/remove_rush_plan.Rd b/man/remove_rush_plan.Rd new file mode 100644 index 0000000..db94a73 --- /dev/null +++ b/man/remove_rush_plan.Rd @@ -0,0 +1,11 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/rush_plan.R +\name{remove_rush_plan} +\alias{remove_rush_plan} +\title{Remove Rush Plan} +\usage{ +remove_rush_plan() +} +\description{ +Removes the rush plan that was set by \code{\link[=rush_plan]{rush_plan()}}. +} diff --git a/man/rush_plan.Rd b/man/rush_plan.Rd index 2b94e57..e8e3fcf 100644 --- a/man/rush_plan.Rd +++ b/man/rush_plan.Rd @@ -8,7 +8,8 @@ rush_plan( n_workers, config = NULL, lgr_thresholds = NULL, - large_objects_path = NULL + large_objects_path = NULL, + start_worker_timeout = Inf ) } \arguments{ @@ -26,6 +27,9 @@ Logger threshold on the workers e.g. \code{c(rush = "debug")}.} \item{large_objects_path}{(\code{character(1)})\cr The path to the directory where large objects are stored.} + +\item{start_worker_timeout}{(\code{numeric(1)})\cr +The time in seconds to wait for a worker to start.} } \description{ Stores the number of workers and Redis configuration options (\link[redux:redis_config]{redux::redis_config}) for \link{Rush}. diff --git a/tests/testthat/_snaps/Rush.md b/tests/testthat/_snaps/Rush.md deleted file mode 100644 index c2214ac..0000000 --- a/tests/testthat/_snaps/Rush.md +++ /dev/null @@ -1,11 +0,0 @@ -# worker can be started with script - - Code - rush$create_worker_script(fun = fun) - Output - DEBUG (500): [rush] Pushing worker config to Redis - DEBUG (500): [rush] Serializing worker configuration to 2384528 bytes - INFO (400): [rush] Start worker with: - INFO (400): [rush] Rscript -e 'rush::start_worker(network_id = 'test-rush', hostname = 'host', url = 'redis://127.0.0.1:6379')' - INFO (400): [rush] See ?rush::start_worker for more details. - diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index 93a7e56..9c7b6cc 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -12,10 +12,9 @@ expect_rush_task = function(task) { } expect_rush_reset = function(rush, type = "kill") { + remove_rush_plan() processes = rush$processes rush$reset(type = type) expect_list(rush$connector$command(c("KEYS", "*")), len = 0) walk(processes, function(p) p$kill()) } - -lg$set_threshold("debug") diff --git a/tests/testthat/setup.R b/tests/testthat/setup.R new file mode 100644 index 0000000..af02590 --- /dev/null +++ b/tests/testthat/setup.R @@ -0,0 +1,13 @@ +old_opts = options( + warnPartialMatchArgs = TRUE, + warnPartialMatchAttr = TRUE, + warnPartialMatchDollar = TRUE +) + +# https://github.com/HenrikBengtsson/Wishlist-for-R/issues/88 +old_opts = lapply(old_opts, function(x) if (is.null(x)) FALSE else x) + +lg_rush = lgr::get_logger("rush") +old_threshold_rush = lg_rush$threshold +lg_rush$set_threshold(0) + diff --git a/tests/testthat/teardown.R b/tests/testthat/teardown.R new file mode 100644 index 0000000..de41d02 --- /dev/null +++ b/tests/testthat/teardown.R @@ -0,0 +1,2 @@ +options(old_opts) +lg_rush$set_threshold(old_threshold_rush) diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R index f136f05..b41222e 100644 --- a/tests/testthat/test-Rush.R +++ b/tests/testthat/test-Rush.R @@ -1,7 +1,7 @@ # start workers with processx -------------------------------------------------- test_that("constructing a rush controller works", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -12,7 +12,7 @@ test_that("constructing a rush controller works", { }) test_that("workers are started", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -20,7 +20,7 @@ test_that("workers are started", { expect_data_table(rush$worker_info, nrows = 0) - worker_ids = rush$start_workers(fun = fun, n_workers = 2, lgr_threshold = c(rush = "debug"), wait_for_workers = TRUE) + worker_ids = rush$start_workers(fun = fun, n_workers = 2, lgr_thresholds = c(rush = "debug"), wait_for_workers = TRUE) expect_equal(rush$n_workers, 2) # check fields @@ -39,7 +39,7 @@ test_that("workers are started", { }) test_that("workers are started with a heartbeat", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -55,7 +55,7 @@ test_that("workers are started with a heartbeat", { }) test_that("additional workers are started", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -81,7 +81,7 @@ test_that("additional workers are started", { }) test_that("packages are available on the worker", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -99,7 +99,7 @@ test_that("packages are available on the worker", { }) test_that("globals are available on the worker", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -119,7 +119,7 @@ test_that("globals are available on the worker", { }) test_that("named globals are available on the worker", { - + skip_on_cran() config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -141,8 +141,8 @@ test_that("named globals are available on the worker", { # start workers with script ---------------------------------------------------- test_that("worker can be started with script", { - - skip_if(TRUE) + skip_on_cran() + #devskip_if(TRUE) set.seed(1) # make log messages reproducible root_logger = lgr::get_logger("root") @@ -150,7 +150,7 @@ test_that("worker can be started with script", { root_logger$appenders$cons$layout$set_fmt("%L (%n): %m") on.exit({ - root_logger$appenders$cons$layout$set_fmt(old_fmt) + root_logger$appenders$console$layout$set_fmt(old_fmt) }) config = start_flush_redis() @@ -239,7 +239,6 @@ test_that("a local worker is killed", { }) test_that("a remote worker is killed via the heartbeat", { - skip_on_os("windows") config = start_flush_redis() @@ -740,8 +739,7 @@ test_that("restarting a worker works", { }) test_that("restarting a worker kills the worker", { - - skip_on_windows() + skip_on_os("windows") config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) @@ -753,6 +751,9 @@ test_that("restarting a worker kills the worker", { expect_true(tools::pskill(pid, signal = 0)) rush$restart_workers(worker_ids = worker_id) + + Sys.sleep(1) + expect_false(pid == rush$worker_info$pid) expect_false(tools::pskill(pid, signal = 0)) @@ -855,8 +856,6 @@ test_that("snapshot option works", { }) test_that("terminating workers on idle works", { - - config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) fun = function(x1, x2, ...) list(y = x1 + x2) @@ -982,6 +981,10 @@ test_that("seed is set correctly on two workers", { # log -------------------------------------------------------------------------- test_that("printing logs with redis appender works", { + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") skip_if(TRUE) # does not work in testthat on environment diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R index 92afa8f..128eaa7 100644 --- a/tests/testthat/test-RushWorker.R +++ b/tests/testthat/test-RushWorker.R @@ -307,7 +307,7 @@ test_that("popping a task with seed, max_retries and timeout works", { seed = 123456 max_retries = 2 timeout = 1 - rush$push_tasks(xss, seed = list(seed), max_retries = max_retries, timeout = timeout) + rush$push_tasks(xss, seeds = list(seed), max_retries = max_retries, timeouts = timeout) # check task task = rush$pop_task(fields = c("xs", "seed", "max_retries", "timeout")) @@ -396,7 +396,7 @@ test_that("pushing a failed tasks works", { rush$push_tasks(xss) task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) # check task count expect_equal(rush$n_tasks, 1) @@ -429,7 +429,10 @@ test_that("pushing a failed tasks works", { }) test_that("retry a failed task works", { - + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -439,7 +442,7 @@ test_that("retry a failed task works", { expect_output(rush$retry_tasks(keys), "Not all task") - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 1) @@ -465,7 +468,7 @@ test_that("retry a failed task works and setting a new seed works", { task = rush$pop_task(fields = c("xs", "seed")) expect_equal(task$seed, seed) - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 1) @@ -480,7 +483,10 @@ test_that("retry a failed task works and setting a new seed works", { }) test_that("retry a failed task works with a maximum of retries", { - + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -492,7 +498,7 @@ test_that("retry a failed task works with a maximum of retries", { expect_null(task$n_retries) expect_output(rush$retry_tasks(keys), "Not all task") - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 1) @@ -508,7 +514,7 @@ test_that("retry a failed task works with a maximum of retries", { expect_false(rush$is_failed_task(task$key)) task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) expect_output(rush$retry_tasks(keys), "reached the maximum number of retries") rush$retry_tasks(keys, ignore_max_retries = TRUE) @@ -523,7 +529,10 @@ test_that("retry a failed task works with a maximum of retries", { }) test_that("retry failed tasks works", { - + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("info") config = start_flush_redis() rush = RushWorker$new(network_id = "test-rush", config = config, host = "local") @@ -535,7 +544,7 @@ test_that("retry failed tasks works", { expect_output(rush$retry_tasks(keys), "Not all task") - rush$push_failed(keys, condition = list(list(message = "error"))) + rush$push_failed(keys, conditions = list(list(message = "error"))) expect_equal(rush$n_queued_tasks, 0) expect_equal(rush$n_failed_tasks, 2) @@ -600,7 +609,7 @@ test_that("moving and fetching tasks works", { # push failed task task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) queued_tasks = rush$fetch_queued_tasks() expect_data_table(queued_tasks, nrows = 1) expect_character(queued_tasks$keys, unique = TRUE) @@ -673,7 +682,7 @@ test_that("fetching as list works", { # push failed task task = rush$pop_task() - rush$push_failed(task$key, condition = list(list(message = "error"))) + rush$push_failed(task$key, conditions = list(list(message = "error"))) failed_tasks = rush$fetch_failed_tasks(data_format = "list") expect_list(failed_tasks, len = 1) expect_names(names(failed_tasks), identical.to = task$key) @@ -727,7 +736,7 @@ test_that("fetch task with states works", { xss = list(list(x1 = 2, x2 = 2)) rush$push_tasks(xss) task_2 = rush$pop_task() - rush$push_failed(task_2$key, condition = list(list(message = "error"))) + rush$push_failed(task_2$key, conditions = list(list(message = "error"))) tab = rush$fetch_tasks_with_state() expect_data_table(tab, nrows = 2) expect_equal(tab$state, c("finished", "failed")) @@ -1046,7 +1055,7 @@ test_that("task in states works", { xss = list(list(x1 = 2, x2 = 2)) keys = rush$push_tasks(xss) task_2 = rush$pop_task() - rush$push_failed(task_2$key, condition = list(list(message = "error"))) + rush$push_failed(task_2$key, conditions = list(list(message = "error"))) keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed")) expect_list(keys_list, len = 4) expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed")) diff --git a/tests/testthat/test-rush_plan.R b/tests/testthat/test-rush_plan.R index 934436a..317133f 100644 --- a/tests/testthat/test-rush_plan.R +++ b/tests/testthat/test-rush_plan.R @@ -36,7 +36,10 @@ test_that("start workers", { }) test_that("set threshold", { - + lg_rush = lgr::get_logger("rush") + old_threshold_rush = lg_rush$threshold + on.exit(lg_rush$set_threshold(old_threshold_rush)) + lg_rush$set_threshold("debug") config = start_flush_redis() rush_plan(n_workers = 2, config, lgr_thresholds = c(rush = "debug")) @@ -52,8 +55,6 @@ test_that("set threshold", { }) test_that("set start worker timeout", { - - config = start_flush_redis() rush_plan(n_workers = 2, config, start_worker_timeout = -Inf) @@ -62,4 +63,6 @@ test_that("set start worker timeout", { rush = rsh("test-rush") fun = function(x1, x2, ...) list(y = x1 + x2) expect_error(rush$start_workers(fun = fun), "Timeout waiting") + + expect_rush_reset(rush) })