Skip to content

Commit

Permalink
docs: add common parameters to man-roxygen
Browse files Browse the repository at this point in the history
  • Loading branch information
be-marc committed Oct 10, 2023
1 parent f0db60e commit 54b3ed1
Show file tree
Hide file tree
Showing 24 changed files with 255 additions and 198 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -12,5 +12,6 @@ import(future)
import(mlr3misc)
import(redux)
importFrom(callr,r_bg)
importFrom(globals,globalsByName)
importFrom(utils,str)
importFrom(uuid,UUIDgenerate)
126 changes: 59 additions & 67 deletions R/Rush.R
Original file line number Diff line number Diff line change
@@ -1,25 +1,23 @@
#' @title Rush Controller
#'
#' @description
#' The [Rush] class is the controller of the asynchronous parallelization.
#' [Rush] is the controller of the asynchronous parallelization.
#' It manages the workers and the tasks.
#'
#' @section Local Workers:
#' A local worker runs on the same machine as the controller.
#' We recommend to use the `future` package to spawn local workers.
#' The `future` backends `multisession` and `multicore` run workers on the local machine.
#' As many rush workers can be started as there are future workers available.
#' With `future`, crashed workers can be easily identified without a heartbeat process.
#' It is also possible to spawn local workers with a batch script but a heartbeat process is required to check for crashed workers.
#'
#' @section Remote Workers:
#' A remote worker runs on a different machine than the controller.
#' Remote workers can be started with a batch script or with the `future` package.
#' Remote workers can be started with a script or with the `future` package.
#' Only a heartbeat process can kill remote workers.
#' The heartbeat process also monitors the remote workers for crashes.
#'
#' @section Stopping Workers:
#' Local and remote workers can be terminated with the `stop_workers(type = "terminate")` method.
#' Local and remote workers can be terminated with the `$stop_workers(type = "terminate")` method.
#' The workers evaluate the currently running task and then terminate.
#' The option `type = "kill"` stops the workers immediately.
#' Killing a local worker is done with the `tools::pskill()` function.
Expand All @@ -32,33 +30,38 @@
#' @section Error Handling:
#' When evaluating tasks in a distributed system, many things can go wrong.
#'
#' @template param_instance_id
#' @template param_config
#' @template param_worker_loop
#' @template param_globals
#' @template param_packages
#' @template param_host
#' @template param_heartbeat_period
#' @template param_heartbeat_expire
#' @template param_lgr_thresholds
#'
#' @export
Rush = R6::R6Class("Rush",
public = list(

#' @field instance_id (`character(1)`)\cr
#' Identifier of the instance.
#' Identifier of the rush network.
instance_id = NULL,

#' @field config ([redux::redis_config])\cr
#' Redis configuration.
#' Redis configuration options.
config = NULL,

#' @field connector ([redux::redis_api])\cr
#' Connector to redis.
#' Returns a connection to Redis.
connector = NULL,

#' @field promises ([future::Future()])\cr
#' List of futures.
#' List of futures running [run_worker].
promises = NULL,

#' @description
#' Creates a new instance of this [R6][R6::R6Class] class.
#'
#' @param instance_id (`character(1)`)\cr
#' Identifier of the instance.
#' @param config ([redux::redis_config])\cr
#' Redis configuration.
initialize = function(instance_id, config = redux::redis_config()) {
self$instance_id = assert_string(instance_id)
self$config = assert_class(config, "redis_config")
Expand Down Expand Up @@ -91,32 +94,31 @@ Rush = R6::R6Class("Rush",
},

#' @description
#' Start workers.
#' Start workers with the future package.
#' Alternatively, use `$create_worker_script()` to create a script for starting workers.
#'
#' @param worker_loop (`function`)\cr
#' Start a worker loop with the future package.
#' Defaults to [fun_loop].
#' Pass `fun` in `...`.
#' @param n_workers (`integer(1)`)\cr
#' Number of workers to be started.
#' If `NULL` the maximum number of free workers is used.
#' @param globals (`character()`)\cr
#' Global variables to be loaded to the workers global environment.
#' @param packages (`character()`)\cr
#' Packages to be loaded by the workers.
#' @param host (`character(1)`)\cr
#' Local or remote host.
#' @param heartbeat_period (`integer(1)`)\cr
#' Period of the heartbeat in seconds.
#' @param heartbeat_expire (`integer(1)`)\cr
#' Time to live of the heartbeat in seconds.
#' @param lgr_thresholds (named `character()` or `numeric()`)\cr
#' Logger threshold on the workers e.g. `c(rush = "debug")`.
#' @param await_workers (`logical(1)`)\cr
#' Whether to wait until all workers are available.
#' @param ... (`any`)\cr
#' Arguments passed to `worker_loop`.
start_workers = function(worker_loop = fun_loop, n_workers = NULL, globals = NULL, packages = NULL, host = "local", heartbeat_period = NULL, heartbeat_expire = NULL, lgr_thresholds = NULL, await_workers = TRUE, ...) {
#'
#' @return (`character()`)\cr
#' Worker ids.
start_workers = function(
worker_loop = fun_loop,
n_workers = NULL,
globals = NULL,
packages = NULL,
host = "local",
heartbeat_period = NULL,
heartbeat_expire = NULL,
lgr_thresholds = NULL,
await_workers = TRUE,
...) {

assert_character(globals, null.ok = TRUE)
assert_character(packages, null.ok = TRUE)
assert_count(n_workers, positive = TRUE, null.ok = TRUE)
Expand All @@ -126,16 +128,14 @@ Rush = R6::R6Class("Rush",
assert_named(lgr_thresholds)
assert_flag(await_workers)
dots = list(...)
if (!is.null(heartbeat_period)) require_namespaces("callr")
r = self$connector

# check free workers
if (is.null(n_workers)) n_workers = future::nbrOfWorkers()
if (n_workers > future::nbrOfFreeWorkers()) {
stopf("No more than %i rush workers can be started. For starting more workers, change the number of workers in the future plan.", future::nbrOfFreeWorkers())
}

r = self$connector

# start workers
instance_id = self$instance_id
config = self$config
Expand Down Expand Up @@ -167,25 +167,18 @@ Rush = R6::R6Class("Rush",
#' Create script to start workers.
#' The worker is started with [start_worker()].
#'
#' @param worker_loop (`function`)\cr
#' Worker loop.
#' Defaults to [fun_loop].
#' Pass `fun` in `...`.
#' @param globals (`character()`)\cr
#' Global variables to be loaded to the workers global environment.
#' @param packages (`character()`)\cr
#' Packages to be loaded by the workers.
#' @param host (`character(1)`)\cr
#' Local or remote host.
#' @param heartbeat_period (`integer(1)`)\cr
#' Period of the heartbeat in seconds.
#' @param heartbeat_expire (`integer(1)`)\cr
#' Time to live of the heartbeat in seconds.
#' @param lgr_thresholds (named `character()` or `numeric()`)\cr
#' Logger threshold on the workers e.g. `c(rush = "debug")`.
#' @param ... (`any`)\cr
#' Arguments passed to `worker_loop`.
create_worker_script = function(worker_loop = fun_loop, globals = NULL, packages = NULL, host = "local", heartbeat_period = NULL, heartbeat_expire = NULL, lgr_thresholds = NULL, ...) {
create_worker_script = function(
worker_loop = fun_loop,
globals = NULL,
packages = NULL,
host = "local",
heartbeat_period = NULL,
heartbeat_expire = NULL,
lgr_thresholds = NULL,
...) {

assert_function(worker_loop)
assert_character(globals, null.ok = TRUE)
assert_character(packages, null.ok = TRUE)
Expand All @@ -194,13 +187,11 @@ Rush = R6::R6Class("Rush",
assert_count(heartbeat_expire, positive = TRUE, null.ok = TRUE)
assert_named(lgr_thresholds)
dots = list(...)
if (!is.null(heartbeat_period)) require_namespaces("callr")

r = self$connector

# identify globals by name
# returns a named list of values of the globals
globals = globals::globalsByName(globals)
globals = globalsByName(globals)

# serialize arguments needed for starting the worker
args = list(
Expand All @@ -218,6 +209,8 @@ Rush = R6::R6Class("Rush",
catn("Start worker with:")
catn(sprintf("Rscript -e 'rush::start_worker(%s, url = \"%s\")'", self$instance_id, self$config$url))
catn("See ?rush::start_worker for more details.")

return(invisible(self))
},

#' @description
Expand All @@ -228,6 +221,8 @@ Rush = R6::R6Class("Rush",
await_workers = function(n) {
assert_count(n)
while(self$n_workers < n) Sys.sleep(0.01)

return(invisible(self))
},

#' @description
Expand Down Expand Up @@ -288,18 +283,15 @@ Rush = R6::R6Class("Rush",
# reset cache to update worker info
private$.cached_worker_info = data.table()

invisible(self)
return(invisible(self))
},

#' @description
#' Detect lost workers.
#' Updates the state of lost workers in `$worker_info`.
#'
#' 1) Local workers without a heartbeat are checked with `tools::pskill()`.
#' 2) Workers with a heartbeat process are checked with `EXISTS` on the heartbeat key.
#'
#' The status of the worker is changed to `"lost"`.
#' Local workers without a heartbeat are checked with `tools::pskill()`.
#' Checking local workers on windows might be very slow.
#'
#' Workers with a heartbeat process are checked with the heartbeat.
detect_lost_workers = function() {
r = self$connector
worker_info = self$worker_info
Expand Down Expand Up @@ -339,7 +331,7 @@ Rush = R6::R6Class("Rush",
private$.cached_worker_info = data.table()
}

invisible(self)
return(invisible(self))
},

#' @description
Expand Down Expand Up @@ -367,7 +359,7 @@ Rush = R6::R6Class("Rush",
r$pipeline(.commands = cmds)
}

invisible(self)
return(invisible(self))
},

#' @description
Expand Down Expand Up @@ -412,7 +404,7 @@ Rush = R6::R6Class("Rush",
private$.cached_worker_info = data.table()
private$.n_seen_results = 0

invisible(self)
return(invisible(self))
},

#' @description
Expand Down Expand Up @@ -452,7 +444,7 @@ Rush = R6::R6Class("Rush",
r$command(c("LPUSH", private$.get_key("queued_tasks"), keys))
r$command(c("SADD", private$.get_key("all_tasks"), keys))

invisible(keys)
return(invisible(keys))
},

#' @description
Expand Down Expand Up @@ -491,7 +483,7 @@ Rush = R6::R6Class("Rush",
r$pipeline(.commands = cmds)
r$command(c("SADD", private$.get_key("all_tasks"), keys))

invisible(keys)
return(invisible(keys))
},

#' @description
Expand Down
52 changes: 26 additions & 26 deletions R/RushWorker.R
Original file line number Diff line number Diff line change
@@ -1,7 +1,25 @@
#' @title Rush
#' @title Rush Worker
#'
#' @description
#' Rush
#' [RushWorker] runs on a worker and executes tasks.
#' The rush worker inherits from [Rush] and adds methods to pop tasks from the queue and push results to the data base.
#'
#' @note
#' The worker registers itself in the data base of the rush network.
#'
#' @section Logging:
#' The worker logs all messages written with the `lgr` package to the data base.
#' The `lgr_thresholds` argument defines the logging level for each logger e.g. `c(rush = "debug")`.
#' Saving log messages adds a small overhead but is useful for debugging.
#' By default, no log messages are stored.
#'
#' @template param_instance_id
#' @template param_config
#' @template param_host
#' @template param_worker_id
#' @template param_heartbeat_period
#' @template param_heartbeat_expire
#' @template param_lgr_thresholds
#'
#' @export
RushWorker = R6::R6Class("RushWorker",
Expand All @@ -12,15 +30,11 @@ RushWorker = R6::R6Class("RushWorker",
#' Identifier of the worker.
worker_id = NULL,

#' @field constants (`list()`)\cr
#' List of constants.
constants = NULL,

#' @field host (`character(1)`)\cr
#' Local or remote host.
#' Worker is started on a local or remote host.
host = NULL,

#' @field heartbeat (`callr::RBackgroundProcess`)\cr
#' @field heartbeat ([callr::r_process])\cr
#' Background process for the heartbeat.
heartbeat = NULL,

Expand All @@ -30,22 +44,6 @@ RushWorker = R6::R6Class("RushWorker",

#' @description
#' Creates a new instance of this [R6][R6::R6Class] class.
#'
#' @param instance_id (`character(1)`)\cr
#' Identifier of the rush instance.
#' @param config ([redux::redis_config])\cr
#' Redis configuration.
#' @param host (`character(1)`)\cr
#' Local or remote host.
#' @param worker_id (`character(1)`)\cr
#' Identifier of the worker.
#' @param heartbeat_period (`numeric(1)`)\cr
#' Period of the heartbeat.
#' @param heartbeat_expire (`numeric(1)`)\cr
#' Expiration of the heartbeat.
#' @param lgr_thresholds (named `character()` or `numeric()`)\cr
#' Logger thresholds.
#' If `NULL`, no log messages are saved.
initialize = function(instance_id, config = redux::redis_config(), host, worker_id = NULL, heartbeat_period = NULL, heartbeat_expire = NULL, lgr_thresholds = NULL) {
self$host = assert_choice(host, c("local", "remote"))
self$worker_id = assert_string(worker_id %??% uuid::UUIDgenerate())
Expand Down Expand Up @@ -127,8 +125,8 @@ RushWorker = R6::R6Class("RushWorker",
#' List of lists of conditions.
#' @param status (`character(1)`)\cr
#' Status of the tasks.
#' If `finished` the tasks are moved to the finished tasks.
#' If `error` the tasks are moved to the failed tasks.
#' If `"finished"` the tasks are moved to the finished tasks.
#' If `"error"` the tasks are moved to the failed tasks.
push_results = function(keys, yss = list(), extra = list(), conditions = list(), status = "finished") {
assert_string(keys)
assert_list(yss, types = "list")
Expand Down Expand Up @@ -163,6 +161,8 @@ RushWorker = R6::R6Class("RushWorker",
self$lgr_buffer$flush()
}
}

return(invisible(self))
}
),

Expand Down
Loading

0 comments on commit 54b3ed1

Please sign in to comment.