Principle:Spotify Luigi Database Load Verification
| Knowledge Sources | Spotify Luigi Repository |
|---|---|
| Domains | Pipeline_Orchestration, Database, ETL |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Database Load Verification is the technique of recording each successful data load into a dedicated marker table and querying that table on subsequent runs to determine whether a given load has already been completed, thereby ensuring idempotent pipeline execution.
Description
In a pipeline orchestration system, every task must be able to answer one fundamental question: "Has this work already been done?" For file-based tasks, the answer is straightforward -- check whether the output file exists. For database-loading tasks, however, the situation is more nuanced. The data itself does not have a simple "exists or does not exist" quality -- rows may be partially loaded, the table may be shared across multiple task instances, or the data may be append-only with no way to distinguish one load from another.
The marker table pattern solves this by introducing an auxiliary table (commonly called table_updates) that acts as a registry of completed loads. Each entry in the marker table records:
- update_id: A unique identifier for the specific task execution (typically the tasks unique ID, derived from its class name and parameters).
- target_table: The name of the table that was loaded.
- inserted: A timestamp recording when the load was completed.
The verification process has two sides:
- Checking completion (
exists()): Before a task runs, the scheduler queries the marker table for a row matching the tasksupdate_id. If a matching row is found, the task is considered complete and is skipped. This is the read side of the pattern. - Recording completion (
touch()): After a task successfully loads data and within the same transaction, it inserts a row into the marker table. This is the write side of the pattern. Because the marker row insertion is part of the same transaction as the data load, either both succeed (data loaded and marked complete) or both are rolled back (no partial state).
Additional robustness features include:
- Automatic marker table creation: If the marker table does not yet exist, the framework creates it automatically using a separate auto-committed connection, so that the main transaction is not disrupted.
- Graceful error handling: If querying the marker table fails because the table does not exist (e.g., on a fresh database), the
exists()method catches the specific error and returnsFalserather than propagating the exception. - Configurable table name: The marker table name can be customized via configuration (e.g.,
[postgres] marker-table = table_updates), allowing different pipelines or environments to use separate tracking tables. - Timestamp source selection: Timestamps can come from the database server (via
DEFAULT NOW()) or from the client, depending on theuse_db_timestampsflag.
Usage
Use Database Load Verification when:
- Your pipeline loads data into a database and needs idempotent task execution -- running the same task twice should not duplicate data or fail.
- You need a reliable way to track which specific task instances (identified by their parameters) have completed their database loads.
- You want the completion record to be transactionally consistent with the data load itself, so that a crash mid-load does not leave a false "completed" marker.
- Your pipeline has many task instances writing to the same table (e.g., daily partition loads) and each must be independently tracked.
- You need the scheduler to efficiently skip already-completed tasks without scanning the data table itself.
Theoretical Basis
Database Load Verification is an application of the Write-Ahead Logging and Two-Phase Commit principles adapted for pipeline orchestration. The marker table serves as a commit log: the data load and the marker insertion are performed within a single transaction, achieving atomicity.
The algorithm for the marker table pattern proceeds as follows:
- Check phase (
exists()):- Open a connection to the database (using auto-commit for the check query).
- Execute:
SELECT 1 FROM marker_table WHERE update_id = ? LIMIT 1. - If a row is returned, the task is complete; return
True. - If the query raises an "undefined table" error (marker table does not exist), catch the error and return
False. - If no row is returned, the task has not been completed; return
False.
- Load phase (
run()):- Execute the data loading operations (init_copy, copy, post_copy) within a transaction.
- Call
touch(connection)within the same transaction.
- Record phase (
touch()):- Ensure the marker table exists by calling
create_marker_table(), which uses a separate auto-committed connection and wraps theCREATE TABLEin error handling for "table already exists." - Insert a row:
INSERT INTO marker_table (update_id, target_table, inserted) VALUES (?, ?, ?). - The insertion is not separately committed; it will be committed along with the data load when
run()callsconnection.commit().
- Ensure the marker table exists by calling
- Commit phase:
- The
run()method commits the transaction, making both the loaded data and the marker row visible simultaneously. - If the commit fails, both the data and the marker are rolled back.
- The
This ensures that the marker table is always consistent with the actual data: a marker row exists if and only if the corresponding data has been successfully loaded and committed.