Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph Pregel Invoke For Functional

From Leeroopedia
Revision as of 11:26, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Langchain_ai_Langgraph_Pregel_Invoke_For_Functional.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Attribute Value
API Pregel.invoke() and Pregel.stream() -- functional API angle
Workflow Functional_API_Workflow
Type API Doc
Repository Langchain_ai_Langgraph
Source File libs/langgraph/langgraph/pregel/main.py
Source Lines L3024-3112 (invoke), L2407-2566+ (stream)

Overview

The Pregel.invoke() and Pregel.stream() methods are the primary execution interfaces for LangGraph workflows. When used with the functional API, these methods operate on the Pregel instance produced by the @entrypoint decorator. The functional API imposes specific semantics: the input is a single value (not a state dictionary), the default stream mode is "updates", and the return value is the entrypoint function's output rather than a state snapshot.

This page documents these methods from the functional API perspective, covering how single-value inputs are dispatched, how the entrypoint's return value is surfaced, and how interrupt handling works.

Description

Pregel.invoke()

The invoke() method (lines 3024-3112) runs the graph to completion and returns the final output. It is implemented as a wrapper around stream():

  1. It calls self.stream() with the provided input and configuration.
  2. When stream_mode="values" (the default for invoke()), it internally uses stream_mode=["updates", "values"] to capture both interrupt events and final values.
  3. It iterates over all streamed chunks:
    • For "values" mode: tracks the latest value payload and collects any interrupts from "updates" events.
    • For other modes: collects all chunks into a list.
  4. Returns the latest value (for "values" mode) or the list of chunks (for other modes).
  5. If interrupts occurred during execution, they are included in the return value.

For functional API workflows, the return value is the entrypoint function's return value (or the value field of entrypoint.final). This is a single value, not a state dictionary.

Pregel.stream()

The stream() method (lines 2407-2566+) is a generator that yields output events as the workflow executes:

  1. Resolves defaults for stream mode, output keys, interrupt configuration, checkpointer, store, cache, and durability mode via self._defaults().
  2. Creates a SyncQueue for collecting stream events.
  3. Sets up a CallbackManager for tracing and monitoring.
  4. Configures stream mode-specific handlers:
    • "messages" mode: attaches a StreamMessagesHandler to intercept LLM token events.
    • "custom" mode: creates a stream_writer callback that tasks can use to emit custom data.
  5. Executes the Pregel graph loop, yielding events from the stream queue.
  6. Handles cleanup and error reporting via the callback manager.

For functional API workflows, the key differences from graph-based streaming are:

  • The default stream mode is "updates" (set by entrypoint.__call__ when constructing the Pregel instance).
  • stream_eager=True is set, so events are emitted as soon as they are available.
  • The "values" stream mode emits values once at the end of the workflow (not after each node, since there is only one node).
  • The input channel is a single EphemeralValue, so the input is consumed once and not merged into a state dictionary.

Input Handling for Functional API

When the Pregel engine receives an input for a functional API workflow:

  • The input value is written directly to the START channel (an EphemeralValue).
  • This triggers the single PregelNode (the entrypoint function).
  • The entrypoint receives the input as its first positional argument.

For resume after interrupt, the input is a Command(resume=value) object. The Pregel engine detects this and routes the resume value to the interrupted continuation rather than treating it as a new workflow input.

Return Value Semantics

The return value flow for functional API workflows:

  1. The entrypoint function returns a value (or entrypoint.final instance).
  2. _pluck_return_value extracts the caller-facing value and writes it to the END channel.
  3. _pluck_save_value extracts the checkpoint value and writes it to the PREVIOUS channel.
  4. invoke() reads the END channel value and returns it directly.
  5. stream() yields the value as an "updates" event keyed by the entrypoint function name.

Usage

from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

@task
def process(x: int) -> int:
    return x * 2

@entrypoint(checkpointer=InMemorySaver())
def my_workflow(x: int) -> int:
    return process(x).result()

config = {"configurable": {"thread_id": "t1"}}

# invoke() -- returns the final value directly
result = my_workflow.invoke(5, config)  # Returns 10

# stream() -- yields update events
for chunk in my_workflow.stream(5, config):
    print(chunk)  # Prints updates as they occur

# stream() with multiple modes
for mode, data in my_workflow.stream(5, config, stream_mode=["updates", "custom"]):
    print(f"{mode}: {data}")

Code Reference

Source Location

File libs/langgraph/langgraph/pregel/main.py
invoke() Lines 3024-3112
stream() Lines 2407-2566+
ainvoke() Lines 3114+ (async counterpart)
astream() (async counterpart of stream())

Signature

class Pregel:
    def invoke(
        self,
        input: InputT | Command | None,
        config: RunnableConfig | None = None,
        *,
        context: ContextT | None = None,
        stream_mode: StreamMode = "values",
        print_mode: StreamMode | Sequence[StreamMode] = (),
        output_keys: str | Sequence[str] | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        durability: Durability | None = None,
        **kwargs: Any,
    ) -> dict[str, Any] | Any

    def stream(
        self,
        input: InputT | Command | None,
        config: RunnableConfig | None = None,
        *,
        context: ContextT | None = None,
        stream_mode: StreamMode | Sequence[StreamMode] | None = None,
        print_mode: StreamMode | Sequence[StreamMode] = (),
        output_keys: str | Sequence[str] | None = None,
        interrupt_before: All | Sequence[str] | None = None,
        interrupt_after: All | Sequence[str] | None = None,
        durability: Durability | None = None,
        subgraphs: bool = False,
        debug: bool | None = None,
    ) -> Iterator[dict[str, Any] | Any]

Import

# Methods are called on the entrypoint-decorated function, which IS a Pregel instance
from langgraph.func import entrypoint

@entrypoint()
def my_workflow(x: int) -> int:
    return x

# my_workflow is a Pregel instance
my_workflow.invoke(42)
for chunk in my_workflow.stream(42):
    print(chunk)

I/O Contract

invoke() Parameters

Parameter Type Default Description
input Command | None (required) The workflow input. For functional API, this is a single value of any type. Use Command(resume=value) to resume after interrupt. Use None for no-input invocations.
config None None Runtime configuration. Must include {"configurable": {"thread_id": "..."}} when using a checkpointer.
context None None Static context object, typed according to context_schema.
stream_mode StreamMode "values" How to process stream output. For invoke(), "values" returns the final value directly.
print_mode Sequence[StreamMode] () Stream modes to print to console for debugging. Does not affect the return value.
output_keys Sequence[str] | None None Specific output channels to include. Defaults to all output channels.
interrupt_before Sequence[str] | None None Node names to interrupt before. Use "*" for all nodes.
interrupt_after Sequence[str] | None None Node names to interrupt after. Use "*" for all nodes.
durability None None Checkpoint persistence mode: "sync", "async", or "exit".

invoke() Return Value

stream_mode Return Type Description
"values" Any The entrypoint's return value. For entrypoint.final, the value field. If interrupts occurred, returns {INTERRUPT: [...]} or merges interrupts into the value dict.
Other modes list[Any] List of all streamed chunks.

stream() Parameters

Parameter Type Default Description
input Command | None (required) Same as invoke().
config None None Same as invoke().
context None None Same as invoke().
stream_mode Sequence[StreamMode] | None None Defaults to self.stream_mode, which is "updates" for functional API workflows. Pass a list for multiple modes.
print_mode Sequence[StreamMode] () Debug printing modes.
output_keys Sequence[str] | None None Channels to stream.
interrupt_before Sequence[str] | None None Nodes to interrupt before.
interrupt_after Sequence[str] | None None Nodes to interrupt after.
durability None None Checkpoint persistence mode.
subgraphs bool False Whether to include events from nested subgraphs. If True, events are emitted as (namespace, data) tuples.
debug None None Enable debug output (sets print_mode=["updates", "values"]).

stream() Yield Value

Condition Yield Type Description
Single stream mode Any The event data for the configured stream mode.
Multiple stream modes tuple[StreamMode, Any] Tuple of (mode_name, event_data).
Subgraphs enabled tuple[tuple[str, ...], StreamMode, Any] Tuple of (namespace_path, mode_name, event_data).

Usage Examples

Simple Invoke

from langgraph.func import entrypoint

@entrypoint()
def double(x: int) -> int:
    return x * 2

result = double.invoke(21)  # Returns 42

Streaming Updates

from langgraph.func import entrypoint, task

@task
def step_a(x: int) -> int:
    return x + 1

@task
def step_b(x: int) -> int:
    return x * 2

@entrypoint()
def pipeline(x: int) -> int:
    a = step_a(x).result()
    return step_b(a).result()

# Default stream_mode for functional API is "updates"
for chunk in pipeline.stream(5):
    print(chunk)
# Output includes task updates and the final entrypoint return

Interrupt and Resume

from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

@task
def generate(prompt: str) -> str:
    return f"Generated: {prompt}"

@entrypoint(checkpointer=InMemorySaver())
def approval_workflow(prompt: str) -> dict:
    content = generate(prompt).result()
    approval = interrupt({"content": content, "question": "Approve?"})
    return {"content": content, "approved": approval}

config = {"configurable": {"thread_id": "approval-1"}}

# First invocation -- pauses at interrupt
for chunk in approval_workflow.stream("Write a report", config):
    print(chunk)

# Resume with approval
for chunk in approval_workflow.stream(Command(resume="yes"), config):
    print(chunk)
# Final output: {"content": "Generated: Write a report", "approved": "yes"}

Multiple Stream Modes

from langgraph.func import entrypoint, task

@task
def compute(x: int) -> int:
    return x ** 2

@entrypoint()
def workflow(x: int) -> int:
    return compute(x).result()

# Stream with multiple modes -- yields (mode, data) tuples
for mode, data in workflow.stream(4, stream_mode=["updates", "values"]):
    print(f"[{mode}] {data}")

Async Execution

import asyncio
from langgraph.func import entrypoint, task

@task
async def async_task(x: int) -> int:
    return x + 1

@entrypoint()
async def async_workflow(x: int) -> int:
    return await async_task(x)

# Use ainvoke / astream for async entrypoints
result = await async_workflow.ainvoke(5)  # Returns 6

async for chunk in async_workflow.astream(5):
    print(chunk)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment