Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Cluster CluStream

From Leeroopedia


Knowledge Sources Domains Last Updated
River River Docs A Framework for Clustering Evolving Data Streams (Aggarwal et al., 2003) Online Clustering, Temporal Micro-Clusters 2026-02-08 16:00 GMT

Overview

Concrete tool for performing CluStream two-phase clustering on evolving data streams, maintaining temporal micro-clusters in the online phase and periodically applying incremental K-Means for macro-clustering.

Description

The cluster.CluStream class implements the CluStream framework. During initialization, it accumulates micro-clusters one-by-one until max_micro_clusters are filled. After initialization, each new point is either absorbed by the nearest micro-cluster (if within its radius) or triggers a maintenance operation: either an old micro-cluster is replaced or the two closest micro-clusters are merged to make room.

Every time_gap time steps, the algorithm extracts micro-cluster centers and applies cluster.KMeans with n_macro_clusters to produce the final macro-cluster solution. The implementation uses Welford's online algorithm (via River's stats.Var) for incrementally computing variance, which is more numerically stable than the traditional CF vector approach.

Usage

Import cluster.CluStream when you need a temporally aware online clustering algorithm with a fixed memory budget and a known number of output clusters. Additional keyword arguments (such as halflife) are passed through to the internal cluster.KMeans instance.

Code Reference

Source Location

river/cluster/clustream.py:L9-L265

Signature

class CluStream(base.Clusterer):
    def __init__(
        self,
        n_macro_clusters: int = 5,
        max_micro_clusters: int = 100,
        micro_cluster_r_factor: int = 2,
        time_window: int = 1000,
        time_gap: int = 100,
        seed: int | None = None,
        **kwargs
    )

Import

from river import cluster

Key Parameters

Parameter Default Description
n_macro_clusters 5 Number of macro-clusters (k) for the K-Means offline phase.
max_micro_clusters 100 Maximum number of micro-clusters maintained in memory.
micro_cluster_r_factor 2 Multiplier for the micro-cluster radius (RMS deviation times this factor).
time_window 1000 Only data within the period (T-time_window, T) is considered for relevance stamps.
time_gap 100 Interval at which incremental K-Means is applied on micro-cluster centers.
seed None Random seed for the internal K-Means algorithm.
**kwargs -- Additional parameters forwarded to the internal cluster.KMeans (e.g., halflife).

Methods

Method Signature Description
learn_one learn_one(x: dict, w=1.0) -> None Absorbs observation x into the nearest micro-cluster or triggers maintenance; periodically applies K-Means on micro-cluster centers.
predict_one predict_one(x: dict) -> int Finds the nearest micro-cluster to x, then returns the macro-cluster assignment of that micro-cluster's center via the internal K-Means model. Returns 0 before macro-clustering has been performed.

Key Attributes

Attribute Type Description
centers dict[int, defaultdict] Centers of the macro-clusters, updated after each K-Means application.
micro_clusters dict[int, CluStreamMicroCluster] Current set of micro-clusters with their CF vectors.

I/O Contract

Inputs

Parameter Type Description
x dict A dictionary mapping feature names to numeric values.
w float Optional sample weight (default 1.0).

Outputs

Output Type Description
predict_one return int The macro-cluster index (0 to n_macro_clusters-1) assigned to the observation.

Usage Examples

from river import cluster
from river import stream

X = [
    [1, 2],
    [1, 4],
    [1, 0],
    [-4, 2],
    [-4, 4],
    [-4, 0],
    [5, 0],
    [5, 2],
    [5, 4]
]

clustream = cluster.CluStream(
    n_macro_clusters=3,
    max_micro_clusters=5,
    time_gap=3,
    seed=0,
    halflife=0.4
)

for x, _ in stream.iter_array(X):
    clustream.learn_one(x)

clustream.predict_one({0: 1, 1: 1})
# 1

clustream.predict_one({0: -4, 1: 3})
# 2

clustream.predict_one({0: 4, 1: 3.5})
# 0

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment