Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Online ml River Stream Iter Sql

From Leeroopedia
Revision as of 16:11, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Online_ml_River_Stream_Iter_Sql.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Online_Learning, Data_Streaming, SQL, Databases
Last Updated 2026-02-08 16:00 GMT

Overview

Streams results from SQL queries for online machine learning using SQLAlchemy connections.

Description

The iter_sql function executes SQL queries and yields results row-by-row for online learning. It works with any SQLAlchemy-compatible database connection and supports both raw SQL strings and SQLAlchemy query objects. By default, SQLAlchemy prefetches results, but this can be configured for true streaming with the stream_results parameter.

Usage

Use this when your data is stored in a relational database and you want to train models online without loading all data into memory. Particularly useful for large datasets that don't fit in memory or when you want to continuously learn from database updates.

Code Reference

Source Location

Signature

def iter_sql(
    query: str | sqlalchemy.TextClause | sqlalchemy.Select,
    conn: sqlalchemy.Connection,
    target_name: str | None = None,
) -> base.typing.Stream:
    ...

Import

from river import stream

I/O Contract

Parameter Type Description
query str, TextClause, or Select SQL query to execute
conn sqlalchemy.Connection SQLAlchemy connection object
target_name str or None Column name to use as target (y will be None if not specified)

Returns:

Type Description
Iterator[(dict, Any)] Stream of (row dict, target value) tuples

Usage Examples

import datetime as dt
import sqlalchemy
from river import stream

# Create an in-memory SQLite database
engine = sqlalchemy.create_engine('sqlite://')

# Define table schema
metadata = sqlalchemy.MetaData()
t_sales = sqlalchemy.Table('sales', metadata,
    sqlalchemy.Column('shop', sqlalchemy.String, primary_key=True),
    sqlalchemy.Column('date', sqlalchemy.Date, primary_key=True),
    sqlalchemy.Column('amount', sqlalchemy.Integer)
)

metadata.create_all(engine)

# Insert sample data
sales = [
    {'shop': 'Hema', 'date': dt.date(2016, 8, 2), 'amount': 20},
    {'shop': 'Ikea', 'date': dt.date(2016, 8, 2), 'amount': 18},
    {'shop': 'Hema', 'date': dt.date(2016, 8, 3), 'amount': 22},
    {'shop': 'Ikea', 'date': dt.date(2016, 8, 3), 'amount': 14},
]

with engine.connect() as conn:
    conn.execute(t_sales.insert(), sales)
    conn.commit()

# Stream data using SQLAlchemy query
with engine.connect() as conn:
    query = sqlalchemy.sql.select(t_sales)
    dataset = stream.iter_sql(query, conn, target_name='amount')

    for x, y in dataset:
        print(f"Features: {x}, Target: {y}")

# Using raw SQL
with engine.connect() as conn:
    query = "SELECT * FROM sales WHERE shop = 'Hema'"
    dataset = stream.iter_sql(query, conn, target_name='amount')

    print("\nFiltered results:")
    for x, y in dataset:
        print(f"Shop: {x['shop']}, Date: {x['date']}, Amount: {y}")

# For true streaming (some databases):
# conn = engine.connect().execution_options(stream_results=True)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment