-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(): Task links #216
feat(): Task links #216
Conversation
I would prefer if the syntax of this PR would be similar to: #217 This might look as follows: with Flow() as f:
with Stage("sql_table_origin"):
_ = make_external_table()
with Stage("sql_table_linked"):
table = duplicate_table_reference()
_ = consume(table)
external_link = table
with StageLockContext():
# Linked execution. Body of duplicate_table_reference should not be executed,
# instead output is referenced from the linked table
result = f.run(
inputs={
external_link: ExternalTableReference(
"external_table",
schema="external_schema",
)
}
) |
We could even support a feature for the usecase to reference the same table in another pipeline instance: cfg = PipedagConfig.default.get(instance)
input_cfg = PipedagConfig.default.get("full")
with Flow() as f:
with Stage("sql_table_origin"):
_ = make_external_table()
with Stage("sql_table_linked"):
table = duplicate_table_reference()
_ = consume(table)
external_link = table
with StageLockContext():
# Linked execution. Body of duplicate_table_reference should not be executed,
# instead output is referenced from the linked table
result = f.run(
inputs={
external_link: input_cfg
},
config=cfg # still optional
) This syntax would even allow to dematerialize dicts/lists/constants and multiple tables that appear in the output of duplicate_table_reference from the cache of "full" instance. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry for the lengthy review. I think it would be good to not replace a task but communication links between tasks when defining inputs.
src/pydiverse/pipedag/core/flow.py
Outdated
:param inputs: | ||
Optionally provide the outputs for a subset of tasks. | ||
The format is expected as | ||
dict[task, ExternalTableReference]. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
dict[table reference, ExternalTableReference]
with table reference at wiring time being technically Task | TaskGetItem
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tasks producing any mentioned table reference will not be executed, but instead the ExternalTableReference will be used consuming the mentioned table reference.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I doubt the current code would be able to deal with TaskGetItem references. I am still wondering whether we might find a way to actually rewire the tasks before they are executed.
src/pydiverse/pipedag/engine/dask.py
Outdated
if inputs: | ||
raise NotImplementedError( | ||
"The inputs argument is currently not supported for the Dask engine." | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a fundamental problem to implement the ExternalTableReference replacement with other orchestration engines. It is fine leave it as NotImplementedError. But a comment that there is no fundamental issue to support it would be good. We could also create an issue right away.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree. I added it for Prefect and Dask as well
@@ -34,6 +40,7 @@ def run(self, flow: Subflow, ignore_position_hashes: bool = False, **run_kwargs) | |||
run_context=run_context, | |||
config_context=config_context, | |||
ignore_position_hashes=ignore_position_hashes, | |||
override=inputs.get(task) if inputs else None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can't we replace the inputs in row 36 rather than overriding a task? I actually wonder where TaskGetItem is lazily resolved. It might be inside results[in_t].
We could even implement an algorithm to avoid calculating all tasks that are part of a subgraph with tasks that are only feeding into references mentioned in inputs. Sketch of idea (not tested!):
input_tasks = {t.task if isinstance(t, TaskGetItem) else t for t in inputs.keys()}
skip_candidates = {task for flow.get_tasks() if task in inputs_tasks}
last_added = skip_candidates
while len(last_added) > 0:
last_added = {task for task in added_task.input_tasks.values() for added_task in last_added}
skip_candidates.update(last_added)
last_removed = skip_candidates
while len(last_removed) > 0:
last_removed = skip_candidates & {task for task in active_task.input_tasks.values() for active_task in flow.get_tasks() if active_task is not in skip_candidates}
skip_candidates -= last_removed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I switched the logic to replace task inputs instead of task outputs
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks much better!
Idea for a new feature:
Add the option to pass
task_links
toflow.run()
.task_links
defines a mapping from tasks to (external) tables in the database.When the flow is executed all tasks run as usual expect for the tasks listed in the mapping. These tasks are not executed, instead an
ExternaTableReference
is established for them.If the option is not used
flow.run()
works as usual.Usecase: Development on a task requires testing it on a large dataset. Development in the original database is discouraged as it is used by many users. Duplicating / rerunning the entire database takes too long / too many computational resources.
-> This feature allows to use the tables in the original database without overhead, only run the tasks/stages interesting for the development at hand and leaves the other users undisturbed.
Note: Most of the changes actually originate from moving
container.py
to another submodule which was needed to solve a circular import (maybe there is a better way?)@windiana42 / @nicolasmueller WDYT?
Checklist
docs/source/changelog.md
entrydocs/source/
docs/source/examples.md