Implementation:Apache Airflow DagBag Loader
| Knowledge Sources | |
|---|---|
| Domains | Testing, DAG_Processing |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for loading and validating DAG files provided by the Airflow DagBag class.
Description
The DagBag class loads DAGs from a specified folder by importing Python files, extracting DAG objects, and tracking import errors. It supports safe mode (only parsing files that contain "DAG" or "airflow" strings), example DAG inclusion, and pool validation. DagBag is used both for local testing and by the DagFileProcessorManager for production DAG discovery.
Usage
Use DagBag directly in tests or scripts to validate DAG files. The airflow dags test CLI command internally uses DagBag for DAG loading before executing a test run.
Code Reference
Source Location
- Repository: Apache Airflow
- File: airflow-core/src/airflow/dag_processing/dagbag.py
- Lines: L162-600
Signature
class DagBag(LoggingMixin):
def __init__(
self,
dag_folder: str | Path | None = None,
include_examples: bool | ArgNotSet = NOTSET,
safe_mode: bool | ArgNotSet = NOTSET,
load_op_links: bool = True,
collect_dags: bool = True,
known_pools: set[str] | None = None,
bundle_path: Path | None = None,
bundle_name: str | None = None,
):
...
Import
from airflow.dag_processing.dagbag import DagBag
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| dag_folder | str or Path or None | No | Path to DAGs directory (defaults to config) |
| include_examples | bool | No | Include built-in example DAGs |
| safe_mode | bool | No | Only parse files containing "DAG" or "airflow" |
| collect_dags | bool | No | Auto-collect DAGs on init (default True) |
| known_pools | set[str] or None | No | Set of valid pool names for validation |
Outputs
| Name | Type | Description |
|---|---|---|
| dags | dict[str, DAG] | Successfully loaded DAGs keyed by dag_id |
| import_errors | dict[str, str] | Files with errors keyed by file path |
| dag_ids | set[str] | Set of all discovered dag_ids |
Usage Examples
Validate DAGs in Tests
from airflow.dag_processing.dagbag import DagBag
def test_no_import_errors():
dagbag = DagBag(dag_folder="/path/to/dags", include_examples=False)
assert len(dagbag.import_errors) == 0, f"Errors: {dagbag.import_errors}"
def test_dag_loaded():
dagbag = DagBag(dag_folder="/path/to/dags")
dag = dagbag.dags.get("my_dag_id")
assert dag is not None
assert len(dag.tasks) > 0