Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph Pregel Debug

From Leeroopedia
Knowledge Sources
Domains Debugging, Execution
Last Updated 2026-02-11 16:00 GMT

Overview

The Pregel debug module provides structured event emitters that produce `task`, `task_result`, and `checkpoint` payloads for the `stream_mode=debug` streaming interface.

Description

`debug.py` implements the debug event pipeline for LangGraph's Pregel execution engine. When a graph is streamed with `stream_mode="debug"`, this module's generator functions are called at key lifecycle points to produce structured event payloads that expose internal execution details.

`map_debug_tasks` iterates over executable tasks at the start of a superstep and yields `TaskPayload` dicts containing the task ID, node name, input value, and trigger channels. Tasks tagged with `TAG_HIDDEN` are excluded from debug output. `map_debug_task_results` is called after a task completes and produces `TaskResultPayload` dicts that include the task ID, name, any error string, the channel writes folded into a result dict, and a list of serialized interrupt objects. The `map_task_result_writes` helper handles the aggregation of multiple writes to the same channel by wrapping them in a `{"$writes": [...]}` envelope.

`map_debug_checkpoint` produces a comprehensive `CheckpointPayload` at the end of each superstep, containing the current config, parent config, channel values, metadata, the list of next node names, and enriched task details (including errors, interrupts, results, and subgraph state references). It delegates to the `tasks_w_writes` helper, which joins tasks with their pending writes from the checkpointer to construct complete `PregelTask` tuples with error, interrupt, result, and state information.

The module also defines several TypedDict classes (`TaskPayload`, `TaskResultPayload`, `CheckpointTask`, `CheckpointPayload`) that formalize the shape of debug events, and provides terminal coloring utilities (`get_colored_text`, `get_bolded_text`) used for console-based debug output in development environments.

Usage

These functions are called by the Pregel streaming infrastructure when `stream_mode="debug"` is active. Application developers consume the resulting events through the stream iterator but do not call these functions directly. The TypedDict definitions are useful for understanding and typing debug event consumers.

Code Reference

Source Location

Signature

def map_debug_tasks(
    tasks: Iterable[PregelExecutableTask],
) -> Iterator[TaskPayload]: ...

def map_debug_task_results(
    task_tup: tuple[PregelExecutableTask, Sequence[tuple[str, Any]]],
    stream_keys: str | Sequence[str],
) -> Iterator[TaskResultPayload]: ...

def map_debug_checkpoint(
    config: RunnableConfig,
    channels: Mapping[str, BaseChannel],
    stream_channels: str | Sequence[str],
    metadata: CheckpointMetadata,
    tasks: Iterable[PregelExecutableTask],
    pending_writes: list[PendingWrite],
    parent_config: RunnableConfig | None,
    output_keys: str | Sequence[str],
) -> Iterator[CheckpointPayload]: ...

def tasks_w_writes(
    tasks: Iterable[PregelTask | PregelExecutableTask],
    pending_writes: list[PendingWrite] | None,
    states: dict[str, RunnableConfig | StateSnapshot] | None,
    output_keys: str | Sequence[str],
) -> tuple[PregelTask, ...]: ...

Import

# Internal module -- not part of the public API
from langgraph.pregel.debug import (
    map_debug_tasks,
    map_debug_task_results,
    map_debug_checkpoint,
    tasks_w_writes,
)

I/O Contract

`map_debug_tasks`

Direction Name Type Description
Input tasks `Iterable[PregelExecutableTask]` Tasks about to execute in the current superstep
Output (yields) `TaskPayload` Dicts with keys `id`, `name`, `input`, `triggers`

`map_debug_task_results`

Direction Name Type Description
Input task_tup `tuple[PregelExecutableTask, Sequence[tuple[str, Any]]]` A task paired with its writes
Input stream_keys Sequence[str]` Channel names to include in result output
Output (yields) `TaskResultPayload` Dicts with keys `id`, `name`, `error`, `result`, `interrupts`

`map_debug_checkpoint`

Direction Name Type Description
Input config `RunnableConfig` Current run configuration
Input channels `Mapping[str, BaseChannel]` Current channel state
Input stream_channels Sequence[str]` Channels to read for checkpoint values
Input metadata `CheckpointMetadata` Metadata for the checkpoint
Input tasks `Iterable[PregelExecutableTask]` Tasks from the current superstep
Input pending_writes `list[PendingWrite]` Writes pending in the checkpointer
Input parent_config None` Parent checkpoint configuration
Input output_keys Sequence[str]` Output channel keys for result mapping
Output (yields) `CheckpointPayload` Dict with keys `config`, `parent_config`, `values`, `metadata`, `next`, `tasks`

Usage Examples

# Consuming debug events through the graph stream interface

from langgraph.graph import StateGraph

graph = StateGraph(...)
# ... build graph ...
app = graph.compile()

# Stream with debug mode to see internal execution details
for mode, payload in app.stream(
    {"input": "hello"},
    stream_mode="debug",
):
    if mode == "debug":
        event_type = payload.get("type")
        if event_type == "task":
            print(f"Task starting: {payload['payload']['name']}")
        elif event_type == "task_result":
            print(f"Task finished: {payload['payload']['name']}, error={payload['payload']['error']}")
        elif event_type == "checkpoint":
            print(f"Checkpoint: next={payload['payload']['next']}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment