Implementation:Langchain ai Langgraph Pregel Algo
| Knowledge Sources | |
|---|---|
| Domains | Internal, Execution |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
The Pregel algorithm module implements the core task preparation, channel write application, and interrupt evaluation logic that drives each superstep of the LangGraph Pregel execution loop.
Description
`_algo.py` is the computational heart of the Pregel execution engine. It is responsible for determining which graph nodes should fire in the next superstep, preparing fully configured executable tasks for those nodes, applying the writes produced by completed tasks back into the checkpoint and channel state, and deciding whether the graph should pause for a human-in-the-loop interrupt.
The module distinguishes between two categories of tasks: PULL tasks, which are triggered when a node's input channels receive new data via graph edges, and PUSH tasks, which originate from explicit `Send` messages or functional `Call` invocations. For each task, `prepare_single_task` computes a deterministic task ID (using either SHA-1 based UUID5 or XXH3 hashing depending on the checkpoint version), assembles the `RunnableConfig` with injected `local_read` and `local_write` callbacks, and wires up retry policies, cache policies, and subgraph checkpointer references.
`apply_writes` is called after a superstep completes to fold task outputs into channel state. It sorts tasks deterministically by path, updates `versions_seen` in the checkpoint, consumes triggered channels, groups pending writes by channel, and applies them. Channels that were not written to are notified of a step bump so they can expire stale values, and if no channels can trigger further nodes the module signals a finish event to all channels.
Usage
This module is used internally by the Pregel runner and the `Pregel` graph class. Application developers do not call these functions directly. They are relevant when debugging graph execution, understanding why a particular node fired or did not fire, or when extending the framework with custom channel types or task scheduling behavior.
Code Reference
Source Location
- Repository: Langchain_ai_Langgraph
- File: libs/langgraph/langgraph/pregel/_algo.py
Signature
def prepare_next_tasks(
checkpoint: Checkpoint,
pending_writes: list[PendingWrite],
processes: Mapping[str, PregelNode],
channels: Mapping[str, BaseChannel],
managed: ManagedValueMapping,
config: RunnableConfig,
step: int,
stop: int,
*,
for_execution: bool,
store: BaseStore | None = None,
checkpointer: BaseCheckpointSaver | None = None,
manager: None | ParentRunManager | AsyncParentRunManager = None,
trigger_to_nodes: Mapping[str, Sequence[str]] | None = None,
updated_channels: set[str] | None = None,
retry_policy: Sequence[RetryPolicy] = (),
cache_policy: CachePolicy | None = None,
) -> dict[str, PregelTask] | dict[str, PregelExecutableTask]: ...
def prepare_single_task(
task_path: tuple[Any, ...],
task_id_checksum: str | None,
*,
checkpoint: Checkpoint,
checkpoint_id_bytes: bytes,
checkpoint_null_version: V | None,
pending_writes: list[PendingWrite],
processes: Mapping[str, PregelNode],
channels: Mapping[str, BaseChannel],
managed: ManagedValueMapping,
config: RunnableConfig,
step: int,
stop: int,
for_execution: bool,
store: BaseStore | None = None,
checkpointer: BaseCheckpointSaver | None = None,
manager: None | ParentRunManager | AsyncParentRunManager = None,
input_cache: dict[INPUT_CACHE_KEY_TYPE, Any] | None = None,
cache_policy: CachePolicy | None = None,
retry_policy: Sequence[RetryPolicy] = (),
) -> None | PregelTask | PregelExecutableTask: ...
def apply_writes(
checkpoint: Checkpoint,
channels: Mapping[str, BaseChannel],
tasks: Iterable[WritesProtocol],
get_next_version: GetNextVersion | None,
trigger_to_nodes: Mapping[str, Sequence[str]],
) -> set[str]: ...
def should_interrupt(
checkpoint: Checkpoint,
interrupt_nodes: All | Sequence[str],
tasks: Iterable[PregelExecutableTask],
) -> list[PregelExecutableTask]: ...
def local_read(
scratchpad: PregelScratchpad,
channels: Mapping[str, BaseChannel],
managed: ManagedValueMapping,
task: WritesProtocol,
select: list[str] | str,
fresh: bool = False,
) -> dict[str, Any] | Any: ...
Import
# Internal module -- not part of the public API
from langgraph.pregel._algo import (
prepare_next_tasks,
prepare_single_task,
apply_writes,
should_interrupt,
local_read,
)
I/O Contract
`prepare_next_tasks`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | checkpoint | `Checkpoint` | Current checkpoint state with channel versions and versions_seen |
| Input | pending_writes | `list[PendingWrite]` | Unprocessed writes from prior steps or resume values |
| Input | processes | `Mapping[str, PregelNode]` | Registry of graph nodes keyed by name |
| Input | channels | `Mapping[str, BaseChannel]` | Current channel instances |
| Input | config | `RunnableConfig` | Runnable configuration including checkpoint namespace |
| Input | step / stop | `int` | Current step counter and maximum step limit |
| Input | for_execution | `bool` | If `True`, returns `PregelExecutableTask` with full config; if `False`, returns lightweight `PregelTask` |
| Input | updated_channels | None` | Optional optimization hint listing channels updated in prior step |
| Output | (return) | dict[str, PregelExecutableTask]` | Dictionary of tasks keyed by deterministic task ID |
`apply_writes`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | checkpoint | `Checkpoint` | Checkpoint to mutate with new versions_seen and channel_versions |
| Input | channels | `Mapping[str, BaseChannel]` | Channel instances to receive write values |
| Input | tasks | `Iterable[WritesProtocol]` | Completed tasks whose writes should be applied |
| Input | get_next_version | None` | Versioning function (typically `increment`) |
| Input | trigger_to_nodes | `Mapping[str, Sequence[str]]` | Mapping from channel names to triggerable node names |
| Output | (return) | `set[str]` | Set of channel names that were updated in this step |
`should_interrupt`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | checkpoint | `Checkpoint` | Current checkpoint |
| Input | interrupt_nodes | Sequence[str]` | Node names (or `"*"`) that trigger an interrupt |
| Input | tasks | `Iterable[PregelExecutableTask]` | Tasks about to execute |
| Output | (return) | `list[PregelExecutableTask]` | Tasks that should cause an interrupt (empty list means no interrupt) |
Usage Examples
# Internally called by the Pregel loop -- illustrative pseudocode
from langgraph.pregel._algo import prepare_next_tasks, apply_writes, should_interrupt
# 1. Prepare tasks for the next superstep
tasks = prepare_next_tasks(
checkpoint,
pending_writes,
processes,
channels,
managed,
config,
step=current_step,
stop=max_steps,
for_execution=True,
store=store,
checkpointer=checkpointer,
manager=run_manager,
trigger_to_nodes=trigger_map,
updated_channels=changed,
)
# 2. Check if we should interrupt before execution
if interrupt_before:
interrupts = should_interrupt(checkpoint, interrupt_before, tasks.values())
if interrupts:
raise GraphInterrupt(interrupts)
# 3. Execute tasks (handled by PregelRunner) ...
# 4. Apply writes from completed tasks
updated = apply_writes(
checkpoint, channels, completed_tasks, increment, trigger_map
)