Implementation:Online ml River Evaluate Iter Progressive Val Score
| Knowledge Sources | River River Docs Beating the Hold-Out: Bounds for K-fold and Progressive Cross-Validation |
|---|---|
| Domains | Online_Learning Evaluation Monitoring |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Concrete tool for performing progressive validation as a Python generator, yielding intermediate evaluation results at configurable step intervals for monitoring and learning curve analysis.
Description
The evaluate.iter_progressive_val_score function performs the same predict-then-learn evaluation protocol as evaluate.progressive_val_score, but instead of consuming all results internally and returning only the final metric, it yields intermediate checkpoint dictionaries at intervals specified by the step parameter.
Each checkpoint dictionary contains:
- The current metric state (e.g.,
{'ROCAUC': ROCAUC: 92.25%}) - The
Stepcount (total observations processed) - Optionally
Time(elapseddatetime.timedelta),Memory(model memory in bytes), andPrediction(the most recent prediction)
This function is the lower-level primitive that progressive_val_score is built upon. It uses the shared _progressive_validation internal function, passing itertools.count(step, step) as the checkpoint schedule.
The generator nature of this function provides several advantages:
- Lazy evaluation: Results are computed only when consumed.
- Early stopping: The consumer can stop iteration at any time.
- Streaming output: Results can be processed, plotted, or logged as they arrive without waiting for the full evaluation to complete.
Usage
Import this function when you need to:
- Plot learning curves showing metric evolution over time.
- Implement early stopping logic based on metric values.
- Log intermediate results to a monitoring system or experiment tracker.
- Access individual predictions alongside metric snapshots.
Code Reference
Source Location
| File | Lines |
|---|---|
river/evaluate/progressive_validation.py |
L106-L228 |
Signature
def iter_progressive_val_score(
dataset: base.typing.Dataset,
model,
metric: metrics.base.Metric,
moment: str | typing.Callable | None = None,
delay: str | int | dt.timedelta | typing.Callable | None = None,
step=1,
measure_time=False,
measure_memory=False,
yield_predictions=False,
) -> typing.Generator
Import
from river import evaluate
steps = evaluate.iter_progressive_val_score(
dataset=dataset, model=model, metric=metric, step=200
)
I/O Contract
Inputs
| Parameter | Type | Default | Description |
|---|---|---|---|
dataset |
base.typing.Dataset |
(required) | Iterable stream of (x, y) tuples.
|
model |
Estimator | (required) | The model to evaluate. |
metric |
metrics.base.Metric |
(required) | The metric used to evaluate predictions. |
moment |
Callable | None | None |
Attribute or function for time measurement (for delayed feedback). |
delay |
int | timedelta | Callable | None | None |
Delay before revealing labels. |
step |
int |
1 |
Yield a checkpoint every step observations. Setting to 1 yields after every observation.
|
measure_time |
bool |
False |
Whether to include elapsed time in checkpoint dictionaries. |
measure_memory |
bool |
False |
Whether to include model memory usage in checkpoint dictionaries. |
yield_predictions |
bool |
False |
Whether to include the most recent prediction in checkpoint dictionaries. |
Outputs
| Output | Type | Description |
|---|---|---|
| Return value | typing.Generator |
Generator yielding checkpoint dictionaries. Each dictionary contains metric state, step count, and optional time/memory/prediction data. |
Checkpoint dictionary structure:
{
'MetricName': <Metric object>, # e.g., 'ROCAUC': ROCAUC: 92.25%
'Step': int, # Number of observations processed
'Samples used': int, # (only for active learners)
'Time': datetime.timedelta, # (only if measure_time=True)
'Memory': int, # (only if measure_memory=True)
'Prediction': dict | bool, # (only if yield_predictions=True)
}
Usage Examples
Basic learning curve monitoring:
from river import datasets, evaluate, linear_model, metrics, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
steps = evaluate.iter_progressive_val_score(
model=model,
dataset=datasets.Phishing(),
metric=metrics.ROCAUC(),
step=200,
)
for step in steps:
print(step)
# {'ROCAUC': ROCAUC: 90.20%, 'Step': 200}
# {'ROCAUC': ROCAUC: 92.25%, 'Step': 400}
# {'ROCAUC': ROCAUC: 93.23%, 'Step': 600}
# {'ROCAUC': ROCAUC: 94.05%, 'Step': 800}
# {'ROCAUC': ROCAUC: 94.79%, 'Step': 1000}
# {'ROCAUC': ROCAUC: 95.07%, 'Step': 1200}
# {'ROCAUC': ROCAUC: 95.07%, 'Step': 1250}
With predictions and time tracking:
import itertools
from river import datasets, evaluate, linear_model, metrics, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
steps = evaluate.iter_progressive_val_score(
model=model,
dataset=datasets.Phishing(),
metric=metrics.ROCAUC(),
step=1,
yield_predictions=True,
)
for step in itertools.islice(steps, 100, 105):
print(step)
# {'ROCAUC': ROCAUC: 94.68%, 'Step': 101, 'Prediction': {False: 0.966..., True: 0.033...}}
# ...
Early stopping:
from river import datasets, evaluate, linear_model, metrics, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
steps = evaluate.iter_progressive_val_score(
model=model,
dataset=datasets.Phishing(),
metric=metrics.ROCAUC(),
step=100,
)
for step in steps:
roc_auc = step['ROCAUC'].get()
if roc_auc > 0.94:
print(f"Target ROCAUC reached at step {step['Step']}: {roc_auc:.4f}")
break
Collecting results for plotting:
from river import datasets, evaluate, linear_model, metrics, preprocessing
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
results = list(evaluate.iter_progressive_val_score(
model=model,
dataset=datasets.Phishing(),
metric=metrics.ROCAUC(),
step=50,
))
steps_list = [r['Step'] for r in results]
aucs = [r['ROCAUC'].get() for r in results]
# Now steps_list and aucs can be plotted with matplotlib