Implementation:Apache Airflow TriggererJobRunner Loop
| Knowledge Sources | |
|---|---|
| Domains | Async_Execution, Core_Infrastructure |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for running asynchronous triggers and managing deferrable tasks provided by the TriggererJobRunner and Trigger model.
Description
The TriggererJobRunner manages the async trigger event loop. It runs as two threads: the main thread handles database operations and heartbeats, while a subthread runs all async trigger code. The Trigger model persists trigger state in the database with encrypted kwargs for security.
Usage
Start the triggerer via airflow triggerer. It must be running for any deferrable operators to function. Configure capacity to control the maximum number of concurrent triggers.
Code Reference
Source Location
- Repository: Apache Airflow
- File: airflow-core/src/airflow/jobs/triggerer_job_runner.py
- Lines: L105-1217
Signature
class TriggererJobRunner(BaseJobRunner, LoggingMixin):
job_type = "TriggererJob"
def __init__(
self,
job: Job,
capacity: int | None = None, # defaults to config [triggerer].capacity
queues: set[str] | None = None,
):
...
Trigger Model:
class Trigger(Base):
__tablename__ = "trigger"
id: Mapped[int]
classpath: Mapped[str] # Trigger class path (e.g., "airflow.triggers.temporal.TimeDeltaTrigger")
encrypted_kwargs: Mapped[str] # Fernet-encrypted kwargs
created_date: Mapped[datetime]
triggerer_id: Mapped[int | None]
queue: Mapped[str | None]
def __init__(
self,
classpath: str,
kwargs: dict[str, Any],
created_date: datetime | None = None,
queue: str | None = None,
): ...
Import
from airflow.jobs.triggerer_job_runner import TriggererJobRunner
from airflow.models.trigger import Trigger
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job | Job | Yes | Job object for heartbeat tracking |
| capacity | int or None | No | Max concurrent triggers (from config) |
| queues | set[str] or None | No | Trigger queue filter |
| Deferred TaskInstances | Database | Yes | Tasks with trigger_id set |
Outputs
| Name | Type | Description |
|---|---|---|
| Trigger events | TriggerEvent | Events that resume deferred tasks |
| State transitions | Database updates | deferred → scheduled state changes |
| Trigger cleanup | Database deletes | Completed triggers removed |
Usage Examples
Start Triggerer
# Start the triggerer component
airflow triggerer
# The triggerer will pick up deferred tasks automatically
Deferrable Operator Pattern
from airflow.triggers.temporal import TimeDeltaTrigger
from datetime import timedelta
class MyDeferrableOperator(BaseOperator):
def execute(self, context):
self.defer(
trigger=TimeDeltaTrigger(timedelta(hours=1)),
method_name="resume_after_wait",
)
def resume_after_wait(self, context, event=None):
# Continue execution after trigger fires
...