Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi CopyToTable Schema

From Leeroopedia


Knowledge Sources Spotify Luigi Repository
Domains Pipeline_Orchestration, Database, ETL
Last Updated 2026-02-10 00:00 GMT

Overview

Concrete tool for defining database table schemas for pipeline output provided by Luigi.

Description

Luigis CopyToTable classes across the rdbms, postgres, mysqldb, and sqla modules each provide a columns attribute and a create_table() method that together define and create the target tables schema.

In the abstract base (rdbms.CopyToTable), the columns attribute defaults to an empty list and can be overridden with either plain column name strings or (name, type) tuples. The create_table(connection) method inspects the tuple length: if each element has two parts (name and type), it constructs a CREATE TABLE statement with comma-separated column definitions; if only names are given, it raises NotImplementedError.

In the PostgreSQL backend (postgres.CopyToTable), the run() method uses a two-attempt loop. On the first attempt, it tries to copy data. If a undefined_table database error occurs, it resets the connection and calls create_table() from the base class, then retries the copy.

In the SQLAlchemy backend (sqla.CopyToTable), columns are specified as ([name, SQLAlchemyType], {kwargs}) tuples. The create_table(engine) method constructs sqlalchemy.Column objects from these tuples and uses metadata.create_all(engine) to issue DDL. If the table already exists, the method calls metadata.reflect() to bind to the existing table. A reflect = True flag can skip column declaration entirely, loading the schema from the database.

The _MetadataColumnsMixin provides additional behavior: when enable_metadata_columns is set to True, the mixin can add extra columns to existing tables via ALTER TABLE statements during the init_copy() phase.

Usage

Use columns and create_table() when:

  • You need the pipeline to create the target table automatically if it does not exist.
  • You want column definitions co-located with the task that produces the data.
  • You are using luigi.contrib.postgres.CopyToTable, luigi.contrib.mysqldb.CopyToTable, or luigi.contrib.sqla.CopyToTable.
  • You want to use SQLAlchemys reflect mode to bind to a pre-existing table without redeclaring its schema.

Code Reference

Source Location

  • Abstract columns and create_table: luigi/contrib/rdbms.py, lines 199-224
  • Postgres CopyToTable (inherits create_table): luigi/contrib/postgres.py, lines 302-368
  • SQLAlchemy create_table: luigi/contrib/sqla.py, lines 311-362
  • Metadata columns mixin: luigi/contrib/rdbms.py, lines 30-146

Signature

rdbms.CopyToTable.columns (attribute):

# List of column name strings:
columns = ['id', 'username', 'inserted']

# Or list of (name, type) tuples:
columns = [('id', 'SERIAL PRIMARY KEY'), ('username', 'VARCHAR(255)'), ('inserted', 'DATETIME')]

rdbms.CopyToTable.create_table (method):

def create_table(self, connection):
    if len(self.columns[0]) == 1:
        raise NotImplementedError(
            "create_table() not implemented for %r and columns types not specified" % self.table
        )
    elif len(self.columns[0]) == 2:
        coldefs = ','.join(
            '{name} {type}'.format(name=name, type=type) for name, type in self.columns
        )
        query = "CREATE TABLE {table} ({coldefs})".format(table=self.table, coldefs=coldefs)
        connection.cursor().execute(query)

sqla.CopyToTable.create_table (method):

def create_table(self, engine):
    def construct_sqla_columns(columns):
        retval = [sqlalchemy.Column(*c[0], **c[1]) for c in columns]
        return retval

    needs_setup = (len(self.columns) == 0) or (
        False in [len(c) == 2 for c in self.columns]
    ) if not self.reflect else False

    if needs_setup:
        raise NotImplementedError(...)
    else:
        with engine.begin() as con:
            metadata = sqlalchemy.MetaData(schema=self.schema) if self.schema else sqlalchemy.MetaData()
            try:
                if not con.dialect.has_table(con, self.table, self.schema or None):
                    sqla_columns = construct_sqla_columns(self.columns)
                    self.table_bound = sqlalchemy.Table(self.table, metadata, *sqla_columns)
                    metadata.create_all(engine)
                else:
                    metadata.reflect(only=[self.table], bind=engine)
                    full_table = '.'.join([self.schema, self.table]) if self.schema else self.table
                    self.table_bound = metadata.tables[full_table]
            except Exception as e:
                self._logger.exception(self.table + str(e))

Import

from luigi.contrib import rdbms
from luigi.contrib import postgres
from luigi.contrib import sqla

I/O Contract

Inputs

Name Type Description
columns list List of column specifications; format varies by backend (see Signature above)
table str Name of the target database table
connection DBAPI 2.0 connection or SQLAlchemy engine Database connection used to execute DDL statements
reflect bool (SQLAlchemy only) If True, load schema from existing table instead of creating it
schema str (SQLAlchemy only) Database schema name for the target table

Outputs

Name Type Description
Created table Database table The target table is created in the database with the declared column definitions
table_bound sqlalchemy.Table (SQLAlchemy only) A bound table object representing the created or reflected table

Usage Examples

PostgreSQL Schema Definition

import luigi
import luigi.contrib.postgres


class LoadTopArtists(luigi.contrib.postgres.CopyToTable):
    date_interval = luigi.DateIntervalParameter()

    host = "localhost"
    database = "toplists"
    user = "luigi"
    password = "abc123"
    table = "top10"

    # Schema defined as (name, type) tuples
    columns = [
        ("date_from", "DATE"),
        ("date_to", "DATE"),
        ("artist", "TEXT"),
        ("streams", "INT"),
    ]

    def requires(self):
        return Top10Artists(self.date_interval)

SQLAlchemy Schema Definition

from sqlalchemy import String, Integer
import luigi
from luigi.contrib import sqla


class LoadItemProperties(sqla.CopyToTable):
    # Columns as ([name, type], {kwargs}) tuples for sqlalchemy.Column()
    columns = [
        (["item", String(64)], {"primary_key": True}),
        (["property", String(64)], {}),
    ]
    connection_string = "sqlite:///example.db"
    table = "item_property"

    def rows(self):
        for row in [("item1", "property1"), ("item2", "property2")]:
            yield row

SQLAlchemy with Reflect (Pre-existing Table)

from luigi.contrib import sqla
import luigi


class LoadIntoExistingTable(sqla.CopyToTable):
    # Skip column declarations; load schema from the database
    reflect = True
    connection_string = "postgresql://user:pass@localhost/mydb"
    table = "existing_table"

    def rows(self):
        with self.input().open('r') as fobj:
            for line in fobj:
                yield line.strip('\n').split('\t')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment