Skip to content

Commit

Permalink
Adding block & panic kwargs
Browse files Browse the repository at this point in the history
  • Loading branch information
Yomguithereal committed Aug 1, 2023
1 parent f1548e3 commit f026fbe
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 56 deletions.
92 changes: 48 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,19 @@ pip install quenouille

## Usage

* [imap, imap_unordered](#imap_unordered-imap)
* [ThreadPoolExecutor](#threadpoolexecutor)
* [#.imap, #.imap_unordered](#executor-imap)
* [#.shutdown](#shutdown)
* [NamedLocks](#namedlocks)
* [Miscellaneous notes](#miscellaneous-notes)
* [The None group](#the-none-group)
* [Parallelism > workers](#parallelism--workers)
* [Callable parallelism guarantees](#callable-parallelism-guarantees)
* [Parallelism vs. throttling](#parallelism-vs-throttling)
* [Adding entropy to throttle](#adding-entropy-to-throttle)
* [Caveats regarding exception raising](#caveats-regarding-exception-raising)
* [Caveats of using imap with queues](#caveats-of-using-imap-with-queues)
- [imap, imap_unordered](#imap_unordered-imap)
- [ThreadPoolExecutor](#threadpoolexecutor)
- [#.imap, #.imap_unordered](#executor-imap)
- [#.shutdown](#shutdown)
- [NamedLocks](#namedlocks)
- [Miscellaneous notes](#miscellaneous-notes)
- [The None group](#the-none-group)
- [Parallelism > workers](#parallelism--workers)
- [Callable parallelism guarantees](#callable-parallelism-guarantees)
- [Parallelism vs. throttling](#parallelism-vs-throttling)
- [Adding entropy to throttle](#adding-entropy-to-throttle)
- [Caveats regarding exception raising](#caveats-regarding-exception-raising)
- [Caveats of using imap with queues](#caveats-of-using-imap-with-queues)

### imap, imap_unordered

Expand Down Expand Up @@ -95,21 +95,23 @@ for html in imap(urls(), fetch, 10, key=get_domain_name, throttle=5, buffer_size
print(html)
```

*Arguments*
_Arguments_

* **iterable** *(iterable)*: Any python iterable.
* **func** *(callable)*: Function used to perform the desired tasks. The function takes an item yielded by the given iterable as single argument. Note that since this function will be dispatched in worker threads, so you should ensure it is thread-safe.
* **threads** *(int, optional)*: Maximum number of threads to use. Defaults to `min(32, os.cpu_count() + 1)`. Note that it can be `0`, in which case no threads will be used and everything will run synchronously (this can be useful for debugging or to avoid duplicating code sometimes).
* **key** *(callable, optional)*: Function returning to which "group" a given item is supposed to belong. This will be used to ensure maximum parallelism is respected.
* **parallelism** *(int or callable, optional)* [`1`]: Number of threads allowed to work on a same group at once. Can also be a function taking a group and returning its parallelism.
* **buffer_size** *(int, optional)* [`1024`]: Maximum number of items the function will buffer into memory while attempting to find an item that can be passed to a worker immediately, all while respecting throttling and group parallelism.
* **throttle** *(int or float or callable, optional)*: Optional throttle time, in seconds, to wait before processing the next item of a given group. Can also be a function taking last group, item and result and returning next throttle time for this group.
* **initializer** *(callable, optional)*: Function to run at the start of each thread worker. Can be useful to setup [thread-local data](https://docs.python.org/3/library/threading.html#thread-local-data), for instance. Remember this function must be threadsafe and should not block because the thread pool will wait for each thread to be correctly booted before being able to proceed. If one of the function calls fails, the thread pool will raise a `quenouille.exceptions.BrokenThreadPool` error and terminate immediately.
* **initargs** *(iterable, optional)*: Arguments to pass to the `initializer` function.
* **wait** *(bool, optional)* [`True`]: Whether to join worker threads, i.e. wait for them to end, when shutting down the executor. Set this to `False` if you need to go on quickly without waiting for your worker threads to end when cleaning up the executor's resources. Just note that if you spawn other thread-intensive tasks or other executors afterwards in rapid succession, you might start too many threads at once.
* **daemonic** *(bool, optional)* [`False`]: whether to spawn daemonic worker. If your worker are daemonic, the interpreter will not wait for them to end when exiting. This can be useful, combined to `wait=False`, for instance, if you want your program to exit as soon as hitting ctrl+C (you might want to avoid this if your threads need to cleanup things on exit as they will be abruptly shut down).
- **iterable** _(iterable)_: Any python iterable.
- **func** _(callable)_: Function used to perform the desired tasks. The function takes an item yielded by the given iterable as single argument. Note that since this function will be dispatched in worker threads, so you should ensure it is thread-safe.
- **threads** _(int, optional)_: Maximum number of threads to use. Defaults to `min(32, os.cpu_count() + 1)`. Note that it can be `0`, in which case no threads will be used and everything will run synchronously (this can be useful for debugging or to avoid duplicating code sometimes).
- **key** _(callable, optional)_: Function returning to which "group" a given item is supposed to belong. This will be used to ensure maximum parallelism is respected.
- **parallelism** _(int or callable, optional)_ [`1`]: Number of threads allowed to work on a same group at once. Can also be a function taking a group and returning its parallelism.
- **buffer_size** _(int, optional)_ [`1024`]: Maximum number of items the function will buffer into memory while attempting to find an item that can be passed to a worker immediately, all while respecting throttling and group parallelism.
- **throttle** _(int or float or callable, optional)_: Optional throttle time, in seconds, to wait before processing the next item of a given group. Can also be a function taking last group, item and result and returning next throttle time for this group.
- **block** _(bool, optional)_ [`False`]: Whether to block when using the `get` method of the given queue.
- **panic** _(callable, optional)_: Function that will be called when the process has errored. Useful to unblock some functions and avoid deadlock in specific situations. Note that this function will not be called when synchronous (i.e. when not using additional threads).
- **initializer** _(callable, optional)_: Function to run at the start of each thread worker. Can be useful to setup [thread-local data](https://docs.python.org/3/library/threading.html#thread-local-data), for instance. Remember this function must be threadsafe and should not block because the thread pool will wait for each thread to be correctly booted before being able to proceed. If one of the function calls fails, the thread pool will raise a `quenouille.exceptions.BrokenThreadPool` error and terminate immediately.
- **initargs** _(iterable, optional)_: Arguments to pass to the `initializer` function.
- **wait** _(bool, optional)_ [`True`]: Whether to join worker threads, i.e. wait for them to end, when shutting down the executor. Set this to `False` if you need to go on quickly without waiting for your worker threads to end when cleaning up the executor's resources. Just note that if you spawn other thread-intensive tasks or other executors afterwards in rapid succession, you might start too many threads at once.
- **daemonic** _(bool, optional)_ [`False`]: whether to spawn daemonic worker. If your worker are daemonic, the interpreter will not wait for them to end when exiting. This can be useful, combined to `wait=False`, for instance, if you want your program to exit as soon as hitting ctrl+C (you might want to avoid this if your threads need to cleanup things on exit as they will be abruptly shut down).

*Using a queue rather than an iterable*
_Using a queue rather than an iterable_

If you need to add new items to process as a result of performing tasks (when designing a web crawler for instance, where each downloaded page will yield new pages to explore further down), know that the `imap` and `imap_unordered` function also accepts queues as input:

Expand Down Expand Up @@ -171,32 +173,34 @@ executor.shutdown(wait=False)

Note that your throttling state is kept between multiple `imap` and `imap_unordered` calls so you don't end up perform some tasks too soon. But keep in mind this state is tied to the `key` function you provide to remain consistent, so if you change the used `key`, the throttling state will be reset.

*Arguments*
_Arguments_

* **max_workers** *(int, optional)*: Maximum number of threads to use. Defaults to `min(32, os.cpu_count() + 1)`. Note that it can be `0`, in which case no threads will be used and everything will run synchronously (this can be useful for debugging or to avoid duplicating code sometimes).
* **initializer** *(callable, optional)*: Function to run at the start of each thread worker. Can be useful to setup [thread-local data](https://docs.python.org/3/library/threading.html#thread-local-data), for instance. Remember this function must be threadsafe and should not block because the thread pool will wait for each thread to be correctly booted before being able to proceed. If one of the function calls fails, the thread pool will raise a `quenouille.exceptions.BrokenThreadPool` error and terminate immediately.
* **initargs** *(iterable, optional)*: Arguments to pass to the `initializer` function.
* **wait** *(bool, optional)* [`True`]: Whether to join worker threads, i.e. wait for them to end, when closing the executor. Set this to `False` if you need to go on quickly without waiting for your worker threads to end when cleaning up the executor's resources. Just note that if you spawn other thread-intensive tasks or other executors afterwards in rapid succession, you might start too many threads at once.
* **daemonic** *(bool, optional)* [`False`]: whether to spawn daemonic worker. If your worker are daemonic, the interpreter will not wait for them to end when exiting. This can be useful, combined to `wait=False`, for instance, if you want your program to exit as soon as hitting ctrl+C (you might want to avoid this if your threads need to cleanup things on exit as they will be abruptly shut down).
- **max_workers** _(int, optional)_: Maximum number of threads to use. Defaults to `min(32, os.cpu_count() + 1)`. Note that it can be `0`, in which case no threads will be used and everything will run synchronously (this can be useful for debugging or to avoid duplicating code sometimes).
- **initializer** _(callable, optional)_: Function to run at the start of each thread worker. Can be useful to setup [thread-local data](https://docs.python.org/3/library/threading.html#thread-local-data), for instance. Remember this function must be threadsafe and should not block because the thread pool will wait for each thread to be correctly booted before being able to proceed. If one of the function calls fails, the thread pool will raise a `quenouille.exceptions.BrokenThreadPool` error and terminate immediately.
- **initargs** _(iterable, optional)_: Arguments to pass to the `initializer` function.
- **wait** _(bool, optional)_ [`True`]: Whether to join worker threads, i.e. wait for them to end, when closing the executor. Set this to `False` if you need to go on quickly without waiting for your worker threads to end when cleaning up the executor's resources. Just note that if you spawn other thread-intensive tasks or other executors afterwards in rapid succession, you might start too many threads at once.
- **daemonic** _(bool, optional)_ [`False`]: whether to spawn daemonic worker. If your worker are daemonic, the interpreter will not wait for them to end when exiting. This can be useful, combined to `wait=False`, for instance, if you want your program to exit as soon as hitting ctrl+C (you might want to avoid this if your threads need to cleanup things on exit as they will be abruptly shut down).

<h4 id="executor-imap">#.imap, #.imap_unordered</h4>

Basically the same as described [here](#imap_unordered-imap) with the following arguments:

* **iterable** *(iterable)*: Any python iterable.
* **func** *(callable)*: Function used to perform the desired tasks. The function takes an item yielded by the given iterable as single argument. Note that since this function will be dispatched in worker threads, so you should ensure it is thread-safe.
* **key** *(callable, optional)*: Function returning to which "group" a given item is supposed to belong. This will be used to ensure maximum parallelism is respected.
* **parallelism** *(int or callable, optional)* [`1`]: Number of threads allowed to work on a same group at once. Can also be a function taking a group and returning its parallelism.
* **buffer_size** *(int, optional)* [`1024`]: Maximum number of items the function will buffer into memory while attempting to find an item that can be passed to a worker immediately, all while respecting throttling and group parallelism.
* **throttle** *(int or float or callable, optional)*: Optional throttle time, in seconds, to wait before processing the next item of a given group. Can also be a function taking last group, item and result and returning next throttle time for this group.
- **iterable** _(iterable)_: Any python iterable.
- **func** _(callable)_: Function used to perform the desired tasks. The function takes an item yielded by the given iterable as single argument. Note that since this function will be dispatched in worker threads, so you should ensure it is thread-safe.
- **key** _(callable, optional)_: Function returning to which "group" a given item is supposed to belong. This will be used to ensure maximum parallelism is respected.
- **parallelism** _(int or callable, optional)_ [`1`]: Number of threads allowed to work on a same group at once. Can also be a function taking a group and returning its parallelism.
- **buffer_size** _(int, optional)_ [`1024`]: Maximum number of items the function will buffer into memory while attempting to find an item that can be passed to a worker immediately, all while respecting throttling and group parallelism.
- **throttle** _(int or float or callable, optional)_: Optional throttle time, in seconds, to wait before processing the next item of a given group. Can also be a function taking last group, item and result and returning next throttle time for this group.
- **block** _(bool, optional)_ [`False`]: Whether to block when using the `get` method of the given queue.
- **panic** _(callable, optional)_: Function that will be called when the process has errored. Useful to unblock some functions and avoid deadlock in specific situations. Note that this function will not be called when synchronous (i.e. when not using additional threads).

#### #.shutdown

Method used to explicitly shutdown the executor.

*Arguments*
_Arguments_

* **wait** *(bool, optional)* [`True`]: Whether to join worker threads, i.e. wait for them to end, when shutting down the executor. Set this to `False` if you need to go on quickly without waiting for your worker threads to end when cleaning up the executor's resources. Just note that if you spawn other thread-intensive tasks or other executors afterwards in rapid succession, you might start too many threads at once.
- **wait** _(bool, optional)_ [`True`]: Whether to join worker threads, i.e. wait for them to end, when shutting down the executor. Set this to `False` if you need to go on quickly without waiting for your worker threads to end when cleaning up the executor's resources. Just note that if you spawn other thread-intensive tasks or other executors afterwards in rapid succession, you might start too many threads at once.

### NamedLocks

Expand Down Expand Up @@ -253,7 +257,7 @@ def throttle(group, item, result):

#### Caveats regarding exception raising

*Deferred generator usage exception deadlocks*
_Deferred generator usage exception deadlocks_

If you consume a generator returned by `imap/imap_unordered` somewhere else than where you created it, you may end up in a deadlock if you raise an exception.

Expand Down Expand Up @@ -283,7 +287,7 @@ for item in it:

#### Caveats of using imap with queues

*Typical deadlocks*
_Typical deadlocks_

Even if `imap` can process an input queue, you should avoid to find yourself in a situation where adding to the queue might block execution if you don't want to end in a deadlock. It can be easy to footgun yourself if your queue has a `maxsize`, for instance:

Expand All @@ -306,12 +310,12 @@ for i in imap(job_queue, worker):
print(i)
```

*Design choices*
_Design choices_

To enable you to add items to the queue in the loop body and so it can safely detect when your queue is drained without race condition, `quenouille` acknowledges that a task is finished only after what you execute in the loop body is done.

This means that sometimes it might be more performant to only add items to the queue from the worker functions rather than from the loop body.

*queue.task_done*
_queue.task_done_

For now, `quenouille` does not call `queue.task_done` for you, so this remains your responsability, if you want to be able to call `queue.join` down the line.
Loading

0 comments on commit f026fbe

Please sign in to comment.