Skip to content

Commit

Permalink
Fix: wait for other tasks, don't cancel
Browse files Browse the repository at this point in the history
  • Loading branch information
SoulTch committed Mar 2, 2024
1 parent a7dd3ee commit 31a444c
Showing 1 changed file with 9 additions and 13 deletions.
22 changes: 9 additions & 13 deletions asynq/asynq_to_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,14 +27,12 @@ def is_asyncio_mode():
return _asyncio_mode > 0


async def _gather(*awaitables):
"""Gather awaitables, but cancel all of them if one of them fails."""
try:
futures = asyncio.gather(*awaitables)
return await futures
except Exception:
futures.cancel()
raise
async def _gather(awaitables):
"""Gather awaitables, but wait all other awaitables to finish even if some of them fail."""

tasks = [asyncio.ensure_future(awaitable) for awaitable in awaitables]
done, _ = await asyncio.wait(tasks, return_when=asyncio.ALL_COMPLETED)
return [task.result() for task in done]


async def resolve_awaitables(x: Any):
Expand All @@ -48,13 +46,11 @@ async def resolve_awaitables(x: Any):
if isinstance(x, BatchItemBase):
raise RuntimeError("asynq BatchItem is not supported in asyncio mode")
if isinstance(x, list):
return await _gather(*[resolve_awaitables(item) for item in x])
return await _gather([resolve_awaitables(item) for item in x])
if isinstance(x, tuple):
return tuple(await _gather(*[resolve_awaitables(item) for item in x]))
return tuple(await _gather([resolve_awaitables(item) for item in x]))
if isinstance(x, dict):
resolutions = await _gather(
*[resolve_awaitables(value) for value in x.values()]
)
resolutions = await _gather([resolve_awaitables(value) for value in x.values()])
return {key: resolution for (key, resolution) in zip(x.keys(), resolutions)}
if x is None:
return None
Expand Down

0 comments on commit 31a444c

Please sign in to comment.