Implementation:Apache Airflow TimerProtocol
| Knowledge Sources | |
|---|---|
| Domains | Observability, Metrics |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Defines the TimerProtocol protocol and its concrete Timer implementation for measuring code execution durations, supporting both context manager and manual start/stop usage patterns.
Description
The protocols.py module provides the timing infrastructure for Airflow's metrics subsystem.
DeltaType
A type alias defined as int | float | datetime.timedelta, representing the accepted types for timing values throughout the metrics system.
TimerProtocol (Protocol)
A structural typing protocol that defines the interface all timer implementations must satisfy:
__enter__(self) -> Self-- Enter the context manager, returning the timer instance.__exit__(self, exc_type, exc_value, traceback) -> None-- Exit the context manager.start(self) -> Self-- Start the timer manually (outside context manager usage).stop(self, send: bool = True) -> None-- Stop the timer and optionally submit the metric to the stats backend.
Timer (Implementation)
The concrete Timer class implements TimerProtocol with the following behavior:
- Constructor:
Timer(real_timer: Timer | None = None)-- Accepts an optional "real" timer (typically from a stats backend like pystatsd or dogstatsd). This wrapper pattern is necessary because pystatsd and dogstatsd present different timer APIs, soTimerprovides a unified interface.
start()-- Records the start time usingtime.perf_counter()for high-resolution timing. If areal_timeris present, delegatesstart()to it as well. Returnsselffor method chaining.
stop(send=True)-- Computes the elapsed duration in milliseconds from the start time usingtime.perf_counter(). Stores the result inself.duration. IfsendisTrueand areal_timeris present, delegatesstop()to submit the metric to the backend.
- Context manager:
__enter__callsstart()and__exit__callsstop(), enabling the idiomaticwith Stats.timer("metric") as t:pattern.
duration: float | None-- The measured duration in milliseconds, available afterstop()is called.
Key design decisions:
- Uses
time.perf_counter()instead ofdatetimearithmetic for accurate duration measurement. - Duration is stored in milliseconds (multiplied by 1000.0 from the seconds returned by
perf_counter()). - The
real_timerwrapper pattern accommodates different backend timer APIs without requiring backend-specific subclasses.
Usage
Timer is used throughout the metrics subsystem. It is returned by StatsLogger.timer(), NoStatsLogger.timer(), and SafeDogStatsdLogger.timer(). It can be used as a context manager for automatic timing, or manually started and stopped.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/observability/src/airflow_shared/observability/metrics/protocols.py(122 lines)
Signature
DeltaType = int | float | datetime.timedelta
class TimerProtocol(Protocol):
"""Type protocol for StatsLogger.timer."""
def __enter__(self) -> Self: ...
def __exit__(self, exc_type, exc_value, traceback) -> None: ...
def start(self) -> Self: ...
def stop(self, send: bool = True) -> None: ...
class Timer(TimerProtocol):
"""Timer that records duration, and optionally sends to StatsD backend."""
_start_time: float | None
duration: float | None
def __init__(self, real_timer: Timer | None = None) -> None: ...
def __enter__(self) -> Self: ...
def __exit__(self, exc_type, exc_value, traceback) -> None: ...
def start(self) -> Self: ...
def stop(self, send: bool = True) -> None: ...
Import
from airflow_shared.observability.metrics.protocols import Timer, TimerProtocol, DeltaType
I/O Contract
| Method | Input | Output | Side Effects |
|---|---|---|---|
Timer.__init__ |
None | Timer instance | None |
Timer.start |
(none) | Self |
Records start time via time.perf_counter(); starts real_timer if present
|
Timer.stop |
send: bool = True |
None |
Computes duration in ms; stops real_timer if send=True
|
Timer.__enter__ |
(none) | Self |
Calls start()
|
Timer.__exit__ |
exception info | None |
Calls stop()
|
Timer.duration |
(attribute) | None | Milliseconds elapsed between start and stop; None before stop
|
Usage Examples
Context Manager Usage
from airflow_shared.observability.metrics.protocols import Timer
# Using Stats.timer() which returns a Timer
with Stats.timer("foos.frob") as t:
frob_the_foos()
log.info("Frobbing the foos took %.2f ms", t.duration)
Manual Start/Stop Usage
from airflow_shared.observability.metrics.protocols import Timer
timer = Timer()
timer.start()
# ... perform work ...
frob_the_foos()
timer.stop(send=False) # Don't send to backend, just measure locally
log.info("Operation took %.2f ms", timer.duration)
Context Manager Without Sending to Backend
from airflow_shared.observability.metrics.protocols import Timer
# Empty timer (no real_timer) -- useful for local measurement only
with Timer() as t:
perform_operation()
print(f"Duration: {t.duration:.2f} ms")
Wrapping a Backend Timer
from airflow_shared.observability.metrics.protocols import Timer
# Wrap a DogStatsd timed() context manager
dogstatsd_timed = dogstatsd_client.timed("my.metric", tags=["env:prod"])
timer = Timer(real_timer=dogstatsd_timed)
with timer as t:
do_work()
# Both local duration and backend metric are recorded