Implementation:Spotify Luigi MSSqlTarget
| Knowledge Sources | |
|---|---|
| Domains | Database, SQL_Server |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
MSSqlTarget is a Luigi target class for tracking task completion against a Microsoft SQL Server database. It uses the marker table pattern for idempotent execution tracking, recording an update_id in a dedicated marker table to indicate that a particular data load or transformation has been completed.
Description
The MSSqlTarget class extends luigi.Target and provides:
__init__(self, host, database, user, password, table, update_id): Initializes the target. Thehostparameter supportshost:portformat; if no port is specified, defaults to 1433.touch(connection=None): Marks the update as complete by inserting or updating a row in the marker table. Uses anIF NOT EXISTS ... INSERT ... ELSE UPDATEpattern. Creates the marker table first if it does not exist. Assertsexists()after the operation.exists(connection=None): Checks whether theupdate_idexists in the marker table. Handles the case where the marker table itself does not exist (SQL Server error 208) by returningFalse.connect(): Creates and returns apymssql._mssqlconnection object.create_marker_table(): Creates the marker table if it does not already exist. Uses a separate connection. Handles SQL Server error 2714 (table already exists) gracefully.
The marker table name is configurable via luigi.cfg under the [mssql] section with the key marker-table (default: table_updates). Its schema includes: id (BIGINT IDENTITY), update_id (VARCHAR(128), PRIMARY KEY), target_table (VARCHAR(128)), and inserted (DATETIME, default GETDATE()).
Usage
Use MSSqlTarget as the return value from output() in Luigi tasks that load data into SQL Server. Call self.output().touch() at the end of run() to mark the task as complete.
Code Reference
Source Location
luigi/contrib/mssqldb.py (157 lines)
Signature
class MSSqlTarget(luigi.Target):
marker_table = luigi.configuration.get_config().get(
'mssql', 'marker-table', 'table_updates')
def __init__(self, host, database, user, password, table, update_id):
"""
:param host: SQL Server address (supports host:port format, default port 1433)
:param database: Database name
:param user: Database user
:param password: User password
:param table: Target table name
:param update_id: Unique identifier for this data set / update
"""
def touch(self, connection=None):
"""Mark this update as complete in the marker table."""
def exists(self, connection=None):
"""Check if update_id exists in the marker table."""
def connect(self):
"""Create and return a pymssql connection."""
def create_marker_table(self):
"""Create marker table if it does not exist."""
Import
from luigi.contrib.mssqldb import MSSqlTarget
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
host |
str |
SQL Server hostname, optionally with :port (default port: 1433).
|
database |
str |
Target database name. |
user |
str |
Database username. |
password |
str |
Database password. |
table |
str |
Name of the target table being loaded. |
update_id |
str |
Unique identifier for the specific update or data load. |
Outputs
| Output | Type | Description |
|---|---|---|
exists() |
bool |
True if the update_id has been recorded in the marker table.
|
touch() |
Side effect | Inserts or updates a row in the marker table with the update_id, target_table, and current timestamp.
|
| Marker table | SQL Server table | Auto-created table (default: table_updates) storing completion records.
|
Usage Examples
from luigi.contrib.mssqldb import MSSqlTarget
import luigi
class LoadSalesData(luigi.Task):
date = luigi.DateParameter()
def output(self):
return MSSqlTarget(
host='sqlserver.internal:1433',
database='warehouse',
user='etl_user',
password='secret',
table='sales_fact',
update_id='load_sales_{}'.format(self.date)
)
def run(self):
# Perform the data load into sales_fact table...
connection = self.output().connect()
try:
# Execute INSERT/bulk load operations...
pass
finally:
connection.close()
# Mark the load as complete
self.output().touch()
Related Pages
- Spotify_Luigi_Database_Load_Verification -- Principle governing database load verification via marker tables
luigi.Target-- Base class for all Luigi targetspymssql-- Microsoft SQL Server Python driver used for connectivityluigi.contrib.mysqldb.MySqlTarget-- Similar marker-table pattern for MySQLluigi.contrib.postgres.PostgresTarget-- Similar marker-table pattern for PostgreSQL