From f59435a892e96fb9092e0055a6df2e1dfc5111a9 Mon Sep 17 00:00:00 2001 From: Nuno Campos Date: Tue, 17 Sep 2024 09:31:48 -0700 Subject: [PATCH] Add more comments --- libs/langgraph/langgraph/pregel/__init__.py | 2 +- libs/langgraph/langgraph/pregel/runner.py | 16 ++++++++-------- libs/langgraph/langgraph/utils/aio.py | 6 +++++- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/libs/langgraph/langgraph/pregel/__init__.py b/libs/langgraph/langgraph/pregel/__init__.py index 338842cde..dc803c735 100644 --- a/libs/langgraph/langgraph/pregel/__init__.py +++ b/libs/langgraph/langgraph/pregel/__init__.py @@ -1417,7 +1417,7 @@ def output() -> Iterator: loop.tasks.values(), timeout=self.step_timeout, retry_policy=self.retry_policy, - extra=lambda: aioloop.create_task(stream.wait()), + get_waiter=lambda: aioloop.create_task(stream.wait()), ): # emit output for o in output(): diff --git a/libs/langgraph/langgraph/pregel/runner.py b/libs/langgraph/langgraph/pregel/runner.py index d4064f9f3..b3bea32c4 100644 --- a/libs/langgraph/langgraph/pregel/runner.py +++ b/libs/langgraph/langgraph/pregel/runner.py @@ -100,15 +100,15 @@ async def atick( reraise: bool = True, timeout: Optional[float] = None, retry_policy: Optional[RetryPolicy] = None, - extra: Optional[Callable[[], asyncio.Future[None]]] = None, + get_waiter: Optional[Callable[[], asyncio.Future[None]]] = None, ) -> AsyncIterator[None]: loop = asyncio.get_event_loop() # give control back to the caller yield - # add extra task if requested - if extra is not None: + # add waiter task if requested + if get_waiter is not None: futures: dict[asyncio.Future, Optional[PregelExecutableTask]] = { - extra(): None + get_waiter(): None } else: futures = {} @@ -130,7 +130,7 @@ async def atick( ] = task all_futures = futures.copy() end_time = timeout + loop.time() if timeout else None - while len(futures) > (1 if extra is not None else 0): + while len(futures) > (1 if get_waiter is not None else 0): done, _ = await asyncio.wait( futures, return_when=asyncio.FIRST_COMPLETED, @@ -141,8 +141,8 @@ async def atick( for fut in done: task = futures.pop(fut) if task is None: - # extra task finished, schedule another - futures[extra()] = None + # waiter task finished, schedule another + futures[get_waiter()] = None continue if exc := _exception(fut): if isinstance(exc, GraphInterrupt): @@ -168,7 +168,7 @@ async def atick( break # give control back to the caller yield - # cancel extra task + # cancel waiter task for fut in futures: fut.cancel() # panic on failure or timeout diff --git a/libs/langgraph/langgraph/utils/aio.py b/libs/langgraph/langgraph/utils/aio.py index bcddd11e3..afe02c050 100644 --- a/libs/langgraph/langgraph/utils/aio.py +++ b/libs/langgraph/langgraph/utils/aio.py @@ -6,7 +6,11 @@ class Queue(asyncio.Queue): async def wait(self): - """If queue is empty, wait until an item is available.""" + """If queue is empty, wait until an item is available. + + Copied from Queue.get(), removing the call to .get_nowait(), + ie. this doesn't consume the item, just waits for it. + """ while self.empty(): if PY_310: getter = self._get_loop().create_future()