Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow DagFileProcessorManager Discovery

From Leeroopedia


Knowledge Sources
Domains DAG_Processing, Scheduling
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for continuous DAG file discovery and parallel parsing from the scheduler perspective, provided by the DagFileProcessorManager.

Description

The DagFileProcessorManager orchestrates DAG file discovery by managing a queue of files to parse, spawning DagFileProcessorProcess workers, and collecting results. In the scheduler context, it ensures DAGs are available for scheduling by monitoring file changes and triggering re-parses. It also handles callback requests from completed DAG runs and tracks file processing statistics.

Usage

This implementation is used internally by the scheduler and dag-processor components. It runs continuously and requires no direct user interaction.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: airflow-core/src/airflow/dag_processing/manager.py
  • Lines: L153-1298

Signature

@attrs.define()
class DagFileProcessorManager(LoggingMixin):
    max_runs: int
    bundle_names_to_parse: list[str] | None = None
    processor_timeout: float = conf.getfloat("dag_processor", "processor_timeout")
    _parallelism: int = conf.getint("dag_processor", "parsing_processes")
    _file_process_interval: float = conf.getfloat("dag_processor", "file_process_interval")
    stale_dag_threshold: float = conf.getfloat("dag_processor", "stale_dag_threshold")
    parsing_cleanup_interval: float = conf.getfloat("dag_processor", "parsing_cleanup_interval")
    max_callbacks_per_loop: int = conf.getint("dag_processor", "max_callbacks_per_loop")
    _file_queue: deque[DagFileInfo] = attrs.field(factory=deque)
    _file_stats: dict[DagFileInfo, DagFileStat] = attrs.field(factory=dict)
    _processors: dict[DagFileInfo, DagFileProcessorProcess] = attrs.field(factory=dict)

Import

from airflow.dag_processing.manager import DagFileProcessorManager

I/O Contract

Inputs

Name Type Required Description
max_runs int Yes Maximum parsing cycles (-1 for unlimited)
DAG bundle directories Filesystem paths Yes Configured DAG source directories
Callback requests list[CallbackRequest] No Pending callbacks to execute during parsing

Outputs

Name Type Description
Parsed DAGs Database records Serialized DAGs in metadata DB
File statistics dict[DagFileInfo, DagFileStat] Parse duration, error counts, last parse time
Import errors dict[str, str] File path to error message mapping

Usage Examples

Internal Scheduler Usage

# This is internal to the dag-processor component
# Not typically called directly by users
manager = DagFileProcessorManager(max_runs=-1)
manager.run()

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment