Implementation:Langchain ai Langgraph Pregel Invoke For Functional
| Attribute | Value |
|---|---|
| API | Pregel.invoke() and Pregel.stream() -- functional API angle
|
| Workflow | Functional_API_Workflow |
| Type | API Doc |
| Repository | Langchain_ai_Langgraph |
| Source File | libs/langgraph/langgraph/pregel/main.py
|
| Source Lines | L3024-3112 (invoke), L2407-2566+ (stream)
|
Overview
The Pregel.invoke() and Pregel.stream() methods are the primary execution interfaces for LangGraph workflows. When used with the functional API, these methods operate on the Pregel instance produced by the @entrypoint decorator. The functional API imposes specific semantics: the input is a single value (not a state dictionary), the default stream mode is "updates", and the return value is the entrypoint function's output rather than a state snapshot.
This page documents these methods from the functional API perspective, covering how single-value inputs are dispatched, how the entrypoint's return value is surfaced, and how interrupt handling works.
Description
Pregel.invoke()
The invoke() method (lines 3024-3112) runs the graph to completion and returns the final output. It is implemented as a wrapper around stream():
- It calls
self.stream()with the provided input and configuration. - When
stream_mode="values"(the default forinvoke()), it internally usesstream_mode=["updates", "values"]to capture both interrupt events and final values. - It iterates over all streamed chunks:
- For
"values"mode: tracks the latest value payload and collects any interrupts from"updates"events. - For other modes: collects all chunks into a list.
- For
- Returns the latest value (for
"values"mode) or the list of chunks (for other modes). - If interrupts occurred during execution, they are included in the return value.
For functional API workflows, the return value is the entrypoint function's return value (or the value field of entrypoint.final). This is a single value, not a state dictionary.
Pregel.stream()
The stream() method (lines 2407-2566+) is a generator that yields output events as the workflow executes:
- Resolves defaults for stream mode, output keys, interrupt configuration, checkpointer, store, cache, and durability mode via
self._defaults(). - Creates a
SyncQueuefor collecting stream events. - Sets up a
CallbackManagerfor tracing and monitoring. - Configures stream mode-specific handlers:
"messages"mode: attaches aStreamMessagesHandlerto intercept LLM token events."custom"mode: creates astream_writercallback that tasks can use to emit custom data.
- Executes the Pregel graph loop, yielding events from the stream queue.
- Handles cleanup and error reporting via the callback manager.
For functional API workflows, the key differences from graph-based streaming are:
- The default stream mode is
"updates"(set byentrypoint.__call__when constructing the Pregel instance). stream_eager=Trueis set, so events are emitted as soon as they are available.- The
"values"stream mode emits values once at the end of the workflow (not after each node, since there is only one node). - The input channel is a single
EphemeralValue, so the input is consumed once and not merged into a state dictionary.
Input Handling for Functional API
When the Pregel engine receives an input for a functional API workflow:
- The input value is written directly to the
STARTchannel (anEphemeralValue). - This triggers the single
PregelNode(the entrypoint function). - The entrypoint receives the input as its first positional argument.
For resume after interrupt, the input is a Command(resume=value) object. The Pregel engine detects this and routes the resume value to the interrupted continuation rather than treating it as a new workflow input.
Return Value Semantics
The return value flow for functional API workflows:
- The entrypoint function returns a value (or
entrypoint.finalinstance). _pluck_return_valueextracts the caller-facing value and writes it to theENDchannel._pluck_save_valueextracts the checkpoint value and writes it to thePREVIOUSchannel.invoke()reads theENDchannel value and returns it directly.stream()yields the value as an"updates"event keyed by the entrypoint function name.
Usage
from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
@task
def process(x: int) -> int:
return x * 2
@entrypoint(checkpointer=InMemorySaver())
def my_workflow(x: int) -> int:
return process(x).result()
config = {"configurable": {"thread_id": "t1"}}
# invoke() -- returns the final value directly
result = my_workflow.invoke(5, config) # Returns 10
# stream() -- yields update events
for chunk in my_workflow.stream(5, config):
print(chunk) # Prints updates as they occur
# stream() with multiple modes
for mode, data in my_workflow.stream(5, config, stream_mode=["updates", "custom"]):
print(f"{mode}: {data}")
Code Reference
Source Location
| File | libs/langgraph/langgraph/pregel/main.py
|
invoke() |
Lines 3024-3112 |
stream() |
Lines 2407-2566+ |
ainvoke() |
Lines 3114+ (async counterpart) |
astream() |
(async counterpart of stream())
|
Signature
class Pregel:
def invoke(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode = "values",
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
**kwargs: Any,
) -> dict[str, Any] | Any
def stream(
self,
input: InputT | Command | None,
config: RunnableConfig | None = None,
*,
context: ContextT | None = None,
stream_mode: StreamMode | Sequence[StreamMode] | None = None,
print_mode: StreamMode | Sequence[StreamMode] = (),
output_keys: str | Sequence[str] | None = None,
interrupt_before: All | Sequence[str] | None = None,
interrupt_after: All | Sequence[str] | None = None,
durability: Durability | None = None,
subgraphs: bool = False,
debug: bool | None = None,
) -> Iterator[dict[str, Any] | Any]
Import
# Methods are called on the entrypoint-decorated function, which IS a Pregel instance
from langgraph.func import entrypoint
@entrypoint()
def my_workflow(x: int) -> int:
return x
# my_workflow is a Pregel instance
my_workflow.invoke(42)
for chunk in my_workflow.stream(42):
print(chunk)
I/O Contract
invoke() Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input |
Command | None | (required) | The workflow input. For functional API, this is a single value of any type. Use Command(resume=value) to resume after interrupt. Use None for no-input invocations.
|
config |
None | None |
Runtime configuration. Must include {"configurable": {"thread_id": "..."}} when using a checkpointer.
|
context |
None | None |
Static context object, typed according to context_schema.
|
stream_mode |
StreamMode |
"values" |
How to process stream output. For invoke(), "values" returns the final value directly.
|
print_mode |
Sequence[StreamMode] | () |
Stream modes to print to console for debugging. Does not affect the return value. |
output_keys |
Sequence[str] | None | None |
Specific output channels to include. Defaults to all output channels. |
interrupt_before |
Sequence[str] | None | None |
Node names to interrupt before. Use "*" for all nodes.
|
interrupt_after |
Sequence[str] | None | None |
Node names to interrupt after. Use "*" for all nodes.
|
durability |
None | None |
Checkpoint persistence mode: "sync", "async", or "exit".
|
invoke() Return Value
| stream_mode | Return Type | Description |
|---|---|---|
"values" |
Any |
The entrypoint's return value. For entrypoint.final, the value field. If interrupts occurred, returns {INTERRUPT: [...]} or merges interrupts into the value dict.
|
| Other modes | list[Any] |
List of all streamed chunks. |
stream() Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input |
Command | None | (required) | Same as invoke().
|
config |
None | None |
Same as invoke().
|
context |
None | None |
Same as invoke().
|
stream_mode |
Sequence[StreamMode] | None | None |
Defaults to self.stream_mode, which is "updates" for functional API workflows. Pass a list for multiple modes.
|
print_mode |
Sequence[StreamMode] | () |
Debug printing modes. |
output_keys |
Sequence[str] | None | None |
Channels to stream. |
interrupt_before |
Sequence[str] | None | None |
Nodes to interrupt before. |
interrupt_after |
Sequence[str] | None | None |
Nodes to interrupt after. |
durability |
None | None |
Checkpoint persistence mode. |
subgraphs |
bool |
False |
Whether to include events from nested subgraphs. If True, events are emitted as (namespace, data) tuples.
|
debug |
None | None |
Enable debug output (sets print_mode=["updates", "values"]).
|
stream() Yield Value
| Condition | Yield Type | Description |
|---|---|---|
| Single stream mode | Any |
The event data for the configured stream mode. |
| Multiple stream modes | tuple[StreamMode, Any] |
Tuple of (mode_name, event_data).
|
| Subgraphs enabled | tuple[tuple[str, ...], StreamMode, Any] |
Tuple of (namespace_path, mode_name, event_data).
|
Usage Examples
Simple Invoke
from langgraph.func import entrypoint
@entrypoint()
def double(x: int) -> int:
return x * 2
result = double.invoke(21) # Returns 42
Streaming Updates
from langgraph.func import entrypoint, task
@task
def step_a(x: int) -> int:
return x + 1
@task
def step_b(x: int) -> int:
return x * 2
@entrypoint()
def pipeline(x: int) -> int:
a = step_a(x).result()
return step_b(a).result()
# Default stream_mode for functional API is "updates"
for chunk in pipeline.stream(5):
print(chunk)
# Output includes task updates and the final entrypoint return
Interrupt and Resume
from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver
@task
def generate(prompt: str) -> str:
return f"Generated: {prompt}"
@entrypoint(checkpointer=InMemorySaver())
def approval_workflow(prompt: str) -> dict:
content = generate(prompt).result()
approval = interrupt({"content": content, "question": "Approve?"})
return {"content": content, "approved": approval}
config = {"configurable": {"thread_id": "approval-1"}}
# First invocation -- pauses at interrupt
for chunk in approval_workflow.stream("Write a report", config):
print(chunk)
# Resume with approval
for chunk in approval_workflow.stream(Command(resume="yes"), config):
print(chunk)
# Final output: {"content": "Generated: Write a report", "approved": "yes"}
Multiple Stream Modes
from langgraph.func import entrypoint, task
@task
def compute(x: int) -> int:
return x ** 2
@entrypoint()
def workflow(x: int) -> int:
return compute(x).result()
# Stream with multiple modes -- yields (mode, data) tuples
for mode, data in workflow.stream(4, stream_mode=["updates", "values"]):
print(f"[{mode}] {data}")
Async Execution
import asyncio
from langgraph.func import entrypoint, task
@task
async def async_task(x: int) -> int:
return x + 1
@entrypoint()
async def async_workflow(x: int) -> int:
return await async_task(x)
# Use ainvoke / astream for async entrypoints
result = await async_workflow.ainvoke(5) # Returns 6
async for chunk in async_workflow.astream(5):
print(chunk)