Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Online ml River Compose Pipeline

From Leeroopedia


Knowledge Sources River River Docs
Domains Online_Learning Software_Design Classification
Last Updated 2026-02-08 16:00 GMT

Overview

Concrete tool for chaining multiple transformers and a final estimator into a single composite model that manages the full learn/transform/predict lifecycle for online learning.

Description

The compose.Pipeline class is the central abstraction for building multi-step models in River. It stores an ordered sequence of estimators (typically transformers followed by a final classifier or regressor) and orchestrates data flow through the chain during learning and prediction.

The pipeline can be constructed in two ways:

  • Using the pipe operator (|): scaler | classifier creates a pipeline via Python's __or__ dunder method.
  • Using the constructor: compose.Pipeline(scaler, classifier).

Internally, steps are stored in a collections.OrderedDict keyed by automatically inferred names (class names, with numeric suffixes for duplicates). The pipeline delegates to the appropriate methods of its constituent steps:

  • learn_one(x, y): Feeds x through each transformer (updating unsupervised transformers before transforming), then trains the final estimator on the transformed features.
  • predict_one(x): Transforms x through all transformers, then calls predict_one on the final step.
  • predict_proba_one(x): Same as above but returns class probabilities.

The pipeline also supports mini-batch operations (learn_many, predict_many, predict_proba_many) and includes a debug_one method for inspecting how data flows through each step.

Usage

Import this class when you need to:

  • Chain preprocessing steps (scalers, encoders) with a classifier or regressor.
  • Create a single model object for use with evaluate.progressive_val_score.
  • Build complex feature engineering pipelines with unions, prefixers, or function transformers.
  • Ensure consistent transformation during both training and prediction.

Code Reference

Source Location

File Lines
river/compose/pipeline.py L121-L757

Signature

class Pipeline(base.Estimator):
    def __init__(self, *steps) -> None

    # Operator overloads
    def __or__(self, other) -> Pipeline     # model | step
    def __ror__(self, other) -> Pipeline    # step | model

    # Single-instance methods
    def learn_one(self, x: dict, y=None, **params)
    def predict_one(self, x: dict, **params)
    def predict_proba_one(self, x: dict, **params)
    def transform_one(self, x: dict, **params)
    def score_one(self, x: dict, **params)

    # Mini-batch methods
    def learn_many(self, X: pd.DataFrame, y: pd.Series | None = None, **params)
    def predict_many(self, X: pd.DataFrame)
    def predict_proba_many(self, X: pd.DataFrame)
    def transform_many(self, X: pd.DataFrame)

    # Debugging
    def debug_one(self, x: dict, show_types=True, n_decimals=5) -> str

Import

from river import compose

# Via constructor
model = compose.Pipeline(step1, step2, step3)

# Via pipe operator (preferred)
model = step1 | step2 | step3

I/O Contract

Inputs

Parameter Type Description
*steps Estimators or (name, estimator) tuples Sequence of estimators to chain. Transformers should precede the final estimator. Names are auto-inferred if not provided.
x (to learn_one/predict_one) dict A dictionary of features.
y (to learn_one) any The target value for supervised learning.
X (to mini-batch methods) pd.DataFrame A DataFrame of features for mini-batch operations.

Outputs

Method Return Type Description
predict_one(x) Depends on final step Predicted label (e.g., bool for binary classification).
predict_proba_one(x) dict Class probability distribution (e.g., {False: 0.3, True: 0.7}).
transform_one(x) dict Transformed feature dictionary (if the final step is a transformer).
debug_one(x) str Human-readable report showing data state at each pipeline step.

Usage Examples

Creating a binary classification pipeline:

from river import compose, linear_model, preprocessing

model = preprocessing.StandardScaler() | linear_model.LogisticRegression()

# Equivalent to:
model = compose.Pipeline(
    preprocessing.StandardScaler(),
    linear_model.LogisticRegression()
)

Training and predicting:

from river import datasets, linear_model, preprocessing

model = preprocessing.StandardScaler() | linear_model.LogisticRegression()

for x, y in datasets.Phishing():
    y_pred = model.predict_proba_one(x)
    model.learn_one(x, y)

Accessing pipeline steps:

model = preprocessing.StandardScaler() | linear_model.LogisticRegression()

# Access by name
scaler = model['StandardScaler']

# Access by index
first_step = model[0]
last_step = model[-1]

Debugging data flow:

from river import compose, datasets, linear_model, preprocessing

model = preprocessing.StandardScaler() | linear_model.LogisticRegression()

for x, y in datasets.Phishing():
    model.learn_one(x, y)
    break

report = model.debug_one(x)
print(report)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment