Implementation:Online ml River Datasets Base
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Dataset_Management, Data_Streaming, API_Design |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Base classes for River's dataset infrastructure supporting synthetic generation, local files, and remote dataset downloads.
Description
This module defines the foundational classes for River's dataset system: Dataset, SyntheticDataset, FileDataset, and RemoteDataset. These abstract base classes establish a consistent interface for accessing data in streaming fashion, regardless of the data source.
Dataset is the abstract base class that all datasets inherit from. It defines metadata fields (task type, number of features, samples, classes) and provides common functionality like the take method for limiting samples and the desc property for accessing documentation. The __iter__ method must be implemented by subclasses to provide streaming access.
SyntheticDataset is for algorithmically generated datasets that can produce infinite streams. These datasets include generators for testing and benchmarking like Friedman regression and concept drift scenarios.
FileDataset handles locally stored datasets that ship with River or are stored on disk. It manages file paths relative to the datasets module location or a specified directory.
RemoteDataset extends FileDataset for datasets that must be downloaded from URLs. It handles downloading, caching in a user data directory (controlled by RIVER_DATA environment variable), unpacking compressed files (zip, tar.gz), and verifying downloads by checking file sizes.
Usage
Use Dataset subclasses to access data in a streaming manner for online learning. SyntheticDataset is for generating test data, FileDataset for bundled datasets, and RemoteDataset for large datasets that need to be downloaded once and cached. All datasets support the standard iteration protocol for streaming access.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/datasets/base.py
Signature
class Dataset(abc.ABC):
def __init__(
self,
task,
n_features,
n_samples=None,
n_classes=None,
n_outputs=None,
sparse=False,
):
...
class SyntheticDataset(Dataset):
...
class FileDataset(Dataset):
def __init__(self, filename, directory=None, **desc):
...
class RemoteDataset(FileDataset):
def __init__(self, url, size, unpack=True, filename=None, **desc):
...
Import
from river import datasets
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| task | str | Type of task: "Regression", "Binary classification", etc. |
| n_features | int | Number of features in the dataset |
| n_samples | int or None | Number of samples (None for infinite) |
| n_classes | int or None | Number of classes (classification only) |
| n_outputs | int or None | Number of outputs (multi-output only) |
| sparse | bool | Whether dataset is sparse |
| Parameter | Type | Description |
|---|---|---|
| filename | str | Name of the data file |
| directory | str or None | Directory containing file (default: datasets module) |
| **desc | dict | Additional dataset metadata |
| Parameter | Type | Description |
|---|---|---|
| url | str | URL where dataset is located |
| size | int | Expected download size in bytes |
| unpack | bool | Whether to unpack compressed file (default: True) |
| filename | str or None | Filename after unpacking (inferred if None) |
| **desc | dict | Additional dataset metadata |
| Method | Parameters | Return Type | Description |
|---|---|---|---|
| __iter__() | None | Iterator | Streams dataset samples |
| take(k) | k: int | Iterator | Returns first k samples |
| download(force, verbose) | force: bool, verbose: bool | None | Downloads remote dataset |
| Property | Type | Description |
|---|---|---|
| desc | str | Dataset description from docstring |
| path | Path | Path to dataset file (FileDataset/RemoteDataset) |
| is_downloaded | bool | Whether remote dataset is downloaded (RemoteDataset) |
Usage Examples
from river import datasets
from river import linear_model
from river import preprocessing
# Example 1: Using a synthetic dataset
dataset = datasets.synth.Friedman(seed=42)
# Synthetic datasets can be infinite
for i, (x, y) in enumerate(dataset.take(100)):
print(f"Sample {i}: x={x}, y={y}")
if i >= 99:
break
# Check dataset properties
print(f"Task: {dataset.task}")
print(f"Features: {dataset.n_features}")
print(f"Samples: {dataset.n_samples}") # None for infinite
# Example 2: Using a bundled file dataset
dataset = datasets.Phishing()
print(dataset.desc) # Print description
print(f"Path: {dataset.path}")
print(f"Samples: {dataset.n_samples}")
print(f"Features: {dataset.n_features}")
# Stream through data
model = preprocessing.StandardScaler() | linear_model.LogisticRegression()
for x, y in dataset:
y_pred = model.predict_one(x)
model.learn_one(x, y)
# Example 3: Using a remote dataset
dataset = datasets.ImageSegments()
# Check if downloaded
if not dataset.is_downloaded:
print("Dataset not yet downloaded")
print(f"Will download from: {dataset.url}")
print(f"Expected size: {dataset.size} bytes")
# Iterating triggers automatic download
for x, y in dataset.take(10):
print(x, y)
# Manual download with control
dataset.download(force=False, verbose=True)
# Example 4: Using take() to limit samples
dataset = datasets.TrumpApproval()
# Get only first 100 samples
for x, y in dataset.take(100):
print(x, y)
# Example 5: Accessing dataset metadata
dataset = datasets.Music()
print(f"Task type: {dataset.task}")
print(f"Number of features: {dataset.n_features}")
print(f"Number of samples: {dataset.n_samples}")
print(f"Number of outputs: {dataset.n_outputs}")
print(f"Is sparse: {dataset.sparse}")
# Get formatted representation
print(dataset)
# Example 6: Custom dataset using FileDataset
import pathlib
from river.datasets import base
class CustomDataset(base.FileDataset):
def __init__(self):
super().__init__(
filename="my_data.csv",
directory="/path/to/data",
task=base.REG,
n_features=5,
n_samples=1000,
sparse=False
)
def __iter__(self):
# Implement custom iteration logic
import csv
with open(self.path) as f:
reader = csv.DictReader(f)
for row in reader:
x = {k: float(v) for k, v in row.items() if k != 'target'}
y = float(row['target'])
yield x, y
# Example 7: Using get_data_home() for custom storage
from river.datasets.base import get_data_home
data_home = get_data_home()
print(f"River data directory: {data_home}")
# Change data directory via environment variable
import os
os.environ['RIVER_DATA'] = '/custom/path'
print(f"New data directory: {get_data_home()}")
# Example 8: Working with dataset representation
dataset = datasets.Bikes()
# Get description
print(dataset.desc)
# Get formatted info
print(dataset._repr_content)
# Example 9: Streaming with multiple passes
dataset = datasets.Phishing()
# First pass: fit scaler
scaler = preprocessing.StandardScaler()
for x, y in dataset:
scaler.learn_one(x)
# Second pass: train model (requires re-instantiation)
dataset = datasets.Phishing()
model = linear_model.LogisticRegression()
for x, y in dataset:
x_scaled = scaler.transform_one(x)
model.learn_one(x_scaled, y)
# Example 10: Dataset constants
from river.datasets.base import REG, BINARY_CLF, MULTI_CLF
print(f"Regression: {REG}")
print(f"Binary classification: {BINARY_CLF}")
print(f"Multi-class classification: {MULTI_CLF}")
# Use in custom dataset
class MyDataset(base.Dataset):
def __init__(self):
super().__init__(
task=REG,
n_features=10,
n_samples=500
)
def __iter__(self):
import random
for _ in range(self.n_samples):
x = {f'x{i}': random.random() for i in range(self.n_features)}
y = sum(x.values())
yield x, y