Implementation:Langchain ai Langgraph Pregel Runner
| Knowledge Sources | |
|---|---|
| Domains | Internal, Execution |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
`PregelRunner` orchestrates the concurrent execution, retry, commit, and error handling of tasks within a single Pregel superstep, supporting both synchronous and asynchronous execution modes.
Description
The `PregelRunner` class is the task-level scheduler inside the Pregel execution loop. After `_algo.prepare_next_tasks` determines which nodes should fire, `PregelRunner` takes those `PregelExecutableTask` objects and executes them concurrently using either thread-pool futures (sync path via `tick`) or `asyncio` tasks (async path via `atick`). It manages a `FuturesDict` that tracks in-flight work, provides cooperative yielding back to the caller for streaming intermediate results, and handles timeout enforcement.
Each task is executed through `run_with_retry` (or `arun_with_retry` for async), which wraps the task's runnable with the configured `RetryPolicy`. When a task completes -- whether successfully, with an error, or via interrupt -- the `commit` method is called. For successful tasks, `commit` persists task writes through `put_writes` and appends a `NO_WRITES` sentinel if the task produced no output. For `GraphInterrupt` exceptions, interrupt data is written to the checkpointer so the graph can be resumed later. For other errors, the exception is serialized alongside any partial writes and persisted as an `ERROR` entry. The runner also supports the functional API's `Call` mechanism through `_call` and `_acall` helpers, which schedule child PUSH tasks from within a running parent task, chaining futures so that the parent blocks until the child completes.
The `FuturesDict` helper class extends `dict` with an event-based notification system: it tracks a counter of in-flight tasks and signals an event when all tasks have completed or when any task fails with a non-interrupt error (via `_should_stop_others`). The `_panic_or_proceed` function inspects all completed futures at the end of a tick, cancels any remaining inflight work on failure, aggregates `GraphInterrupt` exceptions, and raises timeout errors.
Usage
`PregelRunner` is instantiated and used internally by the `Pregel` class during `stream`, `invoke`, `astream`, and `ainvoke` calls. It is not part of the public API and should not be instantiated directly by application code. Understanding its behavior is useful when debugging concurrency issues, timeout behavior, or retry logic in graph execution.
Code Reference
Source Location
- Repository: Langchain_ai_Langgraph
- File: libs/langgraph/langgraph/pregel/_runner.py
Signature
class PregelRunner:
def __init__(
self,
*,
submit: weakref.ref[Submit],
put_writes: weakref.ref[Callable[[str, Sequence[tuple[str, Any]]], None]],
use_astream: bool = False,
node_finished: Callable[[str], None] | None = None,
) -> None: ...
def tick(
self,
tasks: Iterable[PregelExecutableTask],
*,
reraise: bool = True,
timeout: float | None = None,
retry_policy: Sequence[RetryPolicy] | None = None,
get_waiter: Callable[[], concurrent.futures.Future[None]] | None = None,
schedule_task: Callable[
[PregelExecutableTask, int, Call | None],
PregelExecutableTask | None,
],
) -> Iterator[None]: ...
async def atick(
self,
tasks: Iterable[PregelExecutableTask],
*,
reraise: bool = True,
timeout: float | None = None,
retry_policy: Sequence[RetryPolicy] | None = None,
get_waiter: Callable[[], asyncio.Future[None]] | None = None,
schedule_task: Callable[
[PregelExecutableTask, int, Call | None],
Awaitable[PregelExecutableTask | None],
],
) -> AsyncIterator[None]: ...
def commit(
self,
task: PregelExecutableTask,
exception: BaseException | None,
) -> None: ...
Import
# Internal module -- not part of the public API
from langgraph.pregel._runner import PregelRunner
I/O Contract
`tick` / `atick`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | tasks | `Iterable[PregelExecutableTask]` | Executable tasks to run concurrently in this superstep |
| Input | reraise | `bool` | Whether to re-raise exceptions from failed tasks (default `True`) |
| Input | timeout | None` | Maximum wall-clock seconds for the superstep |
| Input | retry_policy | None` | Retry policies to apply to each task execution |
| Input | get_waiter | None` | Factory for waiter futures (used for external event integration) |
| Input | schedule_task | `Callable` | Callback to schedule child PUSH tasks spawned via `Call` |
| Output | (yields) | `None` | Yields control back to the caller after each batch of completed tasks, enabling streaming |
`commit`
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | task | `PregelExecutableTask` | The task that completed |
| Input | exception | None` | The exception if the task failed, or `None` on success |
| Side Effect | put_writes | -- | Writes task outputs (or errors/interrupts) to the checkpointer via the injected `put_writes` callback |
Usage Examples
# Illustrative internal usage within the Pregel loop
import weakref
from langgraph.pregel._runner import PregelRunner
runner = PregelRunner(
submit=weakref.ref(executor.submit),
put_writes=weakref.ref(checkpointer.put_writes),
use_astream=False,
node_finished=on_node_done,
)
# Synchronous execution of a superstep
for _ in runner.tick(
tasks,
reraise=True,
timeout=30.0,
retry_policy=graph_retry_policy,
schedule_task=schedule_callback,
):
# Each yield point is an opportunity to emit streaming events
emit_stream_events(tasks)