Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langchain ai Langgraph Pregel Stream With Interrupts

From Leeroopedia
Property Value
API Pregel.stream(self, input, config, *, stream_mode=None, ...) with interrupt handling
Type API Doc
Workflow Human_in_the_Loop_Agent
Pipeline Stage Execution / Streaming
Repository Langchain_ai_Langgraph
Source libs/langgraph/langgraph/pregel/main.py:L2407-2506
Also covers GraphInterrupt exception at libs/langgraph/langgraph/errors.py:L84-90

Overview

Pregel.stream() executes a compiled graph and yields output events as an iterator. When the graph encounters an interrupt -- whether from compile-time interrupt_before/interrupt_after configuration or from an in-node interrupt() call -- the method checkpoints the state and yields an __interrupt__ event containing the interrupt data before terminating.

The GraphInterrupt exception is a subclass of GraphBubbleUp that carries interrupt data from nodes to the Pregel execution loop. It is never surfaced directly to the user; instead, it is caught internally and translated into __interrupt__ stream events.

Description

The stream() method sets up the execution environment, resolves defaults for stream modes, checkpointer, store, and interrupt lists, then enters the Pregel execution loop. The interrupt-relevant behavior occurs within this loop:

  1. The method accepts input which can be the initial graph input, a Command (for resuming), or None (for continuing after a compile-time interrupt).
  2. At each super-step, the loop checks interrupt_before and interrupt_after conditions.
  3. If a node raises GraphInterrupt, the exception is caught, the interrupt data is recorded, and the loop terminates.
  4. The __interrupt__ event is yielded to the caller containing a tuple of Interrupt objects.

The GraphInterrupt exception is defined as:

class GraphInterrupt(GraphBubbleUp):
    """Raised when a subgraph is interrupted, suppressed by the root graph.
    Never raised directly, or surfaced to the user."""

    def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:
        super().__init__(interrupts)

Usage

from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command

config = {"configurable": {"thread_id": "my-thread"}}

# Run graph until interrupt
for event in graph.stream({"messages": [user_input]}, config):
    if "__interrupt__" in event:
        # Handle interrupt
        interrupts = event["__interrupt__"]
        for intr in interrupts:
            print(f"Interrupt: {intr.value} (id={intr.id})")
    else:
        print(event)

# Resume after human provides input
for event in graph.stream(Command(resume="approved"), config):
    print(event)

Code Reference

Source Location

Item Path
Pregel.stream() libs/langgraph/langgraph/pregel/main.py, Lines 2407-2506+
GraphInterrupt libs/langgraph/langgraph/errors.py, Lines 84-90
GraphBubbleUp libs/langgraph/langgraph/errors.py, Lines 80-81

Signature

def stream(
    self,
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] | None = None,
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    subgraphs: bool = False,
    debug: bool | None = None,
) -> Iterator[dict[str, Any] | Any]:
class GraphInterrupt(GraphBubbleUp):
    def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:

Import

from langgraph.errors import GraphInterrupt

I/O Contract

Pregel.stream() key parameters

Parameter Type Default Description
input Command | None (required) The graph input. Pass a Command to resume from an interrupt. Pass None to continue after a compile-time interrupt without modifications.
config None None Configuration including thread_id for checkpointing.
stream_mode Sequence[StreamMode] | None None Output mode(s). Interrupts are always surfaced via __interrupt__ regardless of mode.
interrupt_before Sequence[str] | None None Override compile-time interrupt-before settings for this invocation.
interrupt_after Sequence[str] | None None Override compile-time interrupt-after settings for this invocation.

Yields: dict[str, Any] | Any -- Stream events. When an interrupt occurs, yields {"__interrupt__": (Interrupt(value=..., id=...),)}.

GraphInterrupt

Parameter Type Default Description
interrupts Sequence[Interrupt] () A sequence of Interrupt objects containing the interrupt values and IDs.

Behavior: Raised internally by the interrupt() function or by the Pregel loop for compile-time interrupts. Never surfaces to the user; always caught and converted to __interrupt__ stream events.

Usage Examples

Streaming with interrupt detection

import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START
from langgraph.types import interrupt, Command

class State(TypedDict):
    question: str
    answer: str | None

def ask_human(state: State):
    response = interrupt(f"Please answer: {state['question']}")
    return {"answer": response}

builder = StateGraph(State)
builder.add_node("ask", ask_human)
builder.add_edge(START, "ask")

graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": str(uuid.uuid4())}}

# First stream: runs until interrupt
events = list(graph.stream({"question": "What color?"}, config))
# events[-1] == {"__interrupt__": (Interrupt(value="Please answer: What color?", id="..."),)}

# Second stream: resume with human input
events = list(graph.stream(Command(resume="Blue"), config))
# events[-1] == {"ask": {"answer": "Blue"}}

Handling compile-time interrupt_before

graph = builder.compile(
    checkpointer=InMemorySaver(),
    interrupt_before=["tools"],
)

config = {"configurable": {"thread_id": "t1"}}

# Run until tools interrupt
for event in graph.stream(input_data, config):
    print(event)
# Graph pauses before "tools" node

# Continue execution without modifying state
for event in graph.stream(None, config):
    print(event)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment