Implementation:Online ml River Covariance EmpiricalCovariance
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Statistics, Covariance, Linear_Algebra |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
EmpiricalCovariance computes and maintains a covariance matrix online from streaming data samples.
Description
EmpiricalCovariance incrementally computes the covariance matrix between all features in streaming data. It maintains statistics for each pair of features using Pearson correlation coefficients and converts them to covariances. For diagonal entries (variance), it maintains separate variance statistics.
The implementation uses itertools.combinations to identify all feature pairs and stores covariances in a dictionary indexed by (feature_i, feature_j) tuples. The matrix is symmetric, so only one ordering needs to be stored. The __getitem__ method automatically handles both orderings for convenient access.
The class supports both single-sample updates (update) and mini-batch updates (update_many). The mini-batch version leverages NumPy's efficient cov function for batch computation, then merges the batch statistics with the running covariance matrix using incremental formulas.
A ddof (delta degrees of freedom) parameter controls whether to compute sample covariance (ddof=1) or population covariance (ddof=0). The class also supports reverting (downweighting) samples and can be initialized from pre-computed state using the _from_state class method.
Usage
Use EmpiricalCovariance when you need to track relationships between features in streaming data for dimensionality reduction, anomaly detection, or understanding feature dependencies. It's particularly useful for online principal component analysis, multivariate statistical tests, or adaptive feature selection based on correlation structure.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/covariance/emp.py
Signature
class EmpiricalCovariance(SymmetricMatrix):
def __init__(self, ddof=1):
...
Import
from river import covariance
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| ddof | int | Delta degrees of freedom (0 for population, 1 for sample) |
| x | dict | Dictionary of features for single sample |
| X | DataFrame | DataFrame of features for mini-batch |
| Method | Return Type | Description |
|---|---|---|
| update(x) | None | Updates covariance matrix with single sample |
| update_many(X) | DataFrame | Updates covariance matrix with batch |
| revert(x) | None | Removes sample's contribution to covariance |
| __getitem__((i, j)) | stats.Cov or stats.Var | Returns covariance or variance statistic |
| Method | Parameters | Description |
|---|---|---|
| update(x) | x: dict | Updates with single sample |
| update_many(X) | X: DataFrame | Updates with batch of samples |
| revert(x) | x: dict | Downdates by removing sample |
| __getitem__(key) | key: tuple[str, str] | Accesses covariance element |
| Property | Type | Description |
|---|---|---|
| matrix | dict | Dictionary of covariance/variance statistics |
Usage Examples
import numpy as np
import pandas as pd
from river import covariance
# Example 1: Single-sample updates
np.random.seed(42)
X = pd.DataFrame(
np.random.random((8, 3)),
columns=["red", "green", "blue"]
)
cov = covariance.EmpiricalCovariance()
for x in X.to_dict(orient="records"):
cov.update(x)
print(cov)
# blue green red
# blue 0.076 0.020 -0.010
# green 0.020 0.113 -0.053
# red -0.010 -0.053 0.079
# Access individual covariances
print(cov["blue", "green"]) # Cov: 0.020292
print(cov["green", "blue"]) # Same, order doesn't matter
# Diagonal entries are variances
print(cov["blue", "blue"]) # Var: 0.076119
# Example 2: Mini-batch updates
cov_batch = covariance.EmpiricalCovariance()
cov_batch.update_many(X)
print(cov_batch)
# Results are identical to incremental updates
# Example 3: Using ddof parameter
# Sample covariance (ddof=1, default)
cov_sample = covariance.EmpiricalCovariance(ddof=1)
cov_sample.update_many(X)
# Population covariance (ddof=0)
cov_pop = covariance.EmpiricalCovariance(ddof=0)
cov_pop.update_many(X)
print(f"Sample variance: {cov_sample['red', 'red'].get():.6f}")
print(f"Population variance: {cov_pop['red', 'red'].get():.6f}")
# Example 4: Streaming correlation monitoring
from river import stream
# Generate correlated features
np.random.seed(42)
n_samples = 1000
# Create correlated data
data = []
for _ in range(n_samples):
a = np.random.randn()
b = 0.8 * a + 0.2 * np.random.randn() # Correlated with a
c = np.random.randn() # Independent
data.append({'a': a, 'b': b, 'c': c})
cov_stream = covariance.EmpiricalCovariance()
for i, x in enumerate(data):
cov_stream.update(x)
if (i + 1) % 100 == 0:
# Compute correlation from covariance
cov_ab = cov_stream['a', 'b'].get()
var_a = cov_stream['a', 'a'].get()
var_b = cov_stream['b', 'b'].get()
corr_ab = cov_ab / (np.sqrt(var_a) * np.sqrt(var_b))
print(f"After {i+1} samples, corr(a,b) = {corr_ab:.3f}")
# Example 5: Revert functionality
cov_rev = covariance.EmpiricalCovariance()
samples = [
{'x': 1, 'y': 2},
{'x': 2, 'y': 4},
{'x': 3, 'y': 6},
]
for s in samples:
cov_rev.update(s)
print(f"Before revert: {cov_rev['x', 'y'].get():.4f}")
# Remove the last sample
cov_rev.revert(samples[-1])
print(f"After revert: {cov_rev['x', 'y'].get():.4f}")
# Example 6: Online feature selection based on covariance
class CovarianceBasedSelector:
def __init__(self, threshold=0.5):
self.cov = covariance.EmpiricalCovariance()
self.threshold = threshold
self.selected = set()
def update_and_select(self, x):
self.cov.update(x)
# Find features with high variance
features = list(x.keys())
for f in features:
if self.cov[f, f].get() > self.threshold:
self.selected.add(f)
return {k: v for k, v in x.items() if k in self.selected}
selector = CovarianceBasedSelector(threshold=0.1)
for x in X.to_dict(orient="records"):
selected = selector.update_and_select(x)
print(f"Selected features: {selector.selected}")
# Example 7: Compare with numpy
np_cov = np.cov(X.T, ddof=1)
river_cov_matrix = np.array([
[cov["red", "red"].get(), cov["red", "green"].get(), cov["red", "blue"].get()],
[cov["green", "red"].get(), cov["green", "green"].get(), cov["green", "blue"].get()],
[cov["blue", "red"].get(), cov["blue", "green"].get(), cov["blue", "blue"].get()],
])
print("NumPy covariance:")
print(np_cov)
print("\nRiver covariance:")
print(river_cov_matrix)
print("\nClose?", np.allclose(np_cov, river_cov_matrix))