Implementation:Online ml River Stream Iter Sql
| Knowledge Sources | |
|---|---|
| Domains | Online_Learning, Data_Streaming, SQL, Databases |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Streams results from SQL queries for online machine learning using SQLAlchemy connections.
Description
The iter_sql function executes SQL queries and yields results row-by-row for online learning. It works with any SQLAlchemy-compatible database connection and supports both raw SQL strings and SQLAlchemy query objects. By default, SQLAlchemy prefetches results, but this can be configured for true streaming with the stream_results parameter.
Usage
Use this when your data is stored in a relational database and you want to train models online without loading all data into memory. Particularly useful for large datasets that don't fit in memory or when you want to continuously learn from database updates.
Code Reference
Source Location
- Repository: Online_ml_River
- File: river/stream/iter_sql.py
Signature
def iter_sql(
query: str | sqlalchemy.TextClause | sqlalchemy.Select,
conn: sqlalchemy.Connection,
target_name: str | None = None,
) -> base.typing.Stream:
...
Import
from river import stream
I/O Contract
| Parameter | Type | Description |
|---|---|---|
| query | str, TextClause, or Select | SQL query to execute |
| conn | sqlalchemy.Connection | SQLAlchemy connection object |
| target_name | str or None | Column name to use as target (y will be None if not specified) |
Returns:
| Type | Description |
|---|---|
| Iterator[(dict, Any)] | Stream of (row dict, target value) tuples |
Usage Examples
import datetime as dt
import sqlalchemy
from river import stream
# Create an in-memory SQLite database
engine = sqlalchemy.create_engine('sqlite://')
# Define table schema
metadata = sqlalchemy.MetaData()
t_sales = sqlalchemy.Table('sales', metadata,
sqlalchemy.Column('shop', sqlalchemy.String, primary_key=True),
sqlalchemy.Column('date', sqlalchemy.Date, primary_key=True),
sqlalchemy.Column('amount', sqlalchemy.Integer)
)
metadata.create_all(engine)
# Insert sample data
sales = [
{'shop': 'Hema', 'date': dt.date(2016, 8, 2), 'amount': 20},
{'shop': 'Ikea', 'date': dt.date(2016, 8, 2), 'amount': 18},
{'shop': 'Hema', 'date': dt.date(2016, 8, 3), 'amount': 22},
{'shop': 'Ikea', 'date': dt.date(2016, 8, 3), 'amount': 14},
]
with engine.connect() as conn:
conn.execute(t_sales.insert(), sales)
conn.commit()
# Stream data using SQLAlchemy query
with engine.connect() as conn:
query = sqlalchemy.sql.select(t_sales)
dataset = stream.iter_sql(query, conn, target_name='amount')
for x, y in dataset:
print(f"Features: {x}, Target: {y}")
# Using raw SQL
with engine.connect() as conn:
query = "SELECT * FROM sales WHERE shop = 'Hema'"
dataset = stream.iter_sql(query, conn, target_name='amount')
print("\nFiltered results:")
for x, y in dataset:
print(f"Shop: {x['shop']}, Date: {x['date']}, Amount: {y}")
# For true streaming (some databases):
# conn = engine.connect().execution_options(stream_results=True)