Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

encapsulate train and predict of PipeOps #541

Draft
wants to merge 5 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

# mlr3pipelines 0.3.1

* PipeOps are now encapsulated during train and predict and gained the active bindings
timings, log, warnings, errors
* GraphLearners gained the timing_pipeops active binding
* Changed PipeOps:
- PipeOpMissInd now also allows for setting type = integer
- PipeOpNMF: now exposes all parameters previously in .options
Expand Down
2 changes: 1 addition & 1 deletion R/Graph.R
Original file line number Diff line number Diff line change
Expand Up @@ -452,7 +452,7 @@ Graph = R6Class("Graph",
state = function(val) {
if (!missing(val)) {
assert_list(val, names = "unique", null.ok = TRUE)
assert_subset(names(val), names(self$pipeops))
assert_subset(names(val), c(names(self$pipeops), "log", "train_time", "predict_time"))
imap(self$pipeops, function(pipeop, pname) pipeop$state = val[[pname]])
val
} else {
Expand Down
23 changes: 21 additions & 2 deletions R/GraphLearner.R
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,9 @@
#' Setting a new predict type will try to set the `predict_type` in all relevant
#' [`PipeOp`] / [`Learner`][mlr3::Learner] encapsulated within the [`Graph`].
#' Similarly, the predict_type of a Graph will always be the smallest denominator in the [`Graph`].
#'
#' In the `timings_pipeops` active binding, the `timings` of the individual [`PipeOp`]s are stored as a named list
#' with names `"train"` and `"predict"`.
#' @family Learners
#' @export
GraphLearner = R6Class("GraphLearner", inherit = Learner,
Expand All @@ -23,6 +26,12 @@ GraphLearner = R6Class("GraphLearner", inherit = Learner,
initialize = function(graph, id = NULL, param_vals = list(), task_type = NULL, predict_type = NULL) {

graph = as_graph(graph, clone = TRUE)

# set the encapsulate of all pipeops to "none"
for (i in seq_along(graph$pipeops)) {
graph$pipeops[[i]]$encapsulate = c(train = "none", predict = "none")
}

id = assert_string(id, null.ok = TRUE) %??% paste(graph$ids(sorted = TRUE), collapse = ".")
self$graph = graph
output = graph$output
Expand All @@ -46,7 +55,6 @@ GraphLearner = R6Class("GraphLearner", inherit = Learner,
}
if (length(task_type) == 0L) {
# recursively walk backwards through the graph
# FIXME: think more about target transformation graphs here
get_po_task_type = function(x) {
task_type = c(
match(c(x$output$train, x$output$predict), class_table$prediction),
Expand Down Expand Up @@ -101,11 +109,21 @@ GraphLearner = R6Class("GraphLearner", inherit = Learner,
stop("param_set is read-only.")
}
self$graph$param_set
},
timings_pipeops = function(rhs) {
assert_ro_binding(rhs)
if (is.null(self$state$model)) {
timing = stats::setNames(rep(NA_real_, length(self$graph$pipeops)), nm = names(self$graph$pipeops))
return(list(train = timing, predict = timing)) # early exit
}
# reorder based on topologically sorted ids
list(train = stats::setNames(map_dbl(self$state$model, function(pipeop) pipeop$train_time %??% NA_real_), nm = names(self$graph$pipeops))[self$graph$ids(TRUE)],
predict = stats::setNames(map_dbl(self$state$model, function(pipeop) pipeop$predict_time %??% NA_real_), nm = names(self$graph$pipeops))[self$graph$ids(TRUE)])
}
),
private = list(
deep_clone = function(name, value) {
# FIXME this repairs the mlr3::Learner deep_clone() method which is broken.
# FIXME: this repairs the mlr3::Learner deep_clone() method which is broken.
if (is.environment(value) && !is.null(value[[".__enclos_env__"]])) {
return(value$clone(deep = TRUE))
}
Expand All @@ -122,6 +140,7 @@ GraphLearner = R6Class("GraphLearner", inherit = Learner,
on.exit({self$graph$state = NULL})
self$graph$state = self$model
prediction = self$graph$predict(task)
self$state$model = self$graph$state # needed to get each pipeop's predict_time in the state
assert_list(prediction, types = "Prediction", len = 1,
.var.name = sprintf("Prediction returned by Graph %s", self$id))
prediction[[1]]
Expand Down
209 changes: 166 additions & 43 deletions R/PipeOp.R
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,8 @@
#' Set of all required packages for the [`PipeOp`]'s `$train` and `$predict` methods. See `$packages` slot.
#' Default is `character(0)`.
#' * `tags` ::`character`\cr
#' A set of tags associated with the `PipeOp`. Tags describe a PipeOp's purpose.
#' Can be used to filter `as.data.table(mlr_pipeops)`. Default is `"abstract"`, indicating an abstract `PipeOp`.
#' A set of tags associated with the [`PipeOp`]. Tags describe a [`PipeOp`]'s purpose.
#' Can be used to filter `as.data.table(mlr_pipeops)`. Default is `"abstract"`, indicating an abstract [`PipeOp`].
#'
#' @section Internals:
#' [`PipeOp`] is an abstract class with abstract functions `private$.train()` and `private$.predict()`. To create a functional
Expand Down Expand Up @@ -111,14 +111,29 @@
#' Number of output channels. This equals `nrow($output)`.
#' * `is_trained` :: `logical(1)` \cr
#' Indicate whether the [`PipeOp`] was already trained and can therefore be used for prediction.
#' * `tags` ::`character`\cr
#' A set of tags associated with the `PipeOp`. Tags describe a PipeOp's purpose.
#' * `tags` :: `character`\cr
#' A set of tags associated with the [`PipeOp`]. Tags describe a [`PipeOp`]'s purpose.
#' Can be used to filter `as.data.table(mlr_pipeops)`.
#' PipeOp tags are inherited and child classes can introduce additional tags.
#' [`PipeOp`] tags are inherited and child classes can introduce additional tags.
#' * `hash` :: `character(1)` \cr
#' Checksum calculated on the [`PipeOp`], depending on the [`PipeOp`]'s `class` and the slots `$id` and `$param_set$values`. If a
#' [`PipeOp`]'s functionality may change depending on more than these values, it should inherit the `$hash` active
#' binding and calculate the hash as `digest(list(super$hash, <OTHER THINGS>), algo = "xxhash64")`.
#' * `timings` :: `numeric(2)` \cr
#' Elapsed time in seconds for the steps `"train"` and `"predict"`.
#' Measured via [mlr3misc::encapsulate()].
#' * `log` :: [`data.table`]\cr
#' Returns the output (including warning and errors) as table with columns `"stage"` ("train" or "predict"),
#' `"class"` ("output", "warning", or "error"), and `"msg"` (`character()`).
#' * `warnings` :: `character()`\cr
#' Logged warnings as vector.
#' * `errors` :: `character()`\cr
#' Logged errors as vector.
#' * `encapsulate` :: named `character()`\cr
#' Controls how to execute the code in internal train and predict methods.
#' Must be a named character vector with names `"train"` and `"predict"`.
#' Possible values are `"none"`, `"evaluate"` (requires package \CRANpkg{evaluate}) and `"callr"` (requires package \CRANpkg{callr}).
#' See [mlr3misc::encapsulate()] for more details.
#' * `.result` :: `list` \cr
#' If the [`Graph`]'s `$keep_results` flag is set to `TRUE`, then the intermediate Results of `$train()` and `$predict()`
#' are saved to this slot, exactly as they are returned by these functions. This is mainly for debugging purposes
Expand All @@ -127,7 +142,7 @@
#' @section Methods:
#' * `train(input)`\cr
#' (`list`) -> named `list`\cr
#' Train [`PipeOp`] on `inputs`, transform it to output and store the learned `$state`. If the PipeOp is already
#' Train [`PipeOp`] on `inputs`, transform it to output and store the learned `$state`. If the [`PipeOp`] is already
#' trained, already present `$state` is overwritten. Input list is typechecked against the `$input` `train` column.
#' Return value is a list with as many entries as `$output` has
#' rows, with each entry named after the `$output` `name` column and class according to the `$output` `train` column.
Expand Down Expand Up @@ -177,11 +192,11 @@
#' # a list as output.
#' .train = function(input) {
#' sum = input[[1]] + input[[2]]
#' self$state = sum
#' self$state$sum = sum
#' list(sum)
#' },
#' .predict = function(input) {
#' list(letters[self$state])
#' list(letters[self$state$sum])
#' }
#' )
#' )
Expand Down Expand Up @@ -222,6 +237,7 @@ PipeOp = R6Class("PipeOp",
self$output = assert_connection_table(output)
self$packages = assert_character(packages, any.missing = FALSE, unique = TRUE)
self$tags = assert_subset(tags, mlr_reflections$pipeops$valid_tags)
private$.encapsulate = private$.learner$encapsulate %??% c(train = "none", predict = "none") # propagate a learner's encapsulate in case of as_pipeop.Learner calls etc.
},

print = function(...) {
Expand All @@ -241,42 +257,10 @@ PipeOp = R6Class("PipeOp",
},

train = function(input) {
self$state = NULL # reset to untrained state first
require_namespaces(self$packages)

if (every(input, is_noop)) {
self$state = NO_OP
return(named_list(self$output$name, NO_OP))
}
unpacked = unpack_multiplicities(input, multiplicity_type_nesting_level(self$input$train), self$input$name, self$id)
if (!is.null(unpacked)) {
return(evaluate_multiplicities(self, unpacked, "train", NULL))
}
input = check_types(self, input, "input", "train")
on.exit({self$state = NULL}) # if any of the followi fails, make sure to reset self$state
output = private$.train(input)
output = check_types(self, output, "output", "train")
on.exit() # don't reset state any more
output
pipeop_train(self, input)
},
predict = function(input) {
# need to load packages in train *and* predict, because they might run in different R instances
require_namespaces(self$packages)

if (every(input, is_noop)) {
return(named_list(self$output$name, NO_OP))
}
if (is_noop(self$state)) {
stopf("Pipeop %s got NO_OP during train but no NO_OP during predict.", self$id)
}
unpacked = unpack_multiplicities(input, multiplicity_type_nesting_level(self$input$predict), self$input$name, self$id)
if (!is.null(unpacked)) {
return(evaluate_multiplicities(self, unpacked, "predict", self$state))
}
input = check_types(self, input, "input", "predict")
output = private$.predict(input)
output = check_types(self, output, "output", "predict")
output
pipeop_predict(self, input)
}
),

Expand Down Expand Up @@ -333,6 +317,30 @@ PipeOp = R6Class("PipeOp",
val
}
})), algo = "xxhash64")
},
timings = function(rhs) {
assert_ro_binding(rhs)
set_names(c(self$state$train_time %??% NA_real_, self$state$predict_time %??% NA_real_), c("train", "predict"))
},
log = function(rhs) {
assert_ro_binding(rhs)
self$state$log
},
warnings = function(rhs) {
assert_ro_binding(rhs)
get_log_condition(self$state, "warning")
},
errors = function(rhs) {
assert_ro_binding(rhs)
get_log_condition(self$state, "error")
},
encapsulate = function(rhs) {
if (missing(rhs)) {
return(private$.encapsulate)
}
assert_character(rhs)
assert_names(names(rhs), subset.of = c("train", "predict"))
private$.encapsulate = insert_named(c(train = "none", predict = "none"), rhs)
}
),

Expand All @@ -355,7 +363,8 @@ PipeOp = R6Class("PipeOp",
.predict = function(input) stop("abstract"),
.param_set = NULL,
.param_set_source = NULL,
.id = NULL
.id = NULL,
.encapsulate = NULL
)
)

Expand Down Expand Up @@ -497,3 +506,117 @@ evaluate_multiplicities = function(self, unpacked, evalcall, instate) {

map(transpose_list(map(result, "output")), as.Multiplicity)
}

pipeop_train = function(pipeop, input) {
# This wrapper calls pipeop$train, and additionally performs some basic checks that the training was successful.
# Exceptions here are possibly encapsulated, so that they get captured and turned into log messages.
train_wrapper = function(pipeop, input) {
output = get_private(pipeop)$.train(input)

if (is.null(output)) {
stopf("PipeOp '%s' on input '%s' returned NULL during internal train()", pipeop$id, deparse(substitute(input)))
}

output
}

pipeop$state = NULL # reset to untrained state first
#require_namespaces(pipeop$packages)

if (every(input, is_noop)) {
pipeop$state = NO_OP
return(named_list(pipeop$output$name, NO_OP))
}

unpacked = unpack_multiplicities(input, multiplicity_type_nesting_level(pipeop$input$train), pipeop$input$name, pipeop$id)
if (!is.null(unpacked)) {
return(evaluate_multiplicities(pipeop, unpacked, "train", NULL))
}

input = check_types(pipeop, input, "input", "train")
on.exit({pipeop$state = NULL}) # if any of the following fails, make sure to reset pipeop$state

lg$debug("Calling train method of PipeOp '%s' on input '%s'",
pipeop$id, deparse(substitute(input)), pipeop = pipeop$clone())

# call train_wrapper with encapsulation
result = encapsulate(pipeop$encapsulate["train"],
.f = train_wrapper,
.args = list(pipeop = pipeop, input = input),
.pkgs = pipeop$packages,
.seed = NA_integer_
)

output = check_types(pipeop, result$result, "output", "train")
on.exit() # don't reset state any more

pipeop$state$log = append_log(pipeop$state$log, "train", result$log$class, result$log$msg)
pipeop$state$train_time = result$elapsed

if (is.null(output)) {
lg$debug("PipeOp '%s' on input '%s' failed to return a state",
pipeop$id, deparse(substitute(input)), pipeop = pipeop$clone(), messages = result$log$msg)
} else {
lg$debug("PipeOp '%s' on input '%s' succeeded to return a state",
pipeop$id, deparse(substitute(input)), pipeop = pipeop$clone(), messages = result$log$msg)
}

output
}

pipeop_predict = function(pipeop, input) {
# This wrapper calls pipeop$predict, and additionally performs some basic checks that the prediction was successful.
# Exceptions here are possibly encapsulated, so that they get captured and turned into log messages.
predict_wrapper = function(pipeop, input) {
# NOTE: may actually be sensible to check this
#if (is.null(pipeop$state)) {
# stopf("No trained state available for PipeOp '%s' on input '%s'", pipeop$id, deparse(substitute(input)))
#}

get_private(pipeop)$.predict(input)
}

# need to load packages in train *and* predict, because they might run in different R instances
#require_namespaces(pipeop$packages)

if (every(input, is_noop)) {
return(named_list(pipeop$output$name, NO_OP))
}

if (is_noop(pipeop$state)) {
stopf("Pipeop %s got NO_OP during train but no NO_OP during predict.", pipeop$id)
}

unpacked = unpack_multiplicities(input, multiplicity_type_nesting_level(pipeop$input$predict), pipeop$input$name, pipeop$id)
if (!is.null(unpacked)) {
return(evaluate_multiplicities(pipeop, unpacked, "predict", pipeop$state))
}

input = check_types(pipeop, input, "input", "predict")

# call predict with encapsulation
lg$debug("Calling predict method of PipeOp '%s' on input '%s'",
pipeop$id, deparse(substitute(input)), pipeop = pipeop$clone())

result = encapsulate(
pipeop$encapsulate["predict"],
.f = predict_wrapper,
.args = list(pipeop = pipeop, input = input),
.pkgs = pipeop$packages,
.seed = NA_integer_
)

output = check_types(pipeop, result$result, "output", "predict")

pipeop$state$log = append_log(pipeop$state$log, "predict", result$log$class, result$log$msg)
pipeop$state$predict_time = result$elapsed

output
}

#FIXME: need this from mlr3
assert_ro_binding = mlr3:::assert_ro_binding
get_private = mlr3:::get_private
append_log = mlr3:::append_log
get_log_condition = mlr3:::get_log_condition

5 changes: 4 additions & 1 deletion R/PipeOpImputeLearner.R
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,10 @@ PipeOpImputeLearner = R6Class("PipeOpImputeLearner",
)
)

mlr_pipeops$add("imputelearner", PipeOpImputeLearner, list(R6Class("Learner", public = list(id = "learner", task_type = "classif", param_set = ParamSet$new()))$new()))
mlr_pipeops$add("imputelearner", PipeOpImputeLearner,
list(R6Class("Learner",
public = list(id = "learner", task_type = "classif", param_set = ParamSet$new()),
active = list(encapsulate = function() c(train = "none", predict = "none")))$new()))

# See mlr-org/mlr#470
convert_to_task = function(id = "imputing", data, target, task_type, ...) {
Expand Down
5 changes: 4 additions & 1 deletion R/PipeOpLearner.R
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,10 @@ PipeOpLearner = R6Class("PipeOpLearner", inherit = PipeOp,
)
)

mlr_pipeops$add("learner", PipeOpLearner, list(R6Class("Learner", public = list(id = "learner", task_type = "classif", param_set = ParamSet$new()))$new()))
mlr_pipeops$add("learner", PipeOpLearner,
list(R6Class("Learner",
public = list(id = "learner", task_type = "classif", param_set = ParamSet$new()),
active = list(encapsulate = function() c(train = "none", predict = "none")))$new()))

#' @export
as_learner.PipeOp = function(x, clone = FALSE) {
Expand Down
5 changes: 4 additions & 1 deletion R/PipeOpLearnerCV.R
Original file line number Diff line number Diff line change
Expand Up @@ -204,4 +204,7 @@ PipeOpLearnerCV = R6Class("PipeOpLearnerCV",
)
)

mlr_pipeops$add("learner_cv", PipeOpLearnerCV, list(R6Class("Learner", public = list(id = "learner_cv", task_type = "classif", param_set = ParamSet$new()))$new()))
mlr_pipeops$add("learner_cv", PipeOpLearnerCV,
list(R6Class("Learner",
public = list(id = "learnercv", task_type = "classif", param_set = ParamSet$new()),
active = list(encapsulate = function() c(train = "none", predict = "none")))$new()))
Loading