Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Evidentlyai Evidently Legacy Collector Storage

From Leeroopedia
Knowledge Sources
Domains Collector, Storage, Data_Monitoring, Legacy
Last Updated 2026-02-14 12:00 GMT

Overview

legacy/collector/storage.py defines the abstract CollectorStorage base class and the InMemoryStorage implementation for buffering collected data, managing async locks, storing reports, and tracking log events within the collector service.

Description

This module provides the storage layer for the collector service:

Event Models:

  • LogEvent -- Pydantic model for log entries with type, report_id, ok flag, and optional error message.
  • CreateReportEvent -- Specialized LogEvent for report creation events (type defaults to "CreateReport").
  • UploadReportEvent -- Specialized LogEvent for report upload events (type defaults to "UploadReport").

Report Management:

  • ReportPopper -- A context manager that yields a ReportBase from a report list. On successful exit, the report is consumed; on exception, it is re-inserted at the front of the list for retry.

Abstract Storage:

  • CollectorStorage -- Abstract polymorphic base class providing:
    • lock(id) / init(id) -- Asyncio lock management per collector ID.
    • init_all(config) -- Initializes locks for all collectors in a config.
    • Abstract methods: append, get_buffer_size, get_and_flush, log, get_logs, add_report, take_reports.

In-Memory Implementation:

  • InMemoryStorage -- Concrete implementation storing everything in dictionaries:
    • append -- Appends data dicts to a per-collector buffer list.
    • get_buffer_size -- Returns the number of buffered items.
    • get_and_flush -- Concatenates buffered data into a single pd.DataFrame and clears the buffer. Returns None if empty.
    • log -- Inserts log events at the front; trims to max_log_events.
    • add_report / take_reports -- Stores reports and yields them via ReportPopper context managers.

Usage

CollectorStorage is used by the collector app to buffer incoming data and manage report lifecycle. The default InMemoryStorage is suitable for development and single-process deployments. For production with multiple workers, implement a custom CollectorStorage subclass backed by persistent storage.

Code Reference

Source Location

Signature

class LogEvent(BaseModel):
    type: str
    report_id: str
    ok: bool
    error: str = ""

class CreateReportEvent(LogEvent):
    type: str = "CreateReport"

class UploadReportEvent(LogEvent):
    type: str = "UploadReport"

class ReportPopper:
    def __init__(self, value: ReportBase, snapshot_list: List[ReportBase]): ...
    def __enter__(self) -> ReportBase: ...
    def __exit__(self, exc_type, exc_val, exc_tb): ...

class CollectorStorage(PolymorphicModel):
    def lock(self, id: str): ...
    def init(self, id: str): ...
    def init_all(self, config): ...
    @abc.abstractmethod
    def append(self, id: str, data: Any): ...
    @abc.abstractmethod
    def get_buffer_size(self, id: str): ...
    @abc.abstractmethod
    def get_and_flush(self, id: str): ...
    @abc.abstractmethod
    def log(self, id: str, event: LogEvent): ...
    @abc.abstractmethod
    def get_logs(self, id: str) -> List[LogEvent]: ...
    @abc.abstractmethod
    def add_report(self, id: str, report: ReportBase): ...
    @abc.abstractmethod
    def take_reports(self, id: str) -> Sequence[ReportPopper]: ...

class InMemoryStorage(CollectorStorage):
    max_log_events: int = 10
    def init(self, id: str): ...
    def append(self, id: str, data: Any): ...
    def get_buffer_size(self, id: str): ...
    def get_and_flush(self, id: str): ...
    def log(self, id: str, event: LogEvent): ...
    def get_logs(self, id: str) -> List[LogEvent]: ...
    def add_report(self, id: str, report: ReportBase): ...
    def take_reports(self, id: str) -> Sequence[ReportPopper]: ...

Import

from evidently.legacy.collector.storage import (
    CollectorStorage,
    InMemoryStorage,
    LogEvent,
    CreateReportEvent,
    UploadReportEvent,
    ReportPopper,
)

I/O Contract

Inputs

Name Type Required Description
id str Yes Collector identifier used as the storage key.
data Any Yes (append) Data dictionary to append to the collector buffer.
event LogEvent Yes (log) A log event to record for the collector.
report ReportBase Yes (add_report) A generated report to store for later upload.
max_log_events int No (default 10) Maximum number of log events to retain per collector.

Outputs

Name Type Description
get_buffer_size return int Number of data items in the buffer for the given collector.
get_and_flush return Optional[pd.DataFrame] Concatenated DataFrame of all buffered data, or None if empty.
get_logs return List[LogEvent] List of log events for the specified collector, newest first.
take_reports return Sequence[ReportPopper] Sequence of context managers yielding reports for upload.

Usage Examples

from evidently.legacy.collector.storage import InMemoryStorage

storage = InMemoryStorage(max_log_events=20)
storage.init("my_collector")

# Buffer data
storage.append("my_collector", {"col1": [1, 2], "col2": [3, 4]})
storage.append("my_collector", {"col1": [5, 6], "col2": [7, 8]})

# Check buffer size
size = storage.get_buffer_size("my_collector")  # returns 2

# Flush buffer as DataFrame
df = storage.get_and_flush("my_collector")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment