Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Compose TransformerUnion

From Leeroopedia
Revision as of 16:06, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Online_ml_River_Compose_TransformerUnion.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Online_Learning, Feature_Engineering, Pipeline, Data_Transformation
Last Updated 2026-02-08 16:00 GMT

Overview

TransformerUnion applies multiple transformers in parallel and merges their outputs into a single feature dictionary.

Description

TransformerUnion is a composition pattern that allows multiple transformers to process the same input simultaneously and combine their outputs. Unlike a Pipeline where transformers are applied sequentially, a TransformerUnion applies all transformers in parallel to the input and merges the resulting feature dictionaries.

The class maintains a dictionary of named transformers and iterates through each one during learn_one and transform_one operations. During transformation, it uses collections.ChainMap to efficiently merge all output dictionaries into a single dictionary, preserving all features from all transformers.

The union supports both supervised and unsupervised transformers, automatically detecting which ones require target values during learning. It also provides a convenient + operator for composing unions inline. TransformerUnion implements both single-sample and mini-batch interfaces for flexibility.

Usage

Use TransformerUnion when you need to apply different transformations to different parts of your input data in parallel. Common use cases include applying one-hot encoding to categorical features while scaling numeric features, or computing multiple aggregate statistics over different groupings of the same data.

Code Reference

Source Location

Signature

class TransformerUnion(base.MiniBatchTransformer):
    def __init__(self, *transformers) -> None:
        ...

Import

from river import compose

I/O Contract

Input
Parameter Type Description
transformers tuple Variable number of transformers or (name, transformer) tuples
x dict Feature dictionary for single-sample methods
X DataFrame Feature dataframe for mini-batch methods
y optional Target value(s) for supervised transformers
Output
Method Return Type Description
transform_one(x) dict Merged dictionary of all transformer outputs
transform_many(X) DataFrame Concatenated dataframe of all transformer outputs
Key Methods
Method Parameters Description
learn_one(x, y=None) x: dict, y: optional Updates all transformers with single sample
transform_one(x) x: dict Applies all transformers and merges outputs
learn_many(X, y=None) X: DataFrame, y: optional Updates all transformers with batch
transform_many(X) X: DataFrame Applies all transformers and concatenates outputs
__getitem__(key) key: str or int Accesses transformer by name or index
__add__(other) other: transformer Adds transformer to union (+ operator)

Usage Examples

from river import compose
from river import feature_extraction
from river import stats

X = [
    {'place': 'Taco Bell', 'revenue': 42},
    {'place': 'Burger King', 'revenue': 16},
    {'place': 'Burger King', 'revenue': 24},
    {'place': 'Taco Bell', 'revenue': 58},
]

# Create union of multiple aggregates
mean = feature_extraction.Agg(
    on='revenue', by='place',
    how=stats.Mean()
)
count = feature_extraction.Agg(
    on='revenue', by='place',
    how=stats.Count()
)
agg = compose.TransformerUnion(mean, count)

# Or use + operator shorthand
agg = mean + count

# Process data
from pprint import pprint
for x in X:
    agg.learn_one(x)
    pprint(agg.transform_one(x))

# Output:
# {'revenue_count_by_place': 1, 'revenue_mean_by_place': 42.0}
# {'revenue_count_by_place': 1, 'revenue_mean_by_place': 16.0}
# {'revenue_count_by_place': 2, 'revenue_mean_by_place': 20.0}
# {'revenue_count_by_place': 2, 'revenue_mean_by_place': 50.0}

# Build complex pipeline
from river import linear_model
from river import preprocessing

model = (
    (mean + count) |
    preprocessing.StandardScaler() |
    linear_model.LogisticRegression()
)

# Access transformers by name
model['TransformerUnion']['Agg']  # First Agg (mean)
model['TransformerUnion']['Agg1']  # Second Agg (count)

# Custom names
agg = compose.TransformerUnion(
    ('Mean revenue by place', mean),
    ('# by place', count)
)

# Mini-batch example
import pandas as pd

X = pd.DataFrame([
    {"place": 2, "revenue": 42},
    {"place": 3, "revenue": 16},
    {"place": 3, "revenue": 24},
])

agg = (
    compose.Select("place") +
    (compose.Select("revenue") | preprocessing.StandardScaler())
)

agg.learn_many(X)
print(agg.transform_many(X))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment