Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Datasets Base

From Leeroopedia


Knowledge Sources
Domains Online_Learning, Dataset_Management, Data_Streaming, API_Design
Last Updated 2026-02-08 16:00 GMT

Overview

Base classes for River's dataset infrastructure supporting synthetic generation, local files, and remote dataset downloads.

Description

This module defines the foundational classes for River's dataset system: Dataset, SyntheticDataset, FileDataset, and RemoteDataset. These abstract base classes establish a consistent interface for accessing data in streaming fashion, regardless of the data source.

Dataset is the abstract base class that all datasets inherit from. It defines metadata fields (task type, number of features, samples, classes) and provides common functionality like the take method for limiting samples and the desc property for accessing documentation. The __iter__ method must be implemented by subclasses to provide streaming access.

SyntheticDataset is for algorithmically generated datasets that can produce infinite streams. These datasets include generators for testing and benchmarking like Friedman regression and concept drift scenarios.

FileDataset handles locally stored datasets that ship with River or are stored on disk. It manages file paths relative to the datasets module location or a specified directory.

RemoteDataset extends FileDataset for datasets that must be downloaded from URLs. It handles downloading, caching in a user data directory (controlled by RIVER_DATA environment variable), unpacking compressed files (zip, tar.gz), and verifying downloads by checking file sizes.

Usage

Use Dataset subclasses to access data in a streaming manner for online learning. SyntheticDataset is for generating test data, FileDataset for bundled datasets, and RemoteDataset for large datasets that need to be downloaded once and cached. All datasets support the standard iteration protocol for streaming access.

Code Reference

Source Location

Signature

class Dataset(abc.ABC):
    def __init__(
        self,
        task,
        n_features,
        n_samples=None,
        n_classes=None,
        n_outputs=None,
        sparse=False,
    ):
        ...

class SyntheticDataset(Dataset):
    ...

class FileDataset(Dataset):
    def __init__(self, filename, directory=None, **desc):
        ...

class RemoteDataset(FileDataset):
    def __init__(self, url, size, unpack=True, filename=None, **desc):
        ...

Import

from river import datasets

I/O Contract

Dataset Parameters
Parameter Type Description
task str Type of task: "Regression", "Binary classification", etc.
n_features int Number of features in the dataset
n_samples int or None Number of samples (None for infinite)
n_classes int or None Number of classes (classification only)
n_outputs int or None Number of outputs (multi-output only)
sparse bool Whether dataset is sparse
FileDataset Parameters
Parameter Type Description
filename str Name of the data file
directory str or None Directory containing file (default: datasets module)
**desc dict Additional dataset metadata
RemoteDataset Parameters
Parameter Type Description
url str URL where dataset is located
size int Expected download size in bytes
unpack bool Whether to unpack compressed file (default: True)
filename str or None Filename after unpacking (inferred if None)
**desc dict Additional dataset metadata
Key Methods
Method Parameters Return Type Description
__iter__() None Iterator Streams dataset samples
take(k) k: int Iterator Returns first k samples
download(force, verbose) force: bool, verbose: bool None Downloads remote dataset
Properties
Property Type Description
desc str Dataset description from docstring
path Path Path to dataset file (FileDataset/RemoteDataset)
is_downloaded bool Whether remote dataset is downloaded (RemoteDataset)

Usage Examples

from river import datasets
from river import linear_model
from river import preprocessing

# Example 1: Using a synthetic dataset
dataset = datasets.synth.Friedman(seed=42)

# Synthetic datasets can be infinite
for i, (x, y) in enumerate(dataset.take(100)):
    print(f"Sample {i}: x={x}, y={y}")
    if i >= 99:
        break

# Check dataset properties
print(f"Task: {dataset.task}")
print(f"Features: {dataset.n_features}")
print(f"Samples: {dataset.n_samples}")  # None for infinite

# Example 2: Using a bundled file dataset
dataset = datasets.Phishing()

print(dataset.desc)  # Print description
print(f"Path: {dataset.path}")
print(f"Samples: {dataset.n_samples}")
print(f"Features: {dataset.n_features}")

# Stream through data
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
for x, y in dataset:
    y_pred = model.predict_one(x)
    model.learn_one(x, y)

# Example 3: Using a remote dataset
dataset = datasets.ImageSegments()

# Check if downloaded
if not dataset.is_downloaded:
    print("Dataset not yet downloaded")
    print(f"Will download from: {dataset.url}")
    print(f"Expected size: {dataset.size} bytes")

# Iterating triggers automatic download
for x, y in dataset.take(10):
    print(x, y)

# Manual download with control
dataset.download(force=False, verbose=True)

# Example 4: Using take() to limit samples
dataset = datasets.TrumpApproval()

# Get only first 100 samples
for x, y in dataset.take(100):
    print(x, y)

# Example 5: Accessing dataset metadata
dataset = datasets.Music()

print(f"Task type: {dataset.task}")
print(f"Number of features: {dataset.n_features}")
print(f"Number of samples: {dataset.n_samples}")
print(f"Number of outputs: {dataset.n_outputs}")
print(f"Is sparse: {dataset.sparse}")

# Get formatted representation
print(dataset)

# Example 6: Custom dataset using FileDataset
import pathlib
from river.datasets import base

class CustomDataset(base.FileDataset):
    def __init__(self):
        super().__init__(
            filename="my_data.csv",
            directory="/path/to/data",
            task=base.REG,
            n_features=5,
            n_samples=1000,
            sparse=False
        )

    def __iter__(self):
        # Implement custom iteration logic
        import csv
        with open(self.path) as f:
            reader = csv.DictReader(f)
            for row in reader:
                x = {k: float(v) for k, v in row.items() if k != 'target'}
                y = float(row['target'])
                yield x, y

# Example 7: Using get_data_home() for custom storage
from river.datasets.base import get_data_home

data_home = get_data_home()
print(f"River data directory: {data_home}")

# Change data directory via environment variable
import os
os.environ['RIVER_DATA'] = '/custom/path'
print(f"New data directory: {get_data_home()}")

# Example 8: Working with dataset representation
dataset = datasets.Bikes()

# Get description
print(dataset.desc)

# Get formatted info
print(dataset._repr_content)

# Example 9: Streaming with multiple passes
dataset = datasets.Phishing()

# First pass: fit scaler
scaler = preprocessing.StandardScaler()
for x, y in dataset:
    scaler.learn_one(x)

# Second pass: train model (requires re-instantiation)
dataset = datasets.Phishing()
model = linear_model.LogisticRegression()
for x, y in dataset:
    x_scaled = scaler.transform_one(x)
    model.learn_one(x_scaled, y)

# Example 10: Dataset constants
from river.datasets.base import REG, BINARY_CLF, MULTI_CLF

print(f"Regression: {REG}")
print(f"Binary classification: {BINARY_CLF}")
print(f"Multi-class classification: {MULTI_CLF}")

# Use in custom dataset
class MyDataset(base.Dataset):
    def __init__(self):
        super().__init__(
            task=REG,
            n_features=10,
            n_samples=500
        )

    def __iter__(self):
        import random
        for _ in range(self.n_samples):
            x = {f'x{i}': random.random() for i in range(self.n_features)}
            y = sum(x.values())
            yield x, y

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment