Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow DagFileProcessorManager Orchestration

From Leeroopedia


Knowledge Sources
Domains DAG_Processing, Concurrency
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for orchestrating parallel DAG file parsing provided by the Airflow dag-processor component.

Description

The DagFileProcessorManager (attrs-based) manages parallel DAG parsing by coordinating multiple DagFileProcessorProcess workers. It handles file queue management, process lifecycle, callback execution, and statistics tracking. The manager uses the selectors module for I/O multiplexing to monitor worker processes.

Usage

The DagFileProcessorManager runs as the dag-processor component. It is started automatically by Airflow and does not require direct instantiation in user code.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: airflow-core/src/airflow/dag_processing/manager.py
  • Lines: L153-1298

Signature

@attrs.define()
class DagFileProcessorManager(LoggingMixin):
    max_runs: int
    bundle_names_to_parse: list[str] | None = None
    processor_timeout: float = conf.getfloat("dag_processor", "processor_timeout")
    _parallelism: int = conf.getint("dag_processor", "parsing_processes")
    _file_process_interval: float = conf.getfloat("dag_processor", "file_process_interval")
    stale_dag_threshold: float = conf.getfloat("dag_processor", "stale_dag_threshold")
    max_callbacks_per_loop: int = conf.getint("dag_processor", "max_callbacks_per_loop")
    # ... additional internal fields for process management

DagFileProcessorProcess:

class DagFileProcessorProcess(WatchedSubprocess):
    @classmethod
    def start(
        cls,
        *,
        path: str | os.PathLike[str],
        bundle_path: Path,
        bundle_name: str,
        callbacks: list[CallbackRequest],
        client: Client,
        **kwargs,
    ) -> Self:
        ...

Import

from airflow.dag_processing.manager import DagFileProcessorManager
from airflow.dag_processing.processor import DagFileProcessorProcess

I/O Contract

Inputs

Name Type Required Description
max_runs int Yes Maximum parsing cycles before exit
bundle_names_to_parse list[str] or None No Specific bundles to parse
DAG files Python files Yes DAG files in configured bundle directories

Outputs

Name Type Description
Serialized DAGs Database rows DAGs persisted to metadata database
File statistics DagFileStat Parse timing and error counts per file
Import errors dict Tracked errors per file path

Usage Examples

CLI Usage

# Start the dag-processor component
airflow dag-processor

# The DagFileProcessorManager is started internally
# and continuously processes DAG files

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment