Skip to content

Commit

Permalink
Merge pull request #2065 from langchain-ai/dqbd/debug-stream-checkpoi…
Browse files Browse the repository at this point in the history
…nt-map

fix(debug): send checkpoint_map as well
  • Loading branch information
nfcampos authored Oct 14, 2024
2 parents f2dc537 + ff310cc commit edec5c0
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 8 deletions.
4 changes: 2 additions & 2 deletions libs/langgraph/langgraph/pregel/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,7 +484,7 @@ def _prepare_state_snapshot(
patch_checkpoint_map(saved.config, saved.metadata),
saved.metadata,
saved.checkpoint["ts"],
saved.parent_config,
patch_checkpoint_map(saved.parent_config, saved.metadata),
tasks_w_writes(
next_tasks.values(),
saved.pending_writes,
Expand Down Expand Up @@ -565,7 +565,7 @@ async def _aprepare_state_snapshot(
patch_checkpoint_map(saved.config, saved.metadata),
saved.metadata,
saved.checkpoint["ts"],
saved.parent_config,
patch_checkpoint_map(saved.parent_config, saved.metadata),
tasks_w_writes(
next_tasks.values(),
saved.pending_writes,
Expand Down
5 changes: 3 additions & 2 deletions libs/langgraph/langgraph/pregel/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from langgraph.pregel.io import read_channels
from langgraph.pregel.utils import find_subgraph_pregel
from langgraph.types import PregelExecutableTask, PregelTask, StateSnapshot
from langgraph.utils.config import patch_checkpoint_map


class TaskPayload(TypedDict):
Expand Down Expand Up @@ -177,8 +178,8 @@ def map_debug_checkpoint(
"timestamp": checkpoint["ts"],
"step": step,
"payload": {
"config": config,
"parent_config": parent_config,
"config": patch_checkpoint_map(config, metadata),
"parent_config": patch_checkpoint_map(parent_config, metadata),
"values": read_channels(channels, stream_channels),
"metadata": metadata,
"next": [t.name for t in tasks],
Expand Down
5 changes: 3 additions & 2 deletions libs/langgraph/langgraph/pregel/loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -502,7 +502,7 @@ def _first(self, *, input_keys: Union[str, Sequence[str]]) -> None:
)

def _put_checkpoint(self, metadata: CheckpointMetadata) -> None:
# assign step
# assign step and parents
metadata["step"] = self.step
metadata["parents"] = self.config[CONF].get(CONFIG_KEY_CHECKPOINT_MAP, {})
# debug flag
Expand All @@ -518,13 +518,14 @@ def _put_checkpoint(self, metadata: CheckpointMetadata) -> None:
self.checkpoint = create_checkpoint(self.checkpoint, self.channels, self.step)
# bail if no checkpointer
if self._checkpointer_put_after_previous is not None:
self.checkpoint_metadata = metadata

self.prev_checkpoint_config = (
self.checkpoint_config
if CONFIG_KEY_CHECKPOINT_ID in self.checkpoint_config[CONF]
and self.checkpoint_config[CONF][CONFIG_KEY_CHECKPOINT_ID]
else None
)
self.checkpoint_metadata = metadata
self.checkpoint_config = {
**self.checkpoint_config,
CONF: {
Expand Down
6 changes: 4 additions & 2 deletions libs/langgraph/langgraph/utils/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ def patch_configurable(


def patch_checkpoint_map(
config: RunnableConfig, metadata: Optional[CheckpointMetadata]
config: Optional[RunnableConfig], metadata: Optional[CheckpointMetadata]
) -> RunnableConfig:
if parents := (metadata.get("parents") if metadata else None):
if config is None:
return config
elif parents := (metadata.get("parents") if metadata else None):
conf = config[CONF]
return patch_configurable(
config,
Expand Down
81 changes: 81 additions & 0 deletions libs/langgraph/tests/test_pregel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9092,6 +9092,9 @@ def outer_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("inner:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
),
Expand Down Expand Up @@ -9250,6 +9253,9 @@ def outer_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("inner:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(PregelTask(AnyStr(), "inner_2", (PULL, "inner_2")),),
Expand Down Expand Up @@ -9279,6 +9285,9 @@ def outer_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("inner:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(
Expand Down Expand Up @@ -9707,6 +9716,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
)
Expand Down Expand Up @@ -9770,6 +9786,15 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(
re.compile(r"child:.+|child1:")
): AnyStr(),
}
),
}
},
),
Expand Down Expand Up @@ -9798,6 +9823,9 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("child:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
),
Expand Down Expand Up @@ -10056,6 +10084,9 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("child:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(),
Expand Down Expand Up @@ -10085,6 +10116,9 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr("child:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{"": AnyStr(), AnyStr("child:"): AnyStr()}
),
}
},
tasks=(
Expand Down Expand Up @@ -10170,6 +10204,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
tasks=(),
Expand Down Expand Up @@ -10208,6 +10249,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
tasks=(
Expand Down Expand Up @@ -10253,6 +10301,13 @@ def parent_2(state: State):
"thread_id": "1",
"checkpoint_ns": AnyStr(),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("child:"): AnyStr(),
AnyStr(re.compile(r"child:.+|child1:")): AnyStr(),
}
),
}
},
tasks=(
Expand Down Expand Up @@ -10444,6 +10499,12 @@ def edit(state: JokeState):
"thread_id": "1",
"checkpoint_ns": AnyStr("generate_joke:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("generate_joke:"): AnyStr(),
}
),
}
},
tasks=(PregelTask(id=AnyStr(""), name="generate", path=(PULL, "generate")),),
Expand Down Expand Up @@ -10476,6 +10537,12 @@ def edit(state: JokeState):
"thread_id": "1",
"checkpoint_ns": AnyStr("generate_joke:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("generate_joke:"): AnyStr(),
}
),
}
},
tasks=(PregelTask(id=AnyStr(""), name="generate", path=(PULL, "generate")),),
Expand Down Expand Up @@ -10931,6 +10998,12 @@ def weather_graph(state: RouterState):
"thread_id": "14",
"checkpoint_ns": AnyStr("weather_graph:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("weather_graph:"): AnyStr(),
}
),
}
},
tasks=(
Expand Down Expand Up @@ -11020,6 +11093,12 @@ def weather_graph(state: RouterState):
"thread_id": "14",
"checkpoint_ns": AnyStr("weather_graph:"),
"checkpoint_id": AnyStr(),
"checkpoint_map": AnyDict(
{
"": AnyStr(),
AnyStr("weather_graph:"): AnyStr(),
}
),
}
},
tasks=(),
Expand Down Expand Up @@ -11917,6 +11996,8 @@ def normalize_config(config: Optional[dict]) -> Optional[dict]:
clean_config["thread_id"] = config["configurable"]["thread_id"]
clean_config["checkpoint_id"] = config["configurable"]["checkpoint_id"]
clean_config["checkpoint_ns"] = config["configurable"]["checkpoint_ns"]
if "checkpoint_map" in config["configurable"]:
clean_config["checkpoint_map"] = config["configurable"]["checkpoint_map"]

return clean_config

Expand Down
Loading

0 comments on commit edec5c0

Please sign in to comment.