Principle:Online ml River CSV Stream Ingestion
| Knowledge Sources | River River Docs |
|---|---|
| Domains | Online_Learning Data_Ingestion ETL |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
CSV stream ingestion is a technique for converting CSV files into observation-by-observation data streams, bridging the gap between batch file storage and the streaming paradigm required by online learning.
Description
Most real-world datasets are stored as CSV files on disk. Online learning, however, requires data to arrive as a sequential stream of individual observations. CSV stream ingestion solves this impedance mismatch by reading a CSV file row by row, converting each row into a (x, y) tuple where x is a feature dictionary and y is the target value.
The ingestion process supports several critical transformations during streaming:
- Type conversion: CSV files store all values as strings. A converters dictionary maps column names to callables (e.g.,
float,int, or custom lambdas) that cast values to the appropriate Python types on the fly. - Date parsing: Columns can be parsed into
datetimeobjects using format strings. - Column dropping: Irrelevant columns can be excluded from the feature dictionary.
- Sampling: A fraction parameter enables random sub-sampling of rows during iteration, which is useful for quick experimentation on large files.
- Compression: On-disk files compressed with gzip or zip are transparently decompressed during streaming.
The key architectural benefit is that CSV stream ingestion maintains a constant memory footprint regardless of file size, since only one row is materialized at a time.
Usage
Use CSV stream ingestion when:
- You have data stored in CSV format and need to feed it to an online learning model.
- You want fine-grained control over type conversion, date parsing, or column selection during streaming.
- You need to sub-sample a large CSV file for rapid prototyping.
- You are building a custom dataset class that wraps a CSV file (as River's built-in datasets do internally).
Theoretical Basis
CSV stream ingestion implements a generator pattern that lazily yields transformed rows. The transformation pipeline applied to each row can be expressed as:
function iter_csv(filepath, target, converters, drop, fraction, seed):
reader = CSVDictReader(filepath, fraction, seed)
for row in reader:
# Drop unwanted columns
for col in drop:
remove row[col]
# Apply type converters
for col, converter in converters:
row[col] = converter(row[col])
# Separate target from features
y = row.pop(target)
x = row
yield (x, y)
The sampling mechanism works by probabilistically skipping rows: for each row, a random number is drawn, and the row is skipped if the number exceeds the specified fraction. This is equivalent to Bernoulli sampling with parameter , where each row is independently included with probability .
The separation of features and target follows a convention central to River: features are always represented as Python dictionaries (dict) rather than arrays. This enables heterogeneous feature types, dynamic feature sets, and natural handling of missing values (keys simply absent from the dictionary).