Implementation:Online ml River Stream Cache
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Data_Streaming, Caching |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
A utility class for caching data streams to disk using Python's pickle protocol for faster subsequent iterations.
Description
The Cache class provides functionality to save iterables to disk and load them back for faster re-iteration. It uses pickle serialization to store stream elements, which can significantly improve performance when processing the same data multiple times. The cache automatically detects the operating system and stores files in the appropriate temporary directory (/tmp for Linux/macOS, C:\TEMP for Windows).
Usage
Use this when you need to repeatedly iterate over the same data stream and want to avoid recomputing transformations or reloading from slower sources like CSV files. It's particularly useful during model development and experimentation when you iterate over the same dataset multiple times.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/stream/cache.py
Signature
class Cache:
def __init__(self, directory=None):
...
def __call__(self, stream, key=None):
...
def __getitem__(self, key):
...
def clear(self, key: str):
...
def clear_all(self):
...
Import
from river import stream
cache = stream.Cache()
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| directory | str or None | Directory path for storing cached files. Auto-detected if None. |
Methods:
| Method | Parameters | Returns | Description |
|---|---|---|---|
| __call__ | stream, key=None | Iterator | Wraps a stream and caches it to disk |
| __getitem__ | key | Iterator | Retrieves cached stream by key |
| clear | key: str | None | Deletes a cached stream |
| clear_all | None | None | Deletes all cached streams |
Usage Examples
import time
from river import datasets, stream
# Create a cache instance
cache = stream.Cache()
# Cache a dataset - first time caches to disk
dataset = datasets.Phishing()
tic = time.time()
for x, y in cache(dataset, key='phishing'):
pass
toc = time.time()
print(f"First iteration: {toc - tic:.4f}s")
# Second iteration is faster - reads from cache
tic = time.time()
for x, y in cache(dataset, key='phishing'):
pass
toc = time.time()
print(f"Second iteration: {toc - tic:.4f}s")
# View cache contents
print(cache)
# Clear specific cache
cache.clear('phishing')
# Or clear all
cache.clear_all()