Workflow:Langchain ai Langgraph Building a Stateful Graph
| Knowledge Sources | |
|---|---|
| Domains | LLM_Ops, Agent_Orchestration, Graph_Computing |
| Last Updated | 2026-02-11 15:00 GMT |
Overview
End-to-end process for constructing, compiling, and executing a stateful multi-node graph using LangGraph's StateGraph builder API.
Description
This workflow covers the primary usage pattern of LangGraph: defining a typed state schema, building a directed graph of processing nodes, connecting them with edges (including conditional routing), and compiling the graph into an executable runtime. The compiled graph supports both synchronous invocation and asynchronous streaming, with optional checkpointing for durable execution. The state flows between nodes via typed channels, where each field can have a custom reducer function (e.g., append, replace, or aggregate) controlling how updates from multiple nodes are merged.
Usage
Execute this workflow when you need to build a multi-step processing pipeline where nodes share and transform state. This is the foundational pattern for all LangGraph applications, whether building chatbots, data processing pipelines, or complex agent systems. Use it when you have discrete processing steps that need to communicate through shared state, and when you want control over the execution flow via explicit edges and conditional routing.
Execution Steps
Step 1: Define the State Schema
Create a typed state schema using Python's TypedDict or Pydantic BaseModel. Each field in the schema represents a channel that nodes can read from and write to. Fields can be annotated with reducer functions (via `Annotated`) to control how values from multiple nodes are merged. For example, a list field with `operator.add` will append new items, while an unannotated field uses last-write-wins semantics.
Key considerations:
- Use `TypedDict` for simple schemas or `BaseModel` for validation
- Annotate list fields with `operator.add` for append behavior
- Use `add_messages` reducer from `langgraph.graph` for chat message lists
- Consider separate input and output schemas if the graph's input differs from its full state
Step 2: Instantiate the StateGraph Builder
Create a `StateGraph` instance by passing the state schema. Optionally provide separate `input_schema` and `output_schema` if the graph accepts a subset of state as input or returns a subset as output. A `context_schema` can supply immutable configuration data accessible to all nodes.
Key considerations:
- The state schema defines the channels available to all nodes
- Input/output schemas default to the full state schema
- Context schema provides read-only data like database connections or user IDs
Step 3: Add Processing Nodes
Register processing functions as nodes in the graph using `add_node()`. Each node is a function that receives the current state and returns a dictionary of state updates. Nodes can be named explicitly or the function name is used automatically. Options include retry policies, cache policies, and deferred execution.
Key considerations:
- Node functions receive state, return partial state updates (dict)
- Use `add_sequence()` for simple linear pipelines
- Set `retry_policy` for nodes calling external services
- Set `cache_policy` for deterministic nodes to avoid re-computation
Step 4: Connect Nodes with Edges
Define the execution flow by adding edges between nodes. Use `add_edge()` for unconditional routing (including fan-in from multiple sources to one target). Use `add_conditional_edges()` with a routing function for dynamic branching based on state. Use the special `START` and `END` constants to define entry and exit points.
Key considerations:
- `START` marks the entry point; `END` marks the terminal node
- Multiple edges into one node create a join (waits for all predecessors)
- Conditional edges return the next node name(s) as strings
- Nodes can return `Command` objects for dynamic routing without pre-declared edges
Step 5: Compile the Graph
Call `compile()` on the builder to produce an executable `CompiledStateGraph`. Optionally pass a checkpointer for persistence, a store for cross-thread data, and interrupt points for human-in-the-loop patterns. The compiled graph is a LangChain Runnable and supports `invoke()`, `stream()`, `batch()`, and their async variants.
Key considerations:
- Pass a checkpointer (e.g., `InMemorySaver`) to enable state persistence
- Set `interrupt_before` or `interrupt_after` for human-in-the-loop workflows
- The compiled graph validates the structure (no orphan nodes, valid edges)
- Provide a `name` for identification in tracing and debugging
Step 6: Execute the Graph
Run the compiled graph with input state using `invoke()` for a single result or `stream()` for incremental output. Provide a `config` dict with `thread_id` when using checkpointing. The graph executes nodes in topological order, respecting edges and collecting state updates through channels and reducers.
Key considerations:
- `invoke()` returns the final state after all nodes complete
- `stream()` yields state updates after each node execution
- Use `config={"configurable": {"thread_id": "..."}}` for persistent threads
- Stream modes include `values`, `updates`, `messages`, and `debug`