Implementation:Online ml River Compose FuncTransformer
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Feature_Engineering, Pipeline, Data_Transformation |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
FuncTransformer wraps a Python function to make it usable as a transformer in River pipelines.
Description
FuncTransformer provides a simple way to integrate custom transformation logic into River pipelines by wrapping any Python function that takes a dictionary and returns a dictionary. This allows for arbitrary feature engineering operations to be composed with other transformers and models.
The transformer expects functions that follow a simple convention: accept a dict of features as input and output a dict of features. The function can be pure (not modifying the input) or impure (modifying the input in-place), though pure functions are recommended to avoid bugs.
FuncTransformer supports both single-sample (transform_one) and mini-batch (transform_many) processing by applying the same function to the appropriate data structure. When added to a pipeline without explicit wrapping, River automatically wraps plain functions in FuncTransformer.
Usage
Use FuncTransformer when you need to apply custom transformation logic that doesn't fit into River's existing transformers. Common use cases include parsing dates, creating domain-specific features, applying mathematical transformations, or any other custom feature engineering that can be expressed as a function from dict to dict.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/compose/func.py
Signature
class FuncTransformer(base.MiniBatchTransformer):
def __init__(self, func: typing.Callable[[dict], dict]):
...
Import
from river import compose
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| func | Callable[[dict], dict] | Function that takes dict and returns dict |
| x | dict | Feature dictionary for single-sample transformation |
| X | DataFrame | Feature dataframe for mini-batch transformation |
| Method | Return Type | Description |
|---|---|---|
| transform_one(x) | dict | Result of applying function to input dict |
| transform_many(X) | DataFrame | Result of applying function to input DataFrame |
| Method | Parameters | Description |
|---|---|---|
| transform_one(x) | x: dict | Applies wrapped function to single sample |
| transform_many(X) | X: DataFrame | Applies wrapped function to mini-batch |
Usage Examples
from pprint import pprint
import datetime as dt
from river import compose
x = {'date': '2019-02-14'}
# Example 1: Parse date (impure - modifies input)
def parse_date_impure(x):
date = dt.datetime.strptime(x['date'], '%Y-%m-%d')
x['is_weekend'] = date.weekday() in (5, 6)
x['hour'] = date.hour
return x
t = compose.FuncTransformer(parse_date_impure)
pprint(t.transform_one(x))
# {'date': '2019-02-14', 'hour': 0, 'is_weekend': False}
# Example 2: Parse date (pure - returns new dict)
def parse_date_pure(x):
date = dt.datetime.strptime(x['date'], '%Y-%m-%d')
return {'is_weekend': date.weekday() in (5, 6), 'hour': date.hour}
t = compose.FuncTransformer(parse_date_pure)
pprint(t.transform_one(x))
# {'hour': 0, 'is_weekend': False}
# Example 3: Pure function that includes original features
def parse_date_with_original(x):
date = dt.datetime.strptime(x['date'], '%Y-%m-%d')
return {'is_weekend': date.weekday() in (5, 6), 'hour': date.hour, **x}
t = compose.FuncTransformer(parse_date_with_original)
pprint(t.transform_one(x))
# {'date': '2019-02-14', 'hour': 0, 'is_weekend': False}
# Use in pipeline with automatic wrapping
from river import naive_bayes
# Explicit wrapping
pipeline = compose.FuncTransformer(parse_date_pure) | naive_bayes.MultinomialNB()
# Automatic wrapping (River detects it's a function)
pipeline = parse_date_pure | naive_bayes.MultinomialNB()
# Both produce:
# Pipeline (
# FuncTransformer (
# func="parse_date_pure"
# ),
# MultinomialNB (
# alpha=1.
# )
# )
# Example with numeric transformations
def log_transform(x):
import math
return {f'log_{k}': math.log(v + 1) for k, v in x.items() if isinstance(v, (int, float))}
t = compose.FuncTransformer(log_transform)
result = t.transform_one({'a': 10, 'b': 100, 'c': 'text'})
pprint(result)
# {'log_a': 2.397895..., 'log_b': 4.615120...}