Implementation:Dagster io Dagster Dbt Asset Check Integration
| Attribute | Value |
|---|---|
| Title | Dbt Asset Check Integration |
| Category | Implementation |
| Domains | Data_Engineering, dbt, Data_Quality |
| Repository | Dagster_io_Dagster |
| Source | python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py:L241-254, python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py:L370-403
|
Overview
Concrete mechanism for converting dbt tests to Dagster asset checks provided by the dagster-dbt library.
Description
The dbt-to-asset-check integration is implemented across two key locations:
- DagsterDbtTranslator.get_asset_check_spec(): Converts a single dbt test into a Dagster
AssetCheckSpecby delegating to thedefault_asset_check_fn()utility. - DbtProjectComponent.build_defs_from_state(): Calls
build_dbt_specs()to extract both asset specs and check specs from the dbt manifest, then passes the check specs to the@multi_assetdecorator.
The DagsterDbtTranslatorSettings controls which tests are converted:
enable_asset_checks(defaultTrue): Controls whether dbt model tests become Dagster asset checks.enable_source_tests_as_checks(defaultFalse): Controls whether dbt source tests become asset checks or observations.
Usage
Automatic when using DbtProjectComponent. Any dbt test declared in a schema YAML file becomes a Dagster asset check. Configure via DagsterDbtTranslatorSettings to customize behavior.
Code Reference
Source Location
python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py:L241-254python_modules/libraries/dagster-dbt/dagster_dbt/components/dbt_project/component.py:L370-403
Signature
# DagsterDbtTranslator method
def get_asset_check_spec(
self,
asset_spec: AssetSpec,
manifest: Mapping[str, Any],
unique_id: str,
project: Optional["DbtProject"],
) -> Optional[AssetCheckSpec]:
return default_asset_check_fn(
manifest=manifest,
dagster_dbt_translator=self,
asset_key=asset_spec.key,
test_unique_id=unique_id,
project=project,
)
# DagsterDbtTranslatorSettings
@dataclass(frozen=True)
class DagsterDbtTranslatorSettings(Resolvable):
enable_asset_checks: bool = True
enable_duplicate_source_asset_keys: bool = False
enable_code_references: bool = False
enable_dbt_selection_by_name: bool = False
enable_source_tests_as_checks: bool = False
enable_source_metadata: bool = False
# In DbtProjectComponent.build_defs_from_state():
def build_defs_from_state(self, context, state_path):
project = self._project_manager.get_project(state_path)
asset_specs, check_specs = build_dbt_specs(
translator=validate_translator(self.translator),
manifest=validate_manifest(project.manifest_path),
select=self.select,
exclude=self.exclude,
selector=self.selector,
project=project,
io_manager_key=None,
)
@dg.multi_asset(
specs=asset_specs,
check_specs=check_specs,
can_subset=True,
...
)
def _fn(context: dg.AssetExecutionContext):
yield from self.execute(context=context, dbt=DbtCliResource(project))
return dg.Definitions(assets=[_fn])
Import
# Automatic when using DbtProjectComponent
# Configure via DagsterDbtTranslatorSettings:
from dagster_dbt import DagsterDbtTranslatorSettings
I/O Contract
Inputs
- dbt manifest: Parsed JSON containing test nodes with their
unique_id,test_metadata, and associations to models/sources. - DagsterDbtTranslatorSettings: Feature flags controlling which tests become checks.
- dbt schema YAML: Test declarations on models and columns.
Example dbt schema YAML:
models:
- name: stg_trips
columns:
- name: trip_id
tests:
- not_null
- unique
- name: pickup_zone
tests:
- accepted_values:
values: ['Manhattan', 'Brooklyn', 'Queens', 'Bronx', 'Staten Island']
Outputs
- AssetCheckSpec objects: Auto-linked to the corresponding Dagster asset. Each spec includes the check name (derived from the dbt test name), the asset key, and any additional metadata.
- AssetCheckResult events: Emitted during
dbt buildexecution when tests pass or fail.
Usage Examples
Default Behavior (No Configuration Needed)
Any dbt test declared in schema.yml automatically becomes a Dagster asset check:
# dbt schema.yml
models:
- name: stg_trips
columns:
- name: trip_id
tests:
- not_null
- unique
These tests appear as Dagster asset checks on the stg_trips asset in the UI.
Enabling Source Tests as Checks
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings
translator = DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(
enable_asset_checks=True,
enable_source_tests_as_checks=True,
)
)
Custom Check Spec via Translator Override
from dagster_dbt import DagsterDbtTranslator
from dagster import AssetCheckSpec
class CustomTranslator(DagsterDbtTranslator):
def get_asset_check_spec(self, asset_spec, manifest, unique_id, project=None):
default_check = super().get_asset_check_spec(
asset_spec, manifest, unique_id, project
)
if default_check is None:
return None
# Add custom metadata or modify the check spec
return default_check