Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Iterative Dvc Database

From Leeroopedia


Knowledge Sources
Domains Database, Data_Import
Last Updated 2026-02-10 10:00 GMT

Overview

Database client and serialization utilities for importing SQL query results into DVC-tracked files, provided by the DVC library.

Description

The dvc/database.py module (134 lines) provides a lightweight database abstraction layer built on SQLAlchemy, enabling DVC to import data from SQL databases into CSV or JSON files. The module defines two dataclasses, two context managers, and a URL construction utility.

Serializer is a dataclass that wraps a SQL query and a database connection. It provides to_csv (which streams results in chunks using pandas.read_sql with chunksize for memory efficiency and writes to an atomic temporary file) and to_json (which loads the full result set and writes JSON with orient="records"). The export method dispatches to the appropriate serializer based on a format string.

Client is a dataclass wrapping a SQLAlchemy Engine. Its test_connection method executes SELECT 1 to verify database connectivity, invoking an optional onerror callback and raising a DvcException with a descriptive message on failure. Its export method creates a connection (with stream_results=True for CSV to enable server-side cursors), constructs a Serializer, and delegates the export. Both methods accept an optional progress callback for reporting row-level progress.

url_from_config constructs a SQLAlchemy URL from either a connection string or a dictionary of connection parameters (via make_url). The client context manager is the primary entry point: it takes a URL string, URL object, or config dictionary, resolves it to a URL, creates a SQLAlchemy engine (with optional echo mode controlled by the DVC_SQLALCHEMY_ECHO environment variable), yields a Client instance, and disposes of the engine on exit. The handle_error context manager wraps engine creation to produce user-friendly error messages when database drivers are missing.

The module also provides atomic_file, an internal context manager that writes to a temporary file in the same directory as the target and then atomically replaces the target using os.replace, ensuring that partial writes never corrupt the output file.

Usage

Use client and url_from_config when importing data from SQL databases into DVC-tracked files -- for example, via dvc import-db or custom data pipeline stages that pull tabular data from a data warehouse. The streaming CSV export is suitable for large result sets, while the JSON export is convenient for smaller datasets or API-style outputs.

Code Reference

Source Location

  • Repository: DVC
  • File: dvc/database.py
  • Lines: L1-134

Signature

@dataclass
class Serializer:
    sql: Union[str, Selectable]
    con: Union[str, Connectable]
    chunksize: int = 10_000

    def to_csv(self, file: StrOrBytesPath, progress=noop):
        ...

    def to_json(self, file: StrOrBytesPath, progress=noop):
        ...

    def export(self, file: StrOrBytesPath, format: str = "csv", progress=noop):
        ...


@dataclass
class Client:
    engine: Engine

    def test_connection(self, onerror: Optional[Callable[[], Any]] = None) -> None:
        ...

    def export(
        self,
        sql: Union[str, Selectable],
        file: StrOrBytesPath,
        format: str = "csv",
        progress=noop,
    ) -> None:
        ...


def url_from_config(config: Union[str, URL, dict[str, str]]) -> URL:
    """Construct a SQLAlchemy URL from a string or config dictionary."""
    ...


@contextmanager
def client(
    url_or_config: Union[str, URL, dict[str, str]],
    **engine_kwargs: Any,
) -> Iterator[Client]:
    """Create a database Client from a URL or config.

    Args:
        url_or_config: connection string, SQLAlchemy URL, or dict of
            connection parameters.
        **engine_kwargs: additional keyword arguments passed to
            sqlalchemy.create_engine.

    Yields:
        Client: a connected database client.
    """
    ...

Import

from dvc.database import client, url_from_config

I/O Contract

client Inputs

Name Type Required Description
url_or_config Union[str, URL, dict[str, str]] Yes Database connection specification. Can be a SQLAlchemy connection string (e.g., "postgresql://user:pass@host/db"), a SQLAlchemy URL object, or a dictionary of connection parameters (keys like drivername, username, password, host, database).
**engine_kwargs Any No Additional keyword arguments passed through to sqlalchemy.create_engine (e.g., pool_size, connect_args).

client Output

Name Type Description
(yielded value) Client A connected database client wrapping a SQLAlchemy Engine. The engine is automatically disposed when the context manager exits.

Client.export Inputs

Name Type Required Description
sql Union[str, Selectable] Yes SQL query string or SQLAlchemy Selectable expression to execute.
file StrOrBytesPath Yes Output file path. The file is written atomically via a temporary file.
format str No Output format: "csv" (default, with streaming/chunked writes) or "json" (records orientation).
progress Callable No Progress callback invoked with the number of rows written per chunk. Defaults to a no-op function.

url_from_config Input

Name Type Required Description
config Union[str, URL, dict[str, str]] Yes A connection string, SQLAlchemy URL object, or dictionary of connection parameters to convert into a SQLAlchemy URL.

Usage Examples

Exporting Query Results to CSV

from dvc.database import client

# Export a SQL query to a CSV file
with client("postgresql://user:pass@localhost/mydb") as db:
    db.test_connection()
    db.export(
        sql="SELECT * FROM training_data WHERE split = 'train'",
        file="data/train.csv",
        format="csv",
    )

Using a Config Dictionary

from dvc.database import client, url_from_config

# Build a URL from a config dictionary
config = {
    "drivername": "postgresql",
    "username": "user",
    "password": "pass",
    "host": "db.example.com",
    "database": "analytics",
}
url = url_from_config(config)

# Export to JSON
with client(url) as db:
    db.test_connection()
    db.export(
        sql="SELECT id, score, label FROM predictions LIMIT 1000",
        file="data/predictions.json",
        format="json",
    )

With Progress Reporting

from dvc.database import client

total_rows = 0

def on_progress(n):
    global total_rows
    total_rows += n
    print(f"Exported {total_rows} rows...")

with client("sqlite:///local.db") as db:
    db.export(
        sql="SELECT * FROM measurements",
        file="output/measurements.csv",
        progress=on_progress,
    )

print(f"Done. Total rows exported: {total_rows}")

Related Pages

Implements Principle

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment