Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi MSSqlTarget

From Leeroopedia


Knowledge Sources
Domains Database, SQL_Server
Last Updated 2026-02-10 08:00 GMT

Overview

MSSqlTarget is a Luigi target class for tracking task completion against a Microsoft SQL Server database. It uses the marker table pattern for idempotent execution tracking, recording an update_id in a dedicated marker table to indicate that a particular data load or transformation has been completed.

Description

The MSSqlTarget class extends luigi.Target and provides:

  • __init__(self, host, database, user, password, table, update_id): Initializes the target. The host parameter supports host:port format; if no port is specified, defaults to 1433.
  • touch(connection=None): Marks the update as complete by inserting or updating a row in the marker table. Uses an IF NOT EXISTS ... INSERT ... ELSE UPDATE pattern. Creates the marker table first if it does not exist. Asserts exists() after the operation.
  • exists(connection=None): Checks whether the update_id exists in the marker table. Handles the case where the marker table itself does not exist (SQL Server error 208) by returning False.
  • connect(): Creates and returns a pymssql._mssql connection object.
  • create_marker_table(): Creates the marker table if it does not already exist. Uses a separate connection. Handles SQL Server error 2714 (table already exists) gracefully.

The marker table name is configurable via luigi.cfg under the [mssql] section with the key marker-table (default: table_updates). Its schema includes: id (BIGINT IDENTITY), update_id (VARCHAR(128), PRIMARY KEY), target_table (VARCHAR(128)), and inserted (DATETIME, default GETDATE()).

Usage

Use MSSqlTarget as the return value from output() in Luigi tasks that load data into SQL Server. Call self.output().touch() at the end of run() to mark the task as complete.

Code Reference

Source Location

luigi/contrib/mssqldb.py (157 lines)

Signature

class MSSqlTarget(luigi.Target):
    marker_table = luigi.configuration.get_config().get(
        'mssql', 'marker-table', 'table_updates')

    def __init__(self, host, database, user, password, table, update_id):
        """
        :param host: SQL Server address (supports host:port format, default port 1433)
        :param database: Database name
        :param user: Database user
        :param password: User password
        :param table: Target table name
        :param update_id: Unique identifier for this data set / update
        """

    def touch(self, connection=None):
        """Mark this update as complete in the marker table."""

    def exists(self, connection=None):
        """Check if update_id exists in the marker table."""

    def connect(self):
        """Create and return a pymssql connection."""

    def create_marker_table(self):
        """Create marker table if it does not exist."""

Import

from luigi.contrib.mssqldb import MSSqlTarget

I/O Contract

Inputs

Input Type Description
host str SQL Server hostname, optionally with :port (default port: 1433).
database str Target database name.
user str Database username.
password str Database password.
table str Name of the target table being loaded.
update_id str Unique identifier for the specific update or data load.

Outputs

Output Type Description
exists() bool True if the update_id has been recorded in the marker table.
touch() Side effect Inserts or updates a row in the marker table with the update_id, target_table, and current timestamp.
Marker table SQL Server table Auto-created table (default: table_updates) storing completion records.

Usage Examples

from luigi.contrib.mssqldb import MSSqlTarget
import luigi

class LoadSalesData(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return MSSqlTarget(
            host='sqlserver.internal:1433',
            database='warehouse',
            user='etl_user',
            password='secret',
            table='sales_fact',
            update_id='load_sales_{}'.format(self.date)
        )

    def run(self):
        # Perform the data load into sales_fact table...
        connection = self.output().connect()
        try:
            # Execute INSERT/bulk load operations...
            pass
        finally:
            connection.close()

        # Mark the load as complete
        self.output().touch()

Related Pages

  • Spotify_Luigi_Database_Load_Verification -- Principle governing database load verification via marker tables
  • luigi.Target -- Base class for all Luigi targets
  • pymssql -- Microsoft SQL Server Python driver used for connectivity
  • luigi.contrib.mysqldb.MySqlTarget -- Similar marker-table pattern for MySQL
  • luigi.contrib.postgres.PostgresTarget -- Similar marker-table pattern for PostgreSQL

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment