Implementation:Iterative Dvc Database
| Knowledge Sources | |
|---|---|
| Domains | Database, Data_Import |
| Last Updated | 2026-02-10 10:00 GMT |
Overview
Database client and serialization utilities for importing SQL query results into DVC-tracked files, provided by the DVC library.
Description
The dvc/database.py module (134 lines) provides a lightweight database abstraction layer built on SQLAlchemy, enabling DVC to import data from SQL databases into CSV or JSON files. The module defines two dataclasses, two context managers, and a URL construction utility.
Serializer is a dataclass that wraps a SQL query and a database connection. It provides to_csv (which streams results in chunks using pandas.read_sql with chunksize for memory efficiency and writes to an atomic temporary file) and to_json (which loads the full result set and writes JSON with orient="records"). The export method dispatches to the appropriate serializer based on a format string.
Client is a dataclass wrapping a SQLAlchemy Engine. Its test_connection method executes SELECT 1 to verify database connectivity, invoking an optional onerror callback and raising a DvcException with a descriptive message on failure. Its export method creates a connection (with stream_results=True for CSV to enable server-side cursors), constructs a Serializer, and delegates the export. Both methods accept an optional progress callback for reporting row-level progress.
url_from_config constructs a SQLAlchemy URL from either a connection string or a dictionary of connection parameters (via make_url). The client context manager is the primary entry point: it takes a URL string, URL object, or config dictionary, resolves it to a URL, creates a SQLAlchemy engine (with optional echo mode controlled by the DVC_SQLALCHEMY_ECHO environment variable), yields a Client instance, and disposes of the engine on exit. The handle_error context manager wraps engine creation to produce user-friendly error messages when database drivers are missing.
The module also provides atomic_file, an internal context manager that writes to a temporary file in the same directory as the target and then atomically replaces the target using os.replace, ensuring that partial writes never corrupt the output file.
Usage
Use client and url_from_config when importing data from SQL databases into DVC-tracked files -- for example, via dvc import-db or custom data pipeline stages that pull tabular data from a data warehouse. The streaming CSV export is suitable for large result sets, while the JSON export is convenient for smaller datasets or API-style outputs.
Code Reference
Source Location
- Repository: DVC
- File:
dvc/database.py - Lines: L1-134
Signature
@dataclass
class Serializer:
sql: Union[str, Selectable]
con: Union[str, Connectable]
chunksize: int = 10_000
def to_csv(self, file: StrOrBytesPath, progress=noop):
...
def to_json(self, file: StrOrBytesPath, progress=noop):
...
def export(self, file: StrOrBytesPath, format: str = "csv", progress=noop):
...
@dataclass
class Client:
engine: Engine
def test_connection(self, onerror: Optional[Callable[[], Any]] = None) -> None:
...
def export(
self,
sql: Union[str, Selectable],
file: StrOrBytesPath,
format: str = "csv",
progress=noop,
) -> None:
...
def url_from_config(config: Union[str, URL, dict[str, str]]) -> URL:
"""Construct a SQLAlchemy URL from a string or config dictionary."""
...
@contextmanager
def client(
url_or_config: Union[str, URL, dict[str, str]],
**engine_kwargs: Any,
) -> Iterator[Client]:
"""Create a database Client from a URL or config.
Args:
url_or_config: connection string, SQLAlchemy URL, or dict of
connection parameters.
**engine_kwargs: additional keyword arguments passed to
sqlalchemy.create_engine.
Yields:
Client: a connected database client.
"""
...
Import
from dvc.database import client, url_from_config
I/O Contract
client Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| url_or_config | Union[str, URL, dict[str, str]] |
Yes | Database connection specification. Can be a SQLAlchemy connection string (e.g., "postgresql://user:pass@host/db"), a SQLAlchemy URL object, or a dictionary of connection parameters (keys like drivername, username, password, host, database).
|
| **engine_kwargs | Any |
No | Additional keyword arguments passed through to sqlalchemy.create_engine (e.g., pool_size, connect_args).
|
client Output
| Name | Type | Description |
|---|---|---|
| (yielded value) | Client |
A connected database client wrapping a SQLAlchemy Engine. The engine is automatically disposed when the context manager exits. |
Client.export Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| sql | Union[str, Selectable] |
Yes | SQL query string or SQLAlchemy Selectable expression to execute. |
| file | StrOrBytesPath |
Yes | Output file path. The file is written atomically via a temporary file. |
| format | str |
No | Output format: "csv" (default, with streaming/chunked writes) or "json" (records orientation).
|
| progress | Callable |
No | Progress callback invoked with the number of rows written per chunk. Defaults to a no-op function. |
url_from_config Input
| Name | Type | Required | Description |
|---|---|---|---|
| config | Union[str, URL, dict[str, str]] |
Yes | A connection string, SQLAlchemy URL object, or dictionary of connection parameters to convert into a SQLAlchemy URL. |
Usage Examples
Exporting Query Results to CSV
from dvc.database import client
# Export a SQL query to a CSV file
with client("postgresql://user:pass@localhost/mydb") as db:
db.test_connection()
db.export(
sql="SELECT * FROM training_data WHERE split = 'train'",
file="data/train.csv",
format="csv",
)
Using a Config Dictionary
from dvc.database import client, url_from_config
# Build a URL from a config dictionary
config = {
"drivername": "postgresql",
"username": "user",
"password": "pass",
"host": "db.example.com",
"database": "analytics",
}
url = url_from_config(config)
# Export to JSON
with client(url) as db:
db.test_connection()
db.export(
sql="SELECT id, score, label FROM predictions LIMIT 1000",
file="data/predictions.json",
format="json",
)
With Progress Reporting
from dvc.database import client
total_rows = 0
def on_progress(n):
global total_rows
total_rows += n
print(f"Exported {total_rows} rows...")
with client("sqlite:///local.db") as db:
db.export(
sql="SELECT * FROM measurements",
file="output/measurements.csv",
progress=on_progress,
)
print(f"Done. Total rows exported: {total_rows}")