Implementation:Apache Airflow SafeDatadogLogger
| Knowledge Sources | |
|---|---|
| Domains | Observability, Metrics, DataDog |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
Provides the SafeDogStatsdLogger class, a DataDog-specific metrics logger that wraps the DogStatsd client for emitting counters, gauges, and timers with DataDog tag support and metric validation.
Description
The datadog_logger.py module implements the DataDog StatsD integration for Airflow's metrics subsystem.
SafeDogStatsdLogger
A wrapper around the datadog.DogStatsd client that provides a safe, validated interface for emitting metrics. It accepts the following configuration during construction:
dogstatsd_client: DogStatsd-- The underlying DataDog StatsD client instance.metrics_validator: ListValidator-- Controls which metric names are allowed (defaults toPatternAllowListValidator()).metrics_tags: bool-- Whether to emit DataDog tags with metrics (defaultFalse).metric_tags_validator: ListValidator-- Controls which tag keys are allowed.stat_name_handler: Callable[[str], str] | None-- Optional function to transform metric names before emission.statsd_influxdb_enabled: bool-- Whether InfluxDB-style tags are enabled.
All metric methods (incr, decr, gauge, timing, timer) are decorated with @validate_stat which provides validation and error handling. Each method:
- Checks if
metrics_tagsis enabled and tags are provided, then builds a tag list in DataDog'skey:valueformat, filtering throughmetric_tags_validator. - Validates the metric name against
metrics_validatorbefore emitting. - Delegates to the underlying
DogStatsdclient method.
The timing method automatically converts datetime.timedelta to milliseconds.
The timer method returns a Timer instance wrapping the DataDog client's timed() context manager when the stat passes validation, or an empty Timer() if the stat is filtered out.
get_dogstatsd_logger() Factory
A factory function that constructs a SafeDogStatsdLogger with full configuration support including:
- Custom host, port, and namespace for the DogStatsd client
- Constant tags parsed from a string
- Metric allow/block list configuration
- Tag validation via
PatternBlockListValidator - Optional stat name handler and InfluxDB tag support
Usage
This logger is activated when is_statsd_datadog_enabled is set in the Airflow metrics configuration. It is typically instantiated via get_dogstatsd_logger() during the stats initialization process.
Code Reference
Source Location
- Repository: Apache_Airflow
- File:
shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py(198 lines)
Signature
class SafeDogStatsdLogger:
"""DogStatsd Logger."""
def __init__(
self,
dogstatsd_client: DogStatsd,
metrics_validator: ListValidator = PatternAllowListValidator(),
metrics_tags: bool = False,
metric_tags_validator: ListValidator = PatternAllowListValidator(),
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> None: ...
@validate_stat
def incr(self, stat: str, count: int = 1, rate: float = 1,
*, tags: dict[str, str] | None = None) -> None: ...
@validate_stat
def decr(self, stat: str, count: int = 1, rate: float = 1,
*, tags: dict[str, str] | None = None) -> None: ...
@validate_stat
def gauge(self, stat: str, value: int | float, rate: float = 1,
delta: bool = False, *, tags: dict[str, str] | None = None) -> None: ...
@validate_stat
def timing(self, stat: str, dt: DeltaType,
*, tags: dict[str, str] | None = None) -> None: ...
@validate_stat
def timer(self, stat: str | None = None,
tags: dict[str, str] | None = None, **kwargs) -> Timer: ...
def get_dogstatsd_logger(
cls, *, tags_in_string: str | None = None, host: str | None = None,
port: int | None = None, namespace: str | None = None,
datadog_metrics_tags: bool = True, statsd_disabled_tags: str | None = None,
metrics_allow_list: str | None = None, metrics_block_list: str | None = None,
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> SafeDogStatsdLogger: ...
Import
from airflow_shared.observability.metrics.datadog_logger import SafeDogStatsdLogger, get_dogstatsd_logger
I/O Contract
| Method | Input | Output | Side Effects |
|---|---|---|---|
incr |
None | None |
Calls DogStatsd.increment() if stat passes validation
|
decr |
None | None |
Calls DogStatsd.decrement() if stat passes validation
|
gauge |
float, rate: float, delta: bool, tags: dict|None |
None |
Calls DogStatsd.gauge() if stat passes validation
|
timing |
None | None |
Converts timedelta to ms; calls DogStatsd.timing()
|
timer |
None, tags: dict|None |
Timer |
Returns Timer wrapping DogStatsd.timed(), or empty Timer if filtered
|
get_dogstatsd_logger |
Configuration parameters (host, port, namespace, tags, validators) | SafeDogStatsdLogger |
Creates DogStatsd client instance
|
Usage Examples
Creating a DataDog Logger via Factory
from airflow_shared.observability.metrics.datadog_logger import get_dogstatsd_logger
logger = get_dogstatsd_logger(
cls=MyStatsClass,
host="localhost",
port=8125,
namespace="airflow",
datadog_metrics_tags=True,
)
Emitting Metrics with Tags
# Increment a counter with DataDog tags
logger.incr("dag.task.success", count=1, tags={"dag_id": "etl_pipeline", "task_id": "extract"})
# Record a gauge
logger.gauge("scheduler.open_slots", value=16, tags={"pool": "default_pool"})
# Record a timing (timedelta automatically converted to ms)
import datetime
logger.timing("task.execution_time", dt=datetime.timedelta(seconds=12.5), tags={"task_id": "transform"})
Using the Timer Context Manager
with logger.timer("scheduler.loop_duration", tags={"scheduler_id": "scheduler-0"}) as t:
run_scheduler_loop()
# Duration sent to DataDog automatically