Implementation:Online ml River Drift State Inspection Pattern
| Knowledge Sources | Domains | Last Updated |
|---|---|---|
| River River Docs | Online Machine Learning, Concept Drift, Model Monitoring | 2026-02-08 16:00 GMT |
Overview
Concrete pattern for accessing and logging drift detector internal state during model evaluation, enabling users to identify when and where concept drift events occur in non-stationary data streams.
Description
This is a Pattern Doc that documents how users inspect drift events across different drift-adaptive components in River. Rather than describing a single class, it covers the conventions and APIs for observing drift detection behavior in:
- DriftRetrainingClassifier: The
drift_detectorattribute exposesdrift_detected(bool) andwarning_detected(bool) after eachlearn_onecall. Users can check these properties within their training loop to log drift timestamps.
- ARFClassifier: The
n_warnings_detected(tree_id=None)andn_drifts_detected(tree_id=None)methods provide aggregate and per-tree counts. The internal_drift_trackerand_warning_trackerdictionaries store per-tree event counts.
- Standalone drift detectors (ADWIN, KSWIN, PageHinkley): The
drift_detectedproperty is available after eachupdate()call. ADWIN additionally exposeswidth,estimation,variance, andn_detections.
Usage
Use this pattern when you want to go beyond aggregate performance metrics and understand the internal drift detection behavior of your adaptive models. This is essential for debugging, parameter tuning, and communicating drift dynamics.
Code Reference
Source Location
- DriftRetrainingClassifier state:
river/drift/retrain.py:L6-L100(thedrift_detectorattribute and_update_detectormethod) - ARF internal detection:
river/forest/adaptive_random_forest.py:L448-L713(then_warnings_detected,n_drifts_detectedmethods, and_drift_tracker/_warning_trackerdicts)
Pattern: Access Drift State During Evaluation
# General pattern: check drift_detected after each learn_one
model.learn_one(x, y)
if model.drift_detector.drift_detected:
# Log or react to drift event
pass
Pattern: ARF Per-Tree Inspection
# After evaluation, inspect per-tree statistics
for tree_id in range(model.n_models):
warnings = model.n_warnings_detected(tree_id)
drifts = model.n_drifts_detected(tree_id)
print(f"Tree {tree_id}: {warnings} warnings, {drifts} drifts")
Import
# No special import needed -- inspection uses attributes of existing objects
from river import drift, forest, evaluate, metrics
I/O Contract
Inputs
| Component | Access Pattern | Description |
|---|---|---|
| DriftRetrainingClassifier | model.drift_detector.drift_detected |
Check after each learn_one call
|
| DriftRetrainingClassifier | model.drift_detector.warning_detected |
Check for warning state (pre-drift signal) |
| ARFClassifier | model.n_drifts_detected(tree_id) |
Query after evaluation or during loop |
| ARFClassifier | model.n_warnings_detected(tree_id) |
Query after evaluation or during loop |
| Standalone detector | detector.drift_detected |
Check after each update call
|
Outputs
| Access Pattern | Return Type | Description |
|---|---|---|
drift_detected |
bool | Whether drift was detected on the most recent step |
warning_detected |
bool | Whether a warning was detected on the most recent step |
n_drifts_detected() |
int | Cumulative count of drift events |
n_warnings_detected() |
int | Cumulative count of warning events |
ADWIN.n_detections |
int | Total number of detections in ADWIN's lifetime |
ADWIN.width |
int | Current adaptive window size (shrinks on drift) |
Usage Examples
Logging Drift Events with DriftRetrainingClassifier
from river import datasets, drift, metrics, tree
model = drift.DriftRetrainingClassifier(
model=tree.HoeffdingTreeClassifier(),
drift_detector=drift.binary.DDM()
)
metric = metrics.Accuracy()
drift_points = []
for i, (x, y) in enumerate(datasets.Elec2().take(10000)):
y_pred = model.predict_one(x)
if y_pred is not None:
metric.update(y, y_pred)
model.learn_one(x, y)
if model.drift_detector.drift_detected:
drift_points.append(i)
print(f"Step {i}: Drift detected. Current accuracy: {metric}")
print(f"\nTotal drifts: {len(drift_points)}")
print(f"Drift points: {drift_points}")
print(f"Final accuracy: {metric}")
Inspecting ARF Per-Tree Drift Statistics
from river import datasets, evaluate, forest, metrics
dataset = datasets.Elec2().take(15000)
model = forest.ARFClassifier(n_models=10, seed=42)
metric = metrics.Accuracy()
evaluate.progressive_val_score(dataset, model, metric)
print(f"Overall accuracy: {metric}")
print(f"Total warnings: {model.n_warnings_detected()}")
print(f"Total drifts: {model.n_drifts_detected()}")
print()
for i in range(model.n_models):
w = model.n_warnings_detected(i)
d = model.n_drifts_detected(i)
if w > 0 or d > 0:
print(f"Tree {i}: {w} warnings, {d} drifts")
Monitoring Drift During iter_progressive_val_score
from river import datasets, drift, evaluate, metrics, tree
model = drift.DriftRetrainingClassifier(
model=tree.HoeffdingTreeClassifier(),
drift_detector=drift.binary.DDM()
)
metric = metrics.Accuracy()
# Use iter_progressive_val_score for step-by-step control
for step_info in evaluate.iter_progressive_val_score(
dataset=datasets.Elec2().take(5000),
model=model,
metric=metric,
step=1000
):
step = step_info["Step"]
print(f"Step {step}: {metric}")
Standalone ADWIN State Inspection
import random
from river import drift
rng = random.Random(42)
adwin = drift.ADWIN(delta=0.002)
data = rng.choices([0, 1], k=500) + rng.choices(range(4, 8), k=500)
for i, val in enumerate(data):
adwin.update(val)
if adwin.drift_detected:
print(f"Drift at step {i}")
print(f" Window width: {adwin.width}")
print(f" Mean estimate: {adwin.estimation:.4f}")
print(f" Total detections: {adwin.n_detections}")