Heuristic:Spotify Luigi Marker Table Idempotency
| Knowledge Sources | |
|---|---|
| Domains | Data_Integrity, Database |
| Last Updated | 2026-02-10 07:00 GMT |
Overview
Idempotent database loading pattern using a marker table with `update_id` to track which data loads have been completed.
Description
Luigi's database contrib modules (PostgreSQL, MySQL, SQLAlchemy) use a marker table pattern to achieve idempotent data loading. Instead of checking if the target table contains data (which is fragile and slow), a separate marker table (default: `table_updates`) records each successful data load identified by an `update_id`. The `exists()` check queries this marker table, and `touch()` inserts a record after successful loading. The entire load + marker insert happens in a single transaction, ensuring atomicity.
Usage
This pattern is automatically used by all Luigi database target classes (`PostgresTarget`, `MySQLTarget`, `SQLAlchemyTarget`). The key insight for users is that the `update_id` must be unique per logical data load. For `PostgresQuery`, the `update_id` should include dynamic parameters; otherwise the query will only execute once.
The Insight (Rule of Thumb)
- Action: Always ensure `update_id` is dynamic (includes parameters like dates, batch IDs) when using database tasks. Override the `update_id` property if the default `task_id` is insufficient.
- Value: Guarantees each data load runs exactly once; re-running a pipeline safely skips already-completed loads.
- Trade-off: The marker table grows over time (one row per successful load). Consider periodic cleanup for long-running pipelines.
Reasoning
Database tables, unlike files, cannot easily be checked for "completeness" of a specific data load. A table might contain data from multiple loads, making it impossible to determine if a particular load succeeded just by checking row counts. The marker table provides a simple boolean answer: "Has this specific `update_id` been recorded as complete?"
The pattern also handles table creation gracefully: if the target table doesn't exist on the first attempt, Luigi catches the `UNDEFINED_TABLE` error, creates the table, and retries the load. The marker table itself is also auto-created.
Critical gotcha: `PostgresQuery` documentation explicitly warns that without a dynamic `update_id` (e.g., via parameters), the query will only execute once:
class PostgresQuery(rdbms.Query):
"""
Task instances require a dynamic `update_id`, e.g. via parameter(s),
otherwise the query will only execute once
"""
Code Evidence
Marker table configuration from `luigi/contrib/postgres.py:171`:
marker_table = luigi.configuration.get_config().get(
'postgres', 'marker-table', 'table_updates')
Existence check from `luigi/contrib/postgres.py:226-244`:
def exists(self, connection=None):
if connection is None:
connection = self.connect()
connection.autocommit = True
cursor = connection.cursor()
try:
cursor.execute("""SELECT 1 FROM {marker_table}
WHERE update_id = %s
LIMIT 1""".format(marker_table=self.marker_table),
(self.update_id,))
row = cursor.fetchone()
except dbapi.DatabaseError as e:
if db_error_code(e) == ERROR_UNDEFINED_TABLE:
row = None
else:
raise
return row is not None
Atomic load + touch in single transaction from `luigi/contrib/postgres.py:400-430`:
for attempt in range(2):
try:
cursor = connection.cursor()
self.init_copy(connection)
self.copy(cursor, tmp_file)
self.post_copy(connection)
except dbapi.DatabaseError as e:
if db_error_code(e) == ERROR_UNDEFINED_TABLE and attempt == 0:
logger.info("Creating table %s", self.table)
if hasattr(connection, 'reset'):
connection.reset()
else:
_pg8000_connection_reset(connection)
self.create_table(connection)
else:
raise
else:
break
# mark as complete in same transaction
self.output().touch(connection)
connection.commit()