Implementation:Dagster io Dagster DagsterDbtTranslator API
| Attribute | Value |
|---|---|
| Title | DagsterDbtTranslator API |
| Category | Implementation |
| Domains | Data_Engineering, dbt |
| Repository | Dagster_io_Dagster |
| Source | python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py:L78
|
Overview
Concrete translator class for converting dbt manifest metadata to Dagster asset specs provided by the dagster-dbt library.
Description
The DagsterDbtTranslator class holds a set of methods that derive Dagster asset definition metadata given a representation of a dbt resource (models, tests, sources, etc.). Each method can be overridden in a subclass to customize how Dagster asset metadata is derived from dbt resource properties.
The companion DagsterDbtTranslatorSettings dataclass provides feature flags that control translation behavior without requiring subclassing.
Usage
Use the default translator when the standard mapping is sufficient. Subclass when custom asset key schemes, group assignments, or dependency patterns are needed.
Code Reference
Source Location
python_modules/libraries/dagster-dbt/dagster_dbt/dagster_dbt_translator.py:L78
Signature
class DagsterDbtTranslator:
"""Holds a set of methods that derive Dagster asset definition metadata
given a representation of a dbt resource."""
def __init__(self, settings: Optional[DagsterDbtTranslatorSettings] = None):
self._settings = settings or DagsterDbtTranslatorSettings()
def get_asset_spec(
self,
manifest: Mapping[str, Any],
unique_id: str,
project: Optional["DbtProject"],
) -> AssetSpec:
"""Returns an AssetSpec representing a specific dbt resource."""
...
def get_asset_check_spec(
self,
asset_spec: AssetSpec,
manifest: Mapping[str, Any],
unique_id: str,
project: Optional["DbtProject"],
) -> Optional[AssetCheckSpec]:
"""Returns an AssetCheckSpec for a dbt test, or None."""
...
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
"""Returns the Dagster asset key for a dbt resource."""
...
def get_group_name(self, dbt_resource_props: Mapping[str, Any]) -> Optional[str]:
"""Returns the Dagster group name for a dbt resource."""
...
def get_description(self, dbt_resource_props: Mapping[str, Any]) -> str:
"""Returns the Dagster description for a dbt resource."""
...
def get_metadata(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, Any]:
"""Returns Dagster metadata for a dbt resource."""
...
def get_tags(self, dbt_resource_props: Mapping[str, Any]) -> Mapping[str, str]:
"""Returns Dagster tags for a dbt resource."""
...
def get_owners(self, dbt_resource_props: Mapping[str, Any]) -> Sequence[str]:
"""Returns Dagster owners for a dbt resource."""
...
@dataclass(frozen=True)
class DagsterDbtTranslatorSettings(Resolvable):
enable_asset_checks: bool = True
enable_duplicate_source_asset_keys: bool = False
enable_code_references: bool = False
enable_dbt_selection_by_name: bool = False
enable_source_tests_as_checks: bool = False
enable_source_metadata: bool = False
Import
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings
I/O Contract
Inputs
- manifest: Parsed dbt manifest JSON (
Mapping[str, Any]) containing all dbt resource metadata. - unique_id: The dbt unique identifier for a specific resource (e.g.,
"model.my_project.stg_orders"). - project: Optional
DbtProjectinstance for code reference resolution. - dbt_resource_props: Per-resource metadata dictionary extracted from the manifest via
get_resource_props().
Outputs
- AssetSpec: Per dbt model/seed/snapshot, containing key, deps, description, metadata, group, tags, owners, and kinds.
- AssetCheckSpec: Per dbt test (optional), linked to the corresponding asset.
Usage Examples
Custom Translator with Key Prefix
from typing import Any, Mapping
from dagster import AssetKey
from dagster_dbt import DagsterDbtTranslator
class CustomDagsterDbtTranslator(DagsterDbtTranslator):
def get_asset_key(self, dbt_resource_props: Mapping[str, Any]) -> AssetKey:
return super().get_asset_key(dbt_resource_props).with_prefix("prefix")
Remapping Source Dependencies
from dagster_dbt import DagsterDbtTranslator
import dagster as dg
class CustomTranslator(DagsterDbtTranslator):
def get_asset_spec(self, manifest, unique_id, project=None):
default_spec = super().get_asset_spec(manifest, unique_id, project)
resource_props = self.get_resource_props(manifest, unique_id)
# Map dbt sources to ingestion asset keys
source_name = resource_props.get("source_name")
if source_name == "raw_taxis":
table_name = resource_props["name"]
return default_spec.replace_attributes(
deps=[dg.AssetDep(dg.AssetKey(table_name))]
)
return default_spec
Configuring Translator Settings
from dagster_dbt import DagsterDbtTranslator, DagsterDbtTranslatorSettings
translator = DagsterDbtTranslator(
settings=DagsterDbtTranslatorSettings(
enable_asset_checks=True,
enable_code_references=True,
enable_source_tests_as_checks=False,
)
)