Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi Event Constants

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_Event_Constants.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

The Event class in luigi/event.py defines the complete set of event constants used throughout Luigi's event-driven callback system. These string constants identify the various lifecycle events that occur during task dependency resolution and execution. Components across Luigi use these constants with the Task.event_handler decorator to register callbacks that respond to pipeline activity.

Source Location

Property Value
Source File luigi/event.py
Lines of Code 36
Module luigi.event
Domain Event_System, Core

Import Statement

from luigi.event import Event

Or equivalently:

from luigi import Event

Class: Event

Event

A simple class containing string constants for all Luigi lifecycle events. It has no methods, no constructor, and no instance behavior. It serves purely as a namespace for event identifier constants.

Event Constants

Constant String Value Description
Event.DEPENDENCY_DISCOVERED "event.core.dependency.discovered" Triggered for every (task, upstream task) pair discovered in a job flow during dependency resolution.
Event.DEPENDENCY_MISSING "event.core.dependency.missing" Triggered when a required dependency's output does not exist.
Event.DEPENDENCY_PRESENT "event.core.dependency.present" Triggered when a required dependency's output is found to exist.
Event.BROKEN_TASK "event.core.task.broken" Triggered when a task is broken (e.g., its requires() or complete() raises an exception).
Event.START "event.core.start" Triggered when a task begins execution.
Event.PROGRESS "event.core.progress" Can be fired by the task itself during execution. Used for reporting progress, metadata, or generic information so that event handlers can track the running task's state.
Event.FAILURE "event.core.failure" Triggered when a task's run() method raises an exception.
Event.SUCCESS "event.core.success" Triggered when a task completes successfully.
Event.PROCESSING_TIME "event.core.processing_time" Triggered to report how long a task took to execute.
Event.TIMEOUT "event.core.timeout" Triggered when a task exceeds its allowed execution time.
Event.PROCESS_FAILURE "event.core.process_failure" Triggered when the process running a task dies unexpectedly.

Event Lifecycle

Events are fired in the following typical order during a pipeline run:

  1. DEPENDENCY_DISCOVERED - Fired during the scheduler's dependency resolution phase for each task-upstream pair.
  2. DEPENDENCY_PRESENT or DEPENDENCY_MISSING - Fired as each dependency's output is checked.
  3. START - Fired when the worker begins executing a task's run() method.
  4. PROGRESS - Optionally fired by the task during execution to report intermediate progress.
  5. SUCCESS or FAILURE - Fired upon task completion or failure.
  6. PROCESSING_TIME - Fired to report the total execution time.

Error scenarios:

  • BROKEN_TASK - Fired when requires() or complete() raises an exception, preventing normal execution.
  • TIMEOUT - Fired when a task exceeds its time limit.
  • PROCESS_FAILURE - Fired when the worker process crashes.

Usage Example

import luigi
from luigi import Event

class MyTask(luigi.Task):
    def run(self):
        # Task work here
        pass

    def output(self):
        return luigi.LocalTarget('/tmp/output.txt')

# Register event handlers using the decorator
@MyTask.event_handler(Event.SUCCESS)
def on_success(task):
    print("Task %s completed successfully!" % task)

@MyTask.event_handler(Event.FAILURE)
def on_failure(task, exception):
    print("Task %s failed with: %s" % (task, exception))

@MyTask.event_handler(Event.START)
def on_start(task):
    print("Task %s is starting" % task)

@MyTask.event_handler(Event.PROCESSING_TIME)
def on_processing_time(task, processing_time):
    print("Task %s took %s seconds" % (task, processing_time))

Integration Points

The Event constants are used by:

  • Worker (luigi/worker.py): Fires START, SUCCESS, FAILURE, PROCESSING_TIME, TIMEOUT, and PROCESS_FAILURE during task execution.
  • Scheduler (luigi/scheduler.py): Fires DEPENDENCY_DISCOVERED, DEPENDENCY_MISSING, DEPENDENCY_PRESENT, and BROKEN_TASK during dependency resolution.
  • MetricsCollectors (e.g., PrometheusMetricsCollector): Listen for task lifecycle events to track metrics.
  • Task.event_handler decorator: The primary mechanism for user code to subscribe to events.

Dependencies

  • No external dependencies. This module is entirely self-contained.

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment