Skip to content

Commit

Permalink
Add more comments
Browse files Browse the repository at this point in the history
  • Loading branch information
nfcampos committed Sep 17, 2024
1 parent bd2ecba commit f59435a
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 10 deletions.
2 changes: 1 addition & 1 deletion libs/langgraph/langgraph/pregel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1417,7 +1417,7 @@ def output() -> Iterator:
loop.tasks.values(),
timeout=self.step_timeout,
retry_policy=self.retry_policy,
extra=lambda: aioloop.create_task(stream.wait()),
get_waiter=lambda: aioloop.create_task(stream.wait()),
):
# emit output
for o in output():
Expand Down
16 changes: 8 additions & 8 deletions libs/langgraph/langgraph/pregel/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -100,15 +100,15 @@ async def atick(
reraise: bool = True,
timeout: Optional[float] = None,
retry_policy: Optional[RetryPolicy] = None,
extra: Optional[Callable[[], asyncio.Future[None]]] = None,
get_waiter: Optional[Callable[[], asyncio.Future[None]]] = None,
) -> AsyncIterator[None]:
loop = asyncio.get_event_loop()
# give control back to the caller
yield
# add extra task if requested
if extra is not None:
# add waiter task if requested
if get_waiter is not None:
futures: dict[asyncio.Future, Optional[PregelExecutableTask]] = {
extra(): None
get_waiter(): None
}
else:
futures = {}
Expand All @@ -130,7 +130,7 @@ async def atick(
] = task
all_futures = futures.copy()
end_time = timeout + loop.time() if timeout else None
while len(futures) > (1 if extra is not None else 0):
while len(futures) > (1 if get_waiter is not None else 0):
done, _ = await asyncio.wait(
futures,
return_when=asyncio.FIRST_COMPLETED,
Expand All @@ -141,8 +141,8 @@ async def atick(
for fut in done:
task = futures.pop(fut)
if task is None:
# extra task finished, schedule another
futures[extra()] = None
# waiter task finished, schedule another
futures[get_waiter()] = None
continue
if exc := _exception(fut):
if isinstance(exc, GraphInterrupt):
Expand All @@ -168,7 +168,7 @@ async def atick(
break
# give control back to the caller
yield
# cancel extra task
# cancel waiter task
for fut in futures:
fut.cancel()
# panic on failure or timeout
Expand Down
6 changes: 5 additions & 1 deletion libs/langgraph/langgraph/utils/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,11 @@

class Queue(asyncio.Queue):
async def wait(self):
"""If queue is empty, wait until an item is available."""
"""If queue is empty, wait until an item is available.
Copied from Queue.get(), removing the call to .get_nowait(),
ie. this doesn't consume the item, just waits for it.
"""
while self.empty():
if PY_310:
getter = self._get_loop().create_future()
Expand Down

0 comments on commit f59435a

Please sign in to comment.