Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Covariance EmpiricalCovariance

From Leeroopedia


Knowledge Sources
Domains Online_Learning, Statistics, Covariance, Linear_Algebra
Last Updated 2026-02-08 16:00 GMT

Overview

EmpiricalCovariance computes and maintains a covariance matrix online from streaming data samples.

Description

EmpiricalCovariance incrementally computes the covariance matrix between all features in streaming data. It maintains statistics for each pair of features using Pearson correlation coefficients and converts them to covariances. For diagonal entries (variance), it maintains separate variance statistics.

The implementation uses itertools.combinations to identify all feature pairs and stores covariances in a dictionary indexed by (feature_i, feature_j) tuples. The matrix is symmetric, so only one ordering needs to be stored. The __getitem__ method automatically handles both orderings for convenient access.

The class supports both single-sample updates (update) and mini-batch updates (update_many). The mini-batch version leverages NumPy's efficient cov function for batch computation, then merges the batch statistics with the running covariance matrix using incremental formulas.

A ddof (delta degrees of freedom) parameter controls whether to compute sample covariance (ddof=1) or population covariance (ddof=0). The class also supports reverting (downweighting) samples and can be initialized from pre-computed state using the _from_state class method.

Usage

Use EmpiricalCovariance when you need to track relationships between features in streaming data for dimensionality reduction, anomaly detection, or understanding feature dependencies. It's particularly useful for online principal component analysis, multivariate statistical tests, or adaptive feature selection based on correlation structure.

Code Reference

Source Location

Signature

class EmpiricalCovariance(SymmetricMatrix):
    def __init__(self, ddof=1):
        ...

Import

from river import covariance

I/O Contract

Input
Parameter Type Description
ddof int Delta degrees of freedom (0 for population, 1 for sample)
x dict Dictionary of features for single sample
X DataFrame DataFrame of features for mini-batch
Output
Method Return Type Description
update(x) None Updates covariance matrix with single sample
update_many(X) DataFrame Updates covariance matrix with batch
revert(x) None Removes sample's contribution to covariance
__getitem__((i, j)) stats.Cov or stats.Var Returns covariance or variance statistic
Key Methods
Method Parameters Description
update(x) x: dict Updates with single sample
update_many(X) X: DataFrame Updates with batch of samples
revert(x) x: dict Downdates by removing sample
__getitem__(key) key: tuple[str, str] Accesses covariance element
Properties
Property Type Description
matrix dict Dictionary of covariance/variance statistics

Usage Examples

import numpy as np
import pandas as pd
from river import covariance

# Example 1: Single-sample updates
np.random.seed(42)
X = pd.DataFrame(
    np.random.random((8, 3)),
    columns=["red", "green", "blue"]
)

cov = covariance.EmpiricalCovariance()
for x in X.to_dict(orient="records"):
    cov.update(x)

print(cov)
#         blue     green    red
#  blue    0.076    0.020   -0.010
# green    0.020    0.113   -0.053
#   red   -0.010   -0.053    0.079

# Access individual covariances
print(cov["blue", "green"])  # Cov: 0.020292
print(cov["green", "blue"])  # Same, order doesn't matter

# Diagonal entries are variances
print(cov["blue", "blue"])  # Var: 0.076119

# Example 2: Mini-batch updates
cov_batch = covariance.EmpiricalCovariance()
cov_batch.update_many(X)
print(cov_batch)
# Results are identical to incremental updates

# Example 3: Using ddof parameter
# Sample covariance (ddof=1, default)
cov_sample = covariance.EmpiricalCovariance(ddof=1)
cov_sample.update_many(X)

# Population covariance (ddof=0)
cov_pop = covariance.EmpiricalCovariance(ddof=0)
cov_pop.update_many(X)

print(f"Sample variance: {cov_sample['red', 'red'].get():.6f}")
print(f"Population variance: {cov_pop['red', 'red'].get():.6f}")

# Example 4: Streaming correlation monitoring
from river import stream

# Generate correlated features
np.random.seed(42)
n_samples = 1000

# Create correlated data
data = []
for _ in range(n_samples):
    a = np.random.randn()
    b = 0.8 * a + 0.2 * np.random.randn()  # Correlated with a
    c = np.random.randn()  # Independent
    data.append({'a': a, 'b': b, 'c': c})

cov_stream = covariance.EmpiricalCovariance()

for i, x in enumerate(data):
    cov_stream.update(x)

    if (i + 1) % 100 == 0:
        # Compute correlation from covariance
        cov_ab = cov_stream['a', 'b'].get()
        var_a = cov_stream['a', 'a'].get()
        var_b = cov_stream['b', 'b'].get()
        corr_ab = cov_ab / (np.sqrt(var_a) * np.sqrt(var_b))
        print(f"After {i+1} samples, corr(a,b) = {corr_ab:.3f}")

# Example 5: Revert functionality
cov_rev = covariance.EmpiricalCovariance()

samples = [
    {'x': 1, 'y': 2},
    {'x': 2, 'y': 4},
    {'x': 3, 'y': 6},
]

for s in samples:
    cov_rev.update(s)

print(f"Before revert: {cov_rev['x', 'y'].get():.4f}")

# Remove the last sample
cov_rev.revert(samples[-1])
print(f"After revert: {cov_rev['x', 'y'].get():.4f}")

# Example 6: Online feature selection based on covariance
class CovarianceBasedSelector:
    def __init__(self, threshold=0.5):
        self.cov = covariance.EmpiricalCovariance()
        self.threshold = threshold
        self.selected = set()

    def update_and_select(self, x):
        self.cov.update(x)

        # Find features with high variance
        features = list(x.keys())
        for f in features:
            if self.cov[f, f].get() > self.threshold:
                self.selected.add(f)

        return {k: v for k, v in x.items() if k in self.selected}

selector = CovarianceBasedSelector(threshold=0.1)
for x in X.to_dict(orient="records"):
    selected = selector.update_and_select(x)

print(f"Selected features: {selector.selected}")

# Example 7: Compare with numpy
np_cov = np.cov(X.T, ddof=1)
river_cov_matrix = np.array([
    [cov["red", "red"].get(), cov["red", "green"].get(), cov["red", "blue"].get()],
    [cov["green", "red"].get(), cov["green", "green"].get(), cov["green", "blue"].get()],
    [cov["blue", "red"].get(), cov["blue", "green"].get(), cov["blue", "blue"].get()],
])

print("NumPy covariance:")
print(np_cov)
print("\nRiver covariance:")
print(river_cov_matrix)
print("\nClose?", np.allclose(np_cov, river_cov_matrix))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment