Implementation:Online ml River Compat SklearnToRiver
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Scikit_Learn, Compatibility, Machine_Learning |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Compatibility wrappers that adapt scikit-learn estimators with partial_fit methods to River's online learning interface.
Description
The sklearn-to-river compatibility layer enables the use of scikit-learn models that support incremental learning (via partial_fit) within River's streaming framework. The module provides two main wrapper classes: SKL2RiverRegressor and SKL2RiverClassifier.
These wrappers translate between River's one-sample-at-a-time interface (learn_one, predict_one) and scikit-learn's batch interface (partial_fit, predict). The wrappers maintain feature name ordering to ensure consistent feature alignment across calls, and handle the conversion between dictionaries (River's format) and arrays or DataFrames (scikit-learn's format).
For classifiers, the classes parameter is required and must be provided during initialization, as scikit-learn classifiers need to know all possible classes upfront. The wrappers also implement both mini-batch methods (learn_many, predict_many) for efficient processing when multiple samples are available.
Usage
Use these wrappers when you want to leverage scikit-learn's incremental learning algorithms (like SGDRegressor or SGDClassifier) within River pipelines or streaming workflows. This allows you to combine scikit-learn's mature implementations with River's streaming-first design and other transformers.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/compat/sklearn_to_river.py
Signature
def convert_sklearn_to_river(estimator: sklearn_base.BaseEstimator, classes: list | None = None):
...
class SKL2RiverRegressor(SKL2RiverBase, base.Regressor):
def __init__(self, estimator: sklearn_base.BaseEstimator):
...
class SKL2RiverClassifier(SKL2RiverBase, base.Classifier):
def __init__(self, estimator: sklearn_base.ClassifierMixin, classes: list):
...
Import
from river import compat
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| estimator | sklearn_base.BaseEstimator | scikit-learn estimator with partial_fit method |
| classes | list or None | Required for classifiers, list of all possible classes |
| Method | Return Type | Description |
|---|---|---|
| convert_sklearn_to_river(estimator, classes) | River wrapper | Returns River-compatible wrapper |
| Method | Parameters | Return Type | Description |
|---|---|---|---|
| learn_one(x, y) | x: dict, y: target | None | Updates model with single sample |
| predict_one(x) | x: dict | float or class | Predicts single sample |
| learn_many(X, y) | X: DataFrame, y: Series | None | Updates model with batch |
| predict_many(X) | X: DataFrame | Series | Predicts batch of samples |
| Method | Parameters | Return Type | Description |
|---|---|---|---|
| predict_proba_one(x) | x: dict | dict | Returns class probabilities for single sample |
| predict_proba_many(X) | X: DataFrame | DataFrame | Returns class probabilities for batch |
Usage Examples
from river import compat
from river import evaluate
from river import metrics
from river import preprocessing
from river import stream
from sklearn import linear_model
from sklearn import datasets
# Convert sklearn regressor to River
dataset = stream.iter_sklearn_dataset(
dataset=datasets.load_diabetes(),
shuffle=True,
seed=42
)
scaler = preprocessing.StandardScaler()
sgd_reg = compat.convert_sklearn_to_river(linear_model.SGDRegressor())
model = scaler | sgd_reg
metric = metrics.MAE()
evaluate.progressive_val_score(dataset, model, metric)
# MAE: 84.501421
# Example with classifier
dataset = stream.iter_sklearn_dataset(
dataset=datasets.load_breast_cancer(),
shuffle=True,
seed=42
)
model = preprocessing.StandardScaler()
model |= compat.convert_sklearn_to_river(
estimator=linear_model.SGDClassifier(
loss='log_loss',
eta0=0.01,
learning_rate='constant'
),
classes=[False, True]
)
metric = metrics.LogLoss()
evaluate.progressive_val_score(dataset, model, metric)
# LogLoss: 0.198029
# Use in River pipeline
from river import compose
pipeline = (
preprocessing.StandardScaler() |
compat.convert_sklearn_to_river(
linear_model.SGDRegressor(max_iter=1),
)
)
# Learn one sample at a time
for x, y in dataset:
y_pred = pipeline.predict_one(x)
pipeline.learn_one(x, y)