Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow SchedulerJobRunner Loop

From Leeroopedia


Knowledge Sources
Domains Scheduling, Core_Infrastructure
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for the main scheduler loop and DagRun creation provided by the SchedulerJobRunner and DagRun model.

Description

The SchedulerJobRunner implements the main scheduler loop: evaluating timetables, creating DagRuns, scheduling TaskInstances, and dispatching them to executors. The DagRun model represents a single invocation of a DAG with its associated state, data interval, and task instances.

Usage

The SchedulerJobRunner is started via airflow scheduler and runs continuously. DagRun objects are created automatically by the scheduler and can also be created via the API for manual triggers.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: airflow-core/src/airflow/jobs/scheduler_job_runner.py
  • Lines: L216-3186

Signature

class SchedulerJobRunner(BaseJobRunner, LoggingMixin):
    job_type = "SchedulerJob"

    def __init__(
        self,
        job: Job,
        num_runs: int = conf.getint("scheduler", "num_runs"),
        scheduler_idle_sleep_time: float = conf.getfloat("scheduler", "scheduler_idle_sleep_time"),
        log: Logger | None = None,
    ):
        ...

DagRun Model:

class DagRun(Base, LoggingMixin):
    __tablename__ = "dag_run"

    id: Mapped[int]
    dag_id: Mapped[str]
    run_id: Mapped[str]
    run_type: Mapped[str]        # scheduled, manual, backfill
    logical_date: Mapped[datetime | None]
    data_interval_start: Mapped[datetime | None]
    data_interval_end: Mapped[datetime | None]
    run_after: Mapped[datetime]
    state: Mapped[str]           # queued, running, success, failed
    conf: Mapped[dict | None]
    last_scheduling_decision: Mapped[datetime | None]
    bundle_version: Mapped[str | None]

Import

from airflow.jobs.scheduler_job_runner import SchedulerJobRunner
from airflow.models.dagrun import DagRun

I/O Contract

Inputs

Name Type Required Description
job Job Yes The Job object for heartbeat tracking
num_runs int No Number of scheduler loops (default from config)
Serialized DAGs Database Yes DAGs parsed by dag-processor
Pool availability Database Yes Current pool slot allocations

Outputs

Name Type Description
DagRun records Database rows New DagRun entries in metadata DB
TaskInstance records Database rows Task instances created for each run
Scheduling decisions Timestamps last_scheduling_decision per DagRun

Usage Examples

Start Scheduler

# Start the scheduler
airflow scheduler

# With specific number of runs (useful for testing)
airflow scheduler --num-runs 10

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment