Implementation:Apache Airflow DagFileProcessorManager Orchestration
| Knowledge Sources | |
|---|---|
| Domains | DAG_Processing, Concurrency |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for orchestrating parallel DAG file parsing provided by the Airflow dag-processor component.
Description
The DagFileProcessorManager (attrs-based) manages parallel DAG parsing by coordinating multiple DagFileProcessorProcess workers. It handles file queue management, process lifecycle, callback execution, and statistics tracking. The manager uses the selectors module for I/O multiplexing to monitor worker processes.
Usage
The DagFileProcessorManager runs as the dag-processor component. It is started automatically by Airflow and does not require direct instantiation in user code.
Code Reference
Source Location
- Repository: Apache Airflow
- File: airflow-core/src/airflow/dag_processing/manager.py
- Lines: L153-1298
Signature
@attrs.define()
class DagFileProcessorManager(LoggingMixin):
max_runs: int
bundle_names_to_parse: list[str] | None = None
processor_timeout: float = conf.getfloat("dag_processor", "processor_timeout")
_parallelism: int = conf.getint("dag_processor", "parsing_processes")
_file_process_interval: float = conf.getfloat("dag_processor", "file_process_interval")
stale_dag_threshold: float = conf.getfloat("dag_processor", "stale_dag_threshold")
max_callbacks_per_loop: int = conf.getint("dag_processor", "max_callbacks_per_loop")
# ... additional internal fields for process management
DagFileProcessorProcess:
class DagFileProcessorProcess(WatchedSubprocess):
@classmethod
def start(
cls,
*,
path: str | os.PathLike[str],
bundle_path: Path,
bundle_name: str,
callbacks: list[CallbackRequest],
client: Client,
**kwargs,
) -> Self:
...
Import
from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.processor import DagFileProcessorProcess
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| max_runs | int | Yes | Maximum parsing cycles before exit |
| bundle_names_to_parse | list[str] or None | No | Specific bundles to parse |
| DAG files | Python files | Yes | DAG files in configured bundle directories |
Outputs
| Name | Type | Description |
|---|---|---|
| Serialized DAGs | Database rows | DAGs persisted to metadata database |
| File statistics | DagFileStat | Parse timing and error counts per file |
| Import errors | dict | Tracked errors per file path |
Usage Examples
CLI Usage
# Start the dag-processor component
airflow dag-processor
# The DagFileProcessorManager is started internally
# and continuously processes DAG files