Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Online ml River Anomaly Score One

From Leeroopedia


Knowledge Sources River River Docs
Domains Online Machine Learning, Anomaly Detection, API Design, Design Patterns
Last Updated 2026-02-08 16:00 GMT

Overview

Concrete reference for the AnomalyDetector and AnomalyFilter base class interfaces in the River library, documenting the abstract methods and deployment patterns that all anomaly detection implementations must follow.

Description

This is a Pattern Doc that documents the abstract interface defined in river/anomaly/base.py. It covers two base classes:

AnomalyDetector (lines 10-44): The base class for all unsupervised anomaly detectors. It extends base.Estimator and requires implementing two abstract methods: learn_one(x: dict) -> None and score_one(x: dict) -> float.

AnomalyFilter (lines 80-151): A wrapper class that extends both base.Wrapper and base.Estimator. It wraps any AnomalyDetector and adds the ability to classify scores as anomalous or normal. It requires implementing the abstract method classify(score: float) -> bool and provides default implementations of score_one (delegates to wrapped detector) and learn_one (conditionally updates wrapped detector based on protection setting).

There is also a SupervisedAnomalyDetector (lines 47-77) for anomaly detectors that require labeled data, but it is less commonly used in the streaming anomaly detection workflow.

Usage

Reference this pattern doc when:

  • You are implementing a custom anomaly detector and need to know which methods to provide
  • You are implementing a custom anomaly filter and need to understand the classify contract
  • You need to understand the learn/score lifecycle and protection mechanism
  • You want to verify that a particular model conforms to the anomaly detection interface

Code Reference

Source Location

river/anomaly/base.py, lines 10-44 (AnomalyDetector), lines 80-151 (AnomalyFilter).

AnomalyDetector Interface

class AnomalyDetector(base.Estimator):
    """An anomaly detector."""

    @property
    def _supervised(self):
        return False

    @abc.abstractmethod
    def learn_one(self, x: dict) -> None:
        """Update the model.

        Parameters
        ----------
        x
            A dictionary of features.
        """

    @abc.abstractmethod
    def score_one(self, x: dict) -> float:
        """Return an outlier score.

        A high score is indicative of an anomaly.
        A low score corresponds to a normal observation.

        Parameters
        ----------
        x
            A dictionary of features.

        Returns
        -------
        An anomaly score. A high score is indicative of an anomaly.
        A low score corresponds a normal observation.
        """

AnomalyFilter Interface

class AnomalyFilter(base.Wrapper, base.Estimator):
    """Anomaly filter base class."""

    def __init__(self, anomaly_detector: AnomalyDetector, protect_anomaly_detector=True):
        self.anomaly_detector = anomaly_detector
        self.protect_anomaly_detector = protect_anomaly_detector

    @abc.abstractmethod
    def classify(self, score: float) -> bool:
        """Classify an anomaly score as anomalous or not.

        Parameters
        ----------
        score
            An anomaly score to classify.

        Returns
        -------
        A boolean value indicating whether the anomaly score
        is anomalous or not.
        """

    def score_one(self, *args, **kwargs):
        """Delegates to the wrapped anomaly detector."""
        return self.anomaly_detector.score_one(*args, **kwargs)

    def learn_one(self, *args, **learn_kwargs) -> None:
        """Update the anomaly filter and the underlying anomaly detector.

        If protect_anomaly_detector is True and the observation is
        classified as normal, the wrapped detector is updated.
        If the observation is anomalous, the detector is NOT updated.
        """
        if self.protect_anomaly_detector and not self.classify(self.score_one(*args)):
            self.anomaly_detector.learn_one(*args, **learn_kwargs)

Import

from river import anomaly

# Using built-in detectors (they implement AnomalyDetector):
detector = anomaly.HalfSpaceTrees()
detector = anomaly.OneClassSVM()

# Using built-in filters (they implement AnomalyFilter):
filter_model = anomaly.ThresholdFilter(detector, threshold=0.9)
filter_model = anomaly.QuantileFilter(detector, q=0.95)

I/O Contract

AnomalyDetector

Method Inputs Output Description
learn_one x: dict (feature dict) None Updates internal model state.
score_one x: dict (feature dict) float Returns anomaly score. High = anomalous.

AnomalyFilter

Method Inputs Output Description
classify score: float bool True if anomalous, False if normal.
score_one x: dict (feature dict) float Delegates to wrapped detector.
learn_one x: dict (feature dict) None Conditionally updates wrapped detector.

Usage Examples

Canonical deployment pattern:

from river import anomaly, preprocessing, compose

# Build model
detector = compose.Pipeline(
    preprocessing.MinMaxScaler(),
    anomaly.HalfSpaceTrees(seed=42)
)

# Score-classify-alert-learn loop
for x in stream_of_observations:
    # 1. Score the observation
    score = detector.score_one(x)

    # 2. Classify (manual threshold)
    is_anomaly = score > 0.9

    # 3. Alert
    if is_anomaly:
        print(f"ALERT: anomaly detected with score {score:.3f}")

    # 4. Learn
    detector.learn_one(x)

Using AnomalyFilter for the same pattern:

from river import anomaly, preprocessing, compose

# Build model with filter
model = anomaly.QuantileFilter(
    compose.Pipeline(
        preprocessing.MinMaxScaler(),
        anomaly.HalfSpaceTrees(seed=42)
    ),
    q=0.95
)

for x in stream_of_observations:
    score = model.score_one(x)
    is_anomaly = model.classify(score)

    if is_anomaly:
        print(f"ALERT: anomaly detected with score {score:.3f}")

    # learn_one handles protection automatically
    model.learn_one(x)

Implementing a custom AnomalyDetector:

from river import anomaly

class MyDetector(anomaly.base.AnomalyDetector):
    def learn_one(self, x: dict) -> None:
        # Update internal state
        ...

    def score_one(self, x: dict) -> float:
        # Return anomaly score (high = anomalous)
        ...
        return score

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment