Implementation:Online ml River Anomaly QuantileFilter
| Knowledge Sources | River River Docs |
|---|---|
| Domains | Online Machine Learning, Anomaly Detection, Adaptive Thresholding |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Concrete tool for converting continuous anomaly scores into binary anomaly labels using an adaptive quantile-based threshold in the River library, wrapping any anomaly detector with a streaming quantile estimator.
Description
The anomaly.QuantileFilter class wraps any anomaly detector and adds adaptive thresholding via the classify method. It maintains a stats.Quantile estimator that tracks the q-th quantile of all anomaly scores seen so far. An observation is classified as anomalous if its score exceeds the current quantile estimate.
The class inherits from anomaly.base.AnomalyFilter. Its learn_one method has a notable difference from the base class: it always updates the quantile estimator with the score, even when the wrapped detector is protected from learning on anomalous observations. This ensures the quantile threshold remains well-calibrated.
When no quantile estimate is available yet (e.g., before any observations), classify uses math.inf as the threshold, ensuring no false positives during the warm-up phase.
Usage
Import and use anomaly.QuantileFilter when:
- You want an adaptive threshold that tracks the score distribution
- You do not have a priori knowledge of the correct fixed threshold
- You want to flag approximately (1-q) fraction of observations as anomalous
- You need to wrap detectors with unbounded scores (like OneClassSVM)
Code Reference
Source Location
river/anomaly/filter.py, lines 109-191.
Signature
class QuantileFilter(anomaly.base.AnomalyFilter):
def __init__(
self,
anomaly_detector,
q: float,
protect_anomaly_detector=True,
):
Import
from river import anomaly
model = anomaly.QuantileFilter(
anomaly.HalfSpaceTrees(seed=42),
q=0.95
)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| anomaly_detector | AnomalyDetector | (required) | The wrapped anomaly detector instance. |
| q | float | (required) | The quantile level (between 0 and 1). Scores above the q-th quantile are classified as anomalous. |
| protect_anomaly_detector | bool | True | If True, the anomaly detector is not updated when the observation is classified as anomalous. |
Methods
classify(score: float) -> bool-- Returns True ifscore >= quantile_estimate. Returns False if quantile estimate is not yet available (usesmath.infas threshold).score_one(x: dict) -> float-- Delegates to the wrapped anomaly detector'sscore_one.learn_one(x: dict) -> None-- Scores the observation, conditionally updates the wrapped detector, and always updates the quantile estimator.
Internal State
self.quantile-- Astats.Quantileinstance that maintains the streaming quantile estimate.- The
qproperty is a convenience accessor forself.quantile.q.
I/O Contract
Inputs
| Method | Parameter | Type | Description |
|---|---|---|---|
| classify | score | float | An anomaly score to classify. |
| score_one | x | dict | A dictionary mapping feature names to numeric values. |
| learn_one | x | dict | A dictionary mapping feature names to numeric values. |
Outputs
| Method | Return Type | Description |
|---|---|---|
| classify | bool | True if the score exceeds the current quantile threshold (anomaly), False otherwise. |
| score_one | float | The anomaly score from the wrapped detector. |
| learn_one | None | Updates the wrapped detector (conditionally) and the quantile estimator (always). |
Usage Examples
QuantileFilter with HalfSpaceTrees in a pipeline:
from river import anomaly, compose, datasets, metrics, preprocessing
model = compose.Pipeline(
preprocessing.MinMaxScaler(),
anomaly.QuantileFilter(
anomaly.HalfSpaceTrees(seed=42),
q=0.95
)
)
report = metrics.ClassificationReport()
for x, y in datasets.CreditCard().take(2000):
score = model.score_one(x)
is_anomaly = model['QuantileFilter'].classify(score)
model.learn_one(x)
report.update(y, is_anomaly)
print(report)
QuantileFilter with OneClassSVM:
from river import anomaly, datasets, metrics
model = anomaly.QuantileFilter(
anomaly.OneClassSVM(nu=0.2),
q=0.995
)
auc = metrics.ROCAUC()
for x, y in datasets.CreditCard().take(2500):
score = model.score_one(x)
is_anomaly = model.classify(score)
model.learn_one(x)
auc.update(y, is_anomaly)
print(auc)
# ROCAUC: 74.68%