Implementation:Online ml River Anomaly Score One
| Knowledge Sources | River River Docs |
|---|---|
| Domains | Online Machine Learning, Anomaly Detection, API Design, Design Patterns |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Concrete reference for the AnomalyDetector and AnomalyFilter base class interfaces in the River library, documenting the abstract methods and deployment patterns that all anomaly detection implementations must follow.
Description
This is a Pattern Doc that documents the abstract interface defined in river/anomaly/base.py. It covers two base classes:
AnomalyDetector (lines 10-44): The base class for all unsupervised anomaly detectors. It extends base.Estimator and requires implementing two abstract methods: learn_one(x: dict) -> None and score_one(x: dict) -> float.
AnomalyFilter (lines 80-151): A wrapper class that extends both base.Wrapper and base.Estimator. It wraps any AnomalyDetector and adds the ability to classify scores as anomalous or normal. It requires implementing the abstract method classify(score: float) -> bool and provides default implementations of score_one (delegates to wrapped detector) and learn_one (conditionally updates wrapped detector based on protection setting).
There is also a SupervisedAnomalyDetector (lines 47-77) for anomaly detectors that require labeled data, but it is less commonly used in the streaming anomaly detection workflow.
Usage
Reference this pattern doc when:
- You are implementing a custom anomaly detector and need to know which methods to provide
- You are implementing a custom anomaly filter and need to understand the classify contract
- You need to understand the learn/score lifecycle and protection mechanism
- You want to verify that a particular model conforms to the anomaly detection interface
Code Reference
Source Location
river/anomaly/base.py, lines 10-44 (AnomalyDetector), lines 80-151 (AnomalyFilter).
AnomalyDetector Interface
class AnomalyDetector(base.Estimator):
"""An anomaly detector."""
@property
def _supervised(self):
return False
@abc.abstractmethod
def learn_one(self, x: dict) -> None:
"""Update the model.
Parameters
----------
x
A dictionary of features.
"""
@abc.abstractmethod
def score_one(self, x: dict) -> float:
"""Return an outlier score.
A high score is indicative of an anomaly.
A low score corresponds to a normal observation.
Parameters
----------
x
A dictionary of features.
Returns
-------
An anomaly score. A high score is indicative of an anomaly.
A low score corresponds a normal observation.
"""
AnomalyFilter Interface
class AnomalyFilter(base.Wrapper, base.Estimator):
"""Anomaly filter base class."""
def __init__(self, anomaly_detector: AnomalyDetector, protect_anomaly_detector=True):
self.anomaly_detector = anomaly_detector
self.protect_anomaly_detector = protect_anomaly_detector
@abc.abstractmethod
def classify(self, score: float) -> bool:
"""Classify an anomaly score as anomalous or not.
Parameters
----------
score
An anomaly score to classify.
Returns
-------
A boolean value indicating whether the anomaly score
is anomalous or not.
"""
def score_one(self, *args, **kwargs):
"""Delegates to the wrapped anomaly detector."""
return self.anomaly_detector.score_one(*args, **kwargs)
def learn_one(self, *args, **learn_kwargs) -> None:
"""Update the anomaly filter and the underlying anomaly detector.
If protect_anomaly_detector is True and the observation is
classified as normal, the wrapped detector is updated.
If the observation is anomalous, the detector is NOT updated.
"""
if self.protect_anomaly_detector and not self.classify(self.score_one(*args)):
self.anomaly_detector.learn_one(*args, **learn_kwargs)
Import
from river import anomaly
# Using built-in detectors (they implement AnomalyDetector):
detector = anomaly.HalfSpaceTrees()
detector = anomaly.OneClassSVM()
# Using built-in filters (they implement AnomalyFilter):
filter_model = anomaly.ThresholdFilter(detector, threshold=0.9)
filter_model = anomaly.QuantileFilter(detector, q=0.95)
I/O Contract
AnomalyDetector
| Method | Inputs | Output | Description |
|---|---|---|---|
| learn_one | x: dict (feature dict) | None | Updates internal model state. |
| score_one | x: dict (feature dict) | float | Returns anomaly score. High = anomalous. |
AnomalyFilter
| Method | Inputs | Output | Description |
|---|---|---|---|
| classify | score: float | bool | True if anomalous, False if normal. |
| score_one | x: dict (feature dict) | float | Delegates to wrapped detector. |
| learn_one | x: dict (feature dict) | None | Conditionally updates wrapped detector. |
Usage Examples
Canonical deployment pattern:
from river import anomaly, preprocessing, compose
# Build model
detector = compose.Pipeline(
preprocessing.MinMaxScaler(),
anomaly.HalfSpaceTrees(seed=42)
)
# Score-classify-alert-learn loop
for x in stream_of_observations:
# 1. Score the observation
score = detector.score_one(x)
# 2. Classify (manual threshold)
is_anomaly = score > 0.9
# 3. Alert
if is_anomaly:
print(f"ALERT: anomaly detected with score {score:.3f}")
# 4. Learn
detector.learn_one(x)
Using AnomalyFilter for the same pattern:
from river import anomaly, preprocessing, compose
# Build model with filter
model = anomaly.QuantileFilter(
compose.Pipeline(
preprocessing.MinMaxScaler(),
anomaly.HalfSpaceTrees(seed=42)
),
q=0.95
)
for x in stream_of_observations:
score = model.score_one(x)
is_anomaly = model.classify(score)
if is_anomaly:
print(f"ALERT: anomaly detected with score {score:.3f}")
# learn_one handles protection automatically
model.learn_one(x)
Implementing a custom AnomalyDetector:
from river import anomaly
class MyDetector(anomaly.base.AnomalyDetector):
def learn_one(self, x: dict) -> None:
# Update internal state
...
def score_one(self, x: dict) -> float:
# Return anomaly score (high = anomalous)
...
return score