Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Apache Airflow SafeStatsdLogger

From Leeroopedia


Knowledge Sources
Domains Observability, Metrics, StatsD
Last Updated 2026-02-08 21:00 GMT

Overview

StatsD metrics logger with optional InfluxDB tag support that wraps a statsd client, providing validated counters, gauges, and timers with rate sampling and allow/block list filtering.

Description

SafeStatsdLogger is the StatsD backend implementation for the Airflow metrics system. It wraps a standard StatsClient and adds several layers of protection and extension:

  • Metric validation: Each metric name is validated through the validate_stat decorator, which invokes the configured stat name handler to check character validity and length constraints.
  • Allow/block list filtering: Each emission is checked against a ListValidator (either PatternAllowListValidator or PatternBlockListValidator) to determine whether the metric should be emitted.
  • InfluxDB tag support: When influxdb_tags_enabled is True, the prepare_stat_with_tags decorator appends key=value tags to the metric name in InfluxDB wire format (comma-separated), with validation against a separate tag validator.
  • Rate sampling: Counter and gauge methods accept a rate parameter for statistical sampling.

The get_statsd_logger() factory function constructs a configured SafeStatsdLogger instance with the appropriate validators and client settings.

Usage

This logger is instantiated by the Stats singleton when StatsD is the configured metrics backend. It is not typically used directly by application code.

Code Reference

Source Location

  • Repository: Apache Airflow
  • File: shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py (184 lines)

Signature

class SafeStatsdLogger:
    """StatsD Logger."""

    def __init__(
        self,
        statsd_client: StatsClient,
        metrics_validator: ListValidator = PatternAllowListValidator(),
        influxdb_tags_enabled: bool = False,
        metric_tags_validator: ListValidator = PatternAllowListValidator(),
        stat_name_handler: Callable[[str], str] | None = None,
        statsd_influxdb_enabled: bool = False,
    ) -> None: ...

    def incr(self, stat: str, count: int = 1, rate: float = 1,
             *, tags: dict[str, str] | None = None) -> None: ...

    def decr(self, stat: str, count: int = 1, rate: float = 1,
             *, tags: dict[str, str] | None = None) -> None: ...

    def gauge(self, stat: str, value: int | float, rate: float = 1,
              delta: bool = False, *, tags: dict[str, str] | None = None) -> None: ...

    def timing(self, stat: str, dt: DeltaType,
               *, tags: dict[str, str] | None = None) -> None: ...

    def timer(self, stat: str | None = None, *args,
              tags: dict[str, str] | None = None, **kwargs) -> Timer: ...


def get_statsd_logger(
    *,
    stats_class: type[StatsClient],
    host: str | None = None,
    port: int | None = None,
    prefix: str | None = None,
    ipv6: bool = False,
    influxdb_tags_enabled: bool = False,
    statsd_disabled_tags: str | None = None,
    metrics_allow_list: str | None = None,
    metrics_block_list: str | None = None,
    stat_name_handler: Callable[[str], str] | None = None,
    statsd_influxdb_enabled: bool = False,
) -> SafeStatsdLogger: ...

Import

from airflow_shared.observability.metrics.statsd_logger import SafeStatsdLogger, get_statsd_logger

I/O Contract

Constructor Inputs

Name Type Required Description
statsd_client StatsClient Yes Underlying statsd client for network emission
metrics_validator ListValidator No Allow/block list for filtering metric names (default: allow all)
influxdb_tags_enabled bool No Enable InfluxDB-style tag appending to metric names
metric_tags_validator ListValidator No Validator for InfluxDB tag keys
stat_name_handler Callable No Custom stat name validation/transformation function
statsd_influxdb_enabled bool No Enable InfluxDB-compatible character set in validation

Factory Inputs (get_statsd_logger)

Name Type Required Description
stats_class type[StatsClient] Yes StatsClient class to instantiate
host str No StatsD server hostname
port int No StatsD server port
prefix str No Metric name prefix (e.g., "airflow")
ipv6 bool No Use IPv6 for StatsD connection
metrics_allow_list str No Comma-separated patterns to allow
metrics_block_list str No Comma-separated patterns to block

Outputs

Name Type Description
SafeStatsdLogger SafeStatsdLogger Configured logger instance ready for metrics emission

Decorator Pipeline

Each metric method is wrapped by two decorators applied in order:

Order Decorator Purpose
1 (outer) @prepare_stat_with_tags Appends InfluxDB tags to stat name if enabled, validates tag keys/values
2 (inner) @validate_stat Runs stat_name_handler for character/length validation, catches InvalidStatsNameException

InfluxDB Tag Format

# Without tags:
airflow.task_instance.completed

# With InfluxDB tags enabled:
airflow.task_instance.completed,dag_id=example_dag,task_id=my_task

Tag values containing commas or equals signs are dropped with an error log to prevent wire format corruption.

Usage Examples

Factory Construction

from statsd import StatsClient
from airflow_shared.observability.metrics.statsd_logger import get_statsd_logger

logger = get_statsd_logger(
    stats_class=StatsClient,
    host="localhost",
    port=9125,
    prefix="airflow",
    metrics_allow_list="dag,task,scheduler",
)

logger.incr("task_instance.completed", count=1, rate=1.0)
logger.gauge("scheduler.open_slots", value=42)

Related Pages

Implements Principle

Requires Environment

Related Implementations

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment