Implementation:Langchain ai Langgraph Pregel Invoke For Agents
| Attribute | Value |
|---|---|
| API | CompiledGraph.invoke and CompiledGraph.stream
|
| Workflow | ReAct_Agent_Creation |
| Type | API Doc |
| Repository | Langchain_ai_Langgraph |
| Source File | libs/langgraph/langgraph/pregel/main.py
|
| Source Lines | L3024-3112 (invoke), L2407-2470 (stream) |
Overview
CompiledGraph.invoke and CompiledGraph.stream are the primary execution methods for running a compiled ReAct agent graph. invoke runs the agent to completion and returns the final state, while stream returns an iterator yielding intermediate results as the agent progresses through its reasoning-and-action loop. Both methods accept agent input in the {"messages": [...]} format and support configuration for checkpointing, interrupts, streaming modes, and durability.
This page documents these methods from the angle of agent-specific execution with message-based input.
Description
invoke
The invoke method runs the graph to completion and returns the final output. Internally, it calls stream with stream_mode=["updates", "values"] and collects results:
- When
stream_mode="values"(the default), it tracks the latest"values"payload and returns it as the final result. - When a different
stream_modeis specified, it collects all chunks into a list and returns them. - If interrupts occur during execution, they are included in the returned dictionary under the
INTERRUPTkey.
For a ReAct agent, a typical invocation looks like:
result = agent.invoke({"messages": [("user", "What is 2 + 2?")]})
# result = {"messages": [HumanMessage(...), AIMessage(...), ...]}
stream
The stream method returns an iterator that yields intermediate outputs as the graph executes step by step. The stream_mode parameter controls what is emitted:
"values": Emits the full state after each step."updates": Emits node names and their outputs for each step."messages": Emits LLM tokens as(token, metadata)tuples during generation."custom": Emits custom data written viaStreamWriterinside nodes."checkpoints": Emits checkpoint events."tasks": Emits task start/finish events."debug": Emits detailed debug information.
Multiple modes can be combined in a list, producing (mode, data) tuples. When subgraphs=True, events from nested subgraphs are also emitted with namespace prefixes.
For a ReAct agent streaming step-by-step:
for chunk in agent.stream(
{"messages": [("user", "Search for X")]},
stream_mode="updates",
):
print(chunk)
# {"agent": {"messages": [AIMessage(...)]}}
# {"tools": {"messages": [ToolMessage(...)]}}
# {"agent": {"messages": [AIMessage(...)]}}
Interrupt and Resume
When interrupt_before or interrupt_after is configured (either at graph compilation or at invocation time), execution pauses at the specified nodes. This is critical for human-in-the-loop agent workflows:
# Pause before tool execution
result = agent.invoke(
{"messages": [("user", "Delete all files")]},
config={"configurable": {"thread_id": "1"}},
interrupt_before=["tools"],
)
# Review the proposed tool calls in result["messages"][-1].tool_calls
# Resume execution
result = agent.invoke(None, config={"configurable": {"thread_id": "1"}})
Code Reference
Source Location
| File | libs/langgraph/langgraph/pregel/main.py
|
| invoke | Lines 3024-3112 |
| stream | Lines 2407-2470 (signature and docstring) |
| Class | Pregel (parent of CompiledStateGraph)
|
Signature
def invoke(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
def stream(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
**kwargs: Any,
) -> Iterator[dict[str, Any] | Any]
Import
# These methods are available on any compiled graph
from langgraph.prebuilt import create_react_agent
agent = create_react_agent(model, tools)
# agent.invoke(...) and agent.stream(...) are available directly
I/O Contract
invoke Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input |
Command | None | (required) | Input data. For agents: {"messages": [...]}. Use None to resume from an interrupt. Use Command for programmatic control.
|
config |
None | None |
Run configuration. Use {"configurable": {"thread_id": "..."}} for checkpointed runs.
|
context |
None | None |
Static runtime context accessible by nodes via Runtime.context.
|
stream_mode |
StreamMode |
"values" |
Output mode. "values" returns final state; other modes return a list of chunks.
|
print_mode |
Sequence[StreamMode] | () |
Modes to print to console for debugging. Does not affect return value. |
output_keys |
Sequence[str] | None | None |
Specific state keys to include in output. Defaults to all non-context channels. |
interrupt_before |
Sequence[str] | None | None |
Nodes to pause before. For agents: ["agent"] or ["tools"].
|
interrupt_after |
Sequence[str] | None | None |
Nodes to pause after. |
durability |
None | None |
Persistence mode: "sync", "async", or "exit".
|
stream Additional Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
subgraphs |
bool |
False |
If True, emit events from nested subgraphs with namespace prefixes.
|
debug |
None | None |
Enable debug-level event emission. |
Output Format
invoke with stream_mode="values":
{
"messages": [
HumanMessage("What is 2 + 2?"),
AIMessage(content="", tool_calls=[{"name": "calc", "args": {"expr": "2+2"}, ...}]),
ToolMessage(content="4", tool_call_id="..."),
AIMessage(content="2 + 2 equals 4."),
],
"remaining_steps": 22,
}
stream with stream_mode="updates":
# Step 1: Agent reasons and calls tool
{"agent": {"messages": [AIMessage(tool_calls=[...])]}}
# Step 2: Tool executes
{"tools": {"messages": [ToolMessage(content="4")]}}
# Step 3: Agent produces final response
{"agent": {"messages": [AIMessage(content="2 + 2 equals 4.")]}}
Usage Examples
Basic Invoke
from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool
@tool
def get_weather(city: str) -> str:
"""Get weather for a city."""
return f"Sunny in {city}"
agent = create_react_agent("openai:gpt-4", tools=[get_weather])
result = agent.invoke({"messages": [("user", "Weather in Paris?")]})
final_answer = result["messages"][-1].content
print(final_answer)
Streaming with Updates
for event in agent.stream(
{"messages": [("user", "Weather in Paris?")]},
stream_mode="updates",
):
for node_name, output in event.items():
print(f"[{node_name}]", output["messages"][-1])
Token-Level Streaming
for token, metadata in agent.stream(
{"messages": [("user", "Tell me about Paris weather")]},
stream_mode="messages",
):
if token.content:
print(token.content, end="", flush=True)
Checkpointed Conversation
from langgraph.checkpoint.memory import MemorySaver
agent = create_react_agent(
"openai:gpt-4",
tools=[get_weather],
checkpointer=MemorySaver(),
)
config = {"configurable": {"thread_id": "conversation-1"}}
# First turn
result = agent.invoke(
{"messages": [("user", "Weather in Paris?")]},
config=config,
)
# Second turn - conversation history is preserved
result = agent.invoke(
{"messages": [("user", "And in London?")]},
config=config,
)
# Agent remembers the context from the first turn
Interrupt and Resume
from langgraph.checkpoint.memory import MemorySaver
agent = create_react_agent(
"openai:gpt-4",
tools=[get_weather],
checkpointer=MemorySaver(),
interrupt_before=["tools"],
)
config = {"configurable": {"thread_id": "review-1"}}
# Execution pauses before tool node
result = agent.invoke(
{"messages": [("user", "Check weather in NYC")]},
config=config,
)
# Inspect proposed tool calls
print(result["messages"][-1].tool_calls)
# Approve and resume
result = agent.invoke(None, config=config)
print(result["messages"][-1].content)