Implementation:Online ml River Anomaly ThresholdFilter
| Knowledge Sources | River River Docs |
|---|---|
| Domains | Online Machine Learning, Anomaly Detection, Binary Classification |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Concrete tool for converting continuous anomaly scores into binary anomaly labels using a fixed threshold in the River library, wrapping any anomaly detector with optional protection against learning from flagged anomalies.
Description
The anomaly.ThresholdFilter class wraps any anomaly detector implementing the AnomalyDetector interface and adds a classify method that converts continuous scores to binary labels. It inherits from anomaly.base.AnomalyFilter.
The classification rule is simple: a score is classified as anomalous if it is greater than or equal to the specified threshold. When protect_anomaly_detector is enabled (default), the wrapped detector's learn_one is only called if the observation is classified as normal, preventing the detector from adapting to anomalous data.
The ThresholdFilter can also be used as part of a pipeline using the pipe operator (|), allowing it to filter anomalous observations before they reach a downstream supervised model.
Usage
Import and use anomaly.ThresholdFilter when:
- You have a fixed score threshold for anomaly classification
- You want to protect the underlying detector from learning on anomalies
- You need to filter anomalous observations in a pipeline before a supervised model
Code Reference
Source Location
river/anomaly/filter.py, lines 8-107.
Signature
class ThresholdFilter(anomaly.base.AnomalyFilter):
def __init__(
self,
anomaly_detector,
threshold: float,
protect_anomaly_detector=True,
):
Import
from river import anomaly
filter_model = anomaly.ThresholdFilter(
anomaly_detector=anomaly.HalfSpaceTrees(),
threshold=0.95
)
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
| anomaly_detector | AnomalyDetector | (required) | The wrapped anomaly detector instance. |
| threshold | float | (required) | The score threshold at or above which an observation is classified as anomalous. |
| protect_anomaly_detector | bool | True | If True, the anomaly detector is not updated when the score is classified as anomalous. |
Methods
classify(score: float) -> bool-- Returns True ifscore >= threshold, False otherwise.score_one(x: dict) -> float-- Delegates to the wrapped anomaly detector'sscore_one.learn_one(x: dict) -> None-- Scores the observation; if protection is enabled and the observation is classified as anomalous, the detector is not updated.
I/O Contract
Inputs
| Method | Parameter | Type | Description |
|---|---|---|---|
| classify | score | float | An anomaly score to classify. |
| score_one | x | dict | A dictionary mapping feature names to numeric values. |
| learn_one | x | dict | A dictionary mapping feature names to numeric values. |
Outputs
| Method | Return Type | Description |
|---|---|---|
| classify | bool | True if the score indicates an anomaly (score >= threshold), False otherwise. |
| score_one | float | The anomaly score from the wrapped detector. |
| learn_one | None | Updates the wrapped detector (conditionally, based on protection setting). |
Usage Examples
Filtering anomalies in a time series pipeline:
from river import anomaly, datasets, metrics, time_series
dataset = datasets.WaterFlow()
metric = metrics.SMAPE()
period = 24 # 24 samples per day
model = (
anomaly.ThresholdFilter(
anomaly.GaussianScorer(
window_size=period * 7, # 7 days
grace_period=30
),
threshold=0.995
) |
time_series.HoltWinters(
alpha=0.3,
beta=0.1,
multiplicative=False
)
)
time_series.evaluate(
dataset,
model,
metric,
horizon=period
)
Basic threshold classification with HalfSpaceTrees:
from river import anomaly, preprocessing, compose
model = anomaly.ThresholdFilter(
anomaly_detector=compose.Pipeline(
preprocessing.MinMaxScaler(),
anomaly.HalfSpaceTrees(seed=42)
),
threshold=0.95,
protect_anomaly_detector=True
)
# Process observations
x = {'feature_a': 3.5, 'feature_b': 1.2}
score = model.score_one(x)
is_anomaly = model.classify(score)
model.learn_one(x)
print(f"Score: {score}, Anomaly: {is_anomaly}")