Implementation:Spotify Luigi Event Constants
Overview
The Event class in luigi/event.py defines the complete set of event constants used throughout Luigi's event-driven callback system. These string constants identify the various lifecycle events that occur during task dependency resolution and execution. Components across Luigi use these constants with the Task.event_handler decorator to register callbacks that respond to pipeline activity.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/event.py
|
| Lines of Code | 36 |
| Module | luigi.event
|
| Domain | Event_System, Core |
Import Statement
from luigi.event import Event
Or equivalently:
from luigi import Event
Class: Event
Event
A simple class containing string constants for all Luigi lifecycle events. It has no methods, no constructor, and no instance behavior. It serves purely as a namespace for event identifier constants.
Event Constants
| Constant | String Value | Description |
|---|---|---|
Event.DEPENDENCY_DISCOVERED |
"event.core.dependency.discovered" |
Triggered for every (task, upstream task) pair discovered in a job flow during dependency resolution. |
Event.DEPENDENCY_MISSING |
"event.core.dependency.missing" |
Triggered when a required dependency's output does not exist. |
Event.DEPENDENCY_PRESENT |
"event.core.dependency.present" |
Triggered when a required dependency's output is found to exist. |
Event.BROKEN_TASK |
"event.core.task.broken" |
Triggered when a task is broken (e.g., its requires() or complete() raises an exception).
|
Event.START |
"event.core.start" |
Triggered when a task begins execution. |
Event.PROGRESS |
"event.core.progress" |
Can be fired by the task itself during execution. Used for reporting progress, metadata, or generic information so that event handlers can track the running task's state. |
Event.FAILURE |
"event.core.failure" |
Triggered when a task's run() method raises an exception.
|
Event.SUCCESS |
"event.core.success" |
Triggered when a task completes successfully. |
Event.PROCESSING_TIME |
"event.core.processing_time" |
Triggered to report how long a task took to execute. |
Event.TIMEOUT |
"event.core.timeout" |
Triggered when a task exceeds its allowed execution time. |
Event.PROCESS_FAILURE |
"event.core.process_failure" |
Triggered when the process running a task dies unexpectedly. |
Event Lifecycle
Events are fired in the following typical order during a pipeline run:
DEPENDENCY_DISCOVERED- Fired during the scheduler's dependency resolution phase for each task-upstream pair.DEPENDENCY_PRESENTorDEPENDENCY_MISSING- Fired as each dependency's output is checked.START- Fired when the worker begins executing a task'srun()method.PROGRESS- Optionally fired by the task during execution to report intermediate progress.SUCCESSorFAILURE- Fired upon task completion or failure.PROCESSING_TIME- Fired to report the total execution time.
Error scenarios:
BROKEN_TASK- Fired whenrequires()orcomplete()raises an exception, preventing normal execution.TIMEOUT- Fired when a task exceeds its time limit.PROCESS_FAILURE- Fired when the worker process crashes.
Usage Example
import luigi
from luigi import Event
class MyTask(luigi.Task):
def run(self):
# Task work here
pass
def output(self):
return luigi.LocalTarget('/tmp/output.txt')
# Register event handlers using the decorator
@MyTask.event_handler(Event.SUCCESS)
def on_success(task):
print("Task %s completed successfully!" % task)
@MyTask.event_handler(Event.FAILURE)
def on_failure(task, exception):
print("Task %s failed with: %s" % (task, exception))
@MyTask.event_handler(Event.START)
def on_start(task):
print("Task %s is starting" % task)
@MyTask.event_handler(Event.PROCESSING_TIME)
def on_processing_time(task, processing_time):
print("Task %s took %s seconds" % (task, processing_time))
Integration Points
The Event constants are used by:
- Worker (
luigi/worker.py): FiresSTART,SUCCESS,FAILURE,PROCESSING_TIME,TIMEOUT, andPROCESS_FAILUREduring task execution. - Scheduler (
luigi/scheduler.py): FiresDEPENDENCY_DISCOVERED,DEPENDENCY_MISSING,DEPENDENCY_PRESENT, andBROKEN_TASKduring dependency resolution. - MetricsCollectors (e.g.,
PrometheusMetricsCollector): Listen for task lifecycle events to track metrics. - Task.event_handler decorator: The primary mechanism for user code to subscribe to events.
Dependencies
- No external dependencies. This module is entirely self-contained.
Related Principles
See Also
- Spotify_Luigi_PrometheusMetricsCollector - Metrics collector that responds to task events
luigi.task.Task.event_handler- Decorator for registering event callbacks