Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow TimerProtocol

From Leeroopedia


Knowledge Sources
Domains Observability, Metrics
Last Updated 2026-02-08 21:00 GMT

Overview

Defines the TimerProtocol protocol and its concrete Timer implementation for measuring code execution durations, supporting both context manager and manual start/stop usage patterns.

Description

The protocols.py module provides the timing infrastructure for Airflow's metrics subsystem.

DeltaType

A type alias defined as int | float | datetime.timedelta, representing the accepted types for timing values throughout the metrics system.

TimerProtocol (Protocol)

A structural typing protocol that defines the interface all timer implementations must satisfy:

  • __enter__(self) -> Self -- Enter the context manager, returning the timer instance.
  • __exit__(self, exc_type, exc_value, traceback) -> None -- Exit the context manager.
  • start(self) -> Self -- Start the timer manually (outside context manager usage).
  • stop(self, send: bool = True) -> None -- Stop the timer and optionally submit the metric to the stats backend.

Timer (Implementation)

The concrete Timer class implements TimerProtocol with the following behavior:

  • Constructor: Timer(real_timer: Timer | None = None) -- Accepts an optional "real" timer (typically from a stats backend like pystatsd or dogstatsd). This wrapper pattern is necessary because pystatsd and dogstatsd present different timer APIs, so Timer provides a unified interface.
  • start() -- Records the start time using time.perf_counter() for high-resolution timing. If a real_timer is present, delegates start() to it as well. Returns self for method chaining.
  • stop(send=True) -- Computes the elapsed duration in milliseconds from the start time using time.perf_counter(). Stores the result in self.duration. If send is True and a real_timer is present, delegates stop() to submit the metric to the backend.
  • Context manager: __enter__ calls start() and __exit__ calls stop(), enabling the idiomatic with Stats.timer("metric") as t: pattern.
  • duration: float | None -- The measured duration in milliseconds, available after stop() is called.

Key design decisions:

  • Uses time.perf_counter() instead of datetime arithmetic for accurate duration measurement.
  • Duration is stored in milliseconds (multiplied by 1000.0 from the seconds returned by perf_counter()).
  • The real_timer wrapper pattern accommodates different backend timer APIs without requiring backend-specific subclasses.

Usage

Timer is used throughout the metrics subsystem. It is returned by StatsLogger.timer(), NoStatsLogger.timer(), and SafeDogStatsdLogger.timer(). It can be used as a context manager for automatic timing, or manually started and stopped.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/observability/src/airflow_shared/observability/metrics/protocols.py (122 lines)

Signature

DeltaType = int | float | datetime.timedelta


class TimerProtocol(Protocol):
    """Type protocol for StatsLogger.timer."""

    def __enter__(self) -> Self: ...
    def __exit__(self, exc_type, exc_value, traceback) -> None: ...
    def start(self) -> Self: ...
    def stop(self, send: bool = True) -> None: ...


class Timer(TimerProtocol):
    """Timer that records duration, and optionally sends to StatsD backend."""

    _start_time: float | None
    duration: float | None

    def __init__(self, real_timer: Timer | None = None) -> None: ...
    def __enter__(self) -> Self: ...
    def __exit__(self, exc_type, exc_value, traceback) -> None: ...
    def start(self) -> Self: ...
    def stop(self, send: bool = True) -> None: ...

Import

from airflow_shared.observability.metrics.protocols import Timer, TimerProtocol, DeltaType

I/O Contract

Method Input Output Side Effects
Timer.__init__ None Timer instance None
Timer.start (none) Self Records start time via time.perf_counter(); starts real_timer if present
Timer.stop send: bool = True None Computes duration in ms; stops real_timer if send=True
Timer.__enter__ (none) Self Calls start()
Timer.__exit__ exception info None Calls stop()
Timer.duration (attribute) None Milliseconds elapsed between start and stop; None before stop

Usage Examples

Context Manager Usage

from airflow_shared.observability.metrics.protocols import Timer

# Using Stats.timer() which returns a Timer
with Stats.timer("foos.frob") as t:
    frob_the_foos()

log.info("Frobbing the foos took %.2f ms", t.duration)

Manual Start/Stop Usage

from airflow_shared.observability.metrics.protocols import Timer

timer = Timer()
timer.start()

# ... perform work ...
frob_the_foos()

timer.stop(send=False)  # Don't send to backend, just measure locally
log.info("Operation took %.2f ms", timer.duration)

Context Manager Without Sending to Backend

from airflow_shared.observability.metrics.protocols import Timer

# Empty timer (no real_timer) -- useful for local measurement only
with Timer() as t:
    perform_operation()

print(f"Duration: {t.duration:.2f} ms")

Wrapping a Backend Timer

from airflow_shared.observability.metrics.protocols import Timer

# Wrap a DogStatsd timed() context manager
dogstatsd_timed = dogstatsd_client.timed("my.metric", tags=["env:prod"])
timer = Timer(real_timer=dogstatsd_timed)

with timer as t:
    do_work()
# Both local duration and backend metric are recorded

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment