Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Apache Airflow TriggererJobRunner Loop

From Leeroopedia


Knowledge Sources
Domains Async_Execution, Core_Infrastructure
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for running asynchronous triggers and managing deferrable tasks provided by the TriggererJobRunner and Trigger model.

Description

The TriggererJobRunner manages the async trigger event loop. It runs as two threads: the main thread handles database operations and heartbeats, while a subthread runs all async trigger code. The Trigger model persists trigger state in the database with encrypted kwargs for security.

Usage

Start the triggerer via airflow triggerer. It must be running for any deferrable operators to function. Configure capacity to control the maximum number of concurrent triggers.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: airflow-core/src/airflow/jobs/triggerer_job_runner.py
  • Lines: L105-1217

Signature

class TriggererJobRunner(BaseJobRunner, LoggingMixin):
    job_type = "TriggererJob"

    def __init__(
        self,
        job: Job,
        capacity: int | None = None,   # defaults to config [triggerer].capacity
        queues: set[str] | None = None,
    ):
        ...

Trigger Model:

class Trigger(Base):
    __tablename__ = "trigger"

    id: Mapped[int]
    classpath: Mapped[str]          # Trigger class path (e.g., "airflow.triggers.temporal.TimeDeltaTrigger")
    encrypted_kwargs: Mapped[str]   # Fernet-encrypted kwargs
    created_date: Mapped[datetime]
    triggerer_id: Mapped[int | None]
    queue: Mapped[str | None]

    def __init__(
        self,
        classpath: str,
        kwargs: dict[str, Any],
        created_date: datetime | None = None,
        queue: str | None = None,
    ): ...

Import

from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.trigger import Trigger

I/O Contract

Inputs

Name Type Required Description
job Job Yes Job object for heartbeat tracking
capacity int or None No Max concurrent triggers (from config)
queues set[str] or None No Trigger queue filter
Deferred TaskInstances Database Yes Tasks with trigger_id set

Outputs

Name Type Description
Trigger events TriggerEvent Events that resume deferred tasks
State transitions Database updates deferred → scheduled state changes
Trigger cleanup Database deletes Completed triggers removed

Usage Examples

Start Triggerer

# Start the triggerer component
airflow triggerer

# The triggerer will pick up deferred tasks automatically

Deferrable Operator Pattern

from airflow.triggers.temporal import TimeDeltaTrigger
from datetime import timedelta

class MyDeferrableOperator(BaseOperator):
    def execute(self, context):
        self.defer(
            trigger=TimeDeltaTrigger(timedelta(hours=1)),
            method_name="resume_after_wait",
        )

    def resume_after_wait(self, context, event=None):
        # Continue execution after trigger fires
        ...

Related Pages

Implements Principle

Requires Environment

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment