Implementation:Online ml River Utils Rolling
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Sliding_Windows, Time_Series |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Generic wrapper for computing statistics over fixed-size or time-based rolling windows.
Description
Provides Rolling for fixed-size windows and TimeRolling for time-based windows. Wraps any object with update/revert methods, maintaining a queue of observations and automatically calling revert when the window is full. Essential for computing windowed statistics on streams.
Usage
Use to compute rolling averages, rolling quantiles, or any statistic over recent observations. Particularly useful for detecting recent changes, smoothing noisy data, or tracking short-term patterns.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/utils/rolling.py
Signature
class Rolling(BaseRolling):
def __init__(self, obj: Rollable, window_size: int):
...
def update(self, *args, **kwargs):
...
class TimeRolling(BaseRolling):
def __init__(self, obj: Rollable, period: dt.timedelta):
...
def update(self, *args, t: dt.datetime, **kwargs):
...
Import
from river import utils
Usage Examples
from river import stats, utils
# Fixed-size rolling window
X = [1, 3, 5, 7]
rmean = utils.Rolling(stats.Mean(), window_size=3)
for x in X:
rmean.update(x)
print(f"Rolling mean: {rmean.get():.1f}")
# Output:
# Rolling mean: 1.0
# Rolling mean: 2.0
# Rolling mean: 3.0
# Rolling mean: 5.0 (mean of 3, 5, 7)
# Time-based rolling window
import datetime as dt
X_time = {
dt.datetime(2019, 1, 1): 1,
dt.datetime(2019, 1, 2): 5,
dt.datetime(2019, 1, 3): 9,
dt.datetime(2019, 1, 4): 13
}
time_rmean = utils.TimeRolling(
stats.Mean(),
period=dt.timedelta(days=3)
)
for t, x in X_time.items():
time_rmean.update(x, t=t)
print(f"{t.date()}: {time_rmean.get():.1f}")
# Output:
# 2019-01-01: 1.0
# 2019-01-02: 3.0
# 2019-01-03: 5.0
# 2019-01-04: 9.0 (mean of 5, 9, 13)
# Works with any Rollable object
rolling_var = utils.Rolling(stats.Var(), window_size=5)
for i in range(10):
rolling_var.update(i)
if rolling_var.n_samples >= 2:
print(f"Rolling variance: {rolling_var.get():.2f}")