Implementation:Langchain ai Langgraph Pregel Debug
| Knowledge Sources | |
|---|---|
| Domains | Debugging, Execution |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
The Pregel debug module provides structured event emitters that produce `task`, `task_result`, and `checkpoint` payloads for the `stream_mode=debug` streaming interface.
Description
`debug.py` implements the debug event pipeline for LangGraph's Pregel execution engine. When a graph is streamed with `stream_mode="debug"`, this module's generator functions are called at key lifecycle points to produce structured event payloads that expose internal execution details.
`map_debug_tasks` iterates over executable tasks at the start of a superstep and yields `TaskPayload` dicts containing the task ID, node name, input value, and trigger channels. Tasks tagged with `TAG_HIDDEN` are excluded from debug output. `map_debug_task_results` is called after a task completes and produces `TaskResultPayload` dicts that include the task ID, name, any error string, the channel writes folded into a result dict, and a list of serialized interrupt objects. The `map_task_result_writes` helper handles the aggregation of multiple writes to the same channel by wrapping them in a `{"$writes": [...]}` envelope.
`map_debug_checkpoint` produces a comprehensive `CheckpointPayload` at the end of each superstep, containing the current config, parent config, channel values, metadata, the list of next node names, and enriched task details (including errors, interrupts, results, and subgraph state references). It delegates to the `tasks_w_writes` helper, which joins tasks with their pending writes from the checkpointer to construct complete `PregelTask` tuples with error, interrupt, result, and state information.
The module also defines several TypedDict classes (`TaskPayload`, `TaskResultPayload`, `CheckpointTask`, `CheckpointPayload`) that formalize the shape of debug events, and provides terminal coloring utilities (`get_colored_text`, `get_bolded_text`) used for console-based debug output in development environments.
Usage
These functions are called by the Pregel streaming infrastructure when `stream_mode="debug"` is active. Application developers consume the resulting events through the stream iterator but do not call these functions directly. The TypedDict definitions are useful for understanding and typing debug event consumers.
Code Reference
Source Location
- Repository: Langchain_ai_Langgraph
- File: libs/langgraph/langgraph/pregel/debug.py
Signature
def map_debug_tasks(
tasks: Iterable[PregelExecutableTask],
) -> Iterator[TaskPayload]: ...
def map_debug_task_results(
task_tup: tuple[PregelExecutableTask, Sequence[tuple[str, Any]]],
stream_keys: str | Sequence[str],
) -> Iterator[TaskResultPayload]: ...
def map_debug_checkpoint(
config: RunnableConfig,
channels: Mapping[str, BaseChannel],
stream_channels: str | Sequence[str],
metadata: CheckpointMetadata,
tasks: Iterable[PregelExecutableTask],
pending_writes: list[PendingWrite],
parent_config: RunnableConfig | None,
output_keys: str | Sequence[str],
) -> Iterator[CheckpointPayload]: ...
def tasks_w_writes(
tasks: Iterable[PregelTask | PregelExecutableTask],
pending_writes: list[PendingWrite] | None,
states: dict[str, RunnableConfig | StateSnapshot] | None,
output_keys: str | Sequence[str],
) -> tuple[PregelTask, ...]: ...
Import
# Internal module -- not part of the public API
from langgraph.pregel.debug import (
map_debug_tasks,
map_debug_task_results,
map_debug_checkpoint,
tasks_w_writes,
)
I/O Contract
`map_debug_tasks`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | tasks | `Iterable[PregelExecutableTask]` | Tasks about to execute in the current superstep |
| Output | (yields) | `TaskPayload` | Dicts with keys `id`, `name`, `input`, `triggers` |
`map_debug_task_results`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | task_tup | `tuple[PregelExecutableTask, Sequence[tuple[str, Any]]]` | A task paired with its writes |
| Input | stream_keys | Sequence[str]` | Channel names to include in result output |
| Output | (yields) | `TaskResultPayload` | Dicts with keys `id`, `name`, `error`, `result`, `interrupts` |
`map_debug_checkpoint`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | config | `RunnableConfig` | Current run configuration |
| Input | channels | `Mapping[str, BaseChannel]` | Current channel state |
| Input | stream_channels | Sequence[str]` | Channels to read for checkpoint values |
| Input | metadata | `CheckpointMetadata` | Metadata for the checkpoint |
| Input | tasks | `Iterable[PregelExecutableTask]` | Tasks from the current superstep |
| Input | pending_writes | `list[PendingWrite]` | Writes pending in the checkpointer |
| Input | parent_config | None` | Parent checkpoint configuration |
| Input | output_keys | Sequence[str]` | Output channel keys for result mapping |
| Output | (yields) | `CheckpointPayload` | Dict with keys `config`, `parent_config`, `values`, `metadata`, `next`, `tasks` |
Usage Examples
# Consuming debug events through the graph stream interface
from langgraph.graph import StateGraph
graph = StateGraph(...)
# ... build graph ...
app = graph.compile()
# Stream with debug mode to see internal execution details
for mode, payload in app.stream(
{"input": "hello"},
stream_mode="debug",
):
if mode == "debug":
event_type = payload.get("type")
if event_type == "task":
print(f"Task starting: {payload['payload']['name']}")
elif event_type == "task_result":
print(f"Task finished: {payload['payload']['name']}, error={payload['payload']['error']}")
elif event_type == "checkpoint":
print(f"Checkpoint: next={payload['payload']['next']}")