Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow SafeOtelLogger

From Leeroopedia


Knowledge Sources
Domains Observability, Metrics, OpenTelemetry
Last Updated 2026-02-08 21:00 GMT

Overview

OpenTelemetry-based metrics logger that provides a safe, validated interface for emitting counters, gauges, and timers as OTel instruments, with support for metric name validation, sampling rates, and backward-compatible gauge deltas.

Description

The SafeOtelLogger class is the primary interface for emitting metrics in Airflow when OpenTelemetry is enabled. It wraps the OTel SDK's MeterProvider and manages a MetricsMap that lazily creates and caches OTel instrument instances (counters, gauges) as they are first referenced.

Key design decisions:

  • Counters vs. UpDownCounters: Most counters are monotonic OTel Counters. The special metric airflow.dag_processing.processes is registered as an UpDownCounter because it can decrease. This is tracked via the UP_DOWN_COUNTERS set.
  • Timers as Gauges: OTel has no native timer instrument. Timer values are stored as Gauge readings representing elapsed milliseconds.
  • Name Safety: All metric names are validated against OTel name length limits (OTEL_NAME_MAX_LENGTH) and pattern allow/block lists before emission.
  • Sampling: The rate parameter on incr, decr, and gauge supports probabilistic sampling to reduce metric volume.

Supporting classes:

  • MetricsMap -- Internal cache that stores and retrieves OTel instruments (counters and InternalGauge instances) keyed by metric name and attributes.
  • InternalGauge -- Wraps an OTel sync gauge with a stored current value to support delta mode (adding to the previous value rather than replacing it).
  • _OtelTimer -- Context manager implementing the Timer protocol; records elapsed duration as a gauge value on stop().

Usage

from airflow_shared.observability.metrics.otel_logger import SafeOtelLogger, get_otel_logger

# Create a logger via the factory function
logger = get_otel_logger(host="localhost", port=4318, prefix="airflow")

# Increment a counter
logger.incr("operator_successes", count=1, tags={"operator_name": "PythonOperator"})

# Set a gauge value
logger.gauge("pool.open_slots", value=10, tags={"pool_name": "default_pool"})

# Record a timing
logger.timing("task.duration", dt=2345.6, tags={"dag_id": "my_dag", "task_id": "extract"})

# Use the timer context manager
with logger.timer("scheduler.scheduler_loop_duration") as t:
    # ... do work ...
    pass

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/observability/src/airflow_shared/observability/metrics/otel_logger.py

Key Classes

SafeOtelLogger (lines 165-291)

class SafeOtelLogger:
    """Otel Logger."""

    def __init__(
        self,
        otel_provider,
        prefix: str = DEFAULT_METRIC_NAME_PREFIX,
        metrics_validator: ListValidator = PatternAllowListValidator(),
        stat_name_handler: Callable[[str], str] | None = None,
        statsd_influxdb_enabled: bool = False,
    ): ...

    def incr(self, stat: str, count: int = 1, rate: float = 1, tags: Attributes = None): ...
    def decr(self, stat: str, count: int = 1, rate: float = 1, tags: Attributes = None): ...
    def gauge(self, stat: str, value: int | float, rate: float = 1, delta: bool = False,
              *, tags: Attributes = None, back_compat_name: str = "") -> None: ...
    def timing(self, stat: str, dt: DeltaType, *, tags: Attributes = None) -> None: ...
    def timer(self, stat: str | None = None, *args, tags: Attributes = None, **kwargs) -> Timer: ...

MetricsMap (lines 313-373)

class MetricsMap:
    """Stores Otel Instruments."""

    def __init__(self, meter): ...
    def clear(self) -> None: ...
    def get_counter(self, name: str, attributes: Attributes = None): ...
    def del_counter(self, name: str, attributes: Attributes = None) -> None: ...
    def set_gauge_value(self, name: str, value: int | float, delta: bool, tags: Attributes): ...

InternalGauge (lines 294-310)

class InternalGauge:
    """Stores sync gauge instrument and current value to support delta feature."""

    def __init__(self, meter, name: str, tags: Attributes): ...
    def set_value(self, new_value: int | float, delta: bool): ...

_OtelTimer (lines 141-162)

class _OtelTimer(Timer):
    """An implementation of Stats.Timer() which records the result in the OTel Metrics Map."""

    def __init__(self, otel_logger: SafeOtelLogger, name: str | None, tags: Attributes): ...
    def stop(self, send: bool = True) -> None: ...

Key Functions

get_otel_logger (lines 376-431)

def get_otel_logger(
    *,
    host: str | None = None,
    port: int | None = None,
    prefix: str | None = None,
    ssl_active: bool = False,
    conf_interval: float | None = None,
    debug: bool = False,
    service_name: str | None = None,
    metrics_allow_list: str | None = None,
    metrics_block_list: str | None = None,
    stat_name_handler: Callable[[str], str] | None = None,
    statsd_influxdb_enabled: bool = False,
) -> SafeOtelLogger: ...

Utility Functions

def full_name(name: str, *, prefix: str = DEFAULT_METRIC_NAME_PREFIX) -> str:
    """Assembles the prefix, delimiter, and name and returns it as a string."""

def name_is_otel_safe(prefix: str, name: str) -> bool:
    """Return True if the provided name and prefix would result in a name
    that meets the OpenTelemetry standard."""

Import

from airflow_shared.observability.metrics.otel_logger import SafeOtelLogger, get_otel_logger
from airflow_shared.observability.metrics.otel_logger import full_name, name_is_otel_safe

I/O Contract

Inputs

Name Type Required Description
otel_provider MeterProvider Yes OpenTelemetry MeterProvider instance
prefix str No Metric name prefix (default: "airflow")
metrics_validator ListValidator No Allow/block list validator for metric names
stat str Yes (per method) The metric name to emit
count int No Increment/decrement value (default: 1)
rate float No Sample rate between 0 and 1 (default: 1.0)
tags Attributes No OTel attributes (key-value pairs) for dimensional metrics
delta bool No If True, gauge value is added to previous reading

Outputs

Name Type Description
OTel Counter opentelemetry Counter/UpDownCounter Counter instrument created and cached in MetricsMap
OTel Gauge InternalGauge Gauge wrapper with stored value and delta support
Timer _OtelTimer Context manager that records duration as a gauge on exit
SafeOtelLogger SafeOtelLogger Returned by get_otel_logger() factory

Usage Examples

Creating an OTel Logger with Factory

from airflow_shared.observability.metrics.otel_logger import get_otel_logger

# Typical initialization from Airflow config
logger = get_otel_logger(
    host="otel-collector.monitoring.svc",
    port=4318,
    prefix="airflow",
    ssl_active=False,
    service_name="airflow-scheduler",
    metrics_allow_list="operator_failures,task.duration,pool.*",
)

Using Delta Gauges

# Set absolute value
logger.gauge("pool.open_slots", value=10, tags={"pool_name": "default"})

# Add delta to existing value (10 + 3 = 13)
logger.gauge("pool.open_slots", value=3, delta=True, tags={"pool_name": "default"})

Timer Context Manager

with logger.timer("scheduler.critical_section_duration") as timer:
    # Perform critical section work
    process_task_instances()
# Duration is automatically recorded as a gauge value in milliseconds

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment