Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Batched processing of dataframe based task: reduce memory consumption and parallelize #205

Open
5 tasks
windiana42 opened this issue Jul 4, 2024 · 3 comments
Labels
enhancement New feature or request speed usability

Comments

@windiana42
Copy link
Member

In case of dataframe based task code, it is sometimes possible to batch computation based on some grouping columns. This can be used to reduce the memory footprint (not the whole input and output dataset needs to reside in memory), and it can be used for parallelization (i.e. Snowflake bulk upload speed greatly increases with >100 small snippet COPY INTO requests in flight).

So first order, the solution is simple. Instead of:

  1. dematerialize inputs
  2. call task function
  3. materialize outputs

One would like to do:
for each chunk:

  1. dematerialize chunk inputs
  2. call task function
  3. materialize chunk output

finalize output table based on chunks

The main problem is that there are a million ways to configure chunking. It is also quite interwoven with other features because user input (grouping columns, number of batches, temporary storage directory, ...) needs to reach (de-)materialization hooks.

One idea would be to establish the concept of an invocation hook which can be special case hacked in user space which integrates into pipedag task calling mechanics:

@materialize(version=AUTO_VERSION, input_type=pl.LazyFrame, invocation_hook=Batched(n=50, columns=["id"]))
def task(tbl1: pl.LazyFrame, tbl2: pl.LazyFrame):
  ...

This feature needs to be aligened with:

  • imperative materialization
  • (de-)materialization hooks
  • local table cache

Future features:

  • delta loading / processing
  • inline feature which delivers input query to dataframe task dematerialization
@windiana42
Copy link
Member Author

For parallelization, it might require explicit state handling for ContextVar based contexts. Especially when using multi-threading.

Snowflake is quite slow when internalizing a huge table via COPY INTO from S3. Thus it would be nice to support n-way multi-processing for dematerialize->task function call->write parquet->upload S3 chain and m-way multi-threading for keeping >100 COPY INTO chunks in flight.

@windiana42
Copy link
Member Author

This is the code region which requires refactoring for this feature:
https://github.com/pydiverse/pydiverse.pipedag/blob/main/src/pydiverse/pipedag/materialize/core.py#L742

            def imperative_materialize(
                table: Table,
                config_context: ConfigContext | None,
                return_as_type: type | None = None,
                return_nothing: bool = False,
            ):
                my_store = config_context.store if config_context is not None else store
                state = task_cache_info.imperative_materialization_state
                if id(table) in state.table_ids:
                    raise RuntimeError(
                        "The table has already been imperatively materialized."
                    )
                table.assumed_dependencies = (
                    list(state.assumed_dependencies)
                    if len(state.assumed_dependencies) > 0
                    else []
                )
                _ = my_store.materialize_task(
                    task, task_cache_info, table, disable_task_finalization=True
                )
                if not return_nothing:

                    def get_return_obj(return_as_type):
                        if return_as_type is None:
                            return_as_type = task.input_type
                            if (
                                return_as_type is None
                                or not my_store.table_store.get_r_table_hook(
                                    return_as_type
                                ).retrieve_as_reference(return_as_type)
                            ):
                                # dematerialize as sa.Table if it would transfer all
                                # rows to python when dematerializing with input_type
                                return_as_type = sa.Table
                        obj = my_store.dematerialize_item(
                            table, return_as_type, run_context
                        )
                        state.add_table_lookup(obj, table)
                        return obj

                    if isinstance(return_as_type, Iterable):
                        return tuple(get_return_obj(t) for t in return_as_type)
                    else:
                        return get_return_obj(return_as_type)

            task_context.imperative_materialize_callback = imperative_materialize
            result = self.fn(*args, **kwargs)
            task_context.imperative_materialize_callback = None
            if task.debug_tainted:
                raise RuntimeError(
                    f"The task {task.name} has been tainted by interactive debugging."
                    f" Aborting."
                )

            def result_finalization_mutator(x):
                state = task_cache_info.imperative_materialization_state
                object_lookup = state.object_lookup
                if id(x) in object_lookup:
                    # substitute imperatively materialized object references with
                    # their respective table objects
                    x = object_lookup[id(x)]
                if isinstance(x, (Table, RawSql)):
                    # fill assumed_dependencies for Tables that were not yet
                    # materialized
                    if len(state.assumed_dependencies) > 0:
                        if x.assumed_dependencies is None:
                            x.assumed_dependencies = list(state.assumed_dependencies)
                return x

            result = deep_map(result, result_finalization_mutator)
            result = store.materialize_task(task, task_cache_info, result)

It would be nice to abstract it a bit more such that the custom invocation hooks only need to mess with:

  1. chunking preparation
  2. looping over chunks (some support should be given to simplify multi-processing / multi-threading)
  3. feeding information to (de-)materialization hooks (query manipulation and temporary chunk storage management)
  4. finalizing materialization

@windiana42
Copy link
Member Author

In case of Snowflake, fast bulk load requires writing temporary parquet files. This overlaps with activity of local table cache. It could make sense to allow an invocation hook to tightly bind to the local table cache such that

  1. parquet files end up in a directory configured in the local table cache
  2. the local table cache stores each chunk individually (changes in chunking configuration will automatically invalidate cache; it is possible to offer unchunked access to cached chunks with polars scan functionality)

@windiana42 windiana42 added enhancement New feature or request usability speed labels Jul 4, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request speed usability
Projects
None yet
Development

No branches or pull requests

1 participant