Implementation:Apache Airflow DagFileProcessorManager Discovery
| Knowledge Sources | |
|---|---|
| Domains | DAG_Processing, Scheduling |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for continuous DAG file discovery and parallel parsing from the scheduler perspective, provided by the DagFileProcessorManager.
Description
The DagFileProcessorManager orchestrates DAG file discovery by managing a queue of files to parse, spawning DagFileProcessorProcess workers, and collecting results. In the scheduler context, it ensures DAGs are available for scheduling by monitoring file changes and triggering re-parses. It also handles callback requests from completed DAG runs and tracks file processing statistics.
Usage
This implementation is used internally by the scheduler and dag-processor components. It runs continuously and requires no direct user interaction.
Code Reference
Source Location
- Repository: Apache Airflow
- File: airflow-core/src/airflow/dag_processing/manager.py
- Lines: L153-1298
Signature
@attrs.define()
class DagFileProcessorManager(LoggingMixin):
max_runs: int
bundle_names_to_parse: list[str] | None = None
processor_timeout: float = conf.getfloat("dag_processor", "processor_timeout")
_parallelism: int = conf.getint("dag_processor", "parsing_processes")
_file_process_interval: float = conf.getfloat("dag_processor", "file_process_interval")
stale_dag_threshold: float = conf.getfloat("dag_processor", "stale_dag_threshold")
parsing_cleanup_interval: float = conf.getfloat("dag_processor", "parsing_cleanup_interval")
max_callbacks_per_loop: int = conf.getint("dag_processor", "max_callbacks_per_loop")
_file_queue: deque[DagFileInfo] = attrs.field(factory=deque)
_file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(factory=dict)
_processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict)
Import
from airflow.dag_processing.manager import DagFileProcessorManager
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| max_runs | int | Yes | Maximum parsing cycles (-1 for unlimited) |
| DAG bundle directories | Filesystem paths | Yes | Configured DAG source directories |
| Callback requests | list[CallbackRequest] | No | Pending callbacks to execute during parsing |
Outputs
| Name | Type | Description |
|---|---|---|
| Parsed DAGs | Database records | Serialized DAGs in metadata DB |
| File statistics | dict[DagFileInfo, DagFileStat] | Parse duration, error counts, last parse time |
| Import errors | dict[str, str] | File path to error message mapping |
Usage Examples
Internal Scheduler Usage
# This is internal to the dag-processor component
# Not typically called directly by users
manager = DagFileProcessorManager(max_runs=-1)
manager.run()