Implementation:Online ml River Compose Pipeline
| Knowledge Sources | River River Docs |
|---|---|
| Domains | Online_Learning Software_Design Classification |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Concrete tool for chaining multiple transformers and a final estimator into a single composite model that manages the full learn/transform/predict lifecycle for online learning.
Description
The compose.Pipeline class is the central abstraction for building multi-step models in River. It stores an ordered sequence of estimators (typically transformers followed by a final classifier or regressor) and orchestrates data flow through the chain during learning and prediction.
The pipeline can be constructed in two ways:
- Using the pipe operator (
|):scaler | classifiercreates a pipeline via Python's__or__dunder method. - Using the constructor:
compose.Pipeline(scaler, classifier).
Internally, steps are stored in a collections.OrderedDict keyed by automatically inferred names (class names, with numeric suffixes for duplicates). The pipeline delegates to the appropriate methods of its constituent steps:
learn_one(x, y): Feedsxthrough each transformer (updating unsupervised transformers before transforming), then trains the final estimator on the transformed features.predict_one(x): Transformsxthrough all transformers, then callspredict_oneon the final step.predict_proba_one(x): Same as above but returns class probabilities.
The pipeline also supports mini-batch operations (learn_many, predict_many, predict_proba_many) and includes a debug_one method for inspecting how data flows through each step.
Usage
Import this class when you need to:
- Chain preprocessing steps (scalers, encoders) with a classifier or regressor.
- Create a single model object for use with
evaluate.progressive_val_score. - Build complex feature engineering pipelines with unions, prefixers, or function transformers.
- Ensure consistent transformation during both training and prediction.
Code Reference
Source Location
| File | Lines |
|---|---|
river/compose/pipeline.py |
L121-L757 |
Signature
class Pipeline(base.Estimator):
def __init__(self, *steps) -> None
# Operator overloads
def __or__(self, other) -> Pipeline # model | step
def __ror__(self, other) -> Pipeline # step | model
# Single-instance methods
def learn_one(self, x: dict, y=None, **params)
def predict_one(self, x: dict, **params)
def predict_proba_one(self, x: dict, **params)
def transform_one(self, x: dict, **params)
def score_one(self, x: dict, **params)
# Mini-batch methods
def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params)
def predict_many(self, X: pd.DataFrame)
def predict_proba_many(self, X: pd.DataFrame)
def transform_many(self, X: pd.DataFrame)
# Debugging
def debug_one(self, x: dict, show_types=True, n_decimals=5) -> str
Import
from river import compose
# Via constructor
model = compose.Pipeline(step1, step2, step3)
# Via pipe operator (preferred)
model = step1 | step2 | step3
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
*steps |
Estimators or (name, estimator) tuples |
Sequence of estimators to chain. Transformers should precede the final estimator. Names are auto-inferred if not provided. |
x (to learn_one/predict_one) |
dict |
A dictionary of features. |
y (to learn_one) |
any | The target value for supervised learning. |
X (to mini-batch methods) |
pd.DataFrame |
A DataFrame of features for mini-batch operations. |
Outputs
| Method | Return Type | Description |
|---|---|---|
predict_one(x) |
Depends on final step | Predicted label (e.g., bool for binary classification).
|
predict_proba_one(x) |
dict |
Class probability distribution (e.g., {False: 0.3, True: 0.7}).
|
transform_one(x) |
dict |
Transformed feature dictionary (if the final step is a transformer). |
debug_one(x) |
str |
Human-readable report showing data state at each pipeline step. |
Usage Examples
Creating a binary classification pipeline:
from river import compose, linear_model, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
# Equivalent to:
model = compose.Pipeline(
preprocessing.StandardScaler(),
linear_model.LogisticRegression()
)
Training and predicting:
from river import datasets, linear_model, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
for x, y in datasets.Phishing():
y_pred = model.predict_proba_one(x)
model.learn_one(x, y)
Accessing pipeline steps:
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
# Access by name
scaler = model['StandardScaler']
# Access by index
first_step = model[0]
last_step = model[-1]
Debugging data flow:
from river import compose, datasets, linear_model, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
for x, y in datasets.Phishing():
model.learn_one(x, y)
break
report = model.debug_one(x)
print(report)