Implementation:Dagster io Dagster Dbt Incremental Support
| Attribute | Value |
|---|---|
| Title | Dbt Incremental Support |
| Category | Implementation |
| Domains | Data_Engineering, dbt, Incremental |
| Repository | Dagster_io_Dagster |
| Source | python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py:L405-446
|
Overview
Concrete mechanism for connecting Dagster partitions to dbt incremental models via template variables and CLI argument injection provided by the dagster-dbt library.
Description
The incremental support is implemented through the get_cli_args() method of DbtProjectComponent, which resolves Jinja templates in CLI arguments at execution time. When a partitioned asset is materialized, the method extracts the partition key, partition key range, and partition time window from the AssetExecutionContext, then injects these values into the resolution scope for Jinja template processing.
The resolved CLI arguments are passed to the dbt CLI as --vars, enabling dbt's is_incremental() macro to filter data to the specified time window.
Usage
Configure via YAML with template variables for partition-aware dbt execution. Requires both a partition definition (via post_processing or @template_var) and Jinja templates in cli_args.
Code Reference
Source Location
python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py:L405-446
Signature
def get_cli_args(self, context: dg.AssetExecutionContext) -> list[str]:
"""Resolve CLI arguments with partition-related scope variables."""
partition_key = context.partition_key if context.has_partition_key else None
partition_key_range = (
context.partition_key_range if context.has_partition_key_range else None
)
try:
partition_time_window = context.partition_time_window
except Exception:
partition_time_window = None
# resolve the cli args with additional partition-related scope
resolved_args = (
_resolution_context.get()
.with_scope(
partition_key=partition_key,
partition_key_range=partition_key_range,
partition_time_window=partition_time_window,
)
.resolve_value(self.cli_args, as_type=list[str])
)
...
Import
from dagster import template_var, DailyPartitionsDefinition
The incremental support is used indirectly through YAML configuration of DbtProjectComponent.
I/O Contract
Inputs
- Partition time window:
context.partition_time_windowprovidingstartandendtimestamps from the Dagster partition definition. - template_vars_module: Path to a Python module containing
@template_var-decorated functions that define partition definitions. - cli_args with Jinja templates: YAML configuration containing templates like
Template:Context.partition time window.start.strftime('%Y-%m-%d'). - dbt model tags: dbt tags (e.g.,
tag:daily) used to scope partition definitions to specific models.
Outputs
- Partitioned dbt assets: Assets with
DailyPartitionsDefinition(or other partition types) assigned viapost_processing. - Resolved CLI arguments: The dbt CLI receives
--varswithmin_date/max_date(or similar) values computed from the partition time window.
Usage Examples
Template Variables Module
# template_vars.py
import dagster as dg
@dg.template_var
def daily_partitions_def():
return dg.DailyPartitionsDefinition(start_date="2023-01-01")
YAML Configuration
# defs.yaml
type: dagster_dbt.DbtProjectComponent
attributes:
project: ../analytics
template_vars_module: .template_vars
post_processing:
assets:
- target: "tag:daily"
attributes:
partitions_def: "{{ daily_partitions_def }}"
cli_args:
- "build"
- "--vars":
min_date: "{{ partition_time_window.start.strftime('%Y-%m-%d') }}"
max_date: "{{ partition_time_window.end.strftime('%Y-%m-%d') }}"
- "--select"
- "tag:daily"
Corresponding dbt Model
-- models/stg_trips.sql
{{ config(materialized='incremental', tags=['daily']) }}
SELECT * FROM {{ source('raw_taxis', 'trips') }}
{% if is_incremental() %}
WHERE pickup_datetime >= '{{ var("min_date") }}'
AND pickup_datetime < '{{ var("max_date") }}'
{% endif %}