Implementation:Langchain ai Langgraph Pregel Stream With Interrupts
| Property | Value |
|---|---|
| API | Pregel.stream(self, input, config, *, stream_mode=None, ...) with interrupt handling
|
| Type | API Doc |
| Workflow | Human_in_the_Loop_Agent |
| Pipeline Stage | Execution / Streaming |
| Repository | Langchain_ai_Langgraph |
| Source | libs/langgraph/langgraph/pregel/main.py:L2407-2506
|
| Also covers | GraphInterrupt exception at libs/langgraph/langgraph/errors.py:L84-90
|
Overview
Pregel.stream() executes a compiled graph and yields output events as an iterator. When the graph encounters an interrupt -- whether from compile-time interrupt_before/interrupt_after configuration or from an in-node interrupt() call -- the method checkpoints the state and yields an __interrupt__ event containing the interrupt data before terminating.
The GraphInterrupt exception is a subclass of GraphBubbleUp that carries interrupt data from nodes to the Pregel execution loop. It is never surfaced directly to the user; instead, it is caught internally and translated into __interrupt__ stream events.
Description
The stream() method sets up the execution environment, resolves defaults for stream modes, checkpointer, store, and interrupt lists, then enters the Pregel execution loop. The interrupt-relevant behavior occurs within this loop:
- The method accepts
inputwhich can be the initial graph input, aCommand(for resuming), orNone(for continuing after a compile-time interrupt). - At each super-step, the loop checks
interrupt_beforeandinterrupt_afterconditions. - If a node raises
GraphInterrupt, the exception is caught, the interrupt data is recorded, and the loop terminates. - The
__interrupt__event is yielded to the caller containing a tuple ofInterruptobjects.
The GraphInterrupt exception is defined as:
class GraphInterrupt(GraphBubbleUp):
"""Raised when a subgraph is interrupted, suppressed by the root graph.
Never raised directly, or surfaced to the user."""
def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:
super().__init__(interrupts)
Usage
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.types import Command
config = {"configurable": {"thread_id": "my-thread"}}
# Run graph until interrupt
for event in graph.stream({"messages": [user_input]}, config):
if "__interrupt__" in event:
# Handle interrupt
interrupts = event["__interrupt__"]
for intr in interrupts:
print(f"Interrupt: {intr.value} (id={intr.id})")
else:
print(event)
# Resume after human provides input
for event in graph.stream(Command(resume="approved"), config):
print(event)
Code Reference
Source Location
| Item | Path |
|---|---|
Pregel.stream() |
libs/langgraph/langgraph/pregel/main.py, Lines 2407-2506+
|
GraphInterrupt |
libs/langgraph/langgraph/errors.py, Lines 84-90
|
GraphBubbleUp |
libs/langgraph/langgraph/errors.py, Lines 80-81
|
Signature
def stream(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
) -> Iterator[dict[str, Any] | Any]:
class GraphInterrupt(GraphBubbleUp):
def __init__(self, interrupts: Sequence[Interrupt] = ()) -> None:
Import
from langgraph.errors import GraphInterrupt
I/O Contract
Pregel.stream() key parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input |
Command | None | (required) | The graph input. Pass a Command to resume from an interrupt. Pass None to continue after a compile-time interrupt without modifications.
|
config |
None | None |
Configuration including thread_id for checkpointing.
|
stream_mode |
Sequence[StreamMode] | None | None |
Output mode(s). Interrupts are always surfaced via __interrupt__ regardless of mode.
|
interrupt_before |
Sequence[str] | None | None |
Override compile-time interrupt-before settings for this invocation. |
interrupt_after |
Sequence[str] | None | None |
Override compile-time interrupt-after settings for this invocation. |
Yields: dict[str, Any] | Any -- Stream events. When an interrupt occurs, yields {"__interrupt__": (Interrupt(value=..., id=...),)}.
GraphInterrupt
| Parameter | Type | Default | Description |
|---|---|---|---|
interrupts |
Sequence[Interrupt] |
() |
A sequence of Interrupt objects containing the interrupt values and IDs.
|
Behavior: Raised internally by the interrupt() function or by the Pregel loop for compile-time interrupts. Never surfaces to the user; always caught and converted to __interrupt__ stream events.
Usage Examples
Streaming with interrupt detection
import uuid
from langgraph.checkpoint.memory import InMemorySaver
from langgraph.graph import StateGraph, START
from langgraph.types import interrupt, Command
class State(TypedDict):
question: str
answer: str | None
def ask_human(state: State):
response = interrupt(f"Please answer: {state['question']}")
return {"answer": response}
builder = StateGraph(State)
builder.add_node("ask", ask_human)
builder.add_edge(START, "ask")
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": str(uuid.uuid4())}}
# First stream: runs until interrupt
events = list(graph.stream({"question": "What color?"}, config))
# events[-1] == {"__interrupt__": (Interrupt(value="Please answer: What color?", id="..."),)}
# Second stream: resume with human input
events = list(graph.stream(Command(resume="Blue"), config))
# events[-1] == {"ask": {"answer": "Blue"}}
Handling compile-time interrupt_before
graph = builder.compile(
checkpointer=InMemorySaver(),
interrupt_before=["tools"],
)
config = {"configurable": {"thread_id": "t1"}}
# Run until tools interrupt
for event in graph.stream(input_data, config):
print(event)
# Graph pauses before "tools" node
# Continue execution without modifying state
for event in graph.stream(None, config):
print(event)