Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Stream Cache

From Leeroopedia


Knowledge Sources
Domains Online_Learning, Data_Streaming, Caching
Last Updated 2026-02-08 16:00 GMT

Overview

A utility class for caching data streams to disk using Python's pickle protocol for faster subsequent iterations.

Description

The Cache class provides functionality to save iterables to disk and load them back for faster re-iteration. It uses pickle serialization to store stream elements, which can significantly improve performance when processing the same data multiple times. The cache automatically detects the operating system and stores files in the appropriate temporary directory (/tmp for Linux/macOS, C:\TEMP for Windows).

Usage

Use this when you need to repeatedly iterate over the same data stream and want to avoid recomputing transformations or reloading from slower sources like CSV files. It's particularly useful during model development and experimentation when you iterate over the same dataset multiple times.

Code Reference

Source Location

Signature

class Cache:
    def __init__(self, directory=None):
        ...

    def __call__(self, stream, key=None):
        ...

    def __getitem__(self, key):
        ...

    def clear(self, key: str):
        ...

    def clear_all(self):
        ...

Import

from river import stream
cache = stream.Cache()

I/O Contract

Parameter Type Description
directory str or None Directory path for storing cached files. Auto-detected if None.

Methods:

Method Parameters Returns Description
__call__ stream, key=None Iterator Wraps a stream and caches it to disk
__getitem__ key Iterator Retrieves cached stream by key
clear key: str None Deletes a cached stream
clear_all None None Deletes all cached streams

Usage Examples

import time
from river import datasets, stream

# Create a cache instance
cache = stream.Cache()

# Cache a dataset - first time caches to disk
dataset = datasets.Phishing()
tic = time.time()
for x, y in cache(dataset, key='phishing'):
    pass
toc = time.time()
print(f"First iteration: {toc - tic:.4f}s")

# Second iteration is faster - reads from cache
tic = time.time()
for x, y in cache(dataset, key='phishing'):
    pass
toc = time.time()
print(f"Second iteration: {toc - tic:.4f}s")

# View cache contents
print(cache)

# Clear specific cache
cache.clear('phishing')

# Or clear all
cache.clear_all()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment