Implementation:Online ml River Stats Shift
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Statistics |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Shift returns past values from a data stream by shifting observations backward in time.
Description
This statistic maintains a buffer of recent values and returns a value from a specified number of steps in the past. It is primarily used as a building block for other statistics to avoid data leakage when computing statistics over target values. The shift amount determines how many steps back to retrieve, and a fill_value is returned when not enough observations have been seen yet.
Usage
Use Shift when you need to access past values in streaming computations, particularly to prevent leakage when calculating statistics on target variables. It is commonly composed with other statistics using the pipe operator (|) to create lagged versions of statistics, such as computing yesterday's average or last week's statistics without including today's data.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/stats/shift.py
Signature
class Shift(stats.base.Univariate):
def __init__(self, amount=1, fill_value=None):
self.amount = amount
self.fill_value = fill_value
self.buffer = collections.deque(maxlen=self.amount + 1)
Import
from river import stats
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| x | Any | Yes | Value to add to the buffer |
| amount | int | Yes (init) | Number of steps to shift back (default: 1) |
| fill_value | Any | No (init) | Value to return before buffer is filled (default: None) |
Outputs
| Name | Type | Description |
|---|---|---|
| get() | Any | Value from 'amount' steps ago (or fill_value if not enough data) |
Usage Examples
from river import stats
# Basic shift by 1 step
shift = stats.Shift(1) | stats.Mean()
for i in range(5):
shift.update(i)
print(f"Current: {i}, Shifted Mean: {shift.get():.1f}")
# Output:
# Current: 0, Shifted Mean: 0.0
# Current: 1, Shifted Mean: 0.0
# Current: 2, Shifted Mean: 0.5
# Current: 3, Shifted Mean: 1.0
# Current: 4, Shifted Mean: 1.5
# Using with feature aggregation
from river import feature_extraction
# Average sales per shop, excluding today's sales
agg = feature_extraction.Agg(
on='sales',
how=stats.Shift(1) | stats.Mean(),
by='shop'
)
X = [
{'shop': 'Ikea', 'sales': 10},
{'shop': 'Ikea', 'sales': 15},
{'shop': 'Ikea', 'sales': 20}
]
# First observation
agg.learn_one(X[0])
print(agg.transform_one(X[0]))
# Output: {'sales_mean_of_shift_1_by_shop': 0.0}
# Second observation
agg.learn_one(X[1])
print(agg.transform_one(X[0]))
# Output: {'sales_mean_of_shift_1_by_shop': 10.0}
# Third observation
agg.learn_one(X[2])
print(agg.transform_one(X[0]))
# Output: {'sales_mean_of_shift_1_by_shop': 12.5}
# Shift by multiple steps
shift_3 = stats.Shift(amount=3, fill_value=-1)
for i in range(7):
shift_3.update(i * 10)
print(f"Current: {i*10}, 3 steps ago: {shift_3.get()}")
# Output:
# Current: 0, 3 steps ago: -1
# Current: 10, 3 steps ago: -1
# Current: 20, 3 steps ago: -1
# Current: 30, 3 steps ago: -1
# Current: 40, 3 steps ago: 0
# Current: 50, 3 steps ago: 10
# Current: 60, 3 steps ago: 20
# Computing yesterday's average (shift by 1 day)
daily_avg = stats.Shift(1) | stats.Mean()
for day in range(1, 8):
value = day * 100
daily_avg.update(value)
print(f"Day {day}: Yesterday's avg = {daily_avg.get():.1f}")