Implementation:Spotify Luigi PrometheusMetricsCollector
Overview
PrometheusMetricsCollector is a metrics collection implementation in the luigi.contrib.prometheus_metric module that integrates Luigi's task lifecycle events with the Prometheus monitoring system. It tracks task starts, failures, disables, completions, and execution times using Prometheus counters and gauges. Labels are configurable via the prometheus config class, allowing per-task-family and per-parameter metric breakdown.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/contrib/prometheus_metric.py
|
| Lines of Code | 84 |
| Module | luigi.contrib.prometheus_metric
|
| Domain | Monitoring, Metrics |
Import Statement
from luigi.contrib.prometheus_metric import PrometheusMetricsCollector
Classes
prometheus (Config)
prometheus(Config)
Configuration class for Prometheus metric label behavior. Values are read from the [prometheus] section in luigi.cfg.
| Parameter | Type | Default | Description |
|---|---|---|---|
use_task_family_in_labels |
BoolParameter (explicit parsing) |
True |
Whether to include the task family name as the "family" label on all metrics.
|
task_parameters_to_use_in_labels |
ListParameter |
[] |
Additional task parameter names to include as metric labels. Parameters are read from task.params.
|
Note: The labels list must not be empty. If use_task_family_in_labels is False and task_parameters_to_use_in_labels is empty, a ValueError is raised.
PrometheusMetricsCollector
PrometheusMetricsCollector(MetricsCollector)
Implements the MetricsCollector interface to publish Luigi task metrics to Prometheus.
Constructor
PrometheusMetricsCollector.__init__(self, *args, **kwargs)
Creates a fresh CollectorRegistry and initializes the following Prometheus instruments:
| Instrument | Prometheus Name | Type | Description |
|---|---|---|---|
task_started_counter |
luigi_task_started_total |
Counter | Number of started Luigi tasks. |
task_failed_counter |
luigi_task_failed_total |
Counter | Number of failed Luigi tasks. |
task_disabled_counter |
luigi_task_disabled_total |
Counter | Number of disabled Luigi tasks. |
task_done_counter |
luigi_task_done_total |
Counter | Number of completed Luigi tasks. |
task_execution_time |
luigi_task_execution_time_seconds |
Gauge | Task execution time in seconds. |
All instruments are created with the configured label set.
Event Handler Methods
| Method | Signature | Description |
|---|---|---|
handle_task_started |
handle_task_started(self, task) |
Increments task_started_counter for the task's labels. Also initializes the task_execution_time gauge for the label set.
|
handle_task_failed |
handle_task_failed(self, task) |
Increments task_failed_counter. Sets task_execution_time to task.updated - task.time_running.
|
handle_task_disabled |
handle_task_disabled(self, task, config) |
Increments task_disabled_counter. Sets task_execution_time to task.updated - task.time_running.
|
handle_task_done |
handle_task_done(self, task) |
Increments task_done_counter. Sets task_execution_time only if task.time_running is not None (task may have been already complete).
|
Utility Methods
| Method | Signature | Description |
|---|---|---|
generate_latest |
generate_latest(self) |
Returns the latest Prometheus metrics in the exposition format by calling generate_latest(self.registry).
|
configure_http_handler |
configure_http_handler(self, http_handler) |
Sets the Content-Type header on the HTTP handler to the Prometheus CONTENT_TYPE_LATEST value.
|
_generate_task_labels |
_generate_task_labels(self, task) |
Internal method that builds the label dictionary for a task. Maps "family" to task.family and all other configured labels to values from task.params.
|
Configuration
[prometheus] use_task_family_in_labels: True task_parameters_to_use_in_labels: ["date", "environment"]
Usage Example
# Typically configured via Luigi's metrics system rather than instantiated directly. # In luigi.cfg: # [scheduler] # metrics_collector = prometheus # Direct usage: from luigi.contrib.prometheus_metric import PrometheusMetricsCollector collector = PrometheusMetricsCollector() # After task events fire, retrieve metrics: metrics_output = collector.generate_latest()
Metric Label Generation
The label generation logic works as follows:
- The
labelslist is built fromtask_parameters_to_use_in_labels. - If
use_task_family_in_labelsisTrue,"family"is appended. - For each task event,
_generate_task_labels(task)constructs a dict:- The key
"family"maps totask.family. - All other keys are looked up via
task.params.get(label).
- The key
- This dict is passed to the Prometheus instrument's
.labels(**kwargs)method.
External Dependencies
- prometheus_client: Required for
CollectorRegistry,Counter,Gauge,generate_latest, andCONTENT_TYPE_LATEST. - Luigi core:
luigi.metrics.MetricsCollector,luigi.task.Config,luigi.parameter
Related Principles
See Also
- Spotify_Luigi_Event_Constants - Events that trigger metric collection
luigi.metrics.MetricsCollector- Base interface for metrics collectors