Implementation:Spotify Luigi CopyToTable Connection
| Knowledge Sources | Spotify Luigi Repository |
|---|---|
| Domains | Pipeline_Orchestration, Database, ETL |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for configuring database connections for pipeline data loading provided by Luigi.
Description
Luigis rdbms.CopyToTable base class (in luigi/contrib/rdbms.py) defines an abstract task interface that requires subclasses to specify database connection parameters as properties: host, database, user, password, table, and optionally port. Each backend-specific module -- luigi.contrib.postgres, luigi.contrib.mysqldb, and luigi.contrib.sqla -- provides a concrete CopyToTable subclass and a corresponding Target class that consumes these parameters to establish a live database connection.
For PostgreSQL, the PostgresTarget class accepts host, database, user, password, table, update_id, and port, then calls psycopg2.connect() (or pg8000.dbapi.connect()) in its connect() method. The host parameter supports a host:port format string, which is automatically parsed. The default port is 5432.
For MySQL, the MySqlTarget class follows the same pattern using mysql.connector.connect(), defaulting to port 3306, and supports additional keyword arguments passed through cnx_kwargs.
For SQLAlchemy, the SQLAlchemyTarget class replaces individual parameters with a single connection_string (a SQLAlchemy connection URI) and uses sqlalchemy.create_engine() to build a reusable engine instance, cached per connection string and process ID.
Usage
Use these connection classes when your Luigi pipeline task needs to load data into a relational database. Subclass the appropriate CopyToTable variant and set the connection properties:
- Use
luigi.contrib.postgres.CopyToTablefor PostgreSQL databases. - Use
luigi.contrib.mysqldb.CopyToTablefor MySQL databases. - Use
luigi.contrib.sqla.CopyToTablefor any SQLAlchemy-supported database.
Code Reference
Source Location
- Abstract base:
luigi/contrib/rdbms.py, lines 148-266, classCopyToTable - PostgreSQL target:
luigi/contrib/postgres.py, lines 165-299, classPostgresTarget - MySQL target:
luigi/contrib/mysqldb.py, lines 34-154, classMySqlTarget - SQLAlchemy target:
luigi/contrib/sqla.py, lines 153-272, classSQLAlchemyTarget
Signature
rdbms.CopyToTable abstract properties:
class CopyToTable(luigi.task.MixinNaiveBulkComplete, _MetadataColumnsMixin, luigi.Task):
@property
@abc.abstractmethod
def host(self):
return None
@property
@abc.abstractmethod
def database(self):
return None
@property
@abc.abstractmethod
def user(self):
return None
@property
@abc.abstractmethod
def password(self):
return None
@property
@abc.abstractmethod
def table(self):
return None
@property
def port(self):
return None
PostgresTarget.__init__:
def __init__(self, host, database, user, password, table, update_id, port=None):
PostgresTarget.connect:
def connect(self):
connection = dbapi.connect(
host=self.host,
port=self.port,
database=self.database,
user=self.user,
password=self.password)
connection.set_client_encoding('utf-8')
return connection
SQLAlchemyTarget.__init__:
def __init__(self, connection_string, target_table, update_id, echo=False, connect_args=None):
Import
from luigi.contrib import rdbms
from luigi.contrib import postgres
from luigi.contrib import mysqldb
from luigi.contrib import sqla
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| host | str |
Database server hostname; may include port as "host:port"
|
| database | str |
Name of the target database |
| user | str |
Database user for authentication |
| password | str |
Password for the database user |
| table | str |
Name of the target table for data loading |
| port | int or None |
Database server port (defaults to 5432 for Postgres, 3306 for MySQL) |
| connection_string | str |
SQLAlchemy connection URI (used by sqla.CopyToTable instead of individual parameters)
|
| connect_args | dict |
Additional keyword arguments passed to the DBAPI connect() call
|
Outputs
| Name | Type | Description |
|---|---|---|
| connection | DBAPI 2.0 connection object | A live database connection used for table creation, data copying, and marker table operations |
| engine | sqlalchemy.engine.Engine |
A SQLAlchemy engine instance (for sqla backend), cached per connection string and PID
|
Usage Examples
PostgreSQL Connection Configuration
import luigi
import luigi.contrib.postgres
class LoadSalesData(luigi.contrib.postgres.CopyToTable):
date = luigi.DateParameter()
host = "db.example.com"
database = "analytics"
user = "etl_user"
password = "secure_password"
table = "daily_sales"
port = 5432
columns = [
("sale_date", "DATE"),
("product_id", "INT"),
("revenue", "NUMERIC"),
]
def requires(self):
return ProcessSalesData(date=self.date)
SQLAlchemy Connection Configuration
from sqlalchemy import String, Integer, Float
import luigi
from luigi.contrib import sqla
class LoadSalesDataSQLA(sqla.CopyToTable):
date = luigi.DateParameter()
connection_string = "postgresql://etl_user:secure_password@db.example.com:5432/analytics"
table = "daily_sales"
columns = [
(["sale_date", String(10)], {}),
(["product_id", Integer()], {}),
(["revenue", Float()], {}),
]
def requires(self):
return ProcessSalesData(date=self.date)
Real-World Example from top_artists.py
class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable):
date_interval = luigi.DateIntervalParameter()
use_spark = luigi.BoolParameter()
host = "localhost"
database = "toplists"
user = "luigi"
password = "abc123"
table = "top10"
columns = [("date_from", "DATE"),
("date_to", "DATE"),
("artist", "TEXT"),
("streams", "INT")]
def requires(self):
return Top10Artists(self.date_interval, self.use_spark)