Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi DbTaskHistory

From Leeroopedia


Template:Metadata

Overview

Concrete tool for persistent storage of task execution history for auditing and debugging provided by Luigi.

Description

DbTaskHistory is Luigi's database-backed implementation of the TaskHistory interface. It uses SQLAlchemy to record every task lifecycle event (scheduled, started, completed, failed) into a relational database, creating a durable audit trail that supports historical queries, the web visualizer's history pages, and operational debugging.

The implementation consists of four components:

  • DbTaskHistory class (luigi/db_task_history.py): The main history recorder. On initialization, it reads the database connection string from the [task_history] configuration section, creates a SQLAlchemy engine and session factory, and ensures all required tables exist (with automatic schema upgrades for older databases). It implements three callback methods -- task_scheduled(), task_started(), and task_finished() -- that the scheduler calls at each lifecycle transition.
  • TaskRecord model: SQLAlchemy ORM model for the tasks table, storing task ID, task name, and host. Related to TaskParameter and TaskEvent via foreign keys.
  • TaskEvent model: SQLAlchemy ORM model for the task_events table, recording event name (PENDING, RUNNING, DONE, FAILED) and timestamp for each lifecycle transition.
  • TaskParameter model: SQLAlchemy ORM model for the task_parameters table, storing key-value pairs of Luigi task parameters associated with each task record.

The history system is activated when record_task_history = True is set in the [scheduler] configuration section. The Scheduler.__init__() method checks this flag and, if enabled, imports and instantiates DbTaskHistory. Otherwise, it falls back to NopHistory, which discards all events.

Separately, the scheduler persists its in-memory state (tasks, workers, dependency graph) to a pickle file at state_path via the SimpleTaskState class. The Scheduler.load() and Scheduler.dump() methods manage this state persistence on startup and shutdown respectively.

Usage

Use DbTaskHistory when:

  • You need a persistent record of all task executions for auditing or compliance.
  • You want to use the web visualizer's history pages (/history, /tasklist).
  • You need to query historical task execution data programmatically (e.g., for SLA monitoring).
  • You need to debug task failures by examining the sequence of events over time.

Code Reference

Source Location

File Lines Role
luigi/db_task_history.py L57-99 DbTaskHistory: initialization, lifecycle callbacks
luigi/db_task_history.py L198-246 TaskParameter, TaskEvent, TaskRecord ORM models
luigi/scheduler.py L122-158 scheduler config class: state_path, record_task_history
luigi/scheduler.py L667-705 Scheduler.__init__(): history backend selection, load()/dump()

Signature

# luigi/db_task_history.py
class DbTaskHistory(task_history.TaskHistory):
    CURRENT_SOURCE_VERSION = 1

    def __init__(self):
        """
        Reads db_connection from [task_history] config section.
        Creates SQLAlchemy engine, session factory, and ensures schema exists.
        """

    def task_scheduled(self, task):
        """Record a PENDING event for the task."""

    def task_started(self, task, worker_host):
        """Record a RUNNING event for the task with host information."""

    def task_finished(self, task, successful):
        """Record a DONE or FAILED event for the task."""

    def find_all_runs(self, session=None):
        """Return all tasks that have been recorded."""

    def find_latest_runs(self, session=None):
        """Return tasks updated in the past 24 hours."""

    def find_all_by_name(self, task_name, session=None):
        """Find all tasks with the given task_name."""

    def find_all_by_parameters(self, task_name, session=None, **task_params):
        """Find tasks matching name and parameter values."""

    def find_task_by_id(self, id, session=None):
        """Find a task by its record ID."""

    def find_task_by_task_id(self, task_id, session=None):
        """Find a task by its Luigi task ID string."""

    def find_all_events(self, session=None):
        """Return all lifecycle events."""

# luigi/scheduler.py
class scheduler(Config):
    state_path = parameter.Parameter(default='/var/lib/luigi-server/state.pickle')
    record_task_history = parameter.BoolParameter(default=False)

class Scheduler:
    def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs):
        """Initializes history backend based on config.record_task_history."""

    def load(self):
        """Load scheduler state from state_path pickle file."""

    def dump(self):
        """Save scheduler state to state_path pickle file."""

Import

from luigi.db_task_history import DbTaskHistory

I/O Contract

Inputs

Parameter Type Description
[task_history] db_connection str SQLAlchemy connection string (e.g., sqlite:///luigi-task-hist.db or postgresql://user:pass@host/dbname).
[scheduler] record_task_history bool Enable/disable task history recording (default: False).
[scheduler] state_path str Path to the pickle file for scheduler state persistence (default: /var/lib/luigi-server/state.pickle).
task (to lifecycle methods) Task Luigi task object with id, task_family, and parameter attributes.
worker_host (to task_started) str Hostname of the worker executing the task.

Outputs

Method Return Type Description
find_all_runs() list[TaskRecord] All recorded task records.
find_latest_runs() list[TaskRecord] Task records updated in the past 24 hours.
find_all_by_name() generator of TaskRecord All records matching a task name.
find_all_by_parameters() generator of TaskRecord Records matching task name and parameter values.
find_task_by_id() TaskRecord Single record by database primary key.
find_task_by_task_id() TaskRecord Most recent record for a Luigi task ID.
find_all_events() list[TaskEvent] All lifecycle events across all tasks.

Database Schema

DbTaskHistory manages three tables:

-- Main task records
CREATE TABLE tasks (
    id INTEGER PRIMARY KEY,
    task_id VARCHAR(200),       -- Luigi task ID string
    name VARCHAR(128),          -- Task family name
    host VARCHAR(128)           -- Worker host that last ran this task
);
CREATE INDEX ix_task_id ON tasks (task_id);
CREATE INDEX ix_tasks_name ON tasks (name);

-- Lifecycle events (PENDING, RUNNING, DONE, FAILED)
CREATE TABLE task_events (
    id INTEGER PRIMARY KEY,
    task_id INTEGER REFERENCES tasks(id),
    event_name VARCHAR(20),     -- PENDING, RUNNING, DONE, or FAILED
    ts TIMESTAMP NOT NULL       -- Event timestamp
);
CREATE INDEX ix_task_events_task_id ON task_events (task_id);
CREATE INDEX ix_task_events_ts ON task_events (ts);

-- Task parameters as key-value pairs
CREATE TABLE task_parameters (
    task_id INTEGER REFERENCES tasks(id),
    name VARCHAR(128),          -- Parameter name
    value TEXT,                 -- Parameter value
    PRIMARY KEY (task_id, name)
);

Usage Examples

Enabling Task History

Add the following to luigi.cfg:

[scheduler]
record_task_history = True

[task_history]
db_connection = sqlite:////var/lib/luigi-server/task-history.db

Then start the scheduler:

luigid --port 8082

Using PostgreSQL for Task History

[scheduler]
record_task_history = True

[task_history]
db_connection = postgresql://luigi:password@db.example.com:5432/luigi_history

Querying Task History Programmatically

from luigi.db_task_history import DbTaskHistory

# DbTaskHistory reads its connection string from luigi.cfg [task_history] section
history = DbTaskHistory()

# Find all runs of a specific task
for task_record in history.find_all_by_name('MyDailyETL'):
    print(f"Task: {task_record.name}, Host: {task_record.host}")
    for event in task_record.events:
        print(f"  {event.event_name} at {event.ts}")

# Find tasks by parameters
for task_record in history.find_all_by_parameters('MyDailyETL', date='2026-02-10'):
    print(f"Task ID: {task_record.task_id}")
    for param_name, param in task_record.parameters.items():
        print(f"  {param_name} = {param.value}")

# Find tasks updated in the past 24 hours
recent_tasks = history.find_latest_runs()
for task_record in recent_tasks:
    print(f"Recent: {task_record.name} on {task_record.host}")

Configuring Scheduler State Persistence

[scheduler]
state_path = /var/lib/luigi-server/state.pickle

Or via command line:

luigid --state-path /tmp/luigi-state.pickle --port 8082

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment