Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

✨ MapperAsyncIterDataPipe for applying custom async functions #9

Merged
merged 4 commits into from
Aug 3, 2023

Conversation

weiji14
Copy link
Owner

@weiji14 weiji14 commented Aug 1, 2023

An asynchronous iterable-style DataPipe for applying a custom asynchronous function over an asynchronous iterable! Uses asyncio.TaskGroup from Python 3.11+ to run several tasks concurrently.

Preview at https://bambooflow--9.org.readthedocs.build/en/9/api.html#bambooflow.datapipes.Mapper

Rationale:

  • Parallelism/Multi-processing is suited for CPU-bound
  • Concurrency is suited for tasks with a lot of waiting e.g. IO-bound
  • Asyncio is cooperative multi-tasking, a better form of concurrency than threading
  • Asynchronous is one take on the future of dataloading - Future of torchdata and dataloading pytorch/data#1196

TODO:

  • Initial implementation
  • Add unit test
  • Handle task exceptions using except*

References:

An asynchronous iterable-style DataPipe for applying a custom asynchronous function over an asynchronous iterable! Uses asyncio.TaskGroup from Python 3.11+ to run several tasks concurrently. Included a doctest, added a new section in the API docs under 'Mapping DataPipes', and set show_toc_level to 3 to make it show in the right sidebar.
@weiji14 weiji14 added the feature New feature or request label Aug 1, 2023
@weiji14 weiji14 added this to the 0.1.0 milestone Aug 1, 2023
@weiji14 weiji14 self-assigned this Aug 1, 2023
Ensure that tasks are processed concurrently, included a timer to double check that all 3 tasks (made up of 2 sub-tasks) complete in 0.5 seconds instead of 1.5 seconds!
To better handle errors from tasks in a TaskGroup, wrap the TaskGroup context manager in a try-except* clause following PEP 654 Exception Groups. Based on the nice examples from https://github.com/jrfk/talk/tree/main/EuroPython2023. Added a unit test to ensure that a ValueError raised in 1 out of 3 tasks can be nicely captured and raised to attention.
Mention PEP0654 so that people know what an ExceptionGroup is, and how it could be handled.
@weiji14 weiji14 marked this pull request as ready for review August 3, 2023 06:23
@weiji14 weiji14 merged commit e17af34 into main Aug 3, 2023
4 checks passed
@weiji14 weiji14 deleted the asynciterdatapipe/mapper branch August 3, 2023 06:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant