Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Langchain ai Langgraph Topic Channel

From Leeroopedia
Knowledge Sources
Domains Channels, PubSub
Last Updated 2026-02-11 16:00 GMT

Overview

A configurable PubSub channel that collects values from multiple producers into a list, with an optional accumulate mode that preserves values across steps.

Description

The Topic channel extends `BaseChannel` to implement a publish-subscribe pattern where multiple nodes can write values that are collected into a list. Unlike single-value channels such as `LastValue` or `AnyValue`, `Topic` aggregates all updates from a step into a sequence, making it ideal for fan-in patterns where multiple nodes contribute results that should all be preserved.

The channel supports two operating modes controlled by the `accumulate` parameter. When `accumulate=False` (the default), the values list is cleared at the beginning of each step before new values are appended. This means `get()` only returns values written during the current step. When `accumulate=True`, values persist across steps, building up an ever-growing list. This is useful for message history channels where the full conversation must be preserved.

Updates accept both individual values and lists of values, with automatic flattening via the internal `_flatten` helper. This allows nodes to write either a single item or a batch of items in a single update call. The `get()` method returns a copy of the values list (not a reference) to prevent external mutation, and raises `EmptyChannelError` when no values are present. Checkpointing saves the full values list, and `from_checkpoint` includes backwards-compatible handling for an older tuple-based checkpoint format.

Usage

Use `Topic` for channels that collect outputs from multiple nodes into a list, such as message accumulation in chatbot graphs, event logging, or fan-in aggregation patterns. Set `accumulate=True` for message history channels that must preserve the full conversation across steps, or leave it as `False` for per-step collection channels.

Code Reference

Source Location

Signature

class Topic(
    Generic[Value],
    BaseChannel[Sequence[Value], Value | list[Value], list[Value]],
):
    def __init__(self, typ: type[Value], accumulate: bool = False) -> None: ...

    @property
    def ValueType(self) -> Any: ...
    @property
    def UpdateType(self) -> Any: ...

    def copy(self) -> Self: ...
    def checkpoint(self) -> list[Value]: ...
    def from_checkpoint(self, checkpoint: list[Value]) -> Self: ...
    def update(self, values: Sequence[Value | list[Value]]) -> bool: ...
    def get(self) -> Sequence[Value]: ...
    def is_available(self) -> bool: ...

Import

from langgraph.channels.topic import Topic

I/O Contract

Constructor Parameters

Parameter Type Required Description
typ `type[Value]` Yes The type of individual values in the topic
accumulate `bool` No If `True`, values persist across steps; if `False` (default), values are cleared each step

Type Mapping

Property Type Description
`ValueType` `Sequence[Value]` The read type is always a sequence of values
`UpdateType` list[Value]` Accepts individual values or lists (auto-flattened)

update

Accumulate Input Behavior
`False` Any values Clears existing values first, then appends new flattened values
`True` Any values Appends new flattened values to existing list
Either Empty sequence Clears values (if `accumulate=False` and had values) or no-op

get

State Behavior
Values present Returns a copy of the values list
Empty Raises `EmptyChannelError`

Usage Examples

from langgraph.channels.topic import Topic

# Non-accumulating topic (per-step collection)
events = Topic(str, accumulate=False)

# Step 1: Multiple nodes write events
events.update(["event_a", "event_b"])
events.update(["event_c"])
print(events.get())  # ["event_a", "event_b", "event_c"]

# Step 2: Previous values cleared, new values added
events.update(["event_d"])
print(events.get())  # ["event_d"]

# Accumulating topic (message history)
messages = Topic(str, accumulate=True)

# Step 1
messages.update(["Hello"])
print(messages.get())  # ["Hello"]

# Step 2: Previous messages preserved
messages.update(["How are you?"])
print(messages.get())  # ["Hello", "How are you?"]

# Supports mixed individual and list updates
mixed = Topic(int, accumulate=True)
mixed.update([1, [2, 3], 4])  # Lists are auto-flattened
print(mixed.get())  # [1, 2, 3, 4]

# Checkpoint and restore
cp = messages.checkpoint()
restored = messages.from_checkpoint(cp)
print(restored.get())  # ["Hello", "How are you?"]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment