Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph NamedBarrierValue Channel

From Leeroopedia
Revision as of 11:26, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Langchain_ai_Langgraph_NamedBarrierValue_Channel.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Channels, Synchronization
Last Updated 2026-02-11 16:00 GMT

Overview

Barrier synchronization channels that gate availability until all expected named values have been received, used to coordinate parallel node execution in the Pregel engine.

Description

The NamedBarrierValue module provides two channel classes that implement barrier synchronization patterns for LangGraph's Pregel execution engine. Both classes track a set of expected `names` and accumulate received values in a `seen` set, only becoming available (returning a value from `get()`) when all expected names have been seen.

The `NamedBarrierValue` class becomes available as soon as `seen == names`. When `consume()` is called (after a subscribed node runs), the `seen` set is reset to empty, allowing the barrier to be reused in subsequent steps. This creates a repeating barrier pattern: all named producers must contribute before any consumer can proceed, and after consumption the barrier resets for the next cycle.

The `NamedBarrierValueAfterFinish` class adds an additional gating condition: even after all names are seen, the channel remains unavailable until `finish()` is explicitly called by the Pregel engine. This two-phase pattern is used for end-of-run synchronization where you need to ensure all producers have completed AND the run is in its finishing phase before triggering final actions. After `consume()`, both `seen` and `finished` are reset.

Both classes validate updates against the expected names set, raising `InvalidUpdateError` if an unexpected name is received. The checkpoint format differs between the two: `NamedBarrierValue` checkpoints the `seen` set, while `NamedBarrierValueAfterFinish` checkpoints a tuple of `(seen, finished)`.

Usage

Use `NamedBarrierValue` when you need to synchronize multiple parallel nodes so that a downstream node only runs after all upstream nodes have completed. Use `NamedBarrierValueAfterFinish` for end-of-run barriers that should only trigger during the graph's finishing phase. These channels are primarily used internally by the Pregel engine for orchestration.

Code Reference

Source Location

Signature

class NamedBarrierValue(Generic[Value], BaseChannel[Value, Value, set[Value]]):
    def __init__(self, typ: type[Value], names: set[Value]) -> None: ...

    @property
    def ValueType(self) -> type[Value]: ...
    @property
    def UpdateType(self) -> type[Value]: ...

    def copy(self) -> Self: ...
    def checkpoint(self) -> set[Value]: ...
    def from_checkpoint(self, checkpoint: set[Value]) -> Self: ...
    def update(self, values: Sequence[Value]) -> bool: ...
    def get(self) -> Value: ...
    def is_available(self) -> bool: ...
    def consume(self) -> bool: ...

class NamedBarrierValueAfterFinish(Generic[Value], BaseChannel[Value, Value, set[Value]]):
    def __init__(self, typ: type[Value], names: set[Value]) -> None: ...

    def copy(self) -> Self: ...
    def checkpoint(self) -> tuple[set[Value], bool]: ...
    def from_checkpoint(self, checkpoint: tuple[set[Value], bool]) -> Self: ...
    def update(self, values: Sequence[Value]) -> bool: ...
    def get(self) -> Value: ...
    def is_available(self) -> bool: ...
    def consume(self) -> bool: ...
    def finish(self) -> bool: ...

Import

from langgraph.channels.named_barrier_value import NamedBarrierValue
from langgraph.channels.named_barrier_value import NamedBarrierValueAfterFinish

I/O Contract

Constructor Parameters

Parameter Type Required Description
typ `type[Value]` Yes The type of the named values
names `set[Value]` Yes The set of expected names that must all be seen before the barrier opens

NamedBarrierValue Behavior

Method Precondition Behavior
`update(values)` Value in `names` Adds to `seen`; returns `True` if newly added
`update(values)` Value not in `names` Raises `InvalidUpdateError`
`get()` `seen == names` Returns `None` (barrier open)
`get()` `seen != names` Raises `EmptyChannelError`
`consume()` `seen == names` Resets `seen` to empty; returns `True`
`consume()` `seen != names` Returns `False`

NamedBarrierValueAfterFinish Additional Behavior

Method Precondition Behavior
`get()` `finished and seen == names` Returns `None` (barrier open)
`get()` Otherwise Raises `EmptyChannelError`
`finish()` `seen == names and not finished` Sets `finished = True`; returns `True`
`finish()` Otherwise Returns `False`
`consume()` `finished and seen == names` Resets both `finished` and `seen`; returns `True`

Checkpoint Format

Class Checkpoint Type Description
`NamedBarrierValue` `set[Value]` The set of names seen so far
`NamedBarrierValueAfterFinish` `tuple[set[Value], bool]` Tuple of `(seen_set, finished_flag)`

Usage Examples

from langgraph.channels.named_barrier_value import (
    NamedBarrierValue,
    NamedBarrierValueAfterFinish,
)
from langgraph.errors import EmptyChannelError

# Basic barrier: wait for all three nodes
barrier = NamedBarrierValue(str, names={"node_a", "node_b", "node_c"})

# Not yet available
assert not barrier.is_available()

# Receive updates from nodes
barrier.update(["node_a"])
barrier.update(["node_b"])
assert not barrier.is_available()

# All names received - barrier opens
barrier.update(["node_c"])
assert barrier.is_available()
assert barrier.get() is None  # Barrier value is always None

# Consume resets the barrier
barrier.consume()
assert not barrier.is_available()

# AfterFinish variant: requires explicit finish() call
finish_barrier = NamedBarrierValueAfterFinish(
    str, names={"writer_1", "writer_2"}
)
finish_barrier.update(["writer_1", "writer_2"])
assert not finish_barrier.is_available()  # All seen but not finished

finish_barrier.finish()
assert finish_barrier.is_available()  # Now available

finish_barrier.consume()
assert not finish_barrier.is_available()  # Reset after consume

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment