Implementation:Online ml River Compose TransformerUnion
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Feature_Engineering, Pipeline, Data_Transformation |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
TransformerUnion applies multiple transformers in parallel and merges their outputs into a single feature dictionary.
Description
TransformerUnion is a composition pattern that allows multiple transformers to process the same input simultaneously and combine their outputs. Unlike a Pipeline where transformers are applied sequentially, a TransformerUnion applies all transformers in parallel to the input and merges the resulting feature dictionaries.
The class maintains a dictionary of named transformers and iterates through each one during learn_one and transform_one operations. During transformation, it uses collections.ChainMap to efficiently merge all output dictionaries into a single dictionary, preserving all features from all transformers.
The union supports both supervised and unsupervised transformers, automatically detecting which ones require target values during learning. It also provides a convenient + operator for composing unions inline. TransformerUnion implements both single-sample and mini-batch interfaces for flexibility.
Usage
Use TransformerUnion when you need to apply different transformations to different parts of your input data in parallel. Common use cases include applying one-hot encoding to categorical features while scaling numeric features, or computing multiple aggregate statistics over different groupings of the same data.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/compose/union.py
Signature
class TransformerUnion(base.MiniBatchTransformer):
def __init__(self, *transformers) -> None:
...
Import
from river import compose
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| transformers | tuple | Variable number of transformers or (name, transformer) tuples |
| x | dict | Feature dictionary for single-sample methods |
| X | DataFrame | Feature dataframe for mini-batch methods |
| y | optional | Target value(s) for supervised transformers |
| Method | Return Type | Description |
|---|---|---|
| transform_one(x) | dict | Merged dictionary of all transformer outputs |
| transform_many(X) | DataFrame | Concatenated dataframe of all transformer outputs |
| Method | Parameters | Description |
|---|---|---|
| learn_one(x, y=None) | x: dict, y: optional | Updates all transformers with single sample |
| transform_one(x) | x: dict | Applies all transformers and merges outputs |
| learn_many(X, y=None) | X: DataFrame, y: optional | Updates all transformers with batch |
| transform_many(X) | X: DataFrame | Applies all transformers and concatenates outputs |
| __getitem__(key) | key: str or int | Accesses transformer by name or index |
| __add__(other) | other: transformer | Adds transformer to union (+ operator) |
Usage Examples
from river import compose
from river import feature_extraction
from river import stats
X = [
{'place': 'Taco Bell', 'revenue': 42},
{'place': 'Burger King', 'revenue': 16},
{'place': 'Burger King', 'revenue': 24},
{'place': 'Taco Bell', 'revenue': 58},
]
# Create union of multiple aggregates
mean = feature_extraction.Agg(
on='revenue', by='place',
how=stats.Mean()
)
count = feature_extraction.Agg(
on='revenue', by='place',
how=stats.Count()
)
agg = compose.TransformerUnion(mean, count)
# Or use + operator shorthand
agg = mean + count
# Process data
from pprint import pprint
for x in X:
agg.learn_one(x)
pprint(agg.transform_one(x))
# Output:
# {'revenue_count_by_place': 1, 'revenue_mean_by_place': 42.0}
# {'revenue_count_by_place': 1, 'revenue_mean_by_place': 16.0}
# {'revenue_count_by_place': 2, 'revenue_mean_by_place': 20.0}
# {'revenue_count_by_place': 2, 'revenue_mean_by_place': 50.0}
# Build complex pipeline
from river import linear_model
from river import preprocessing
model = (
(mean + count) |
preprocessing.StandardScaler() |
linear_model.LogisticRegression()
)
# Access transformers by name
model['TransformerUnion']['Agg'] # First Agg (mean)
model['TransformerUnion']['Agg1'] # Second Agg (count)
# Custom names
agg = compose.TransformerUnion(
('Mean revenue by place', mean),
('# by place', count)
)
# Mini-batch example
import pandas as pd
X = pd.DataFrame([
{"place": 2, "revenue": 42},
{"place": 3, "revenue": 16},
{"place": 3, "revenue": 24},
])
agg = (
compose.Select("place") +
(compose.Select("revenue") | preprocessing.StandardScaler())
)
agg.learn_many(X)
print(agg.transform_many(X))