Implementation:Spotify Luigi Marker Table Check
| Knowledge Sources | Spotify Luigi Repository |
|---|---|
| Domains | Pipeline_Orchestration, Database, ETL |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for verifying database load completion via marker table pattern provided by Luigi.
Description
Luigi implements the marker table verification pattern through the touch() and exists() methods on each database target class: PostgresTarget, MySqlTarget, and SQLAlchemyTarget. Each target also provides a create_marker_table() method that ensures the marker table exists before any read or write operation.
PostgresTarget (luigi/contrib/postgres.py): The marker table name defaults to table_updates and is configurable via [postgres] marker-table in the Luigi config. The table schema contains three columns: update_id TEXT PRIMARY KEY, target_table TEXT, and inserted TIMESTAMP. When use_db_timestamps is True (the default), the inserted column uses DEFAULT NOW() and the touch() method omits the timestamp from the INSERT. When False, the client-side datetime.datetime.now() is provided. The exists() method executes SELECT 1 FROM table_updates WHERE update_id = %s LIMIT 1 and returns True if a row is found. If the marker table does not exist, the DatabaseError with code undefined_table is caught and False is returned. The create_marker_table() method uses a separate auto-committed connection and catches duplicate_table errors.
MySqlTarget (luigi/contrib/mysqldb.py): Follows the same pattern using MySQL-specific syntax. The marker table includes an AUTO_INCREMENT id column, and the touch() method uses INSERT ... ON DUPLICATE KEY UPDATE to handle re-runs. After inserting, it asserts self.exists(connection) to verify the marker was written. The create_marker_table() catches ER_TABLE_EXISTS_ERROR.
SQLAlchemyTarget (luigi/contrib/sqla.py): Uses SQLAlchemy metadata and ORM constructs. The create_marker_table() method checks con.dialect.has_table() and either creates the table via metadata.create_all() or reflects the existing table. The touch() method uses table.insert() for new entries or table.update() for existing ones, and asserts self.exists() afterward. The exists() method uses a SQLAlchemy select() query filtered by both update_id and target_table.
In the CopyToTable.run() method of each backend, self.output().touch(connection) is called within the same transaction as the data load, immediately before connection.commit(). This ensures atomic consistency between the loaded data and the completion marker.
Usage
Use the marker table pattern whenever you:
- Subclass any of Luigis
CopyToTablevariants -- the marker table check is built into theoutput()target returned by default. - Need to verify task completion via
task.complete(), which delegates toself.output().exists(). - Want to re-run a specific task instance by deleting its marker row from the
table_updatestable. - Need to customize the marker table name or timestamp behavior.
Code Reference
Source Location
- PostgresTarget.touch:
luigi/contrib/postgres.py, lines 206-233 - PostgresTarget.exists:
luigi/contrib/postgres.py, lines 235-252 - PostgresTarget.create_marker_table:
luigi/contrib/postgres.py, lines 267-296 - MySqlTarget.touch:
luigi/contrib/mysqldb.py, lines 74-97 - MySqlTarget.exists:
luigi/contrib/mysqldb.py, lines 99-116 - SQLAlchemyTarget.touch:
luigi/contrib/sqla.py, lines 215-234 - SQLAlchemyTarget.exists:
luigi/contrib/sqla.py, lines 236-245 - rdbms.CopyToTable (update_id property):
luigi/contrib/rdbms.py, lines 226-231
Signature
PostgresTarget.touch:
def touch(self, connection=None):
"""Mark this update as complete."""
self.create_marker_table()
if connection is None:
connection = self.connect()
connection.autocommit = True
if self.use_db_timestamps:
connection.cursor().execute(
"""INSERT INTO {marker_table} (update_id, target_table)
VALUES (%s, %s)
""".format(marker_table=self.marker_table),
(self.update_id, self.table))
else:
connection.cursor().execute(
"""INSERT INTO {marker_table} (update_id, target_table, inserted)
VALUES (%s, %s, %s);
""".format(marker_table=self.marker_table),
(self.update_id, self.table, datetime.datetime.now()))
PostgresTarget.exists:
def exists(self, connection=None):
if connection is None:
connection = self.connect()
connection.autocommit = True
cursor = connection.cursor()
try:
cursor.execute(
"""SELECT 1 FROM {marker_table}
WHERE update_id = %s
LIMIT 1""".format(marker_table=self.marker_table),
(self.update_id,))
row = cursor.fetchone()
except dbapi.DatabaseError as e:
if db_error_code(e) == ERROR_UNDEFINED_TABLE:
row = None
else:
raise
return row is not None
PostgresTarget.create_marker_table:
def create_marker_table(self):
"""Create marker table if it doesn't exist."""
connection = self.connect()
connection.autocommit = True
cursor = connection.cursor()
if self.use_db_timestamps:
sql = """ CREATE TABLE {marker_table} (
update_id TEXT PRIMARY KEY,
target_table TEXT,
inserted TIMESTAMP DEFAULT NOW())
""".format(marker_table=self.marker_table)
else:
sql = """ CREATE TABLE {marker_table} (
update_id TEXT PRIMARY KEY,
target_table TEXT,
inserted TIMESTAMP);
""".format(marker_table=self.marker_table)
try:
cursor.execute(sql)
except dbapi.DatabaseError as e:
if db_error_code(e) == ERROR_DUPLICATE_TABLE:
pass
else:
raise
connection.close()
rdbms.CopyToTable.update_id (property):
@property
def update_id(self):
"""This update id will be a unique identifier for this insert on this table."""
return self.task_id
Import
from luigi.contrib.postgres import PostgresTarget
from luigi.contrib.mysqldb import MySqlTarget
from luigi.contrib.sqla import SQLAlchemyTarget
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| update_id | str |
Unique identifier for the task execution; defaults to task_id (class name + parameters)
|
| table | str |
Name of the target table that was loaded |
| connection | DBAPI 2.0 connection or None |
Optional connection to reuse; if None, a new auto-committed connection is created
|
| marker_table | str |
Name of the marker table (default: table_updates, configurable via Luigi config)
|
| use_db_timestamps | bool |
Whether to use database-side NOW() (default: True) or client-side timestamps
|
Outputs
| Name | Type | Description |
|---|---|---|
| exists() result | bool |
True if a marker row for this update_id is found; False otherwise
|
| Marker table row | Database row | A row in table_updates recording the update_id, target_table, and inserted timestamp
|
| Marker table (DDL) | Database table | The table_updates table itself, created automatically if it does not exist
|
Usage Examples
Default Marker Table Behavior
import luigi
import luigi.contrib.postgres
class LoadDailyData(luigi.contrib.postgres.CopyToTable):
date = luigi.DateParameter()
host = "localhost"
database = "warehouse"
user = "etl"
password = "secret"
table = "daily_data"
columns = [("date", "DATE"), ("value", "NUMERIC")]
def requires(self):
return ProcessData(date=self.date)
# output() returns PostgresTarget with update_id = self.task_id
# On first run: exists() returns False, run() loads data + touch()
# On second run: exists() returns True, task is skipped
Custom update_id for Re-run Control
import luigi
import luigi.contrib.postgres
class LoadPartitionedData(luigi.contrib.postgres.CopyToTable):
date = luigi.DateParameter()
version = luigi.IntParameter(default=1)
host = "localhost"
database = "warehouse"
user = "etl"
password = "secret"
table = "partitioned_data"
columns = [("date", "DATE"), ("metric", "TEXT"), ("value", "NUMERIC")]
@property
def update_id(self):
"""Include version in update_id to allow re-loading with a new version."""
return "{}_v{}".format(self.task_id, self.version)
def requires(self):
return ComputeMetrics(date=self.date)
Manually Checking and Resetting Completion
from luigi.contrib.postgres import PostgresTarget
# Check if a specific load has been completed
target = PostgresTarget(
host="localhost",
database="warehouse",
user="etl",
password="secret",
table="daily_data",
update_id="LoadDailyData(date=2026-01-15)",
)
if target.exists():
print("Load already completed")
else:
print("Load has not been completed")
# To force a re-run, delete the marker row:
# DELETE FROM table_updates WHERE update_id = 'LoadDailyData(date=2026-01-15)';