Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow SafeDatadogLogger

From Leeroopedia
Revision as of 14:11, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Apache_Airflow_SafeDatadogLogger.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Observability, Metrics, DataDog
Last Updated 2026-02-08 21:00 GMT

Overview

Provides the SafeDogStatsdLogger class, a DataDog-specific metrics logger that wraps the DogStatsd client for emitting counters, gauges, and timers with DataDog tag support and metric validation.

Description

The datadog_logger.py module implements the DataDog StatsD integration for Airflow's metrics subsystem.

SafeDogStatsdLogger

A wrapper around the datadog.DogStatsd client that provides a safe, validated interface for emitting metrics. It accepts the following configuration during construction:

  • dogstatsd_client: DogStatsd -- The underlying DataDog StatsD client instance.
  • metrics_validator: ListValidator -- Controls which metric names are allowed (defaults to PatternAllowListValidator()).
  • metrics_tags: bool -- Whether to emit DataDog tags with metrics (default False).
  • metric_tags_validator: ListValidator -- Controls which tag keys are allowed.
  • stat_name_handler: Callable[[str], str] | None -- Optional function to transform metric names before emission.
  • statsd_influxdb_enabled: bool -- Whether InfluxDB-style tags are enabled.

All metric methods (incr, decr, gauge, timing, timer) are decorated with @validate_stat which provides validation and error handling. Each method:

  1. Checks if metrics_tags is enabled and tags are provided, then builds a tag list in DataDog's key:value format, filtering through metric_tags_validator.
  2. Validates the metric name against metrics_validator before emitting.
  3. Delegates to the underlying DogStatsd client method.

The timing method automatically converts datetime.timedelta to milliseconds.

The timer method returns a Timer instance wrapping the DataDog client's timed() context manager when the stat passes validation, or an empty Timer() if the stat is filtered out.

get_dogstatsd_logger() Factory

A factory function that constructs a SafeDogStatsdLogger with full configuration support including:

  • Custom host, port, and namespace for the DogStatsd client
  • Constant tags parsed from a string
  • Metric allow/block list configuration
  • Tag validation via PatternBlockListValidator
  • Optional stat name handler and InfluxDB tag support

Usage

This logger is activated when is_statsd_datadog_enabled is set in the Airflow metrics configuration. It is typically instantiated via get_dogstatsd_logger() during the stats initialization process.

Code Reference

Source Location

  • Repository: Apache_Airflow
  • File: shared/observability/src/airflow_shared/observability/metrics/datadog_logger.py (198 lines)

Signature

class SafeDogStatsdLogger:
    """DogStatsd Logger."""

    def __init__(
        self,
        dogstatsd_client: DogStatsd,
        metrics_validator: ListValidator = PatternAllowListValidator(),
        metrics_tags: bool = False,
        metric_tags_validator: ListValidator = PatternAllowListValidator(),
        stat_name_handler: Callable[[str], str] | None = None,
        statsd_influxdb_enabled: bool = False,
    ) -> None: ...

    @validate_stat
    def incr(self, stat: str, count: int = 1, rate: float = 1,
             *, tags: dict[str, str] | None = None) -> None: ...

    @validate_stat
    def decr(self, stat: str, count: int = 1, rate: float = 1,
             *, tags: dict[str, str] | None = None) -> None: ...

    @validate_stat
    def gauge(self, stat: str, value: int | float, rate: float = 1,
              delta: bool = False, *, tags: dict[str, str] | None = None) -> None: ...

    @validate_stat
    def timing(self, stat: str, dt: DeltaType,
               *, tags: dict[str, str] | None = None) -> None: ...

    @validate_stat
    def timer(self, stat: str | None = None,
              tags: dict[str, str] | None = None, **kwargs) -> Timer: ...


def get_dogstatsd_logger(
    cls, *, tags_in_string: str | None = None, host: str | None = None,
    port: int | None = None, namespace: str | None = None,
    datadog_metrics_tags: bool = True, statsd_disabled_tags: str | None = None,
    metrics_allow_list: str | None = None, metrics_block_list: str | None = None,
    stat_name_handler: Callable[[str], str] | None = None,
    statsd_influxdb_enabled: bool = False,
) -> SafeDogStatsdLogger: ...

Import

from airflow_shared.observability.metrics.datadog_logger import SafeDogStatsdLogger, get_dogstatsd_logger

I/O Contract

Method Input Output Side Effects
incr None None Calls DogStatsd.increment() if stat passes validation
decr None None Calls DogStatsd.decrement() if stat passes validation
gauge float, rate: float, delta: bool, tags: dict|None None Calls DogStatsd.gauge() if stat passes validation
timing None None Converts timedelta to ms; calls DogStatsd.timing()
timer None, tags: dict|None Timer Returns Timer wrapping DogStatsd.timed(), or empty Timer if filtered
get_dogstatsd_logger Configuration parameters (host, port, namespace, tags, validators) SafeDogStatsdLogger Creates DogStatsd client instance

Usage Examples

Creating a DataDog Logger via Factory

from airflow_shared.observability.metrics.datadog_logger import get_dogstatsd_logger

logger = get_dogstatsd_logger(
    cls=MyStatsClass,
    host="localhost",
    port=8125,
    namespace="airflow",
    datadog_metrics_tags=True,
)

Emitting Metrics with Tags

# Increment a counter with DataDog tags
logger.incr("dag.task.success", count=1, tags={"dag_id": "etl_pipeline", "task_id": "extract"})

# Record a gauge
logger.gauge("scheduler.open_slots", value=16, tags={"pool": "default_pool"})

# Record a timing (timedelta automatically converted to ms)
import datetime
logger.timing("task.execution_time", dt=datetime.timedelta(seconds=12.5), tags={"task_id": "transform"})

Using the Timer Context Manager

with logger.timer("scheduler.loop_duration", tags={"scheduler_id": "scheduler-0"}) as t:
    run_scheduler_loop()
# Duration sent to DataDog automatically

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment