Implementation:Apache Airflow SafeOtelLogger
| Knowledge Sources | |
|---|---|
| Domains | Observability, Metrics, OpenTelemetry |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
OpenTelemetry-based metrics logger that provides a safe, validated interface for emitting counters, gauges, and timers as OTel instruments, with support for metric name validation, sampling rates, and backward-compatible gauge deltas.
Description
The SafeOtelLogger class is the primary interface for emitting metrics in Airflow when OpenTelemetry is enabled. It wraps the OTel SDK's MeterProvider and manages a MetricsMap that lazily creates and caches OTel instrument instances (counters, gauges) as they are first referenced.
Key design decisions:
- Counters vs. UpDownCounters: Most counters are monotonic OTel Counters. The special metric
airflow.dag_processing.processesis registered as an UpDownCounter because it can decrease. This is tracked via theUP_DOWN_COUNTERSset. - Timers as Gauges: OTel has no native timer instrument. Timer values are stored as Gauge readings representing elapsed milliseconds.
- Name Safety: All metric names are validated against OTel name length limits (
OTEL_NAME_MAX_LENGTH) and pattern allow/block lists before emission. - Sampling: The
rateparameter onincr,decr, andgaugesupports probabilistic sampling to reduce metric volume.
Supporting classes:
- MetricsMap -- Internal cache that stores and retrieves OTel instruments (counters and
InternalGaugeinstances) keyed by metric name and attributes. - InternalGauge -- Wraps an OTel sync gauge with a stored current value to support delta mode (adding to the previous value rather than replacing it).
- _OtelTimer -- Context manager implementing the
Timerprotocol; records elapsed duration as a gauge value onstop().
Usage
from airflow_shared.observability.metrics.otel_logger import SafeOtelLogger, get_otel_logger
# Create a logger via the factory function
logger = get_otel_logger(host="localhost", port=4318, prefix="airflow")
# Increment a counter
logger.incr("operator_successes", count=1, tags={"operator_name": "PythonOperator"})
# Set a gauge value
logger.gauge("pool.open_slots", value=10, tags={"pool_name": "default_pool"})
# Record a timing
logger.timing("task.duration", dt=2345.6, tags={"dag_id": "my_dag", "task_id": "extract"})
# Use the timer context manager
with logger.timer("scheduler.scheduler_loop_duration") as t:
# ... do work ...
pass
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/observability/src/airflow_shared/observability/metrics/otel_logger.py
Key Classes
SafeOtelLogger (lines 165-291)
class SafeOtelLogger:
"""Otel Logger."""
def __init__(
self,
otel_provider,
prefix: str = DEFAULT_METRIC_NAME_PREFIX,
metrics_validator: ListValidator = PatternAllowListValidator(),
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
): ...
def incr(self, stat: str, count: int = 1, rate: float = 1, tags: Attributes = None): ...
def decr(self, stat: str, count: int = 1, rate: float = 1, tags: Attributes = None): ...
def gauge(self, stat: str, value: int | float, rate: float = 1, delta: bool = False,
*, tags: Attributes = None, back_compat_name: str = "") -> None: ...
def timing(self, stat: str, dt: DeltaType, *, tags: Attributes = None) -> None: ...
def timer(self, stat: str | None = None, *args, tags: Attributes = None, **kwargs) -> Timer: ...
MetricsMap (lines 313-373)
class MetricsMap:
"""Stores Otel Instruments."""
def __init__(self, meter): ...
def clear(self) -> None: ...
def get_counter(self, name: str, attributes: Attributes = None): ...
def del_counter(self, name: str, attributes: Attributes = None) -> None: ...
def set_gauge_value(self, name: str, value: int | float, delta: bool, tags: Attributes): ...
InternalGauge (lines 294-310)
class InternalGauge:
"""Stores sync gauge instrument and current value to support delta feature."""
def __init__(self, meter, name: str, tags: Attributes): ...
def set_value(self, new_value: int | float, delta: bool): ...
_OtelTimer (lines 141-162)
class _OtelTimer(Timer):
"""An implementation of Stats.Timer() which records the result in the OTel Metrics Map."""
def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags: Attributes): ...
def stop(self, send: bool = True) -> None: ...
Key Functions
get_otel_logger (lines 376-431)
def get_otel_logger(
*,
host: str | None = None,
port: int | None = None,
prefix: str | None = None,
ssl_active: bool = False,
conf_interval: float | None = None,
debug: bool = False,
service_name: str | None = None,
metrics_allow_list: str | None = None,
metrics_block_list: str | None = None,
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> SafeOtelLogger: ...
Utility Functions
def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
"""Assembles the prefix, delimiter, and name and returns it as a string."""
def name_is_otel_safe(prefix: str, name: str) -> bool:
"""Return True if the provided name and prefix would result in a name
that meets the OpenTelemetry standard."""
Import
from airflow_shared.observability.metrics.otel_logger import SafeOtelLogger, get_otel_logger
from airflow_shared.observability.metrics.otel_logger import full_name, name_is_otel_safe
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| otel_provider | MeterProvider | Yes | OpenTelemetry MeterProvider instance |
| prefix | str | No | Metric name prefix (default: "airflow")
|
| metrics_validator | ListValidator | No | Allow/block list validator for metric names |
| stat | str | Yes (per method) | The metric name to emit |
| count | int | No | Increment/decrement value (default: 1) |
| rate | float | No | Sample rate between 0 and 1 (default: 1.0) |
| tags | Attributes | No | OTel attributes (key-value pairs) for dimensional metrics |
| delta | bool | No | If True, gauge value is added to previous reading |
Outputs
| Name | Type | Description |
|---|---|---|
| OTel Counter | opentelemetry Counter/UpDownCounter | Counter instrument created and cached in MetricsMap |
| OTel Gauge | InternalGauge | Gauge wrapper with stored value and delta support |
| Timer | _OtelTimer | Context manager that records duration as a gauge on exit |
| SafeOtelLogger | SafeOtelLogger | Returned by get_otel_logger() factory
|
Usage Examples
Creating an OTel Logger with Factory
from airflow_shared.observability.metrics.otel_logger import get_otel_logger
# Typical initialization from Airflow config
logger = get_otel_logger(
host="otel-collector.monitoring.svc",
port=4318,
prefix="airflow",
ssl_active=False,
service_name="airflow-scheduler",
metrics_allow_list="operator_failures,task.duration,pool.*",
)
Using Delta Gauges
# Set absolute value
logger.gauge("pool.open_slots", value=10, tags={"pool_name": "default"})
# Add delta to existing value (10 + 3 = 13)
logger.gauge("pool.open_slots", value=3, delta=True, tags={"pool_name": "default"})
Timer Context Manager
with logger.timer("scheduler.critical_section_duration") as timer:
# Perform critical section work
process_task_instances()
# Duration is automatically recorded as a gauge value in milliseconds