Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Langchain ai Langgraph Pregel Invoke

From Leeroopedia
Metadata Value
Type Implementation (API Doc)
Library langgraph
Source File libs/langgraph/langgraph/pregel/main.py
Lines L3024-3112 (invoke), L2407-2473 (stream)
Workflow Building_a_Stateful_Graph

Overview

Pregel.invoke runs a compiled graph to completion with a single input and returns the final result. Pregel.stream runs the graph and yields intermediate results as an iterator. These are the two primary execution methods on a compiled StateGraph (which is a CompiledStateGraph extending Pregel).

Description

invoke() is a convenience wrapper around stream(). It iterates through all streamed events, collecting either the latest "values" output (when stream_mode="values", the default) or all chunks (for other stream modes). It also aggregates any interrupts and includes them in the return value.

stream() is the core execution method. It sets up the streaming infrastructure (queue, callback managers, stream mode handlers), resolves defaults for interrupt/durability/output settings, and enters the Pregel execution loop. Each superstep's output is yielded to the caller as it becomes available.

Both methods also have async counterparts: ainvoke() and astream().

Usage

from typing_extensions import TypedDict
from langgraph.graph import StateGraph, START, END

class State(TypedDict):
    x: int

def add_one(state: State) -> dict:
    return {"x": state["x"] + 1}

builder = StateGraph(State)
builder.add_node(add_one)
builder.add_edge(START, "add_one")
builder.add_edge("add_one", END)
graph = builder.compile()

# invoke: get final result
result = graph.invoke({"x": 0})
# {'x': 1}

# stream: iterate over intermediate results
for chunk in graph.stream({"x": 0}, stream_mode="updates"):
    print(chunk)

Code Reference

Source Location

Item Path Lines
invoke libs/langgraph/langgraph/pregel/main.py L3024-3112
stream libs/langgraph/langgraph/pregel/main.py L2407-2473 (signature + docstring)
Pregel class libs/langgraph/langgraph/pregel/main.py L324

Signature

def invoke(
    self,
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode = "values",
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    **kwargs: Any,
) -> dict[str, Any] | Any:
def stream(
    self,
    input: InputT | Command | None,
    config: RunnableConfig | None = None,
    *,
    context: ContextT | None = None,
    stream_mode: StreamMode | Sequence[StreamMode] | None = None,
    print_mode: StreamMode | Sequence[StreamMode] = (),
    output_keys: str | Sequence[str] | None = None,
    interrupt_before: All | Sequence[str] | None = None,
    interrupt_after: All | Sequence[str] | None = None,
    durability: Durability | None = None,
    subgraphs: bool = False,
    debug: bool | None = None,
) -> Iterator[dict[str, Any] | Any]:

Import

These methods are called on a compiled graph instance. No separate import is needed beyond the graph construction imports:

from langgraph.graph import StateGraph, START, END

# Build and compile
builder = StateGraph(MyState)
# ... add nodes and edges ...
graph = builder.compile()

# Then call invoke/stream directly
result = graph.invoke(input_data)

I/O Contract

invoke Parameters

Parameter Type Default Description
input Command | None required The input data. Typically a dict matching the input schema. Pass None to resume from a checkpoint.
config None None Runtime configuration including thread_id for checkpointing, callbacks, tags, etc.
context None None Static runtime context (immutable data available to all nodes).
stream_mode StreamMode "values" Controls the shape of the return value. "values" returns the final state; other modes return a list of chunks.
print_mode Sequence[StreamMode] () Stream modes to print to console for debugging. Does not affect the return value.
output_keys Sequence[str] | None None Specific state keys to include in the output. Defaults to all output channels.
interrupt_before Sequence[str] | None None Nodes to pause before (overrides compile-time setting for this invocation).
interrupt_after Sequence[str] | None None Nodes to pause after (overrides compile-time setting for this invocation).
durability None None Checkpoint persistence timing: "sync", "async", or "exit".

Returns: dict[str, Any] | Any -- When stream_mode="values", returns the final state dict. For other modes, returns a list of output chunks.

stream Parameters

All parameters from invoke are supported, plus:

Parameter Type Default Description
subgraphs bool False If True, emit events from subgraphs as (namespace, data) tuples.
debug None None Enable debug output for this stream invocation.

Yields: dict[str, Any] | Any -- Output events whose shape depends on stream_mode.

Usage Examples

Basic invoke

result = graph.invoke({"x": 0})
# {'x': 1}

invoke with Checkpointer

from langgraph.checkpoint.memory import InMemorySaver

graph = builder.compile(checkpointer=InMemorySaver())
config = {"configurable": {"thread_id": "thread-1"}}

result = graph.invoke({"x": 0}, config)
# State is persisted; subsequent invocations accumulate
result = graph.invoke(None, config)  # resume from last checkpoint

Streaming with "updates" Mode

for event in graph.stream({"x": 0}, stream_mode="updates"):
    print(event)
# {'add_one': {'x': 1}}

Streaming with "values" Mode

for state in graph.stream({"x": 0}, stream_mode="values"):
    print(state)
# {'x': 0}  (initial state)
# {'x': 1}  (after add_one)

Multiple Stream Modes

for mode, data in graph.stream({"x": 0}, stream_mode=["values", "updates"]):
    print(f"{mode}: {data}")
# values: {'x': 0}
# updates: {'add_one': {'x': 1}}
# values: {'x': 1}

Streaming with Context

result = graph.invoke({"x": 0}, context={"multiplier": 2.0})

Async Execution

import asyncio

async def main():
    result = await graph.ainvoke({"x": 0})
    print(result)

    async for event in graph.astream({"x": 0}, stream_mode="updates"):
        print(event)

asyncio.run(main())

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment