Implementation:Spotify Luigi DatadogMetricsCollector
| Knowledge Sources | |
|---|---|
| Domains | Monitoring, Metrics |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
DatadogMetricsCollector is a Luigi metrics collector that integrates with the Datadog monitoring platform. It sends task lifecycle events (started, failed, disabled, done) as both StatsD increment metrics and Datadog API events, enabling real-time pipeline observability through Datadog dashboards and alerting.
Description
The module provides two classes:
datadog(extendsConfig): A Luigi configuration class that holds Datadog connection settings. Parameters are read from Luigi configuration (e.g.,luigi.cfg) under the[datadog]section.DatadogMetricsCollector(extendsMetricsCollector): The collector implementation that initializes the Datadog SDK and handles four task lifecycle events.
Each event handler sends:
- A StatsD increment metric (e.g.,
luigi.task.started) viastatsd.increment(). - A Datadog API event via
api.Event.create()with descriptive title, text, and tags.
The handle_task_done handler additionally sends a gauge metric for task execution time (luigi.task.execution_time).
All metrics and events are tagged with the task name, task parameters, environment, and any user-defined default tags.
Usage
Configure Datadog settings in luigi.cfg and set the metrics collector in the scheduler configuration. The collector is automatically invoked by the Luigi scheduler during task lifecycle transitions.
Code Reference
Source Location
luigi/contrib/datadog_metric.py (127 lines)
Signature
class datadog(Config):
api_key = Parameter(default='dummy_api_key')
app_key = Parameter(default='dummy_app_key')
default_tags = Parameter(default='application:luigi')
environment = Parameter(default='development')
metric_namespace = Parameter(default='luigi')
statsd_host = Parameter(default='localhost')
statsd_port = IntParameter(default=8125)
class DatadogMetricsCollector(MetricsCollector):
def __init__(self, *args, **kwargs):
"""Initializes Datadog SDK with config values."""
def handle_task_started(self, task):
"""Sends increment and event for task start."""
def handle_task_failed(self, task):
"""Sends increment and event for task failure (alert_type='error')."""
def handle_task_disabled(self, task, config):
"""Sends increment and event for task disable (alert_type='error')."""
def handle_task_done(self, task):
"""Sends increment, execution_time gauge, and event for task completion."""
Import
from luigi.contrib.datadog_metric import DatadogMetricsCollector
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
task |
luigi.Task |
The task instance whose lifecycle event is being reported. Used to extract task.family (name), task.params (parameters), task.time_running, and task.updated.
|
config |
Scheduler config | Passed to handle_task_disabled; provides disable_persist, retry_count, and disable_window.
|
Outputs
| Output | Type | Description |
|---|---|---|
| StatsD metrics | statsd.increment / statsd.gauge |
Namespaced metrics (e.g., luigi.task.started, luigi.task.execution_time) sent to the configured StatsD host.
|
| Datadog events | api.Event.create |
Events with title, descriptive text, tags (task_name, task_state, environment, task parameters), and alert type (info or error). |
Usage Examples
# In luigi.cfg:
# [datadog]
# api_key = your_datadog_api_key
# app_key = your_datadog_app_key
# environment = production
# metric_namespace = my_pipeline
# statsd_host = datadog-agent.internal
# statsd_port = 8125
# default_tags = team:data-eng,service:etl
# The collector is configured via the scheduler's metrics_collector setting.
# Luigi will automatically call handle_task_started, handle_task_done, etc.
Related Pages
- Spotify_Luigi_Metrics_Collection -- Principle governing metrics collection in Luigi pipelines
luigi.metrics.MetricsCollector-- Abstract base class defining the metrics collector interfaceluigi.task.Config-- Base class for Luigi configuration sections