Implementation:Dagster io Dagster DuckDB Resource
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Databases |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete resource for interacting with DuckDB databases provided by the dagster-duckdb integration library.
Description
The DuckDBResource class provides a managed connection to a DuckDB database within Dagster pipelines. It extends Dagster's ConfigurableResource base class, accepting a database path and optional connection configuration. The resource exposes a get_connection() context manager that yields a duckdb.DuckDBPyConnection object, automatically handling connection setup and teardown.
DuckDB is an in-process analytical database, making it well-suited for local development and testing. The resource enables seamless integration with Dagster's resource injection system, allowing assets to interact with DuckDB without managing connection lifecycle manually.
Usage
Import from the dagster_duckdb package. Use when assets need to read from or write to DuckDB databases. Register the resource in the Definitions object with a key (conventionally "duckdb") and declare a parameter of type DuckDBResource in asset functions.
Code Reference
Source Location
- Repository: dagster
- File: python_modules/libraries/dagster-duckdb/dagster_duckdb/resource.py:L11
Signature
class DuckDBResource(ConfigurableResource):
database: str = Field(
description="Path to the DuckDB database. Setting database=':memory:' will use an in-memory database"
)
connection_config: dict[str, Any] = Field(
description="DuckDB connection configuration options",
default={},
)
@contextmanager
def get_connection(self) -> Generator[duckdb.DuckDBPyConnection, None, None]:
...
Import
from dagster_duckdb import DuckDBResource
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| database | str | Yes | Path to the DuckDB database file. Use ":memory:" for an in-memory database.
|
| connection_config | dict[str, Any] | No | Additional DuckDB connection configuration options (e.g., threads, memory limit). Defaults to empty dict. |
Outputs
| Name | Type | Description |
|---|---|---|
| get_connection() | context manager yielding duckdb.DuckDBPyConnection | A DuckDB connection that is automatically closed when the context manager exits. |
Usage Examples
Basic Resource Configuration
import dagster as dg
from dagster_duckdb import DuckDBResource
defs = dg.Definitions(
resources={"duckdb": DuckDBResource(database="/tmp/my_database.duckdb")},
assets=[my_asset],
)
Asset Using DuckDB Resource
import dagster as dg
from dagster_duckdb import DuckDBResource
@dg.asset(kinds={"duckdb"})
def my_asset(duckdb: DuckDBResource):
with duckdb.get_connection() as conn:
conn.execute("SELECT 1")
Full Pipeline Example
import dagster as dg
from dagster_duckdb import DuckDBResource
@dg.asset(kinds={"duckdb"}, group_name="ingestion")
def raw_events(duckdb: DuckDBResource) -> None:
with duckdb.get_connection() as conn:
conn.execute(
"CREATE TABLE IF NOT EXISTS raw_events AS "
"SELECT * FROM read_csv('https://example.com/events.csv')"
)
defs = dg.Definitions(
resources={"duckdb": DuckDBResource(database="data/pipeline.duckdb")},
assets=[raw_events],
)