Implementation:Langchain ai Langgraph NamedBarrierValue Channel
| Knowledge Sources | |
|---|---|
| Domains | Channels, Synchronization |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
Barrier synchronization channels that gate availability until all expected named values have been received, used to coordinate parallel node execution in the Pregel engine.
Description
The NamedBarrierValue module provides two channel classes that implement barrier synchronization patterns for LangGraph's Pregel execution engine. Both classes track a set of expected `names` and accumulate received values in a `seen` set, only becoming available (returning a value from `get()`) when all expected names have been seen.
The `NamedBarrierValue` class becomes available as soon as `seen == names`. When `consume()` is called (after a subscribed node runs), the `seen` set is reset to empty, allowing the barrier to be reused in subsequent steps. This creates a repeating barrier pattern: all named producers must contribute before any consumer can proceed, and after consumption the barrier resets for the next cycle.
The `NamedBarrierValueAfterFinish` class adds an additional gating condition: even after all names are seen, the channel remains unavailable until `finish()` is explicitly called by the Pregel engine. This two-phase pattern is used for end-of-run synchronization where you need to ensure all producers have completed AND the run is in its finishing phase before triggering final actions. After `consume()`, both `seen` and `finished` are reset.
Both classes validate updates against the expected names set, raising `InvalidUpdateError` if an unexpected name is received. The checkpoint format differs between the two: `NamedBarrierValue` checkpoints the `seen` set, while `NamedBarrierValueAfterFinish` checkpoints a tuple of `(seen, finished)`.
Usage
Use `NamedBarrierValue` when you need to synchronize multiple parallel nodes so that a downstream node only runs after all upstream nodes have completed. Use `NamedBarrierValueAfterFinish` for end-of-run barriers that should only trigger during the graph's finishing phase. These channels are primarily used internally by the Pregel engine for orchestration.
Code Reference
Source Location
Signature
class NamedBarrierValue(Generic[Value], BaseChannel[Value, Value, set[Value]]):
def __init__(self, typ: type[Value], names: set[Value]) -> None: ...
@property
def ValueType(self) -> type[Value]: ...
@property
def UpdateType(self) -> type[Value]: ...
def copy(self) -> Self: ...
def checkpoint(self) -> set[Value]: ...
def from_checkpoint(self, checkpoint: set[Value]) -> Self: ...
def update(self, values: Sequence[Value]) -> bool: ...
def get(self) -> Value: ...
def is_available(self) -> bool: ...
def consume(self) -> bool: ...
class NamedBarrierValueAfterFinish(Generic[Value], BaseChannel[Value, Value, set[Value]]):
def __init__(self, typ: type[Value], names: set[Value]) -> None: ...
def copy(self) -> Self: ...
def checkpoint(self) -> tuple[set[Value], bool]: ...
def from_checkpoint(self, checkpoint: tuple[set[Value], bool]) -> Self: ...
def update(self, values: Sequence[Value]) -> bool: ...
def get(self) -> Value: ...
def is_available(self) -> bool: ...
def consume(self) -> bool: ...
def finish(self) -> bool: ...
Import
from langgraph.channels.named_barrier_value import NamedBarrierValue
from langgraph.channels.named_barrier_value import NamedBarrierValueAfterFinish
I/O Contract
Constructor Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
| typ | `type[Value]` | Yes | The type of the named values |
| names | `set[Value]` | Yes | The set of expected names that must all be seen before the barrier opens |
NamedBarrierValue Behavior
| Method | Precondition | Behavior |
|---|---|---|
| `update(values)` | Value in `names` | Adds to `seen`; returns `True` if newly added |
| `update(values)` | Value not in `names` | Raises `InvalidUpdateError` |
| `get()` | `seen == names` | Returns `None` (barrier open) |
| `get()` | `seen != names` | Raises `EmptyChannelError` |
| `consume()` | `seen == names` | Resets `seen` to empty; returns `True` |
| `consume()` | `seen != names` | Returns `False` |
NamedBarrierValueAfterFinish Additional Behavior
| Method | Precondition | Behavior |
|---|---|---|
| `get()` | `finished and seen == names` | Returns `None` (barrier open) |
| `get()` | Otherwise | Raises `EmptyChannelError` |
| `finish()` | `seen == names and not finished` | Sets `finished = True`; returns `True` |
| `finish()` | Otherwise | Returns `False` |
| `consume()` | `finished and seen == names` | Resets both `finished` and `seen`; returns `True` |
Checkpoint Format
| Class | Checkpoint Type | Description |
|---|---|---|
| `NamedBarrierValue` | `set[Value]` | The set of names seen so far |
| `NamedBarrierValueAfterFinish` | `tuple[set[Value], bool]` | Tuple of `(seen_set, finished_flag)` |
Usage Examples
from langgraph.channels.named_barrier_value import (
NamedBarrierValue,
NamedBarrierValueAfterFinish,
)
from langgraph.errors import EmptyChannelError
# Basic barrier: wait for all three nodes
barrier = NamedBarrierValue(str, names={"node_a", "node_b", "node_c"})
# Not yet available
assert not barrier.is_available()
# Receive updates from nodes
barrier.update(["node_a"])
barrier.update(["node_b"])
assert not barrier.is_available()
# All names received - barrier opens
barrier.update(["node_c"])
assert barrier.is_available()
assert barrier.get() is None # Barrier value is always None
# Consume resets the barrier
barrier.consume()
assert not barrier.is_available()
# AfterFinish variant: requires explicit finish() call
finish_barrier = NamedBarrierValueAfterFinish(
str, names={"writer_1", "writer_2"}
)
finish_barrier.update(["writer_1", "writer_2"])
assert not finish_barrier.is_available() # All seen but not finished
finish_barrier.finish()
assert finish_barrier.is_available() # Now available
finish_barrier.consume()
assert not finish_barrier.is_available() # Reset after consume