Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph Pregel Algo

From Leeroopedia
Knowledge Sources
Domains Internal, Execution
Last Updated 2026-02-11 16:00 GMT

Overview

The Pregel algorithm module implements the core task preparation, channel write application, and interrupt evaluation logic that drives each superstep of the LangGraph Pregel execution loop.

Description

`_algo.py` is the computational heart of the Pregel execution engine. It is responsible for determining which graph nodes should fire in the next superstep, preparing fully configured executable tasks for those nodes, applying the writes produced by completed tasks back into the checkpoint and channel state, and deciding whether the graph should pause for a human-in-the-loop interrupt.

The module distinguishes between two categories of tasks: PULL tasks, which are triggered when a node's input channels receive new data via graph edges, and PUSH tasks, which originate from explicit `Send` messages or functional `Call` invocations. For each task, `prepare_single_task` computes a deterministic task ID (using either SHA-1 based UUID5 or XXH3 hashing depending on the checkpoint version), assembles the `RunnableConfig` with injected `local_read` and `local_write` callbacks, and wires up retry policies, cache policies, and subgraph checkpointer references.

`apply_writes` is called after a superstep completes to fold task outputs into channel state. It sorts tasks deterministically by path, updates `versions_seen` in the checkpoint, consumes triggered channels, groups pending writes by channel, and applies them. Channels that were not written to are notified of a step bump so they can expire stale values, and if no channels can trigger further nodes the module signals a finish event to all channels.

Usage

This module is used internally by the Pregel runner and the `Pregel` graph class. Application developers do not call these functions directly. They are relevant when debugging graph execution, understanding why a particular node fired or did not fire, or when extending the framework with custom channel types or task scheduling behavior.

Code Reference

Source Location

Signature

def prepare_next_tasks(
    checkpoint: Checkpoint,
    pending_writes: list[PendingWrite],
    processes: Mapping[str, PregelNode],
    channels: Mapping[str, BaseChannel],
    managed: ManagedValueMapping,
    config: RunnableConfig,
    step: int,
    stop: int,
    *,
    for_execution: bool,
    store: BaseStore | None = None,
    checkpointer: BaseCheckpointSaver | None = None,
    manager: None | ParentRunManager | AsyncParentRunManager = None,
    trigger_to_nodes: Mapping[str, Sequence[str]] | None = None,
    updated_channels: set[str] | None = None,
    retry_policy: Sequence[RetryPolicy] = (),
    cache_policy: CachePolicy | None = None,
) -> dict[str, PregelTask] | dict[str, PregelExecutableTask]: ...

def prepare_single_task(
    task_path: tuple[Any, ...],
    task_id_checksum: str | None,
    *,
    checkpoint: Checkpoint,
    checkpoint_id_bytes: bytes,
    checkpoint_null_version: V | None,
    pending_writes: list[PendingWrite],
    processes: Mapping[str, PregelNode],
    channels: Mapping[str, BaseChannel],
    managed: ManagedValueMapping,
    config: RunnableConfig,
    step: int,
    stop: int,
    for_execution: bool,
    store: BaseStore | None = None,
    checkpointer: BaseCheckpointSaver | None = None,
    manager: None | ParentRunManager | AsyncParentRunManager = None,
    input_cache: dict[INPUT_CACHE_KEY_TYPE, Any] | None = None,
    cache_policy: CachePolicy | None = None,
    retry_policy: Sequence[RetryPolicy] = (),
) -> None | PregelTask | PregelExecutableTask: ...

def apply_writes(
    checkpoint: Checkpoint,
    channels: Mapping[str, BaseChannel],
    tasks: Iterable[WritesProtocol],
    get_next_version: GetNextVersion | None,
    trigger_to_nodes: Mapping[str, Sequence[str]],
) -> set[str]: ...

def should_interrupt(
    checkpoint: Checkpoint,
    interrupt_nodes: All | Sequence[str],
    tasks: Iterable[PregelExecutableTask],
) -> list[PregelExecutableTask]: ...

def local_read(
    scratchpad: PregelScratchpad,
    channels: Mapping[str, BaseChannel],
    managed: ManagedValueMapping,
    task: WritesProtocol,
    select: list[str] | str,
    fresh: bool = False,
) -> dict[str, Any] | Any: ...

Import

# Internal module -- not part of the public API
from langgraph.pregel._algo import (
    prepare_next_tasks,
    prepare_single_task,
    apply_writes,
    should_interrupt,
    local_read,
)

I/O Contract

`prepare_next_tasks`

Direction Name Type Description
Input checkpoint `Checkpoint` Current checkpoint state with channel versions and versions_seen
Input pending_writes `list[PendingWrite]` Unprocessed writes from prior steps or resume values
Input processes `Mapping[str, PregelNode]` Registry of graph nodes keyed by name
Input channels `Mapping[str, BaseChannel]` Current channel instances
Input config `RunnableConfig` Runnable configuration including checkpoint namespace
Input step / stop `int` Current step counter and maximum step limit
Input for_execution `bool` If `True`, returns `PregelExecutableTask` with full config; if `False`, returns lightweight `PregelTask`
Input updated_channels None` Optional optimization hint listing channels updated in prior step
Output (return) dict[str, PregelExecutableTask]` Dictionary of tasks keyed by deterministic task ID

`apply_writes`

Direction Name Type Description
Input checkpoint `Checkpoint` Checkpoint to mutate with new versions_seen and channel_versions
Input channels `Mapping[str, BaseChannel]` Channel instances to receive write values
Input tasks `Iterable[WritesProtocol]` Completed tasks whose writes should be applied
Input get_next_version None` Versioning function (typically `increment`)
Input trigger_to_nodes `Mapping[str, Sequence[str]]` Mapping from channel names to triggerable node names
Output (return) `set[str]` Set of channel names that were updated in this step

`should_interrupt`

Direction Name Type Description
Input checkpoint `Checkpoint` Current checkpoint
Input interrupt_nodes Sequence[str]` Node names (or `"*"`) that trigger an interrupt
Input tasks `Iterable[PregelExecutableTask]` Tasks about to execute
Output (return) `list[PregelExecutableTask]` Tasks that should cause an interrupt (empty list means no interrupt)

Usage Examples

# Internally called by the Pregel loop -- illustrative pseudocode

from langgraph.pregel._algo import prepare_next_tasks, apply_writes, should_interrupt

# 1. Prepare tasks for the next superstep
tasks = prepare_next_tasks(
    checkpoint,
    pending_writes,
    processes,
    channels,
    managed,
    config,
    step=current_step,
    stop=max_steps,
    for_execution=True,
    store=store,
    checkpointer=checkpointer,
    manager=run_manager,
    trigger_to_nodes=trigger_map,
    updated_channels=changed,
)

# 2. Check if we should interrupt before execution
if interrupt_before:
    interrupts = should_interrupt(checkpoint, interrupt_before, tasks.values())
    if interrupts:
        raise GraphInterrupt(interrupts)

# 3. Execute tasks (handled by PregelRunner) ...

# 4. Apply writes from completed tasks
updated = apply_writes(
    checkpoint, channels, completed_tasks, increment, trigger_map
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment