Implementation:Evidentlyai Evidently Legacy Collector Storage
| Knowledge Sources | |
|---|---|
| Domains | Collector, Storage, Data_Monitoring, Legacy |
| Last Updated | 2026-02-14 12:00 GMT |
Overview
legacy/collector/storage.py defines the abstract CollectorStorage base class and the InMemoryStorage implementation for buffering collected data, managing async locks, storing reports, and tracking log events within the collector service.
Description
This module provides the storage layer for the collector service:
Event Models:
LogEvent-- Pydantic model for log entries withtype,report_id,okflag, and optionalerrormessage.CreateReportEvent-- SpecializedLogEventfor report creation events (type defaults to"CreateReport").UploadReportEvent-- SpecializedLogEventfor report upload events (type defaults to"UploadReport").
Report Management:
ReportPopper-- A context manager that yields aReportBasefrom a report list. On successful exit, the report is consumed; on exception, it is re-inserted at the front of the list for retry.
Abstract Storage:
CollectorStorage-- Abstract polymorphic base class providing:lock(id)/init(id)-- Asyncio lock management per collector ID.init_all(config)-- Initializes locks for all collectors in a config.- Abstract methods:
append,get_buffer_size,get_and_flush,log,get_logs,add_report,take_reports.
In-Memory Implementation:
InMemoryStorage-- Concrete implementation storing everything in dictionaries:append-- Appends data dicts to a per-collector buffer list.get_buffer_size-- Returns the number of buffered items.get_and_flush-- Concatenates buffered data into a singlepd.DataFrameand clears the buffer. ReturnsNoneif empty.log-- Inserts log events at the front; trims tomax_log_events.add_report/take_reports-- Stores reports and yields them viaReportPoppercontext managers.
Usage
CollectorStorage is used by the collector app to buffer incoming data and manage report lifecycle. The default InMemoryStorage is suitable for development and single-process deployments. For production with multiple workers, implement a custom CollectorStorage subclass backed by persistent storage.
Code Reference
Source Location
- Repository: Evidentlyai_Evidently
- File:
src/evidently/legacy/collector/storage.py
Signature
class LogEvent(BaseModel):
type: str
report_id: str
ok: bool
error: str = ""
class CreateReportEvent(LogEvent):
type: str = "CreateReport"
class UploadReportEvent(LogEvent):
type: str = "UploadReport"
class ReportPopper:
def __init__(self, value: ReportBase, snapshot_list: List[ReportBase]): ...
def __enter__(self) -> ReportBase: ...
def __exit__(self, exc_type, exc_val, exc_tb): ...
class CollectorStorage(PolymorphicModel):
def lock(self, id: str): ...
def init(self, id: str): ...
def init_all(self, config): ...
@abc.abstractmethod
def append(self, id: str, data: Any): ...
@abc.abstractmethod
def get_buffer_size(self, id: str): ...
@abc.abstractmethod
def get_and_flush(self, id: str): ...
@abc.abstractmethod
def log(self, id: str, event: LogEvent): ...
@abc.abstractmethod
def get_logs(self, id: str) -> List[LogEvent]: ...
@abc.abstractmethod
def add_report(self, id: str, report: ReportBase): ...
@abc.abstractmethod
def take_reports(self, id: str) -> Sequence[ReportPopper]: ...
class InMemoryStorage(CollectorStorage):
max_log_events: int = 10
def init(self, id: str): ...
def append(self, id: str, data: Any): ...
def get_buffer_size(self, id: str): ...
def get_and_flush(self, id: str): ...
def log(self, id: str, event: LogEvent): ...
def get_logs(self, id: str) -> List[LogEvent]: ...
def add_report(self, id: str, report: ReportBase): ...
def take_reports(self, id: str) -> Sequence[ReportPopper]: ...
Import
from evidently.legacy.collector.storage import (
CollectorStorage,
InMemoryStorage,
LogEvent,
CreateReportEvent,
UploadReportEvent,
ReportPopper,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| id | str |
Yes | Collector identifier used as the storage key. |
| data | Any |
Yes (append) | Data dictionary to append to the collector buffer. |
| event | LogEvent |
Yes (log) | A log event to record for the collector. |
| report | ReportBase |
Yes (add_report) | A generated report to store for later upload. |
| max_log_events | int |
No (default 10) | Maximum number of log events to retain per collector. |
Outputs
| Name | Type | Description |
|---|---|---|
| get_buffer_size return | int |
Number of data items in the buffer for the given collector. |
| get_and_flush return | Optional[pd.DataFrame] |
Concatenated DataFrame of all buffered data, or None if empty.
|
| get_logs return | List[LogEvent] |
List of log events for the specified collector, newest first. |
| take_reports return | Sequence[ReportPopper] |
Sequence of context managers yielding reports for upload. |
Usage Examples
from evidently.legacy.collector.storage import InMemoryStorage
storage = InMemoryStorage(max_log_events=20)
storage.init("my_collector")
# Buffer data
storage.append("my_collector", {"col1": [1, 2], "col2": [3, 4]})
storage.append("my_collector", {"col1": [5, 6], "col2": [7, 8]})
# Check buffer size
size = storage.get_buffer_size("my_collector") # returns 2
# Flush buffer as DataFrame
df = storage.get_and_flush("my_collector")