diff --git a/R/Rush.R b/R/Rush.R index 17e4619..7263b83 100644 --- a/R/Rush.R +++ b/R/Rush.R @@ -107,7 +107,6 @@ #' @template param_seed #' @template param_data_format #' -#' #' @export Rush = R6::R6Class("Rush", public = list( @@ -731,72 +730,6 @@ Rush = R6::R6Class("Rush", return(invisible(self)) }, - #' @description - #' Fetch latest results from the data base. - #' - #' @param fields (`character()`)\cr - #' Fields to be read from the hashes. - #' - #' @return `data.table()`\cr - #' Latest results. - fetch_latest_results = function(fields = "ys", data_format = "data.table") { - assert_character(fields) - assert_choice(data_format, c("data.table", "list")) - r = self$connector - - # return empty data.table or list if all results are fetched - if (self$n_finished_tasks == private$.n_seen_results) { - data = if (data_format == "list") list() else data.table() - return(data) - } - keys = r$command(c("LRANGE", private$.get_key("finished_tasks"), private$.n_seen_results, -1)) - - # increase seen results counter - private$.n_seen_results = private$.n_seen_results + length(keys) - - # read results from hashes - data = self$read_hashes(keys, "ys") - if (data_format == "list") return(set_names(data, keys)) - rbindlist(data, use.names = TRUE, fill = TRUE) - }, - - #' @description - #' Block process until a new result is available. - #' Returns latest results or `NULL` if no result is available after `timeout` seconds. - #' - #' @param fields (`character()`)\cr - #' Fields to be read from the hashes. - #' @param timeout (`numeric(1)`)\cr - #' Time to wait for a result in seconds. - #' - #' @return `data.table()`\cr - #' Latest results. - wait_for_latest_results = function(fields = "ys", timeout = Inf, data_format = "data.table") { - start_time = Sys.time() - while(start_time + timeout > Sys.time()) { - latest_results = self$fetch_latest_results(fields, data_format = data_format) - if (length(latest_results)) break - Sys.sleep(0.01) - } - latest_results - }, - - #' @description - #' Fetch results from the data base. - #' Results are cached. - #' - #' @param fields (`character()`)\cr - #' Fields to be read from the hashes. - #' Defaults to `"ys"`. - #' @param reset_cache (`logical(1)`)\cr - #' Whether to reset the cache. - #' - #' @return `data.table()` - #' Results. - fetch_results = function(fields = "ys", reset_cache = FALSE, data_format = "data.table") { - private$.fetch_cached(fields, cache = ".cached_results", data_format, reset_cache) - }, - #' @description #' Fetch queued tasks from the data base. #' @@ -808,7 +741,7 @@ Rush = R6::R6Class("Rush", #' Table of queued tasks. fetch_queued_tasks = function(fields = c("xs", "xs_extra"), data_format = "data.table") { keys = self$queued_tasks - private$.fetch_default(keys, fields, data_format) + private$.fetch_tasks(keys, fields, data_format) }, #' @description @@ -832,7 +765,7 @@ Rush = R6::R6Class("Rush", } keys = unlist(r$pipeline(.commands = cmds)) - private$.fetch_default(keys, fields, data_format) + private$.fetch_tasks(keys, fields, data_format) }, #' @description @@ -846,7 +779,7 @@ Rush = R6::R6Class("Rush", #' Table of running tasks. fetch_running_tasks = function(fields = c("xs", "xs_extra", "worker_extra"), data_format = "data.table") { keys = self$running_tasks - private$.fetch_default(keys, fields, data_format) + private$.fetch_tasks(keys, fields, data_format) }, #' @description @@ -861,9 +794,12 @@ Rush = R6::R6Class("Rush", #' #' @return `data.table()`\cr #' Table of finished tasks. - fetch_finished_tasks = function(fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), reset_cache = FALSE, data_format = "data.table") { - lg$debug("Fetching finished tasks") - private$.fetch_cached(fields, cache = ".cached_tasks", data_format, reset_cache) + fetch_finished_tasks = function(fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), reset_cache = FALSE, data_format = "data.table") { + keys = if (self$n_finished_tasks > length(private$.cached_tasks)) { + r = self$connector + r$command(c("LRANGE", private$.get_key("finished_tasks"), length(private$.cached_tasks), -1)) + } + private$.fetch_cached_tasks(keys, fields, reset_cache, data_format) }, #' @description @@ -878,9 +814,16 @@ Rush = R6::R6Class("Rush", #' #' @return `data.table()`\cr #' Table of finished tasks. - wait_for_finished_tasks = function(fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), timeout = Inf, data_format = "data.table") { + wait_for_finished_tasks = function( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), + timeout = Inf, + data_format = "data.table" + ) { + assert_number(timeout, lower = 0) start_time = Sys.time() + lg$debug("Wait for new tasks for at least %s seconds", as.character(timeout)) + while(start_time + timeout > Sys.time()) { if (self$n_finished_tasks > length(private$.cached_tasks)) { return(self$fetch_finished_tasks(fields, data_format = data_format)) @@ -890,41 +833,69 @@ Rush = R6::R6Class("Rush", NULL }, - fetch_active_tasks = function(fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), data_format = "data.table") { + #' @description + #' Fetch finished tasks from the data base that finished after the last fetch. + #' Updates the cache of the finished tasks. + #' + #' @param fields (`character()`)\cr + #' Fields to be read from the hashes. + #' + #' @return `data.table()`\cr + #' Latest results. + fetch_new_tasks = function( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), + data_format = "data.table" + ) { + assert_character(fields) + assert_choice(data_format, c("data.table", "list")) r = self$connector + start_time = Sys.time() - lg$debug("Fetching active tasks") - lg$debug("Reading %i cached task(s)", length(private$.cached_tasks)) + # return empty data.table or list if all results are fetched + n_new_results = self$n_finished_tasks - private$.n_seen_results + if (!n_new_results) { + data = if (data_format == "list") list() else data.table() + return(data) + } - if (self$n_finished_tasks > length(private$.cached_tasks)) { + # increase seen results counter + private$.n_seen_results = private$.n_seen_results + n_new_results + + # fetch finished tasks + data = self$fetch_finished_tasks(fields, data_format = data_format) + tail(data, n_new_results) + }, - # get keys in atomic operation - r$MULTI() - r$LRANGE(private$.get_key("queued_tasks"), 0, -1) - r$SMEMBERS(private$.get_key("running_tasks")) - r$LRANGE(private$.get_key("finished_tasks"), length(private$.cached_tasks), -1) - keys = r$EXEC() + #' @description + #' Block process until a new finished task is available. + #' Returns new tasks or `NULL` if no new task is available after `timeout` seconds. + #' + #' @param fields (`character()`)\cr + #' Fields to be read from the hashes. + #' Defaults to `c("xs", "xs_extra", "worker_extra", "ys", "ys_extra")`. + #' @param timeout (`numeric(1)`)\cr + #' Time to wait for new result in seconds. + #' + #' @return `data.table() | list()`. + wait_for_new_tasks = function( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), + timeout = Inf, + data_format = "data.table" + ) { + assert_number(timeout, lower = 0) + start_time = Sys.time() - lg$debug("Caching %i new task(s)", length(keys[[3]])) + lg$debug("Wait for new tasks for at least %s seconds", as.character(timeout)) - # bind new results to cached results - data_finished = set_names(self$read_hashes(keys[[3]], fields), keys[[3]]) - private$.cached_tasks = c(private$.cached_tasks, data_finished ) - } else { - r$MULTI() - r$LRANGE(private$.get_key("queued_tasks"), 0, -1) - r$SMEMBERS(private$.get_key("running_tasks")) - keys = r$EXEC() + while(start_time + timeout > Sys.time()) { + n_new_results = self$n_finished_tasks - private$.n_seen_results + if (n_new_results) { + return(self$fetch_new_tasks(fields, data_format = data_format)) + } + Sys.sleep(0.01) } - data_queued = private$.fetch_default(keys[[1]], fields, data_format) - data_running = private$.fetch_default(keys[[2]], fields, data_format) - data_finished = private$.cached_tasks - - if (data_format == "list") return(c(data_queued, data_running, data_finished)) - data_finished = rbindlist(private$.cached_tasks, use.names = TRUE, fill = TRUE) - if (nrow(data_finished)) set(data_finished, j = "keys", value = names(private$.cached_tasks)) - data = rbindlist(list(data_queued, data_running, data_finished), use.names = TRUE, fill = TRUE) - data[] + if (data_format == "list") return(NULL) + data.table() }, #' @description @@ -938,7 +909,7 @@ Rush = R6::R6Class("Rush", #' Table of failed tasks. fetch_failed_tasks = function(fields = c("xs", "worker_extra", "condition"), data_format = "data.table") { keys = self$failed_tasks - private$.fetch_default(keys, fields, data_format) + private$.fetch_tasks(keys, fields, data_format) }, #' @description @@ -950,9 +921,46 @@ Rush = R6::R6Class("Rush", #' #' @return `data.table()`\cr #' Table of all tasks. - fetch_tasks = function(fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition", "state"), data_format = "data.table") { + fetch_tasks = function(fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), data_format = "data.table") { keys = self$tasks - private$.fetch_default(keys, fields, data_format) + private$.fetch_tasks(keys, fields, data_format) + }, + + #' @description + #' Fetch tasks with different states from the data base. + #' If tasks with different states are to be queried at the same time, this function prevents tasks from appearing twice. + #' This could be the case if a worker changes the state of a task while the tasks are being fetched. + #' + #' @param fields (`character()`)\cr + #' Fields to be read from the hashes. + #' Defaults to `c("xs", "ys", "xs_extra", "worker_extra", "ys_extra")`. + #' @param states (`character()`)\cr + #' States of the tasks to be fetched. + #' Defaults to `c("queued", "running", "finished", "failed")`. + #' @param reset_cache (`logical(1)`)\cr + #' Whether to reset the cache of the finished tasks. + fetch_tasks_with_state = function( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), + states = c("queued", "running", "finished", "failed"), + reset_cache = FALSE, + data_format = "data.table" + ) { + r = self$connector + assert_character(states) + + all_keys = private$.tasks_with_state(states, only_new_keys = TRUE) + + data = imap(all_keys, function(keys, state) { + if (state == "finished") { + private$.fetch_cached_tasks(keys, fields, reset_cache, data_format) + } else { + private$.fetch_tasks(keys, fields, data_format) + } + }) + + if (data_format == "list") return(data) + data = rbindlist(data, use.names = TRUE, fill = TRUE, idcol = "state") + data[] }, #' @description @@ -968,6 +976,8 @@ Rush = R6::R6Class("Rush", assert_character(keys, min.len = 1) assert_flag(detect_lost_workers) + lg$debug("Wait for %i task(s)", length(keys)) + while (any(keys %nin% c(self$finished_tasks, self$failed_tasks)) && self$n_running_workers > 0) { if (detect_lost_workers) self$detect_lost_workers() Sys.sleep(0.01) @@ -1121,6 +1131,19 @@ Rush = R6::R6Class("Rush", r = self$connector if (!length(keys)) return(logical(0)) as.logical(r$command(c("SMISMEMBER", private$.get_key("failed_tasks"), keys))) + }, + + #' @description + #' Returns keys of requested states. + #' + #' @param states (`character()`)\cr + #' States of the tasks. + #' + #' @return (Named list of `character()`). + tasks_with_state = function(states) { + r = self$connector + assert_subset(states, c("queued", "running", "finished", "failed")) + private$.tasks_with_state(states) } ), @@ -1321,11 +1344,9 @@ Rush = R6::R6Class("Rush", } ), - private = list( - - # cache of the finished tasks and results - .cached_results = list(), + private = list( + # cache for finished tasks .cached_tasks = list(), # counter of the seen results for the latest results methods @@ -1337,6 +1358,7 @@ Rush = R6::R6Class("Rush", .seed = NULL, + # counter for printed logs # zero based .log_counter = list(), @@ -1412,47 +1434,68 @@ Rush = R6::R6Class("Rush", r$command(list("SET", private$.get_key("start_args"), bin_start_args)) }, + # get task keys + # finished tasks keys can be restricted to uncached tasks + .tasks_with_state = function(states, only_new_keys = FALSE) { + r = self$connector + start_finished_tasks = if (only_new_keys) length(private$.cached_tasks) else 0 + r$MULTI() + if ("queued" %in% states) r$LRANGE(private$.get_key("queued_tasks"), 0, -1) + if ("running" %in% states) r$SMEMBERS(private$.get_key("running_tasks")) + if ("finished" %in% states) r$LRANGE(private$.get_key("finished_tasks"), start_finished_tasks, -1) + if ("failed" %in% states) r$SMEMBERS(private$.get_key("failed_tasks")) + keys = r$EXEC() + keys = map(keys, unlist) + set_names(keys, states) + }, + # fetch tasks - .fetch_default = function(keys, fields, data_format = "data.table") { + .fetch_tasks = function(keys, fields, data_format = "data.table") { r = self$connector assert_character(fields) assert_choice(data_format, c("data.table", "list")) - if (is.null(keys) || !length(keys)) { + if (!length(keys)) { data = if (data_format == "list") list() else data.table() return(data) } data = self$read_hashes(keys, fields) + + lg$debug("Fetching %i task(s)", length(data)) + if (data_format == "list") return(set_names(data, keys)) - data = rbindlist(data, use.names = TRUE, fill = TRUE) - data[, keys := unlist(keys)] - data[] + tab = rbindlist(data, use.names = TRUE, fill = TRUE) + tab[, keys := unlist(keys)] + tab[] }, + # fetch and cache tasks - .fetch_cached = function(fields, cache, data_format = "data.table", reset_cache = FALSE) { + .fetch_cached_tasks = function(new_keys, fields, reset_cache = FALSE, data_format = "data.table") { r = self$connector - if (reset_cache) private[[cache]] = list() + assert_flag(reset_cache) + assert_choice(data_format, c("data.table", "list")) - lg$debug("Reading %i cached task(s)", length(private[[cache]])) + if (reset_cache) private$.cached_tasks = list() - if (self$n_finished_tasks > length(private[[cache]])) { + lg$debug("Reading %i cached task(s)", length(private$.cached_tasks)) - # get keys of new results - keys = r$command(c("LRANGE", private$.get_key("finished_tasks"), length(private[[cache]]), -1)) + if (length(new_keys)) { - lg$debug("Caching %i new task(s)", length(keys)) + lg$debug("Caching %i new task(s)", length(new_keys)) # bind new results to cached results - data = set_names(self$read_hashes(keys, fields), keys) - private[[cache]] = c(private[[cache]], data) + data = set_names(self$read_hashes(new_keys, fields), new_keys) + private$.cached_tasks = c(private$.cached_tasks, data) } - if (data_format == "list") return(private[[cache]]) - data = rbindlist(private[[cache]], use.names = TRUE, fill = TRUE) - if (nrow(data)) data[, keys := names(private[[cache]])] - data[] + lg$debug("Fetching %i task(s)", length(private$.cached_tasks)) + + if (data_format == "list") return(private$.cached_tasks) + tab = rbindlist(private$.cached_tasks, use.names = TRUE, fill = TRUE) + if (nrow(tab)) tab[, keys := names(private$.cached_tasks)] + tab[] } ) ) diff --git a/man/Rush.Rd b/man/Rush.Rd index 8547517..aaf6fab 100644 --- a/man/Rush.Rd +++ b/man/Rush.Rd @@ -191,9 +191,6 @@ Number of failed tasks.} \item{\code{n_tasks}}{(\code{integer(1)})\cr Number of all tasks.} -\item{\code{data}}{(\link[data.table:data.table]{data.table::data.table})\cr -Contains all performed function calls.} - \item{\code{worker_info}}{(\code{\link[data.table:data.table]{data.table::data.table()}})\cr Contains information about the workers.} @@ -209,6 +206,9 @@ For example, \code{c(60, 1000)} saves the data base every 60 seconds if there ar Overwrites the redis configuration file. Set to \code{NULL} to disable snapshots. For more details see \href{https://redis.io/docs/management/persistence/#snapshotting}{redis.io}.} + +\item{\code{redis_info}}{(\code{list()})\cr +Information about the Redis server.} } \if{html}{\out{}} } @@ -231,22 +231,23 @@ For more details see \href{https://redis.io/docs/management/persistence/#snapsho \item \href{#method-Rush-push_priority_tasks}{\code{Rush$push_priority_tasks()}} \item \href{#method-Rush-push_failed}{\code{Rush$push_failed()}} \item \href{#method-Rush-retry_tasks}{\code{Rush$retry_tasks()}} -\item \href{#method-Rush-fetch_latest_results}{\code{Rush$fetch_latest_results()}} -\item \href{#method-Rush-wait_for_latest_results}{\code{Rush$wait_for_latest_results()}} -\item \href{#method-Rush-fetch_results}{\code{Rush$fetch_results()}} \item \href{#method-Rush-fetch_queued_tasks}{\code{Rush$fetch_queued_tasks()}} \item \href{#method-Rush-fetch_priority_tasks}{\code{Rush$fetch_priority_tasks()}} \item \href{#method-Rush-fetch_running_tasks}{\code{Rush$fetch_running_tasks()}} \item \href{#method-Rush-fetch_finished_tasks}{\code{Rush$fetch_finished_tasks()}} \item \href{#method-Rush-wait_for_finished_tasks}{\code{Rush$wait_for_finished_tasks()}} +\item \href{#method-Rush-fetch_new_tasks}{\code{Rush$fetch_new_tasks()}} +\item \href{#method-Rush-wait_for_new_tasks}{\code{Rush$wait_for_new_tasks()}} \item \href{#method-Rush-fetch_failed_tasks}{\code{Rush$fetch_failed_tasks()}} \item \href{#method-Rush-fetch_tasks}{\code{Rush$fetch_tasks()}} +\item \href{#method-Rush-fetch_tasks_with_state}{\code{Rush$fetch_tasks_with_state()}} \item \href{#method-Rush-wait_for_tasks}{\code{Rush$wait_for_tasks()}} \item \href{#method-Rush-write_hashes}{\code{Rush$write_hashes()}} \item \href{#method-Rush-read_hashes}{\code{Rush$read_hashes()}} \item \href{#method-Rush-read_hash}{\code{Rush$read_hash()}} \item \href{#method-Rush-is_running_task}{\code{Rush$is_running_task()}} \item \href{#method-Rush-is_failed_task}{\code{Rush$is_failed_task()}} +\item \href{#method-Rush-tasks_with_state}{\code{Rush$tasks_with_state()}} \item \href{#method-Rush-clone}{\code{Rush$clone()}} } } @@ -706,104 +707,6 @@ Whether to change the seed of the task.} } } \if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-Rush-fetch_latest_results}{}}} -\subsection{Method \code{fetch_latest_results()}}{ -Fetch latest results from the data base. -\subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$fetch_latest_results(fields = "ys", data_format = "data.table")}\if{html}{\out{
}} -} - -\subsection{Arguments}{ -\if{html}{\out{
}} -\describe{ -\item{\code{fields}}{(\code{character()})\cr -Fields to be read from the hashes.} - -\item{\code{data_format}}{(\code{character()})\cr -Returned data format. -Choose \code{"data.table"} or "list". -The default is \code{"data.table"} but \code{"list"} is easier when list columns are present.} -} -\if{html}{\out{
}} -} -\subsection{Returns}{ -\code{data.table()}\cr -Latest results. -} -} -\if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-Rush-wait_for_latest_results}{}}} -\subsection{Method \code{wait_for_latest_results()}}{ -Block process until a new result is available. -Returns latest results or \code{NULL} if no result is available after \code{timeout} seconds. -\subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$wait_for_latest_results( - fields = "ys", - timeout = Inf, - data_format = "data.table" -)}\if{html}{\out{
}} -} - -\subsection{Arguments}{ -\if{html}{\out{
}} -\describe{ -\item{\code{fields}}{(\code{character()})\cr -Fields to be read from the hashes.} - -\item{\code{timeout}}{(\code{numeric(1)})\cr -Time to wait for a result in seconds.} - -\item{\code{data_format}}{(\code{character()})\cr -Returned data format. -Choose \code{"data.table"} or "list". -The default is \code{"data.table"} but \code{"list"} is easier when list columns are present.} -} -\if{html}{\out{
}} -} -\subsection{Returns}{ -\code{data.table()}\cr -Latest results. -} -} -\if{html}{\out{
}} -\if{html}{\out{}} -\if{latex}{\out{\hypertarget{method-Rush-fetch_results}{}}} -\subsection{Method \code{fetch_results()}}{ -Fetch results from the data base. -Results are cached. -\subsection{Usage}{ -\if{html}{\out{
}}\preformatted{Rush$fetch_results( - fields = "ys", - reset_cache = FALSE, - data_format = "data.table" -)}\if{html}{\out{
}} -} - -\subsection{Arguments}{ -\if{html}{\out{
}} -\describe{ -\item{\code{fields}}{(\code{character()})\cr -Fields to be read from the hashes. -Defaults to \code{"ys"}.} - -\item{\code{reset_cache}}{(\code{logical(1)})\cr -Whether to reset the cache.} - -\item{\code{data_format}}{(\code{character()})\cr -Returned data format. -Choose \code{"data.table"} or "list". -The default is \code{"data.table"} but \code{"list"} is easier when list columns are present.} -} -\if{html}{\out{
}} -} -\subsection{Returns}{ -\code{data.table()} -Results. -} -} -\if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-fetch_queued_tasks}{}}} \subsection{Method \code{fetch_queued_tasks()}}{ @@ -904,7 +807,7 @@ Fetch finished tasks from the data base. Finished tasks are cached. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_finished_tasks( - fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra"), + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), reset_cache = FALSE, data_format = "data.table" )}\if{html}{\out{
}} @@ -969,6 +872,72 @@ Table of finished tasks. } } \if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-fetch_new_tasks}{}}} +\subsection{Method \code{fetch_new_tasks()}}{ +Fetch finished tasks from the data base that finished after the last fetch. +Updates the cache of the finished tasks. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$fetch_new_tasks( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), + data_format = "data.table" +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{fields}}{(\code{character()})\cr +Fields to be read from the hashes.} + +\item{\code{data_format}}{(\code{character()})\cr +Returned data format. +Choose \code{"data.table"} or "list". +The default is \code{"data.table"} but \code{"list"} is easier when list columns are present.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\code{data.table()}\cr +Latest results. +} +} +\if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-wait_for_new_tasks}{}}} +\subsection{Method \code{wait_for_new_tasks()}}{ +Block process until a new finished task is available. +Returns new tasks or \code{NULL} if no new task is available after \code{timeout} seconds. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$wait_for_new_tasks( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), + timeout = Inf, + data_format = "data.table" +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{fields}}{(\code{character()})\cr +Fields to be read from the hashes. +Defaults to \code{c("xs", "xs_extra", "worker_extra", "ys", "ys_extra")}.} + +\item{\code{timeout}}{(\code{numeric(1)})\cr +Time to wait for new result in seconds.} + +\item{\code{data_format}}{(\code{character()})\cr +Returned data format. +Choose \code{"data.table"} or "list". +The default is \code{"data.table"} but \code{"list"} is easier when list columns are present.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +\code{data.table() | list()}. +} +} +\if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-fetch_failed_tasks}{}}} \subsection{Method \code{fetch_failed_tasks()}}{ @@ -1006,7 +975,7 @@ Table of failed tasks. Fetch all tasks from the data base. \subsection{Usage}{ \if{html}{\out{
}}\preformatted{Rush$fetch_tasks( - fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition", "state"), + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), data_format = "data.table" )}\if{html}{\out{
}} } @@ -1031,6 +1000,44 @@ Table of all tasks. } } \if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-fetch_tasks_with_state}{}}} +\subsection{Method \code{fetch_tasks_with_state()}}{ +Fetch tasks with different states from the data base. +If tasks with different states are to be queried at the same time, this function prevents tasks from appearing twice. +This could be the case if a worker changes the state of a task while the tasks are being fetched. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$fetch_tasks_with_state( + fields = c("xs", "ys", "xs_extra", "worker_extra", "ys_extra", "condition"), + states = c("queued", "running", "finished", "failed"), + reset_cache = FALSE, + data_format = "data.table" +)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{fields}}{(\code{character()})\cr +Fields to be read from the hashes. +Defaults to \code{c("xs", "ys", "xs_extra", "worker_extra", "ys_extra")}.} + +\item{\code{states}}{(\code{character()})\cr +States of the tasks to be fetched. +Defaults to \code{c("queued", "running", "finished", "failed")}.} + +\item{\code{reset_cache}}{(\code{logical(1)})\cr +Whether to reset the cache of the finished tasks.} + +\item{\code{data_format}}{(\code{character()})\cr +Returned data format. +Choose \code{"data.table"} or "list". +The default is \code{"data.table"} but \code{"list"} is easier when list columns are present.} +} +\if{html}{\out{
}} +} +} +\if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-wait_for_tasks}{}}} \subsection{Method \code{wait_for_tasks()}}{ @@ -1189,6 +1196,27 @@ Keys of the tasks.} } } \if{html}{\out{
}} +\if{html}{\out{}} +\if{latex}{\out{\hypertarget{method-Rush-tasks_with_state}{}}} +\subsection{Method \code{tasks_with_state()}}{ +Returns keys of requested states. +\subsection{Usage}{ +\if{html}{\out{
}}\preformatted{Rush$tasks_with_state(states)}\if{html}{\out{
}} +} + +\subsection{Arguments}{ +\if{html}{\out{
}} +\describe{ +\item{\code{states}}{(\code{character()})\cr +States of the tasks.} +} +\if{html}{\out{
}} +} +\subsection{Returns}{ +(Named list of \code{character()}). +} +} +\if{html}{\out{
}} \if{html}{\out{}} \if{latex}{\out{\hypertarget{method-Rush-clone}{}}} \subsection{Method \code{clone()}}{ diff --git a/man/RushWorker.Rd b/man/RushWorker.Rd index 4809f3b..0bd7942 100644 --- a/man/RushWorker.Rd +++ b/man/RushWorker.Rd @@ -58,12 +58,12 @@ Used in the worker loop to determine whether to continue.}
  • rush::Rush$detect_lost_workers()
  • rush::Rush$fetch_failed_tasks()
  • rush::Rush$fetch_finished_tasks()
  • -
  • rush::Rush$fetch_latest_results()
  • +
  • rush::Rush$fetch_new_tasks()
  • rush::Rush$fetch_priority_tasks()
  • rush::Rush$fetch_queued_tasks()
  • -
  • rush::Rush$fetch_results()
  • rush::Rush$fetch_running_tasks()
  • rush::Rush$fetch_tasks()
  • +
  • rush::Rush$fetch_tasks_with_state()
  • rush::Rush$format()
  • rush::Rush$is_failed_task()
  • rush::Rush$is_running_task()
  • @@ -80,8 +80,9 @@ Used in the worker loop to determine whether to continue.}
  • rush::Rush$retry_tasks()
  • rush::Rush$start_workers()
  • rush::Rush$stop_workers()
  • +
  • rush::Rush$tasks_with_state()
  • rush::Rush$wait_for_finished_tasks()
  • -
  • rush::Rush$wait_for_latest_results()
  • +
  • rush::Rush$wait_for_new_tasks()
  • rush::Rush$wait_for_tasks()
  • rush::Rush$wait_for_workers()
  • rush::Rush$write_hashes()
  • diff --git a/tests/testthat/_snaps/Rush.md b/tests/testthat/_snaps/Rush.md index d045ba2..c2214ac 100644 --- a/tests/testthat/_snaps/Rush.md +++ b/tests/testthat/_snaps/Rush.md @@ -3,6 +3,8 @@ Code rush$create_worker_script(fun = fun) Output + DEBUG (500): [rush] Pushing worker config to Redis + DEBUG (500): [rush] Serializing worker configuration to 2384528 bytes INFO (400): [rush] Start worker with: INFO (400): [rush] Rscript -e 'rush::start_worker(network_id = 'test-rush', hostname = 'host', url = 'redis://127.0.0.1:6379')' INFO (400): [rush] See ?rush::start_worker for more details. diff --git a/tests/testthat/helper.R b/tests/testthat/helper.R index 185401d..93a7e56 100644 --- a/tests/testthat/helper.R +++ b/tests/testthat/helper.R @@ -18,4 +18,4 @@ expect_rush_reset = function(rush, type = "kill") { walk(processes, function(p) p$kill()) } -#lg$set_threshold(0) +lg$set_threshold("debug") diff --git a/tests/testthat/test-Rush.R b/tests/testthat/test-Rush.R index fa1f792..06078fe 100644 --- a/tests/testthat/test-Rush.R +++ b/tests/testthat/test-Rush.R @@ -150,6 +150,7 @@ test_that("named globals are available on the worker", { test_that("worker can be started with script", { skip_on_cran() skip_on_ci() + skip_if(TRUE) set.seed(1) # make log messages reproducible root_logger = lgr::get_logger("root") @@ -773,7 +774,7 @@ test_that("blocking on new results works", { config = start_flush_redis() rush = Rush$new(network_id = "test-rush", config = config) - fun = function(x1, x2, ...) { + fun = function(x1, x2, ...) { Sys.sleep(5) list(y = x1 + x2) } @@ -781,9 +782,9 @@ test_that("blocking on new results works", { xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) - expect_data_table(rush$wait_for_latest_results(timeout = 1), nrows = 0) - expect_data_table(rush$wait_for_latest_results(timeout = 10), nrows = 1) - expect_data_table(rush$wait_for_latest_results(timeout = 1), nrows = 0) + expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0) + expect_data_table(rush$wait_for_new_tasks(timeout = 10), nrows = 1) + expect_data_table(rush$wait_for_new_tasks(timeout = 1), nrows = 0) expect_rush_reset(rush) }) @@ -1032,6 +1033,6 @@ test_that("redis info works", { skip_on_ci() config = start_flush_redis() - rush = Rush$new(network_id = "test-rush", config = config, seed = 123) + rush = Rush$new(network_id = "test-rush", config = config) expect_list(rush$redis_info) }) diff --git a/tests/testthat/test-RushWorker.R b/tests/testthat/test-RushWorker.R index 6f8dcab..4da51ba 100644 --- a/tests/testthat/test-RushWorker.R +++ b/tests/testthat/test-RushWorker.R @@ -673,35 +673,23 @@ test_that("fetching as list works", { expect_list(finished_tasks, len = 1) expect_names(names(finished_tasks), permutation.of = task_1$key) - results = rush$fetch_results(data_format = "list") - expect_list(results, len = 1) - expect_names(names(results), permutation.of = task_1$key) - rush$push_results(task_2$key, list(list(y = 3))) finished_tasks = rush$fetch_finished_tasks(data_format = "list") expect_list(finished_tasks, len = 2) expect_names(names(finished_tasks), permutation.of = c(task_1$key, task_2$key)) - results = rush$fetch_results(data_format = "list") - expect_list(results, len = 2) - expect_names(names(results), permutation.of = c(task_1$key, task_2$key)) - expect_null(rush$wait_for_finished_tasks(timeout = 0.1, data_format = "list")) - latest_results = rush$fetch_latest_results(data_format = "list") + latest_results = rush$fetch_new_tasks(data_format = "list") expect_list(latest_results, len = 2) expect_names(names(latest_results), permutation.of = c(task_1$key, task_2$key)) task_3 = rush$pop_task() rush$push_results(task_3$key, list(list(y = 3))) - latest_results = rush$wait_for_latest_results(data_format = "list") + latest_results = rush$wait_for_new_tasks(data_format = "list") expect_list(latest_results, len = 1) expect_names(names(latest_results), permutation.of = task_3$key) - finished_tasks = rush$wait_for_finished_tasks(data_format = "list") - expect_list(finished_tasks, len = 3) - expect_names(names(finished_tasks), permutation.of = c(task_1$key, task_2$key, task_3$key)) - # push failed task task = rush$pop_task() rush$push_failed(task$key, condition = list(list(message = "error"))) @@ -712,6 +700,64 @@ test_that("fetching as list works", { expect_rush_reset(rush, type = "terminate") }) +test_that("fetch task with states works", { + skip_on_cran() + skip_on_ci() + + config = start_flush_redis() + rush = RushWorker$new(network_id = "test-rush", config = config, host = "local", seed = 123) + xss = list(list(x1 = 1, x2 = 2)) + keys = rush$push_tasks(xss) + + # queued + expect_equal(rush$n_queued_tasks, 1) + expect_data_table(rush$fetch_tasks_with_state(states = "finished"), nrows = 0) + expect_data_table(rush$fetch_tasks_with_state(states = c("running", "finished")), nrows = 0) + tab = rush$fetch_tasks_with_state(states = "queued") + expect_data_table(tab, nrows = 1) + expect_names(names(tab), must.include = "state") + + expect_list(rush$fetch_tasks_with_state(states = "finished", data_format = "list"), len = 1) + expect_list(rush$fetch_tasks_with_state(states = c("running", "finished"), data_format = "list"), len = 2) + l = rush$fetch_tasks_with_state(states = "queued", data_format = "list") + expect_list(l, len = 1) + expect_list(l$queued, len = 1) + + # running + task = rush$pop_task(fields = c("xs", "seed")) + tab = rush$fetch_tasks_with_state() + expect_data_table(tab, nrows = 1) + expect_equal(tab$state, "running") + + l = rush$fetch_tasks_with_state(data_format = "list") + expect_list(l, len = 4) + expect_list(l$running, len = 1) + + # finished + rush$push_results(task$key, list(list(y = 3))) + tab = rush$fetch_tasks_with_state() + expect_data_table(tab, nrows = 1) + expect_equal(tab$state, "finished") + + l = rush$fetch_tasks_with_state(data_format = "list") + expect_list(l, len = 4) + expect_list(l$finished, len = 1) + + # failed + xss = list(list(x1 = 2, x2 = 2)) + rush$push_tasks(xss) + task_2 = rush$pop_task() + rush$push_failed(task_2$key, condition = list(list(message = "error"))) + tab = rush$fetch_tasks_with_state() + expect_data_table(tab, nrows = 2) + expect_equal(tab$state, c("finished", "failed")) + + l = rush$fetch_tasks_with_state(data_format = "list") + expect_list(l, len = 4) + expect_list(l$finished, len = 1) + expect_list(l$failed, len = 1) +}) + test_that("latest results are fetched", { skip_on_cran() skip_on_ci() @@ -724,20 +770,20 @@ test_that("latest results are fetched", { task = rush$pop_task() rush$push_results(task$key, list(list(y = 3))) - latest_results = rush$fetch_latest_results() + latest_results = rush$fetch_new_tasks() expect_data_table(latest_results, nrows = 1) expect_set_equal(latest_results$y, 3) - expect_data_table(rush$fetch_latest_results(), nrows = 0) + expect_data_table(rush$fetch_new_tasks(), nrows = 0) # add 1 task keys = rush$push_tasks(list(list(x1 = 1, x2 = 3))) task = rush$pop_task() rush$push_results(task$key, list(list(y = 4))) - latest_results = rush$fetch_latest_results() + latest_results = rush$fetch_new_tasks() expect_data_table(latest_results, nrows = 1) expect_set_equal(latest_results$y, 4) - expect_data_table(rush$fetch_latest_results(), nrows = 0) + expect_data_table(rush$fetch_new_tasks(), nrows = 0) # add 2 tasks keys = rush$push_tasks(list(list(x1 = 1, x2 = 4))) @@ -747,10 +793,10 @@ test_that("latest results are fetched", { task = rush$pop_task() rush$push_results(task$key, list(list(y = 6))) - latest_results = rush$fetch_latest_results() + latest_results = rush$fetch_new_tasks() expect_data_table(latest_results, nrows = 2) expect_set_equal(latest_results$y, c(5, 6)) - expect_data_table(rush$fetch_latest_results(), nrows = 0) + expect_data_table(rush$fetch_new_tasks(), nrows = 0) expect_rush_reset(rush, type = "terminate") }) @@ -986,7 +1032,10 @@ test_that("popping a task with seed from the queue works", { expect_rush_reset(rush, type = "terminate") }) -test_that("fetch active tasks works", { +# atomic operations ----------------------------------------------------------- + + +test_that("task in states works", { skip_on_cran() skip_on_ci() @@ -995,23 +1044,59 @@ test_that("fetch active tasks works", { xss = list(list(x1 = 1, x2 = 2)) keys = rush$push_tasks(xss) - expect_data_table(rush$fetch_active_tasks(), nrows = 1) - expect_equal(rush$n_queued_tasks, 1) + keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed")) + expect_list(keys_list, len = 4) + expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed")) + expect_equal(keys_list$queued, keys) + expect_null(keys_list$running) + expect_null(keys_list$finished) + expect_null(keys_list$failed) - task = rush$pop_task(fields = c("xs", "seed")) - expect_data_table(rush$fetch_active_tasks(), nrows = 1) - expect_equal(rush$n_queued_tasks, 0) - expect_equal(rush$n_running_tasks, 1) - - xss = list(list(x1 = 2, x2 = 2)) - rush$push_tasks(xss) - expect_data_table(rush$fetch_active_tasks(), nrows = 2) - expect_equal(rush$n_queued_tasks, 1) - expect_equal(rush$n_running_tasks, 1) + task = rush$pop_task() + keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed")) + expect_list(keys_list, len = 4) + expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed")) + expect_equal(keys_list$running, keys) + expect_null(keys_list$queued) + expect_null(keys_list$finished) + expect_null(keys_list$failed) rush$push_results(task$key, list(list(y = 3))) - expect_data_table(rush$fetch_active_tasks(), nrows = 2) - expect_equal(rush$n_queued_tasks, 1) - expect_equal(rush$n_running_tasks, 0) - expect_equal(rush$n_finished_tasks, 1) + keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed")) + expect_list(keys_list, len = 4) + expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed")) + expect_null(keys_list$queued) + expect_null(keys_list$running) + expect_equal(keys_list$finished, task$key) + expect_null(keys_list$failed) + + xss = list(list(x1 = 2, x2 = 2)) + keys = rush$push_tasks(xss) + task_2 = rush$pop_task() + rush$push_failed(task_2$key, condition = list(list(message = "error"))) + keys_list = rush$tasks_with_state(c("queued", "running", "finished", "failed")) + expect_list(keys_list, len = 4) + expect_names(names(keys_list), identical.to = c("queued", "running", "finished", "failed")) + expect_null(keys_list$queued) + expect_null(keys_list$running) + expect_equal(keys_list$finished, task$key) + expect_equal(keys_list$failed, task_2$key) + + keys_list = rush$tasks_with_state(c("queued")) + expect_list(keys_list, len = 1) + expect_names(names(keys_list), identical.to = c("queued")) + expect_null(keys_list$queued) + + keys_list = rush$tasks_with_state(c("queued", "running")) + expect_list(keys_list, len = 2) + expect_names(names(keys_list), identical.to = c("queued", "running")) + expect_null(keys_list$queued) + expect_null(keys_list$running) + + keys_list = rush$tasks_with_state(c("queued", "running", "finished")) + expect_list(keys_list, len = 3) + expect_names(names(keys_list), identical.to = c("queued", "running", "finished")) + expect_null(keys_list$queued) + expect_null(keys_list$running) + expect_equal(keys_list$finished, task$key) })