Implementation:Langchain ai Langgraph Topic Channel
| Knowledge Sources | |
|---|---|
| Domains | Channels, PubSub |
| Last Updated | 2026-02-11 16:00 GMT |
Overview
A configurable PubSub channel that collects values from multiple producers into a list, with an optional accumulate mode that preserves values across steps.
Description
The Topic channel extends `BaseChannel` to implement a publish-subscribe pattern where multiple nodes can write values that are collected into a list. Unlike single-value channels such as `LastValue` or `AnyValue`, `Topic` aggregates all updates from a step into a sequence, making it ideal for fan-in patterns where multiple nodes contribute results that should all be preserved.
The channel supports two operating modes controlled by the `accumulate` parameter. When `accumulate=False` (the default), the values list is cleared at the beginning of each step before new values are appended. This means `get()` only returns values written during the current step. When `accumulate=True`, values persist across steps, building up an ever-growing list. This is useful for message history channels where the full conversation must be preserved.
Updates accept both individual values and lists of values, with automatic flattening via the internal `_flatten` helper. This allows nodes to write either a single item or a batch of items in a single update call. The `get()` method returns a copy of the values list (not a reference) to prevent external mutation, and raises `EmptyChannelError` when no values are present. Checkpointing saves the full values list, and `from_checkpoint` includes backwards-compatible handling for an older tuple-based checkpoint format.
Usage
Use `Topic` for channels that collect outputs from multiple nodes into a list, such as message accumulation in chatbot graphs, event logging, or fan-in aggregation patterns. Set `accumulate=True` for message history channels that must preserve the full conversation across steps, or leave it as `False` for per-step collection channels.
Code Reference
Source Location
- Repository: Langchain_ai_Langgraph
- File: libs/langgraph/langgraph/channels/topic.py
Signature
class Topic(
Generic[Value],
BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
def __init__(self, typ: type[Value], accumulate: bool = False) -> None: ...
@property
def ValueType(self) -> Any: ...
@property
def UpdateType(self) -> Any: ...
def copy(self) -> Self: ...
def checkpoint(self) -> list[Value]: ...
def from_checkpoint(self, checkpoint: list[Value]) -> Self: ...
def update(self, values: Sequence[Value | list[Value]]) -> bool: ...
def get(self) -> Sequence[Value]: ...
def is_available(self) -> bool: ...
Import
from langgraph.channels.topic import Topic
I/O Contract
Constructor Parameters
| Parameter | Type | Required | Description |
|---|---|---|---|
| typ | `type[Value]` | Yes | The type of individual values in the topic |
| accumulate | `bool` | No | If `True`, values persist across steps; if `False` (default), values are cleared each step |
Type Mapping
| Property | Type | Description |
|---|---|---|
| `ValueType` | `Sequence[Value]` | The read type is always a sequence of values |
| `UpdateType` | list[Value]` | Accepts individual values or lists (auto-flattened) |
update
| Accumulate | Input | Behavior |
|---|---|---|
| `False` | Any values | Clears existing values first, then appends new flattened values |
| `True` | Any values | Appends new flattened values to existing list |
| Either | Empty sequence | Clears values (if `accumulate=False` and had values) or no-op |
get
| State | Behavior |
|---|---|
| Values present | Returns a copy of the values list |
| Empty | Raises `EmptyChannelError` |
Usage Examples
from langgraph.channels.topic import Topic
# Non-accumulating topic (per-step collection)
events = Topic(str, accumulate=False)
# Step 1: Multiple nodes write events
events.update(["event_a", "event_b"])
events.update(["event_c"])
print(events.get()) # ["event_a", "event_b", "event_c"]
# Step 2: Previous values cleared, new values added
events.update(["event_d"])
print(events.get()) # ["event_d"]
# Accumulating topic (message history)
messages = Topic(str, accumulate=True)
# Step 1
messages.update(["Hello"])
print(messages.get()) # ["Hello"]
# Step 2: Previous messages preserved
messages.update(["How are you?"])
print(messages.get()) # ["Hello", "How are you?"]
# Supports mixed individual and list updates
mixed = Topic(int, accumulate=True)
mixed.update([1, [2, 3], 4]) # Lists are auto-flattened
print(mixed.get()) # [1, 2, 3, 4]
# Checkpoint and restore
cp = messages.checkpoint()
restored = messages.from_checkpoint(cp)
print(restored.get()) # ["Hello", "How are you?"]