Implementation:Astronomer Astronomer cosmos LegacyDbtProject Parser
| Knowledge Sources | |
|---|---|
| Domains | dbt Parsing, Project Discovery, DAG Construction |
| Last Updated | 2026-02-07 17:00 GMT |
Overview
Legacy dbt project parser that walks a dbt project directory tree to discover and extract models, snapshots, seeds, and tests along with their upstream dependencies and configuration metadata.
Description
The cosmos.dbt.parser.project module provides a file-system-based parser for dbt projects. It is considered the "legacy" parser because newer versions of Cosmos support parsing via dbt ls and manifest files. This parser operates entirely by reading SQL, Python, CSV, and YAML files from the project directory.
DbtModelType is an enumeration with four members:
DBT_MODEL-- A dbt model (SQL or Python)DBT_SNAPSHOT-- A dbt snapshotDBT_SEED-- A dbt seed (CSV data)DBT_TEST-- A dbt test extracted from YAML column definitions
DbtModelConfig is a dataclass that holds:
config_selectors-- A set of strings in"key:value"format (e.g.,"materialized:table","tags:daily")upstream_models-- A set of model names that this model depends on
DbtModelConfig supports addition via __add__, which merges two configs with an order-of-operations rule: SQL-file-level configs (materialized, schema) take precedence over properties.yml configs. Tags are always collected from both sources.
DbtModel is a dataclass representing a single dbt entity. On initialization (__post_init__), it reads the file at its path and extracts:
- For SQL files: Uses Jinja2 template parsing to find
ref()calls (upstream dependencies) andconfig()calls (materialized, schema, tags selectors) - For Python files: Uses Python AST parsing via
extract_python_file_upstream_requirements()to finddbt.ref()calls within themodel()function - For Snapshots: Extracts the snapshot name from the
Template:% snapshot name %Jinja block and parses the body for refs - For Seeds and Tests: No file parsing is performed
The standalone function extract_python_file_upstream_requirements(code) parses Python source code using ast.parse(), locates the model() function definition, walks its AST to find all .ref() method calls, and returns the list of referenced entity names.
LegacyDbtProject is the main entry point dataclass. Given a project_name and optional directory overrides, its __post_init__ method:
- Resolves directory paths (defaults:
/usr/local/airflow/dags/dbt/{project_name}, models inmodels/, snapshots insnapshots/, seeds inseeds/) - Recursively globs
*.sqland*.pyfiles under the models directory - Recursively globs
*.sqlfiles under the snapshots directory - Recursively globs
*.csvfiles under the seeds directory - Recursively globs
*.ymlconfig files under the models directory to extract column-level tests and merge config selectors
The YAML config handler (_handle_config_file) processes the models: key in properties.yml files. For each model entry, it extracts column-level tests (creating DbtModel entries of type DBT_TEST with upstream dependencies) and config selectors. If no materialized config is found, it defaults to "materialized:view" to match dbt's default behaviour.
Usage
Use this parser when you need to resolve dbt project structure without invoking the dbt CLI. It is suitable for environments where dbt is not installed or for fast graph resolution from the file system. For production use with complex projects, prefer the dbt ls or manifest-based load modes.
Code Reference
Source Location
- Repository: Astronomer_Astronomer_cosmos
- File: cosmos/dbt/parser/project.py
- Lines: 1-431
Signature
class DbtModelType(Enum):
DBT_MODEL = "model"
DBT_SNAPSHOT = "snapshot"
DBT_SEED = "seed"
DBT_TEST = "test"
@dataclass
class DbtModelConfig:
config_types: ClassVar[list[str]] = ["materialized", "schema", "tags"]
config_selectors: set[str] = field(default_factory=set)
upstream_models: set[str] = field(default_factory=set)
def __add__(self, other_config: DbtModelConfig) -> DbtModelConfig: ...
def extract_python_file_upstream_requirements(code: str) -> list[str]: ...
@dataclass
class DbtModel:
name: str
type: DbtModelType
path: Path
dbt_vars: dict[str, str] = field(default_factory=dict)
config: DbtModelConfig = field(default_factory=DbtModelConfig)
@dataclass
class LegacyDbtProject:
project_name: str
dbt_root_path: str | None = None
dbt_models_dir: str | None = None
dbt_snapshots_dir: str | None = None
dbt_seeds_dir: str | None = None
models: dict[str, DbtModel] = field(default_factory=dict)
snapshots: dict[str, DbtModel] = field(default_factory=dict)
seeds: dict[str, DbtModel] = field(default_factory=dict)
tests: dict[str, DbtModel] = field(default_factory=dict)
dbt_vars: dict[str, str] = field(default_factory=dict)
Import
from cosmos.dbt.parser.project import LegacyDbtProject, DbtModel, DbtModelType, DbtModelConfig
from cosmos.dbt.parser.project import extract_python_file_upstream_requirements
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| project_name | str | Yes | Name of the dbt project directory (e.g., "jaffle_shop")
|
| dbt_root_path | str or None | No | Root directory containing dbt projects. Defaults to "/usr/local/airflow/dags/dbt"
|
| dbt_models_dir | str or None | No | Subdirectory name for models within the project. Defaults to "models"
|
| dbt_snapshots_dir | str or None | No | Subdirectory name for snapshots within the project. Defaults to "snapshots"
|
| dbt_seeds_dir | str or None | No | Subdirectory name for seeds within the project. Defaults to "seeds"
|
| dbt_vars | dict[str, str] | No | Dictionary of dbt variables used to resolve var() calls in Jinja ref expressions
|
Outputs
| Name | Type | Description |
|---|---|---|
| models | dict[str, DbtModel] | Dictionary mapping model names to their DbtModel instances, populated from SQL and Python files under the models directory |
| snapshots | dict[str, DbtModel] | Dictionary mapping snapshot names to their DbtModel instances, populated from SQL files under the snapshots directory |
| seeds | dict[str, DbtModel] | Dictionary mapping seed names to their DbtModel instances, populated from CSV files under the seeds directory |
| tests | dict[str, DbtModel] | Dictionary mapping test names to their DbtModel instances, extracted from column-level tests in YAML config files |
| project_dir | Path | Resolved absolute path to the dbt project directory |
| models_dir | Path | Resolved absolute path to the models subdirectory |
| snapshots_dir | Path | Resolved absolute path to the snapshots subdirectory |
| seeds_dir | Path | Resolved absolute path to the seeds subdirectory |
Usage Examples
Basic Example
from cosmos.dbt.parser.project import LegacyDbtProject
# Parse a dbt project located at /usr/local/airflow/dags/dbt/jaffle_shop
project = LegacyDbtProject(project_name="jaffle_shop")
# Access discovered models
for name, model in project.models.items():
print(f"Model: {name}")
print(f" Path: {model.path}")
print(f" Upstream: {model.config.upstream_models}")
print(f" Config: {model.config.config_selectors}")
# Access discovered seeds
for name, seed in project.seeds.items():
print(f"Seed: {name}")
# Access discovered tests
for name, test in project.tests.items():
print(f"Test: {name}, depends on: {test.config.upstream_models}")
Custom Directory Layout Example
from cosmos.dbt.parser.project import LegacyDbtProject
project = LegacyDbtProject(
project_name="my_project",
dbt_root_path="/opt/dbt/projects",
dbt_models_dir="src/models",
dbt_snapshots_dir="src/snapshots",
dbt_seeds_dir="data/seeds",
dbt_vars={"schema_prefix": "prod"},
)
Extracting Python Model Dependencies
from cosmos.dbt.parser.project import extract_python_file_upstream_requirements
python_model_code = """
def model(dbt, session):
upstream_df = dbt.ref("stg_customers")
orders_df = dbt.ref("stg_orders")
return upstream_df.join(orders_df, on="customer_id")
"""
deps = extract_python_file_upstream_requirements(python_model_code)
print(deps) # ['stg_customers', 'stg_orders']
Related Pages
- cosmos.airflow.dag.DbtDag -- Uses project parsing to build Airflow DAGs
- cosmos.airflow.task_group.DbtTaskGroup -- Uses project parsing to build Airflow TaskGroups