Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Compose Grouper

From Leeroopedia


Knowledge Sources
Domains Online_Learning, Feature_Engineering, Grouping, Data_Transformation
Last Updated 2026-02-08 16:00 GMT

Overview

Grouper applies a transformer independently within different groups discovered online from streaming data.

Description

Grouper is a meta-transformer that partitions incoming data into groups based on one or more key features and maintains a separate copy of the provided transformer for each group. This enables group-specific transformations where each group's transformer evolves independently based only on data from that group.

The groups are discovered dynamically as data streams in - when a new group key is encountered, a deep copy of the base transformer is created for that group. The grouper uses a defaultdict with functools.partial to lazily instantiate transformers for new groups.

Keys are extracted from the input dictionary based on the 'by' parameter, which can be a single feature name or a list of feature names. Multiple keys are concatenated with underscores to form a unique group identifier. Each group maintains its own internal state completely separate from other groups.

Usage

Use Grouper when you need to apply transformations that should be learned separately for different categories or segments in your data. Common use cases include computing group-specific statistics, scaling features independently per category, or maintaining separate models for different user segments or product categories.

Code Reference

Source Location

Signature

class Grouper(base.Transformer):
    def __init__(
        self,
        transformer: base.BaseTransformer,
        by: base.typing.FeatureName | list[base.typing.FeatureName],
    ):
        ...

Import

from river import compose

I/O Contract

Input
Parameter Type Description
transformer base.BaseTransformer Transformer to apply within each group
by str or list[str] Feature name(s) defining groups
x dict Feature dictionary containing grouping keys
Output
Method Return Type Description
transform_one(x) dict Transformed features using group-specific transformer
Key Methods
Method Parameters Description
learn_one(x) x: dict Updates the transformer for x's group
transform_one(x) x: dict Transforms x using its group's transformer
Attributes
Attribute Type Description
transformers defaultdict Dictionary mapping group keys to transformer instances
by list[str] List of feature names used for grouping

Usage Examples

from river import compose
from river import preprocessing
from river import stats

# Example: Group-specific standardization
data = [
    {'category': 'A', 'value': 10},
    {'category': 'B', 'value': 100},
    {'category': 'A', 'value': 12},
    {'category': 'B', 'value': 95},
    {'category': 'A', 'value': 8},
    {'category': 'B', 'value': 105},
]

# Create grouper that scales values separately per category
scaler = preprocessing.StandardScaler()
grouper = compose.Grouper(
    transformer=scaler,
    by='category'
)

for x in data:
    grouper.learn_one(x)
    transformed = grouper.transform_one(x)
    print(f"Original: {x}, Transformed: {transformed}")

# Each group maintains its own statistics:
# Category A values (10, 12, 8) are scaled using A's mean/std
# Category B values (100, 95, 105) are scaled using B's mean/std

# Example: Group-specific feature extraction
from river import feature_extraction

# Compute statistics grouped by multiple keys
agg = feature_extraction.Agg(
    on='value',
    by=['category', 'subcategory'],
    how=stats.Mean()
)

data_multi = [
    {'category': 'A', 'subcategory': 'X', 'value': 10},
    {'category': 'A', 'subcategory': 'Y', 'value': 20},
    {'category': 'A', 'subcategory': 'X', 'value': 15},
    {'category': 'B', 'subcategory': 'X', 'value': 30},
]

grouper_multi = compose.Grouper(
    transformer=preprocessing.MinMaxScaler(),
    by=['category', 'subcategory']
)

for x in data_multi:
    grouper_multi.learn_one(x)
    result = grouper_multi.transform_one(x)
    print(result)

# Access transformers for specific groups
# Keys are formed as "value1_value2_..."
print(grouper_multi.transformers['A_X'])  # Transformer for group (A, X)
print(grouper_multi.transformers['A_Y'])  # Transformer for group (A, Y)

# Example: Group-specific polynomial features
from river import feature_extraction as fx

data_poly = [
    {'store': 'NYC', 'x': 1, 'y': 2},
    {'store': 'LA', 'x': 3, 'y': 4},
    {'store': 'NYC', 'x': 2, 'y': 3},
]

poly_grouper = compose.Grouper(
    transformer=fx.PolynomialExtender(degree=2),
    by='store'
)

for x in data_poly:
    result = poly_grouper.transform_one(x)
    print(result)
    poly_grouper.learn_one(x)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment