Implementation:Online ml River FeatureExtraction Agg
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Feature_Engineering, Aggregation |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Streaming aggregate feature extraction grouped by categorical variables, similar to SQL GROUP BY operations.
Description
The Agg transformer computes running aggregate statistics on a specified feature, optionally grouped by one or more categorical features. It maintains per-group statistics (like Mean, Max, Mode) that update incrementally as new data arrives. TargetAgg is a supervised variant that aggregates target values instead of features, useful for target encoding. Both support time-windowed aggregates through integration with TimeRolling utilities. The feature names are automatically generated to reflect the aggregation.
Usage
Use Agg for creating aggregate features in streaming data, such as user averages, category statistics, or temporal patterns. TargetAgg is particularly powerful for target encoding of categorical variables, where Bayesian statistics can be used to prevent overfitting. Common applications include recommendation systems, fraud detection, and any scenario where group-level statistics provide predictive signal. Combine multiple Agg instances with TransformerUnion to extract diverse aggregate features efficiently.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/feature_extraction/agg.py
Signature
class Agg(base.Transformer):
def __init__(
self,
on: str,
by: str | list[str] | None,
how: stats.base.Univariate | utils.Rolling | utils.TimeRolling,
)
class TargetAgg(base.SupervisedTransformer, Agg):
def __init__(
self,
by: str | list[str] | None,
how: stats.base.Univariate | utils.Rolling | utils.TimeRolling,
target_name="y",
)
Import
from river import feature_extraction
from river import stats
I/O Contract
| Input | Output |
|---|---|
| Dict[str, Any] - Features | Dict[str, float] - Aggregate statistics |
Usage Examples
from river import feature_extraction as fx
from river import stats
X = [
{'country': 'France', 'place': 'Taco Bell', 'revenue': 42},
{'country': 'Sweden', 'place': 'Burger King', 'revenue': 16},
{'country': 'France', 'place': 'Burger King', 'revenue': 24},
{'country': 'Sweden', 'place': 'Taco Bell', 'revenue': 58},
]
# Average revenue per place
agg = fx.Agg(
on='revenue',
by='place',
how=stats.Mean()
)
for x in X:
agg.learn_one(x)
print(agg.transform_one(x))
# {'revenue_mean_by_place': 42.0}
# {'revenue_mean_by_place': 16.0}
# {'revenue_mean_by_place': 20.0}
# {'revenue_mean_by_place': 50.0}
# TargetAgg for target encoding
dataset = [
({'country': 'France', 'place': 'Taco Bell'}, 42),
({'country': 'Sweden', 'place': 'Burger King'}, 16),
({'country': 'France', 'place': 'Burger King'}, 24),
({'country': 'Sweden', 'place': 'Taco Bell'}, 58),
]
agg = fx.TargetAgg(
by='place',
how=stats.BayesianMean(
prior=3,
prior_weight=1
)
)
for x, y in dataset:
print(agg.transform_one(x))
agg.learn_one(x, y)
# {'y_bayes_mean_by_place': 3.0}
# {'y_bayes_mean_by_place': 3.0}
# {'y_bayes_mean_by_place': 9.5}
# {'y_bayes_mean_by_place': 22.5}