Implementation:Online ml River Evaluate Progressive Val Score Drift
| Knowledge Sources | Domains | Last Updated |
|---|---|---|
| River River Docs | Online Machine Learning, Concept Drift, Model Evaluation | 2026-02-08 16:00 GMT |
Overview
Concrete tool for evaluating drift-adaptive classifiers using progressive validation on non-stationary data streams, revealing performance dynamics around concept drift points.
Description
This is an angle-specific Implementation of evaluate.progressive_val_score that focuses on its use with drift-adaptive models such as DriftRetrainingClassifier and ARFClassifier. While the function itself is the same progressive validation function used for all models, its behavior with drift-adaptive models reveals additional dynamics.
When used with drift-adaptive models:
- The model's internal drift detection runs automatically during
learn_onecalls within the evaluation loop. - Performance metrics naturally degrade around drift points and recover as the model adapts.
- The
print_everyparameter produces periodic snapshots that make these performance changes visible. - Using
iter_progressive_val_scorewithstep=1enables fine-grained monitoring of per-sample performance alongside drift state inspection.
The key difference from evaluating non-adaptive models is that the model's internal state changes during evaluation -- trees are replaced, models are reset, background models are swapped in -- and these state changes are reflected in the performance trajectory.
Usage
Use progressive validation with drift-adaptive models when you want to benchmark their accuracy on non-stationary streams, compare them against non-adaptive baselines, or analyze the temporal dynamics of drift adaptation.
Code Reference
Source Location
river/evaluate/progressive_validation.py:L231-L409 (progressive_val_score function)
river/evaluate/progressive_validation.py:L106-L228 (iter_progressive_val_score function)
Signature
def progressive_val_score(
dataset: base.typing.Dataset,
model,
metric: metrics.base.Metric,
moment: str | typing.Callable | None = None,
delay: str | int | dt.timedelta | typing.Callable | None = None,
print_every=0,
show_time=False,
show_memory=False,
**print_kwargs,
) -> metrics.base.Metric
def iter_progressive_val_score(
dataset: base.typing.Dataset,
model,
metric: metrics.base.Metric,
moment: str | typing.Callable | None = None,
delay: str | int | dt.timedelta | typing.Callable | None = None,
step=1,
measure_time=False,
measure_memory=False,
yield_predictions=False,
) -> typing.Generator
Import
from river import evaluate
Key Parameters (Drift-Relevant)
| Parameter | Type | Default | Description |
|---|---|---|---|
dataset |
Dataset | (required) | A non-stationary data stream (e.g., datasets.Elec2() or datasets.Insects())
|
model |
Classifier | (required) | A drift-adaptive model (e.g., DriftRetrainingClassifier, ARFClassifier)
|
metric |
Metric | (required) | Performance metric (e.g., metrics.Accuracy())
|
print_every |
int | 0 | Print metric every N steps. Reveals performance changes around drift points |
step |
int | 1 | (iter variant) Yield results every N steps for fine-grained analysis |
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
dataset |
Iterable of (x, y) | Non-stationary data stream; temporal ordering is preserved |
model |
base.Classifier | Drift-adaptive model with internal drift detection |
metric |
metrics.base.Metric | Accumulating metric that tracks performance over the full stream |
Outputs
| Function | Return Type | Description |
|---|---|---|
progressive_val_score |
metrics.base.Metric | The final metric value after processing the entire stream. Prints intermediate results if print_every > 0
|
iter_progressive_val_score |
Generator of dict | Yields dicts with metric state, step number, and optionally predictions/time/memory at each checkpoint |
Usage Examples
Evaluating DriftRetrainingClassifier on Elec2
from river import datasets, evaluate, drift, metrics, tree
dataset = datasets.Elec2().take(10000)
model = drift.DriftRetrainingClassifier(
model=tree.HoeffdingTreeClassifier(),
drift_detector=drift.binary.DDM()
)
metric = metrics.Accuracy()
evaluate.progressive_val_score(dataset, model, metric, print_every=2000)
# [2,000] Accuracy: XX.XX%
# [4,000] Accuracy: XX.XX%
# [6,000] Accuracy: XX.XX%
# [8,000] Accuracy: XX.XX%
# [10,000] Accuracy: XX.XX%
Comparing Adaptive vs. Non-Adaptive on the Same Stream
from river import datasets, evaluate, drift, forest, metrics, tree
# Non-adaptive baseline
dataset1 = datasets.Elec2().take(10000)
baseline = tree.HoeffdingTreeClassifier()
metric_baseline = metrics.Accuracy()
evaluate.progressive_val_score(dataset1, baseline, metric_baseline)
print(f"Baseline: {metric_baseline}")
# Drift-adaptive model
dataset2 = datasets.Elec2().take(10000)
adaptive = forest.ARFClassifier(n_models=10, seed=42)
metric_adaptive = metrics.Accuracy()
evaluate.progressive_val_score(dataset2, adaptive, metric_adaptive)
print(f"Adaptive: {metric_adaptive}")
# Adaptation benefit
print(f"Benefit: {metric_adaptive.get() - metric_baseline.get():.4f}")
Fine-Grained Monitoring with iter_progressive_val_score
from river import datasets, evaluate, drift, metrics, tree
model = drift.DriftRetrainingClassifier(
model=tree.HoeffdingTreeClassifier(),
drift_detector=drift.binary.DDM()
)
metric = metrics.Accuracy()
accuracy_over_time = []
for step_info in evaluate.iter_progressive_val_score(
dataset=datasets.Elec2().take(5000),
model=model,
metric=metric,
step=500
):
step = step_info["Step"]
acc = metric.get()
accuracy_over_time.append((step, acc))
print(f"Step {step}: Accuracy = {acc:.4f}")
Evaluating ARFClassifier on Insects with Drift Inspection
from river import datasets, evaluate, forest, metrics
dataset = datasets.Insects(variant="abrupt_balanced")
model = forest.ARFClassifier(n_models=10, seed=42)
metric = metrics.Accuracy()
evaluate.progressive_val_score(dataset, model, metric, print_every=10000)
print(f"\nFinal accuracy: {metric}")
print(f"Total drifts detected: {model.n_drifts_detected()}")
print(f"Total warnings detected: {model.n_warnings_detected()}")
Combined Evaluation and Drift Logging
from river import datasets, drift, metrics, tree
model = drift.DriftRetrainingClassifier(
model=tree.HoeffdingTreeClassifier(),
drift_detector=drift.binary.DDM()
)
metric = metrics.Accuracy()
drift_log = []
for i, (x, y) in enumerate(datasets.Elec2().take(10000)):
y_pred = model.predict_one(x)
if y_pred is not None:
metric.update(y, y_pred)
model.learn_one(x, y)
if model.drift_detector.drift_detected:
drift_log.append({"step": i, "accuracy": metric.get()})
print(f"Final: {metric}")
print(f"Drift events: {len(drift_log)}")
for event in drift_log:
print(f" Step {event['step']}: accuracy was {event['accuracy']:.4f}")