Implementation:Spotify Luigi DbTaskHistory
Overview
Concrete tool for persistent storage of task execution history for auditing and debugging provided by Luigi.
Description
DbTaskHistory is Luigi's database-backed implementation of the TaskHistory interface. It uses SQLAlchemy to record every task lifecycle event (scheduled, started, completed, failed) into a relational database, creating a durable audit trail that supports historical queries, the web visualizer's history pages, and operational debugging.
The implementation consists of four components:
DbTaskHistoryclass (luigi/db_task_history.py): The main history recorder. On initialization, it reads the database connection string from the[task_history]configuration section, creates a SQLAlchemy engine and session factory, and ensures all required tables exist (with automatic schema upgrades for older databases). It implements three callback methods --task_scheduled(),task_started(), andtask_finished()-- that the scheduler calls at each lifecycle transition.TaskRecordmodel: SQLAlchemy ORM model for thetaskstable, storing task ID, task name, and host. Related toTaskParameterandTaskEventvia foreign keys.TaskEventmodel: SQLAlchemy ORM model for thetask_eventstable, recording event name (PENDING, RUNNING, DONE, FAILED) and timestamp for each lifecycle transition.TaskParametermodel: SQLAlchemy ORM model for thetask_parameterstable, storing key-value pairs of Luigi task parameters associated with each task record.
The history system is activated when record_task_history = True is set in the [scheduler] configuration section. The Scheduler.__init__() method checks this flag and, if enabled, imports and instantiates DbTaskHistory. Otherwise, it falls back to NopHistory, which discards all events.
Separately, the scheduler persists its in-memory state (tasks, workers, dependency graph) to a pickle file at state_path via the SimpleTaskState class. The Scheduler.load() and Scheduler.dump() methods manage this state persistence on startup and shutdown respectively.
Usage
Use DbTaskHistory when:
- You need a persistent record of all task executions for auditing or compliance.
- You want to use the web visualizer's history pages (
/history,/tasklist). - You need to query historical task execution data programmatically (e.g., for SLA monitoring).
- You need to debug task failures by examining the sequence of events over time.
Code Reference
Source Location
| File | Lines | Role |
|---|---|---|
luigi/db_task_history.py |
L57-99 | DbTaskHistory: initialization, lifecycle callbacks
|
luigi/db_task_history.py |
L198-246 | TaskParameter, TaskEvent, TaskRecord ORM models
|
luigi/scheduler.py |
L122-158 | scheduler config class: state_path, record_task_history
|
luigi/scheduler.py |
L667-705 | Scheduler.__init__(): history backend selection, load()/dump()
|
Signature
# luigi/db_task_history.py
class DbTaskHistory(task_history.TaskHistory):
CURRENT_SOURCE_VERSION = 1
def __init__(self):
"""
Reads db_connection from [task_history] config section.
Creates SQLAlchemy engine, session factory, and ensures schema exists.
"""
def task_scheduled(self, task):
"""Record a PENDING event for the task."""
def task_started(self, task, worker_host):
"""Record a RUNNING event for the task with host information."""
def task_finished(self, task, successful):
"""Record a DONE or FAILED event for the task."""
def find_all_runs(self, session=None):
"""Return all tasks that have been recorded."""
def find_latest_runs(self, session=None):
"""Return tasks updated in the past 24 hours."""
def find_all_by_name(self, task_name, session=None):
"""Find all tasks with the given task_name."""
def find_all_by_parameters(self, task_name, session=None, **task_params):
"""Find tasks matching name and parameter values."""
def find_task_by_id(self, id, session=None):
"""Find a task by its record ID."""
def find_task_by_task_id(self, task_id, session=None):
"""Find a task by its Luigi task ID string."""
def find_all_events(self, session=None):
"""Return all lifecycle events."""
# luigi/scheduler.py
class scheduler(Config):
state_path = parameter.Parameter(default='/var/lib/luigi-server/state.pickle')
record_task_history = parameter.BoolParameter(default=False)
class Scheduler:
def __init__(self, config=None, resources=None, task_history_impl=None, **kwargs):
"""Initializes history backend based on config.record_task_history."""
def load(self):
"""Load scheduler state from state_path pickle file."""
def dump(self):
"""Save scheduler state to state_path pickle file."""
Import
from luigi.db_task_history import DbTaskHistory
I/O Contract
Inputs
| Parameter | Type | Description |
|---|---|---|
[task_history] db_connection |
str |
SQLAlchemy connection string (e.g., sqlite:///luigi-task-hist.db or postgresql://user:pass@host/dbname).
|
[scheduler] record_task_history |
bool |
Enable/disable task history recording (default: False).
|
[scheduler] state_path |
str |
Path to the pickle file for scheduler state persistence (default: /var/lib/luigi-server/state.pickle).
|
task (to lifecycle methods) |
Task |
Luigi task object with id, task_family, and parameter attributes.
|
worker_host (to task_started) |
str |
Hostname of the worker executing the task. |
Outputs
| Method | Return Type | Description |
|---|---|---|
find_all_runs() |
list[TaskRecord] |
All recorded task records. |
find_latest_runs() |
list[TaskRecord] |
Task records updated in the past 24 hours. |
find_all_by_name() |
generator of TaskRecord |
All records matching a task name. |
find_all_by_parameters() |
generator of TaskRecord |
Records matching task name and parameter values. |
find_task_by_id() |
TaskRecord |
Single record by database primary key. |
find_task_by_task_id() |
TaskRecord |
Most recent record for a Luigi task ID. |
find_all_events() |
list[TaskEvent] |
All lifecycle events across all tasks. |
Database Schema
DbTaskHistory manages three tables:
-- Main task records
CREATE TABLE tasks (
id INTEGER PRIMARY KEY,
task_id VARCHAR(200), -- Luigi task ID string
name VARCHAR(128), -- Task family name
host VARCHAR(128) -- Worker host that last ran this task
);
CREATE INDEX ix_task_id ON tasks (task_id);
CREATE INDEX ix_tasks_name ON tasks (name);
-- Lifecycle events (PENDING, RUNNING, DONE, FAILED)
CREATE TABLE task_events (
id INTEGER PRIMARY KEY,
task_id INTEGER REFERENCES tasks(id),
event_name VARCHAR(20), -- PENDING, RUNNING, DONE, or FAILED
ts TIMESTAMP NOT NULL -- Event timestamp
);
CREATE INDEX ix_task_events_task_id ON task_events (task_id);
CREATE INDEX ix_task_events_ts ON task_events (ts);
-- Task parameters as key-value pairs
CREATE TABLE task_parameters (
task_id INTEGER REFERENCES tasks(id),
name VARCHAR(128), -- Parameter name
value TEXT, -- Parameter value
PRIMARY KEY (task_id, name)
);
Usage Examples
Enabling Task History
Add the following to luigi.cfg:
[scheduler]
record_task_history = True
[task_history]
db_connection = sqlite:////var/lib/luigi-server/task-history.db
Then start the scheduler:
luigid --port 8082
Using PostgreSQL for Task History
[scheduler]
record_task_history = True
[task_history]
db_connection = postgresql://luigi:password@db.example.com:5432/luigi_history
Querying Task History Programmatically
from luigi.db_task_history import DbTaskHistory
# DbTaskHistory reads its connection string from luigi.cfg [task_history] section
history = DbTaskHistory()
# Find all runs of a specific task
for task_record in history.find_all_by_name('MyDailyETL'):
print(f"Task: {task_record.name}, Host: {task_record.host}")
for event in task_record.events:
print(f" {event.event_name} at {event.ts}")
# Find tasks by parameters
for task_record in history.find_all_by_parameters('MyDailyETL', date='2026-02-10'):
print(f"Task ID: {task_record.task_id}")
for param_name, param in task_record.parameters.items():
print(f" {param_name} = {param.value}")
# Find tasks updated in the past 24 hours
recent_tasks = history.find_latest_runs()
for task_record in recent_tasks:
print(f"Recent: {task_record.name} on {task_record.host}")
Configuring Scheduler State Persistence
[scheduler]
state_path = /var/lib/luigi-server/state.pickle
Or via command line:
luigid --state-path /tmp/luigi-state.pickle --port 8082