Implementation:Online ml River Cluster CluStream
| Knowledge Sources | Domains | Last Updated |
|---|---|---|
| River River Docs A Framework for Clustering Evolving Data Streams (Aggarwal et al., 2003) | Online Clustering, Temporal Micro-Clusters | 2026-02-08 16:00 GMT |
Overview
Concrete tool for performing CluStream two-phase clustering on evolving data streams, maintaining temporal micro-clusters in the online phase and periodically applying incremental K-Means for macro-clustering.
Description
The cluster.CluStream class implements the CluStream framework. During initialization, it accumulates micro-clusters one-by-one until max_micro_clusters are filled. After initialization, each new point is either absorbed by the nearest micro-cluster (if within its radius) or triggers a maintenance operation: either an old micro-cluster is replaced or the two closest micro-clusters are merged to make room.
Every time_gap time steps, the algorithm extracts micro-cluster centers and applies cluster.KMeans with n_macro_clusters to produce the final macro-cluster solution. The implementation uses Welford's online algorithm (via River's stats.Var) for incrementally computing variance, which is more numerically stable than the traditional CF vector approach.
Usage
Import cluster.CluStream when you need a temporally aware online clustering algorithm with a fixed memory budget and a known number of output clusters. Additional keyword arguments (such as halflife) are passed through to the internal cluster.KMeans instance.
Code Reference
Source Location
river/cluster/clustream.py:L9-L265
Signature
class CluStream(base.Clusterer):
def __init__(
self,
n_macro_clusters: int = 5,
max_micro_clusters: int = 100,
micro_cluster_r_factor: int = 2,
time_window: int = 1000,
time_gap: int = 100,
seed: int | None = None,
**kwargs
)
Import
from river import cluster
Key Parameters
| Parameter | Default | Description |
|---|---|---|
| n_macro_clusters | 5 | Number of macro-clusters (k) for the K-Means offline phase. |
| max_micro_clusters | 100 | Maximum number of micro-clusters maintained in memory. |
| micro_cluster_r_factor | 2 | Multiplier for the micro-cluster radius (RMS deviation times this factor). |
| time_window | 1000 | Only data within the period (T-time_window, T) is considered for relevance stamps.
|
| time_gap | 100 | Interval at which incremental K-Means is applied on micro-cluster centers. |
| seed | None | Random seed for the internal K-Means algorithm. |
| **kwargs | -- | Additional parameters forwarded to the internal cluster.KMeans (e.g., halflife).
|
Methods
| Method | Signature | Description |
|---|---|---|
| learn_one | learn_one(x: dict, w=1.0) -> None |
Absorbs observation x into the nearest micro-cluster or triggers maintenance; periodically applies K-Means on micro-cluster centers. |
| predict_one | predict_one(x: dict) -> int |
Finds the nearest micro-cluster to x, then returns the macro-cluster assignment of that micro-cluster's center via the internal K-Means model. Returns 0 before macro-clustering has been performed. |
Key Attributes
| Attribute | Type | Description |
|---|---|---|
| centers | dict[int, defaultdict] |
Centers of the macro-clusters, updated after each K-Means application. |
| micro_clusters | dict[int, CluStreamMicroCluster] |
Current set of micro-clusters with their CF vectors. |
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
| x | dict |
A dictionary mapping feature names to numeric values. |
| w | float |
Optional sample weight (default 1.0). |
Outputs
| Output | Type | Description |
|---|---|---|
| predict_one return | int |
The macro-cluster index (0 to n_macro_clusters-1) assigned to the observation. |
Usage Examples
from river import cluster
from river import stream
X = [
[1, 2],
[1, 4],
[1, 0],
[-4, 2],
[-4, 4],
[-4, 0],
[5, 0],
[5, 2],
[5, 4]
]
clustream = cluster.CluStream(
n_macro_clusters=3,
max_micro_clusters=5,
time_gap=3,
seed=0,
halflife=0.4
)
for x, _ in stream.iter_array(X):
clustream.learn_one(x)
clustream.predict_one({0: 1, 1: 1})
# 1
clustream.predict_one({0: -4, 1: 3})
# 2
clustream.predict_one({0: 4, 1: 3.5})
# 0