Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph Pregel Runner

From Leeroopedia
Revision as of 11:26, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Langchain_ai_Langgraph_Pregel_Runner.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Internal, Execution
Last Updated 2026-02-11 16:00 GMT

Overview

`PregelRunner` orchestrates the concurrent execution, retry, commit, and error handling of tasks within a single Pregel superstep, supporting both synchronous and asynchronous execution modes.

Description

The `PregelRunner` class is the task-level scheduler inside the Pregel execution loop. After `_algo.prepare_next_tasks` determines which nodes should fire, `PregelRunner` takes those `PregelExecutableTask` objects and executes them concurrently using either thread-pool futures (sync path via `tick`) or `asyncio` tasks (async path via `atick`). It manages a `FuturesDict` that tracks in-flight work, provides cooperative yielding back to the caller for streaming intermediate results, and handles timeout enforcement.

Each task is executed through `run_with_retry` (or `arun_with_retry` for async), which wraps the task's runnable with the configured `RetryPolicy`. When a task completes -- whether successfully, with an error, or via interrupt -- the `commit` method is called. For successful tasks, `commit` persists task writes through `put_writes` and appends a `NO_WRITES` sentinel if the task produced no output. For `GraphInterrupt` exceptions, interrupt data is written to the checkpointer so the graph can be resumed later. For other errors, the exception is serialized alongside any partial writes and persisted as an `ERROR` entry. The runner also supports the functional API's `Call` mechanism through `_call` and `_acall` helpers, which schedule child PUSH tasks from within a running parent task, chaining futures so that the parent blocks until the child completes.

The `FuturesDict` helper class extends `dict` with an event-based notification system: it tracks a counter of in-flight tasks and signals an event when all tasks have completed or when any task fails with a non-interrupt error (via `_should_stop_others`). The `_panic_or_proceed` function inspects all completed futures at the end of a tick, cancels any remaining inflight work on failure, aggregates `GraphInterrupt` exceptions, and raises timeout errors.

Usage

`PregelRunner` is instantiated and used internally by the `Pregel` class during `stream`, `invoke`, `astream`, and `ainvoke` calls. It is not part of the public API and should not be instantiated directly by application code. Understanding its behavior is useful when debugging concurrency issues, timeout behavior, or retry logic in graph execution.

Code Reference

Source Location

Signature

class PregelRunner:
    def __init__(
        self,
        *,
        submit: weakref.ref[Submit],
        put_writes: weakref.ref[Callable[[str, Sequence[tuple[str, Any]]], None]],
        use_astream: bool = False,
        node_finished: Callable[[str], None] | None = None,
    ) -> None: ...

    def tick(
        self,
        tasks: Iterable[PregelExecutableTask],
        *,
        reraise: bool = True,
        timeout: float | None = None,
        retry_policy: Sequence[RetryPolicy] | None = None,
        get_waiter: Callable[[], concurrent.futures.Future[None]] | None = None,
        schedule_task: Callable[
            [PregelExecutableTask, int, Call | None],
            PregelExecutableTask | None,
        ],
    ) -> Iterator[None]: ...

    async def atick(
        self,
        tasks: Iterable[PregelExecutableTask],
        *,
        reraise: bool = True,
        timeout: float | None = None,
        retry_policy: Sequence[RetryPolicy] | None = None,
        get_waiter: Callable[[], asyncio.Future[None]] | None = None,
        schedule_task: Callable[
            [PregelExecutableTask, int, Call | None],
            Awaitable[PregelExecutableTask | None],
        ],
    ) -> AsyncIterator[None]: ...

    def commit(
        self,
        task: PregelExecutableTask,
        exception: BaseException | None,
    ) -> None: ...

Import

# Internal module -- not part of the public API
from langgraph.pregel._runner import PregelRunner

I/O Contract

`tick` / `atick`

Direction Name Type Description
Input tasks `Iterable[PregelExecutableTask]` Executable tasks to run concurrently in this superstep
Input reraise `bool` Whether to re-raise exceptions from failed tasks (default `True`)
Input timeout None` Maximum wall-clock seconds for the superstep
Input retry_policy None` Retry policies to apply to each task execution
Input get_waiter None` Factory for waiter futures (used for external event integration)
Input schedule_task `Callable` Callback to schedule child PUSH tasks spawned via `Call`
Output (yields) `None` Yields control back to the caller after each batch of completed tasks, enabling streaming

`commit`

Direction Name Type Description
Input task `PregelExecutableTask` The task that completed
Input exception None` The exception if the task failed, or `None` on success
Side Effect put_writes -- Writes task outputs (or errors/interrupts) to the checkpointer via the injected `put_writes` callback

Usage Examples

# Illustrative internal usage within the Pregel loop

import weakref
from langgraph.pregel._runner import PregelRunner

runner = PregelRunner(
    submit=weakref.ref(executor.submit),
    put_writes=weakref.ref(checkpointer.put_writes),
    use_astream=False,
    node_finished=on_node_done,
)

# Synchronous execution of a superstep
for _ in runner.tick(
    tasks,
    reraise=True,
    timeout=30.0,
    retry_policy=graph_retry_policy,
    schedule_task=schedule_callback,
):
    # Each yield point is an opportunity to emit streaming events
    emit_stream_events(tasks)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment