Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langchain ai Langgraph Pregel Invoke For Agents

From Leeroopedia
Attribute Value
API CompiledGraph.invoke and CompiledGraph.stream
Workflow ReAct_Agent_Creation
Type API Doc
Repository Langchain_ai_Langgraph
Source File libs/langgraph/langgraph/pregel/main.py
Source Lines L3024-3112 (invoke), L2407-2470 (stream)

Overview

CompiledGraph.invoke and CompiledGraph.stream are the primary execution methods for running a compiled ReAct agent graph. invoke runs the agent to completion and returns the final state, while stream returns an iterator yielding intermediate results as the agent progresses through its reasoning-and-action loop. Both methods accept agent input in the {"messages": [...]} format and support configuration for checkpointing, interrupts, streaming modes, and durability.

This page documents these methods from the angle of agent-specific execution with message-based input.

Description

invoke

The invoke method runs the graph to completion and returns the final output. Internally, it calls stream with stream_mode=["updates", "values"] and collects results:

  • When stream_mode="values" (the default), it tracks the latest "values" payload and returns it as the final result.
  • When a different stream_mode is specified, it collects all chunks into a list and returns them.
  • If interrupts occur during execution, they are included in the returned dictionary under the INTERRUPT key.

For a ReAct agent, a typical invocation looks like:

result = agent.invoke({"messages": [("user", "What is 2 + 2?")]})
# result = {"messages": [HumanMessage(...), AIMessage(...), ...]}

stream

The stream method returns an iterator that yields intermediate outputs as the graph executes step by step. The stream_mode parameter controls what is emitted:

  • "values": Emits the full state after each step.
  • "updates": Emits node names and their outputs for each step.
  • "messages": Emits LLM tokens as (token, metadata) tuples during generation.
  • "custom": Emits custom data written via StreamWriter inside nodes.
  • "checkpoints": Emits checkpoint events.
  • "tasks": Emits task start/finish events.
  • "debug": Emits detailed debug information.

Multiple modes can be combined in a list, producing (mode, data) tuples. When subgraphs=True, events from nested subgraphs are also emitted with namespace prefixes.

For a ReAct agent streaming step-by-step:

for chunk in agent.stream(
    {"messages": [("user", "Search for X")]},
    stream_mode="updates",
):
    print(chunk)
    # {"agent": {"messages": [AIMessage(...)]}}
    # {"tools": {"messages": [ToolMessage(...)]}}
    # {"agent": {"messages": [AIMessage(...)]}}

Interrupt and Resume

When interrupt_before or interrupt_after is configured (either at graph compilation or at invocation time), execution pauses at the specified nodes. This is critical for human-in-the-loop agent workflows:

# Pause before tool execution
result = agent.invoke(
    {"messages": [("user", "Delete all files")]},
    config={"configurable": {"thread_id": "1"}},
    interrupt_before=["tools"],
)
# Review the proposed tool calls in result["messages"][-1].tool_calls
# Resume execution
result = agent.invoke(None, config={"configurable": {"thread_id": "1"}})

Code Reference

Source Location

File libs/langgraph/langgraph/pregel/main.py
invoke Lines 3024-3112
stream Lines 2407-2470 (signature and docstring)
Class Pregel (parent of CompiledStateGraph)

Signature

def invoke(
    self,
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode = "values",
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    **kwargs: Any,
) -> dict[str, Any] | Any
def stream(
    self,
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] | None = None,
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    subgraphs: bool = False,
    debug: bool | None = None,
    **kwargs: Any,
) -> Iterator[dict[str, Any] | Any]

Import

# These methods are available on any compiled graph
from langgraph.prebuilt import create_react_agent

agent = create_react_agent(model, tools)
# agent.invoke(...) and agent.stream(...) are available directly

I/O Contract

invoke Parameters

Parameter Type Default Description
input Command | None (required) Input data. For agents: {"messages": [...]}. Use None to resume from an interrupt. Use Command for programmatic control.
config None None Run configuration. Use {"configurable": {"thread_id": "..."}} for checkpointed runs.
context None None Static runtime context accessible by nodes via Runtime.context.
stream_mode StreamMode "values" Output mode. "values" returns final state; other modes return a list of chunks.
print_mode Sequence[StreamMode] () Modes to print to console for debugging. Does not affect return value.
output_keys Sequence[str] | None None Specific state keys to include in output. Defaults to all non-context channels.
interrupt_before Sequence[str] | None None Nodes to pause before. For agents: ["agent"] or ["tools"].
interrupt_after Sequence[str] | None None Nodes to pause after.
durability None None Persistence mode: "sync", "async", or "exit".

stream Additional Parameters

Parameter Type Default Description
subgraphs bool False If True, emit events from nested subgraphs with namespace prefixes.
debug None None Enable debug-level event emission.

Output Format

invoke with stream_mode="values":

{
    "messages": [
        HumanMessage("What is 2 + 2?"),
        AIMessage(content="", tool_calls=[{"name": "calc", "args": {"expr": "2+2"}, ...}]),
        ToolMessage(content="4", tool_call_id="..."),
        AIMessage(content="2 + 2 equals 4."),
    ],
    "remaining_steps": 22,
}

stream with stream_mode="updates":

# Step 1: Agent reasons and calls tool
{"agent": {"messages": [AIMessage(tool_calls=[...])]}}
# Step 2: Tool executes
{"tools": {"messages": [ToolMessage(content="4")]}}
# Step 3: Agent produces final response
{"agent": {"messages": [AIMessage(content="2 + 2 equals 4.")]}}

Usage Examples

Basic Invoke

from langgraph.prebuilt import create_react_agent
from langchain_core.tools import tool

@tool
def get_weather(city: str) -> str:
    """Get weather for a city."""
    return f"Sunny in {city}"

agent = create_react_agent("openai:gpt-4", tools=[get_weather])

result = agent.invoke({"messages": [("user", "Weather in Paris?")]})
final_answer = result["messages"][-1].content
print(final_answer)

Streaming with Updates

for event in agent.stream(
    {"messages": [("user", "Weather in Paris?")]},
    stream_mode="updates",
):
    for node_name, output in event.items():
        print(f"[{node_name}]", output["messages"][-1])

Token-Level Streaming

for token, metadata in agent.stream(
    {"messages": [("user", "Tell me about Paris weather")]},
    stream_mode="messages",
):
    if token.content:
        print(token.content, end="", flush=True)

Checkpointed Conversation

from langgraph.checkpoint.memory import MemorySaver

agent = create_react_agent(
    "openai:gpt-4",
    tools=[get_weather],
    checkpointer=MemorySaver(),
)

config = {"configurable": {"thread_id": "conversation-1"}}

# First turn
result = agent.invoke(
    {"messages": [("user", "Weather in Paris?")]},
    config=config,
)

# Second turn - conversation history is preserved
result = agent.invoke(
    {"messages": [("user", "And in London?")]},
    config=config,
)
# Agent remembers the context from the first turn

Interrupt and Resume

from langgraph.checkpoint.memory import MemorySaver

agent = create_react_agent(
    "openai:gpt-4",
    tools=[get_weather],
    checkpointer=MemorySaver(),
    interrupt_before=["tools"],
)

config = {"configurable": {"thread_id": "review-1"}}

# Execution pauses before tool node
result = agent.invoke(
    {"messages": [("user", "Check weather in NYC")]},
    config=config,
)
# Inspect proposed tool calls
print(result["messages"][-1].tool_calls)

# Approve and resume
result = agent.invoke(None, config=config)
print(result["messages"][-1].content)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment