Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Principle:Langchain ai Langgraph Functional Workflow Execution

From Leeroopedia
Revision as of 17:29, 16 February 2026 by Admin (talk | contribs) (Auto-imported from principles/Langchain_ai_Langgraph_Functional_Workflow_Execution.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Attribute Value
Concept Executing functional API workflows with invoke/stream including interrupt support
Workflow Functional_API_Workflow
Type Principle
Repository Langchain_ai_Langgraph
Source libs/langgraph/langgraph/pregel/main.py, libs/langgraph/langgraph/func/__init__.py

Overview

Once a workflow is defined using @entrypoint and @task decorators, it is executed through the Pregel execution engine's invoke() and stream() methods. Because the @entrypoint decorator compiles the decorated function into a Pregel instance, the resulting object directly exposes these methods. The functional API imposes specific semantics on these execution methods: the input is a single value (not a state dictionary), the output is the entrypoint's return value, and the stream mode defaults to "updates".

This principle covers how functional API workflows are dispatched, how streaming works for functional workflows, how interrupts pause and resume execution, and how the execution model differs from the graph-based API.

Description

Functional Graph Execution Model

A functional API workflow compiled via @entrypoint produces a Pregel graph with a single node (the entrypoint function). Execution proceeds as follows:

  1. The caller invokes workflow.invoke(input, config) or iterates over workflow.stream(input, config).
  2. The Pregel engine writes the input to the START channel (an EphemeralValue).
  3. The single PregelNode is triggered, executing the entrypoint function.
  4. Inside the entrypoint, @task calls register sub-operations with the runtime. These tasks run as part of the same Pregel step.
  5. When the entrypoint returns, its value is written to the END channel (via _pluck_return_value) and to the PREVIOUS channel (via _pluck_save_value).
  6. The Pregel engine yields the output and, if a checkpointer is active, persists the checkpoint.

Unlike graph-based workflows where multiple nodes execute across multiple Pregel "supersteps," a functional API workflow typically completes in a single superstep (unless interrupted). The tasks within the entrypoint are sub-operations of that single step, not separate graph nodes.

Single-Input Dispatch

The functional API uses a fundamentally different input model than the graph-based API:

  • Graph API: Input is a dictionary matching the state schema (e.g., {"messages": [HumanMessage("hi")]}). The dictionary is merged into the state.
  • Functional API: Input is a single value of any type. It is passed directly as the first parameter of the entrypoint function. It can be a string, integer, list, dictionary, or custom object.

This single-input model simplifies the API for workflows that do not need the incremental state-merging semantics of the graph API. When resuming after an interrupt, the input is a Command(resume=value) object instead of the original input type.

Stream Modes for Functional API

The functional API defaults to stream_mode="updates" (set during graph construction in entrypoint.__call__) with stream_eager=True. The available stream modes and their behavior for functional workflows are:

Stream Mode Behavior with Functional API
"updates" Emits task names and their return values after each task completes. The entrypoint's return value is emitted under its function name.
"values" Emits the final output value once at the end of the workflow (not after each task). When used with invoke(), returns the final value directly.
"custom" Emits custom data written via StreamWriter from inside tasks or the entrypoint.
"messages" Emits LLM message tokens and metadata for any LLM invocations inside tasks.
"checkpoints" Emits checkpoint events in the format returned by get_state().
"tasks" Emits events when tasks start and finish, including results and errors.
"debug" Emits detailed debug information for each step.

Multiple stream modes can be combined by passing a list: stream_mode=["updates", "custom"]. When multiple modes are used, streamed outputs are tuples of (mode, data).

Interrupt and Resume

Functional API workflows support the same interrupt/resume pattern as graph-based workflows:

  1. Inside the entrypoint (or a task), the code calls interrupt(value).
  2. The Pregel engine raises an internal exception that pauses execution and persists the current state to the checkpoint.
  3. The caller receives an Interrupt event in the stream (or via the return value in invoke()).
  4. To resume, the caller invokes the workflow again with Command(resume=response_value) and the same thread configuration.
  5. The Pregel engine restores from the checkpoint. Previously completed tasks are not re-executed (their results come from the checkpoint). Execution resumes from the point of interruption, and interrupt() returns the resume value.

This enables human-in-the-loop patterns where the workflow pauses to collect input, approval, or corrections before continuing.

Invoke vs. Stream

The invoke() method is built on top of stream():

  • invoke() internally calls stream() with stream_mode=["updates", "values"] (when the user requests "values" mode) and collects the outputs.
  • For "values" mode, invoke() returns the latest value from the stream.
  • For other modes, invoke() returns a list of all chunks.
  • If interrupts occurred, they are included in the return value.

stream() is the more general interface, yielding outputs incrementally as the workflow progresses.

Usage

from langgraph.func import entrypoint, task
from langgraph.types import interrupt, Command
from langgraph.checkpoint.memory import InMemorySaver

@task
def analyze(text: str) -> str:
    return f"Analysis of: {text}"

@entrypoint(checkpointer=InMemorySaver())
def review_workflow(text: str) -> dict:
    analysis = analyze(text).result()
    feedback = interrupt({"analysis": analysis, "prompt": "Approve?"})
    return {"analysis": analysis, "feedback": feedback}

config = {"configurable": {"thread_id": "review-1"}}

# Stream the workflow -- will pause at interrupt
for chunk in review_workflow.stream("important document", config):
    print(chunk)

# Resume with human feedback
for chunk in review_workflow.stream(Command(resume="Approved"), config):
    print(chunk)

# Or use invoke for a single return value
result = review_workflow.invoke("another document", {
    "configurable": {"thread_id": "review-2"}
})

Theoretical Basis

The execution model for functional API workflows integrates several theoretical concepts:

  • Single-Step Execution with Sub-Tasks: Unlike traditional Pregel/BSP (Bulk Synchronous Parallel) models that operate over multiple supersteps with barrier synchronization, the functional API collapses the workflow into a single superstep. Tasks within the entrypoint execute as sub-operations of that step, with their own internal concurrency. This simplification is possible because the functional API's single-node graph does not require inter-node communication or multi-step state propagation.
  • Checkpoint-Based Continuation: The interrupt/resume pattern implements a form of delimited continuations backed by persistent storage. When interrupt() is called, the current execution context is serialized to a checkpoint. Resuming reconstructs the context and continues from the interruption point. This is similar to how Temporal's workflow engine handles long-running processes with durable execution guarantees.
  • Transparent Replay: On resume, the runtime uses checkpoint data to skip re-execution of completed tasks. This transparent replay mechanism ensures that task side effects occur exactly once (assuming deterministic task functions), providing at-most-once execution semantics for the overall workflow while maintaining exactly-once semantics for each individual task invocation.
  • Stream Processing: The streaming interface follows the push-based reactive model where the producer (Pregel engine) pushes events to the consumer (the caller iterating over the stream). Different stream modes provide different projections of the same underlying execution, similar to how database views provide different perspectives on the same data.
  • Command Pattern: The Command(resume=value) mechanism follows the command pattern from object-oriented design, encapsulating the resume action and its associated data as a first-class object that can be passed through the same input channel as the original workflow input.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment