Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Heuristic:Spotify Luigi Marker Table Idempotency

From Leeroopedia
Revision as of 10:54, 16 February 2026 by Admin (talk | contribs) (Auto-imported from heuristics/Spotify_Luigi_Marker_Table_Idempotency.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)



Knowledge Sources
Domains Data_Integrity, Database
Last Updated 2026-02-10 07:00 GMT

Overview

Idempotent database loading pattern using a marker table with `update_id` to track which data loads have been completed.

Description

Luigi's database contrib modules (PostgreSQL, MySQL, SQLAlchemy) use a marker table pattern to achieve idempotent data loading. Instead of checking if the target table contains data (which is fragile and slow), a separate marker table (default: `table_updates`) records each successful data load identified by an `update_id`. The `exists()` check queries this marker table, and `touch()` inserts a record after successful loading. The entire load + marker insert happens in a single transaction, ensuring atomicity.

Usage

This pattern is automatically used by all Luigi database target classes (`PostgresTarget`, `MySQLTarget`, `SQLAlchemyTarget`). The key insight for users is that the `update_id` must be unique per logical data load. For `PostgresQuery`, the `update_id` should include dynamic parameters; otherwise the query will only execute once.

The Insight (Rule of Thumb)

  • Action: Always ensure `update_id` is dynamic (includes parameters like dates, batch IDs) when using database tasks. Override the `update_id` property if the default `task_id` is insufficient.
  • Value: Guarantees each data load runs exactly once; re-running a pipeline safely skips already-completed loads.
  • Trade-off: The marker table grows over time (one row per successful load). Consider periodic cleanup for long-running pipelines.

Reasoning

Database tables, unlike files, cannot easily be checked for "completeness" of a specific data load. A table might contain data from multiple loads, making it impossible to determine if a particular load succeeded just by checking row counts. The marker table provides a simple boolean answer: "Has this specific `update_id` been recorded as complete?"

The pattern also handles table creation gracefully: if the target table doesn't exist on the first attempt, Luigi catches the `UNDEFINED_TABLE` error, creates the table, and retries the load. The marker table itself is also auto-created.

Critical gotcha: `PostgresQuery` documentation explicitly warns that without a dynamic `update_id` (e.g., via parameters), the query will only execute once:

class PostgresQuery(rdbms.Query):
    """
    Task instances require a dynamic `update_id`, e.g. via parameter(s),
    otherwise the query will only execute once
    """

Code Evidence

Marker table configuration from `luigi/contrib/postgres.py:171`:

marker_table = luigi.configuration.get_config().get(
    'postgres', 'marker-table', 'table_updates')

Existence check from `luigi/contrib/postgres.py:226-244`:

def exists(self, connection=None):
    if connection is None:
        connection = self.connect()
        connection.autocommit = True
    cursor = connection.cursor()
    try:
        cursor.execute("""SELECT 1 FROM {marker_table}
            WHERE update_id = %s
            LIMIT 1""".format(marker_table=self.marker_table),
                       (self.update_id,))
        row = cursor.fetchone()
    except dbapi.DatabaseError as e:
        if db_error_code(e) == ERROR_UNDEFINED_TABLE:
            row = None
        else:
            raise
    return row is not None

Atomic load + touch in single transaction from `luigi/contrib/postgres.py:400-430`:

for attempt in range(2):
    try:
        cursor = connection.cursor()
        self.init_copy(connection)
        self.copy(cursor, tmp_file)
        self.post_copy(connection)
    except dbapi.DatabaseError as e:
        if db_error_code(e) == ERROR_UNDEFINED_TABLE and attempt == 0:
            logger.info("Creating table %s", self.table)
            if hasattr(connection, 'reset'):
                connection.reset()
            else:
                _pg8000_connection_reset(connection)
            self.create_table(connection)
        else:
            raise
    else:
        break

# mark as complete in same transaction
self.output().touch(connection)
connection.commit()

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment