Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi PrometheusMetricsCollector

From Leeroopedia


Overview

PrometheusMetricsCollector is a metrics collection implementation in the luigi.contrib.prometheus_metric module that integrates Luigi's task lifecycle events with the Prometheus monitoring system. It tracks task starts, failures, disables, completions, and execution times using Prometheus counters and gauges. Labels are configurable via the prometheus config class, allowing per-task-family and per-parameter metric breakdown.

Source Location

Property Value
Source File luigi/contrib/prometheus_metric.py
Lines of Code 84
Module luigi.contrib.prometheus_metric
Domain Monitoring, Metrics

Import Statement

from luigi.contrib.prometheus_metric import PrometheusMetricsCollector

Classes

prometheus (Config)

prometheus(Config)

Configuration class for Prometheus metric label behavior. Values are read from the [prometheus] section in luigi.cfg.

Parameter Type Default Description
use_task_family_in_labels BoolParameter (explicit parsing) True Whether to include the task family name as the "family" label on all metrics.
task_parameters_to_use_in_labels ListParameter [] Additional task parameter names to include as metric labels. Parameters are read from task.params.

Note: The labels list must not be empty. If use_task_family_in_labels is False and task_parameters_to_use_in_labels is empty, a ValueError is raised.

PrometheusMetricsCollector

PrometheusMetricsCollector(MetricsCollector)

Implements the MetricsCollector interface to publish Luigi task metrics to Prometheus.

Constructor

PrometheusMetricsCollector.__init__(self, *args, **kwargs)

Creates a fresh CollectorRegistry and initializes the following Prometheus instruments:

Instrument Prometheus Name Type Description
task_started_counter luigi_task_started_total Counter Number of started Luigi tasks.
task_failed_counter luigi_task_failed_total Counter Number of failed Luigi tasks.
task_disabled_counter luigi_task_disabled_total Counter Number of disabled Luigi tasks.
task_done_counter luigi_task_done_total Counter Number of completed Luigi tasks.
task_execution_time luigi_task_execution_time_seconds Gauge Task execution time in seconds.

All instruments are created with the configured label set.

Event Handler Methods

Method Signature Description
handle_task_started handle_task_started(self, task) Increments task_started_counter for the task's labels. Also initializes the task_execution_time gauge for the label set.
handle_task_failed handle_task_failed(self, task) Increments task_failed_counter. Sets task_execution_time to task.updated - task.time_running.
handle_task_disabled handle_task_disabled(self, task, config) Increments task_disabled_counter. Sets task_execution_time to task.updated - task.time_running.
handle_task_done handle_task_done(self, task) Increments task_done_counter. Sets task_execution_time only if task.time_running is not None (task may have been already complete).

Utility Methods

Method Signature Description
generate_latest generate_latest(self) Returns the latest Prometheus metrics in the exposition format by calling generate_latest(self.registry).
configure_http_handler configure_http_handler(self, http_handler) Sets the Content-Type header on the HTTP handler to the Prometheus CONTENT_TYPE_LATEST value.
_generate_task_labels _generate_task_labels(self, task) Internal method that builds the label dictionary for a task. Maps "family" to task.family and all other configured labels to values from task.params.

Configuration

[prometheus]
use_task_family_in_labels: True
task_parameters_to_use_in_labels: ["date", "environment"]

Usage Example

# Typically configured via Luigi's metrics system rather than instantiated directly.
# In luigi.cfg:
#   [scheduler]
#   metrics_collector = prometheus

# Direct usage:
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector

collector = PrometheusMetricsCollector()

# After task events fire, retrieve metrics:
metrics_output = collector.generate_latest()

Metric Label Generation

The label generation logic works as follows:

  1. The labels list is built from task_parameters_to_use_in_labels.
  2. If use_task_family_in_labels is True, "family" is appended.
  3. For each task event, _generate_task_labels(task) constructs a dict:
    • The key "family" maps to task.family.
    • All other keys are looked up via task.params.get(label).
  4. This dict is passed to the Prometheus instrument's .labels(**kwargs) method.

External Dependencies

  • prometheus_client: Required for CollectorRegistry, Counter, Gauge, generate_latest, and CONTENT_TYPE_LATEST.
  • Luigi core: luigi.metrics.MetricsCollector, luigi.task.Config, luigi.parameter

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment