Implementation:Langchain ai Langgraph Pregel Invoke
| Metadata | Value |
|---|---|
| Type | Implementation (API Doc) |
| Library | langgraph |
| Source File | libs/langgraph/langgraph/pregel/main.py
|
| Lines | L3024-3112 (invoke), L2407-2473 (stream)
|
| Workflow | Building_a_Stateful_Graph |
Overview
Pregel.invoke runs a compiled graph to completion with a single input and returns the final result. Pregel.stream runs the graph and yields intermediate results as an iterator. These are the two primary execution methods on a compiled StateGraph (which is a CompiledStateGraph extending Pregel).
Description
invoke() is a convenience wrapper around stream(). It iterates through all streamed events, collecting either the latest "values" output (when stream_mode="values", the default) or all chunks (for other stream modes). It also aggregates any interrupts and includes them in the return value.
stream() is the core execution method. It sets up the streaming infrastructure (queue, callback managers, stream mode handlers), resolves defaults for interrupt/durability/output settings, and enters the Pregel execution loop. Each superstep's output is yielded to the caller as it becomes available.
Both methods also have async counterparts: ainvoke() and astream().
Usage
from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END
class State(TypedDict):
x: int
def add_one(state: State) -> dict:
return {"x": state["x"] + 1}
builder = StateGraph(State)
builder.add_node(add_one)
builder.add_edge(START, "add_one")
builder.add_edge("add_one", END)
graph = builder.compile()
# invoke: get final result
result = graph.invoke({"x": 0})
# {'x': 1}
# stream: iterate over intermediate results
for chunk in graph.stream({"x": 0}, stream_mode="updates"):
print(chunk)
Code Reference
Source Location
| Item | Path | Lines |
|---|---|---|
invoke |
libs/langgraph/langgraph/pregel/main.py |
L3024-3112 |
stream |
libs/langgraph/langgraph/pregel/main.py |
L2407-2473 (signature + docstring) |
Pregel class |
libs/langgraph/langgraph/pregel/main.py |
L324 |
Signature
def invoke(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any:
def stream(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
) -> Iterator[dict[str, Any] | Any]:
Import
These methods are called on a compiled graph instance. No separate import is needed beyond the graph construction imports:
from langgraph.graph import StateGraph, START, END
# Build and compile
builder = StateGraph(MyState)
# ... add nodes and edges ...
graph = builder.compile()
# Then call invoke/stream directly
result = graph.invoke(input_data)
I/O Contract
invoke Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input |
Command | None | required | The input data. Typically a dict matching the input schema. Pass None to resume from a checkpoint.
|
config |
None | None |
Runtime configuration including thread_id for checkpointing, callbacks, tags, etc.
|
context |
None | None |
Static runtime context (immutable data available to all nodes). |
stream_mode |
StreamMode |
"values" |
Controls the shape of the return value. "values" returns the final state; other modes return a list of chunks.
|
print_mode |
Sequence[StreamMode] | () |
Stream modes to print to console for debugging. Does not affect the return value. |
output_keys |
Sequence[str] | None | None |
Specific state keys to include in the output. Defaults to all output channels. |
interrupt_before |
Sequence[str] | None | None |
Nodes to pause before (overrides compile-time setting for this invocation). |
interrupt_after |
Sequence[str] | None | None |
Nodes to pause after (overrides compile-time setting for this invocation). |
durability |
None | None |
Checkpoint persistence timing: "sync", "async", or "exit".
|
Returns: dict[str, Any] | Any -- When stream_mode="values", returns the final state dict. For other modes, returns a list of output chunks.
stream Parameters
All parameters from invoke are supported, plus:
| Parameter | Type | Default | Description |
|---|---|---|---|
subgraphs |
bool |
False |
If True, emit events from subgraphs as (namespace, data) tuples.
|
debug |
None | None |
Enable debug output for this stream invocation. |
Yields: dict[str, Any] | Any -- Output events whose shape depends on stream_mode.
Usage Examples
Basic invoke
result = graph.invoke({"x": 0})
# {'x': 1}
invoke with Checkpointer
from langgraph.checkpoint.memory import InMemorySaver
graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "thread-1"}}
result = graph.invoke({"x": 0}, config)
# State is persisted; subsequent invocations accumulate
result = graph.invoke(None, config) # resume from last checkpoint
Streaming with "updates" Mode
for event in graph.stream({"x": 0}, stream_mode="updates"):
print(event)
# {'add_one': {'x': 1}}
Streaming with "values" Mode
for state in graph.stream({"x": 0}, stream_mode="values"):
print(state)
# {'x': 0} (initial state)
# {'x': 1} (after add_one)
Multiple Stream Modes
for mode, data in graph.stream({"x": 0}, stream_mode=["values", "updates"]):
print(f"{mode}: {data}")
# values: {'x': 0}
# updates: {'add_one': {'x': 1}}
# values: {'x': 1}
Streaming with Context
result = graph.invoke({"x": 0}, context={"multiplier": 2.0})
Async Execution
import asyncio
async def main():
result = await graph.ainvoke({"x": 0})
print(result)
async for event in graph.astream({"x": 0}, stream_mode="updates"):
print(event)
asyncio.run(main())
Related Pages
- Langchain_ai_Langgraph_Graph_Execution
- Environment:Langchain_ai_Langgraph_Python_Runtime_Environment
- Heuristic:Langchain_ai_Langgraph_Retry_Policy_Configuration
- Heuristic:Langchain_ai_Langgraph_Recursion_Limit_Tuning
- Heuristic:Langchain_ai_Langgraph_Stream_Mode_Selection
- Langchain_ai_Langgraph_StateGraph_Compile
- Langchain_ai_Langgraph_StateGraph_Add_Edge
- Langchain_ai_Langgraph_Add_Messages