Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi CopyToTable Connection

From Leeroopedia
Revision as of 16:46, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_CopyToTable_Connection.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources Spotify Luigi Repository
Domains Pipeline_Orchestration, Database, ETL
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for configuring database connections for pipeline data loading provided by Luigi.

Description

Luigis rdbms.CopyToTable base class (in luigi/contrib/rdbms.py) defines an abstract task interface that requires subclasses to specify database connection parameters as properties: host, database, user, password, table, and optionally port. Each backend-specific module -- luigi.contrib.postgres, luigi.contrib.mysqldb, and luigi.contrib.sqla -- provides a concrete CopyToTable subclass and a corresponding Target class that consumes these parameters to establish a live database connection.

For PostgreSQL, the PostgresTarget class accepts host, database, user, password, table, update_id, and port, then calls psycopg2.connect() (or pg8000.dbapi.connect()) in its connect() method. The host parameter supports a host:port format string, which is automatically parsed. The default port is 5432.

For MySQL, the MySqlTarget class follows the same pattern using mysql.connector.connect(), defaulting to port 3306, and supports additional keyword arguments passed through cnx_kwargs.

For SQLAlchemy, the SQLAlchemyTarget class replaces individual parameters with a single connection_string (a SQLAlchemy connection URI) and uses sqlalchemy.create_engine() to build a reusable engine instance, cached per connection string and process ID.

Usage

Use these connection classes when your Luigi pipeline task needs to load data into a relational database. Subclass the appropriate CopyToTable variant and set the connection properties:

  • Use luigi.contrib.postgres.CopyToTable for PostgreSQL databases.
  • Use luigi.contrib.mysqldb.CopyToTable for MySQL databases.
  • Use luigi.contrib.sqla.CopyToTable for any SQLAlchemy-supported database.

Code Reference

Source Location

  • Abstract base: luigi/contrib/rdbms.py, lines 148-266, class CopyToTable
  • PostgreSQL target: luigi/contrib/postgres.py, lines 165-299, class PostgresTarget
  • MySQL target: luigi/contrib/mysqldb.py, lines 34-154, class MySqlTarget
  • SQLAlchemy target: luigi/contrib/sqla.py, lines 153-272, class SQLAlchemyTarget

Signature

rdbms.CopyToTable abstract properties:

class CopyToTable(luigi.task.MixinNaiveBulkComplete, _MetadataColumnsMixin, luigi.Task):
    @property
    @abc.abstractmethod
    def host(self):
        return None

    @property
    @abc.abstractmethod
    def database(self):
        return None

    @property
    @abc.abstractmethod
    def user(self):
        return None

    @property
    @abc.abstractmethod
    def password(self):
        return None

    @property
    @abc.abstractmethod
    def table(self):
        return None

    @property
    def port(self):
        return None

PostgresTarget.__init__:

def __init__(self, host, database, user, password, table, update_id, port=None):

PostgresTarget.connect:

def connect(self):
    connection = dbapi.connect(
        host=self.host,
        port=self.port,
        database=self.database,
        user=self.user,
        password=self.password)
    connection.set_client_encoding('utf-8')
    return connection

SQLAlchemyTarget.__init__:

def __init__(self, connection_string, target_table, update_id, echo=False, connect_args=None):

Import

from luigi.contrib import rdbms
from luigi.contrib import postgres
from luigi.contrib import mysqldb
from luigi.contrib import sqla

I/O Contract

Inputs

Name Type Description
host str Database server hostname; may include port as "host:port"
database str Name of the target database
user str Database user for authentication
password str Password for the database user
table str Name of the target table for data loading
port int or None Database server port (defaults to 5432 for Postgres, 3306 for MySQL)
connection_string str SQLAlchemy connection URI (used by sqla.CopyToTable instead of individual parameters)
connect_args dict Additional keyword arguments passed to the DBAPI connect() call

Outputs

Name Type Description
connection DBAPI 2.0 connection object A live database connection used for table creation, data copying, and marker table operations
engine sqlalchemy.engine.Engine A SQLAlchemy engine instance (for sqla backend), cached per connection string and PID

Usage Examples

PostgreSQL Connection Configuration

import luigi
import luigi.contrib.postgres


class LoadSalesData(luigi.contrib.postgres.CopyToTable):
    date = luigi.DateParameter()

    host = "db.example.com"
    database = "analytics"
    user = "etl_user"
    password = "secure_password"
    table = "daily_sales"
    port = 5432

    columns = [
        ("sale_date", "DATE"),
        ("product_id", "INT"),
        ("revenue", "NUMERIC"),
    ]

    def requires(self):
        return ProcessSalesData(date=self.date)

SQLAlchemy Connection Configuration

from sqlalchemy import String, Integer, Float
import luigi
from luigi.contrib import sqla


class LoadSalesDataSQLA(sqla.CopyToTable):
    date = luigi.DateParameter()

    connection_string = "postgresql://etl_user:secure_password@db.example.com:5432/analytics"
    table = "daily_sales"

    columns = [
        (["sale_date", String(10)], {}),
        (["product_id", Integer()], {}),
        (["revenue", Float()], {}),
    ]

    def requires(self):
        return ProcessSalesData(date=self.date)

Real-World Example from top_artists.py

class ArtistToplistToDatabase(luigi.contrib.postgres.CopyToTable):
    date_interval = luigi.DateIntervalParameter()
    use_spark = luigi.BoolParameter()

    host = "localhost"
    database = "toplists"
    user = "luigi"
    password = "abc123"
    table = "top10"

    columns = [("date_from", "DATE"),
               ("date_to", "DATE"),
               ("artist", "TEXT"),
               ("streams", "INT")]

    def requires(self):
        return Top10Artists(self.date_interval, self.use_spark)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment