Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Dagster io Dagster Dbt Incremental Support

From Leeroopedia


Attribute Value
Title Dbt Incremental Support
Category Implementation
Domains Data_Engineering, dbt, Incremental
Repository Dagster_io_Dagster
Source python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py:L405-446

Overview

Concrete mechanism for connecting Dagster partitions to dbt incremental models via template variables and CLI argument injection provided by the dagster-dbt library.

Description

The incremental support is implemented through the get_cli_args() method of DbtProjectComponent, which resolves Jinja templates in CLI arguments at execution time. When a partitioned asset is materialized, the method extracts the partition key, partition key range, and partition time window from the AssetExecutionContext, then injects these values into the resolution scope for Jinja template processing.

The resolved CLI arguments are passed to the dbt CLI as --vars, enabling dbt's is_incremental() macro to filter data to the specified time window.

Usage

Configure via YAML with template variables for partition-aware dbt execution. Requires both a partition definition (via post_processing or @template_var) and Jinja templates in cli_args.

Code Reference

Source Location

python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py:L405-446

Signature

def get_cli_args(self, context: dg.AssetExecutionContext) -> list[str]:
    """Resolve CLI arguments with partition-related scope variables."""
    partition_key = context.partition_key if context.has_partition_key else None
    partition_key_range = (
        context.partition_key_range if context.has_partition_key_range else None
    )
    try:
        partition_time_window = context.partition_time_window
    except Exception:
        partition_time_window = None

    # resolve the cli args with additional partition-related scope
    resolved_args = (
        _resolution_context.get()
        .with_scope(
            partition_key=partition_key,
            partition_key_range=partition_key_range,
            partition_time_window=partition_time_window,
        )
        .resolve_value(self.cli_args, as_type=list[str])
    )
    ...

Import

from dagster import template_var, DailyPartitionsDefinition

The incremental support is used indirectly through YAML configuration of DbtProjectComponent.

I/O Contract

Inputs

  • Partition time window: context.partition_time_window providing start and end timestamps from the Dagster partition definition.
  • template_vars_module: Path to a Python module containing @template_var-decorated functions that define partition definitions.
  • cli_args with Jinja templates: YAML configuration containing templates like Template:Context.partition time window.start.strftime('%Y-%m-%d').
  • dbt model tags: dbt tags (e.g., tag:daily) used to scope partition definitions to specific models.

Outputs

  • Partitioned dbt assets: Assets with DailyPartitionsDefinition (or other partition types) assigned via post_processing.
  • Resolved CLI arguments: The dbt CLI receives --vars with min_date/max_date (or similar) values computed from the partition time window.

Usage Examples

Template Variables Module

# template_vars.py
import dagster as dg


@dg.template_var
def daily_partitions_def():
    return dg.DailyPartitionsDefinition(start_date="2023-01-01")

YAML Configuration

# defs.yaml
type: dagster_dbt.DbtProjectComponent
attributes:
  project: ../analytics
  template_vars_module: .template_vars
  post_processing:
    assets:
      - target: "tag:daily"
        attributes:
          partitions_def: "{{ daily_partitions_def }}"
  cli_args:
    - "build"
    - "--vars":
        min_date: "{{ partition_time_window.start.strftime('%Y-%m-%d') }}"
        max_date: "{{ partition_time_window.end.strftime('%Y-%m-%d') }}"
    - "--select"
    - "tag:daily"

Corresponding dbt Model

-- models/stg_trips.sql
{{ config(materialized='incremental', tags=['daily']) }}

SELECT * FROM {{ source('raw_taxis', 'trips') }}
{% if is_incremental() %}
WHERE pickup_datetime >= '{{ var("min_date") }}'
  AND pickup_datetime < '{{ var("max_date") }}'
{% endif %}

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment