Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi DatadogMetricsCollector

From Leeroopedia
Revision as of 16:46, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_DatadogMetricsCollector.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Monitoring, Metrics
Last Updated 2026-02-10 08:00 GMT

Overview

DatadogMetricsCollector is a Luigi metrics collector that integrates with the Datadog monitoring platform. It sends task lifecycle events (started, failed, disabled, done) as both StatsD increment metrics and Datadog API events, enabling real-time pipeline observability through Datadog dashboards and alerting.

Description

The module provides two classes:

  • datadog (extends Config): A Luigi configuration class that holds Datadog connection settings. Parameters are read from Luigi configuration (e.g., luigi.cfg) under the [datadog] section.
  • DatadogMetricsCollector (extends MetricsCollector): The collector implementation that initializes the Datadog SDK and handles four task lifecycle events.

Each event handler sends:

  1. A StatsD increment metric (e.g., luigi.task.started) via statsd.increment().
  2. A Datadog API event via api.Event.create() with descriptive title, text, and tags.

The handle_task_done handler additionally sends a gauge metric for task execution time (luigi.task.execution_time).

All metrics and events are tagged with the task name, task parameters, environment, and any user-defined default tags.

Usage

Configure Datadog settings in luigi.cfg and set the metrics collector in the scheduler configuration. The collector is automatically invoked by the Luigi scheduler during task lifecycle transitions.

Code Reference

Source Location

luigi/contrib/datadog_metric.py (127 lines)

Signature

class datadog(Config):
    api_key = Parameter(default='dummy_api_key')
    app_key = Parameter(default='dummy_app_key')
    default_tags = Parameter(default='application:luigi')
    environment = Parameter(default='development')
    metric_namespace = Parameter(default='luigi')
    statsd_host = Parameter(default='localhost')
    statsd_port = IntParameter(default=8125)

class DatadogMetricsCollector(MetricsCollector):
    def __init__(self, *args, **kwargs):
        """Initializes Datadog SDK with config values."""

    def handle_task_started(self, task):
        """Sends increment and event for task start."""

    def handle_task_failed(self, task):
        """Sends increment and event for task failure (alert_type='error')."""

    def handle_task_disabled(self, task, config):
        """Sends increment and event for task disable (alert_type='error')."""

    def handle_task_done(self, task):
        """Sends increment, execution_time gauge, and event for task completion."""

Import

from luigi.contrib.datadog_metric import DatadogMetricsCollector

I/O Contract

Inputs

Input Type Description
task luigi.Task The task instance whose lifecycle event is being reported. Used to extract task.family (name), task.params (parameters), task.time_running, and task.updated.
config Scheduler config Passed to handle_task_disabled; provides disable_persist, retry_count, and disable_window.

Outputs

Output Type Description
StatsD metrics statsd.increment / statsd.gauge Namespaced metrics (e.g., luigi.task.started, luigi.task.execution_time) sent to the configured StatsD host.
Datadog events api.Event.create Events with title, descriptive text, tags (task_name, task_state, environment, task parameters), and alert type (info or error).

Usage Examples

# In luigi.cfg:
# [datadog]
# api_key = your_datadog_api_key
# app_key = your_datadog_app_key
# environment = production
# metric_namespace = my_pipeline
# statsd_host = datadog-agent.internal
# statsd_port = 8125
# default_tags = team:data-eng,service:etl

# The collector is configured via the scheduler's metrics_collector setting.
# Luigi will automatically call handle_task_started, handle_task_done, etc.

Related Pages

  • Spotify_Luigi_Metrics_Collection -- Principle governing metrics collection in Luigi pipelines
  • luigi.metrics.MetricsCollector -- Abstract base class defining the metrics collector interface
  • luigi.task.Config -- Base class for Luigi configuration sections

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment