Implementation:Apache Airflow SafeStatsdLogger
| Knowledge Sources | |
|---|---|
| Domains | Observability, Metrics, StatsD |
| Last Updated | 2026-02-08 21:00 GMT |
Overview
StatsD metrics logger with optional InfluxDB tag support that wraps a statsd client, providing validated counters, gauges, and timers with rate sampling and allow/block list filtering.
Description
SafeStatsdLogger is the StatsD backend implementation for the Airflow metrics system. It wraps a standard StatsClient and adds several layers of protection and extension:
- Metric validation: Each metric name is validated through the validate_stat decorator, which invokes the configured stat name handler to check character validity and length constraints.
- Allow/block list filtering: Each emission is checked against a ListValidator (either PatternAllowListValidator or PatternBlockListValidator) to determine whether the metric should be emitted.
- InfluxDB tag support: When influxdb_tags_enabled is True, the prepare_stat_with_tags decorator appends key=value tags to the metric name in InfluxDB wire format (comma-separated), with validation against a separate tag validator.
- Rate sampling: Counter and gauge methods accept a rate parameter for statistical sampling.
The get_statsd_logger() factory function constructs a configured SafeStatsdLogger instance with the appropriate validators and client settings.
Usage
This logger is instantiated by the Stats singleton when StatsD is the configured metrics backend. It is not typically used directly by application code.
Code Reference
Source Location
- Repository: Apache Airflow
- File: shared/observability/src/airflow_shared/observability/metrics/statsd_logger.py (184 lines)
Signature
class SafeStatsdLogger:
"""StatsD Logger."""
def __init__(
self,
statsd_client: StatsClient,
metrics_validator: ListValidator = PatternAllowListValidator(),
influxdb_tags_enabled: bool = False,
metric_tags_validator: ListValidator = PatternAllowListValidator(),
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> None: ...
def incr(self, stat: str, count: int = 1, rate: float = 1,
*, tags: dict[str, str] | None = None) -> None: ...
def decr(self, stat: str, count: int = 1, rate: float = 1,
*, tags: dict[str, str] | None = None) -> None: ...
def gauge(self, stat: str, value: int | float, rate: float = 1,
delta: bool = False, *, tags: dict[str, str] | None = None) -> None: ...
def timing(self, stat: str, dt: DeltaType,
*, tags: dict[str, str] | None = None) -> None: ...
def timer(self, stat: str | None = None, *args,
tags: dict[str, str] | None = None, **kwargs) -> Timer: ...
def get_statsd_logger(
*,
stats_class: type[StatsClient],
host: str | None = None,
port: int | None = None,
prefix: str | None = None,
ipv6: bool = False,
influxdb_tags_enabled: bool = False,
statsd_disabled_tags: str | None = None,
metrics_allow_list: str | None = None,
metrics_block_list: str | None = None,
stat_name_handler: Callable[[str], str] | None = None,
statsd_influxdb_enabled: bool = False,
) -> SafeStatsdLogger: ...
Import
from airflow_shared.observability.metrics.statsd_logger import SafeStatsdLogger, get_statsd_logger
I/O Contract
Constructor Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| statsd_client | StatsClient | Yes | Underlying statsd client for network emission |
| metrics_validator | ListValidator | No | Allow/block list for filtering metric names (default: allow all) |
| influxdb_tags_enabled | bool | No | Enable InfluxDB-style tag appending to metric names |
| metric_tags_validator | ListValidator | No | Validator for InfluxDB tag keys |
| stat_name_handler | Callable | No | Custom stat name validation/transformation function |
| statsd_influxdb_enabled | bool | No | Enable InfluxDB-compatible character set in validation |
Factory Inputs (get_statsd_logger)
| Name | Type | Required | Description |
|---|---|---|---|
| stats_class | type[StatsClient] | Yes | StatsClient class to instantiate |
| host | str | No | StatsD server hostname |
| port | int | No | StatsD server port |
| prefix | str | No | Metric name prefix (e.g., "airflow") |
| ipv6 | bool | No | Use IPv6 for StatsD connection |
| metrics_allow_list | str | No | Comma-separated patterns to allow |
| metrics_block_list | str | No | Comma-separated patterns to block |
Outputs
| Name | Type | Description |
|---|---|---|
| SafeStatsdLogger | SafeStatsdLogger | Configured logger instance ready for metrics emission |
Decorator Pipeline
Each metric method is wrapped by two decorators applied in order:
| Order | Decorator | Purpose |
|---|---|---|
| 1 (outer) | @prepare_stat_with_tags | Appends InfluxDB tags to stat name if enabled, validates tag keys/values |
| 2 (inner) | @validate_stat | Runs stat_name_handler for character/length validation, catches InvalidStatsNameException |
InfluxDB Tag Format
# Without tags:
airflow.task_instance.completed
# With InfluxDB tags enabled:
airflow.task_instance.completed,dag_id=example_dag,task_id=my_task
Tag values containing commas or equals signs are dropped with an error log to prevent wire format corruption.
Usage Examples
Factory Construction
from statsd import StatsClient
from airflow_shared.observability.metrics.statsd_logger import get_statsd_logger
logger = get_statsd_logger(
stats_class=StatsClient,
host="localhost",
port=9125,
prefix="airflow",
metrics_allow_list="dag,task,scheduler",
)
logger.incr("task_instance.completed", count=1, rate=1.0)
logger.gauge("scheduler.open_slots", value=42)