Implementation:Spotify Luigi CopyToTable Schema
| Knowledge Sources | Spotify Luigi Repository |
|---|---|
| Domains | Pipeline_Orchestration, Database, ETL |
| Last Updated | 2026-02-10 00:00 GMT |
Overview
Concrete tool for defining database table schemas for pipeline output provided by Luigi.
Description
Luigis CopyToTable classes across the rdbms, postgres, mysqldb, and sqla modules each provide a columns attribute and a create_table() method that together define and create the target tables schema.
In the abstract base (rdbms.CopyToTable), the columns attribute defaults to an empty list and can be overridden with either plain column name strings or (name, type) tuples. The create_table(connection) method inspects the tuple length: if each element has two parts (name and type), it constructs a CREATE TABLE statement with comma-separated column definitions; if only names are given, it raises NotImplementedError.
In the PostgreSQL backend (postgres.CopyToTable), the run() method uses a two-attempt loop. On the first attempt, it tries to copy data. If a undefined_table database error occurs, it resets the connection and calls create_table() from the base class, then retries the copy.
In the SQLAlchemy backend (sqla.CopyToTable), columns are specified as ([name, SQLAlchemyType], {kwargs}) tuples. The create_table(engine) method constructs sqlalchemy.Column objects from these tuples and uses metadata.create_all(engine) to issue DDL. If the table already exists, the method calls metadata.reflect() to bind to the existing table. A reflect = True flag can skip column declaration entirely, loading the schema from the database.
The _MetadataColumnsMixin provides additional behavior: when enable_metadata_columns is set to True, the mixin can add extra columns to existing tables via ALTER TABLE statements during the init_copy() phase.
Usage
Use columns and create_table() when:
- You need the pipeline to create the target table automatically if it does not exist.
- You want column definitions co-located with the task that produces the data.
- You are using
luigi.contrib.postgres.CopyToTable,luigi.contrib.mysqldb.CopyToTable, orluigi.contrib.sqla.CopyToTable. - You want to use SQLAlchemys
reflectmode to bind to a pre-existing table without redeclaring its schema.
Code Reference
Source Location
- Abstract columns and create_table:
luigi/contrib/rdbms.py, lines 199-224 - Postgres CopyToTable (inherits create_table):
luigi/contrib/postgres.py, lines 302-368 - SQLAlchemy create_table:
luigi/contrib/sqla.py, lines 311-362 - Metadata columns mixin:
luigi/contrib/rdbms.py, lines 30-146
Signature
rdbms.CopyToTable.columns (attribute):
# List of column name strings:
columns = ['id', 'username', 'inserted']
# Or list of (name, type) tuples:
columns = [('id', 'SERIAL PRIMARY KEY'), ('username', 'VARCHAR(255)'), ('inserted', 'DATETIME')]
rdbms.CopyToTable.create_table (method):
def create_table(self, connection):
if len(self.columns[0]) == 1:
raise NotImplementedError(
"create_table() not implemented for %r and columns types not specified" % self.table
)
elif len(self.columns[0]) == 2:
coldefs = ','.join(
'{name} {type}'.format(name=name, type=type) for name, type in self.columns
)
query = "CREATE TABLE {table} ({coldefs})".format(table=self.table, coldefs=coldefs)
connection.cursor().execute(query)
sqla.CopyToTable.create_table (method):
def create_table(self, engine):
def construct_sqla_columns(columns):
retval = [sqlalchemy.Column(*c[0], **c[1]) for c in columns]
return retval
needs_setup = (len(self.columns) == 0) or (
False in [len(c) == 2 for c in self.columns]
) if not self.reflect else False
if needs_setup:
raise NotImplementedError(...)
else:
with engine.begin() as con:
metadata = sqlalchemy.MetaData(schema=self.schema) if self.schema else sqlalchemy.MetaData()
try:
if not con.dialect.has_table(con, self.table, self.schema or None):
sqla_columns = construct_sqla_columns(self.columns)
self.table_bound = sqlalchemy.Table(self.table, metadata, *sqla_columns)
metadata.create_all(engine)
else:
metadata.reflect(only=[self.table], bind=engine)
full_table = '.'.join([self.schema, self.table]) if self.schema else self.table
self.table_bound = metadata.tables[full_table]
except Exception as e:
self._logger.exception(self.table + str(e))
Import
from luigi.contrib import rdbms
from luigi.contrib import postgres
from luigi.contrib import sqla
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| columns | list |
List of column specifications; format varies by backend (see Signature above) |
| table | str |
Name of the target database table |
| connection | DBAPI 2.0 connection or SQLAlchemy engine | Database connection used to execute DDL statements |
| reflect | bool |
(SQLAlchemy only) If True, load schema from existing table instead of creating it
|
| schema | str |
(SQLAlchemy only) Database schema name for the target table |
Outputs
| Name | Type | Description |
|---|---|---|
| Created table | Database table | The target table is created in the database with the declared column definitions |
| table_bound | sqlalchemy.Table |
(SQLAlchemy only) A bound table object representing the created or reflected table |
Usage Examples
PostgreSQL Schema Definition
import luigi
import luigi.contrib.postgres
class LoadTopArtists(luigi.contrib.postgres.CopyToTable):
date_interval = luigi.DateIntervalParameter()
host = "localhost"
database = "toplists"
user = "luigi"
password = "abc123"
table = "top10"
# Schema defined as (name, type) tuples
columns = [
("date_from", "DATE"),
("date_to", "DATE"),
("artist", "TEXT"),
("streams", "INT"),
]
def requires(self):
return Top10Artists(self.date_interval)
SQLAlchemy Schema Definition
from sqlalchemy import String, Integer
import luigi
from luigi.contrib import sqla
class LoadItemProperties(sqla.CopyToTable):
# Columns as ([name, type], {kwargs}) tuples for sqlalchemy.Column()
columns = [
(["item", String(64)], {"primary_key": True}),
(["property", String(64)], {}),
]
connection_string = "sqlite:///example.db"
table = "item_property"
def rows(self):
for row in [("item1", "property1"), ("item2", "property2")]:
yield row
SQLAlchemy with Reflect (Pre-existing Table)
from luigi.contrib import sqla
import luigi
class LoadIntoExistingTable(sqla.CopyToTable):
# Skip column declarations; load schema from the database
reflect = True
connection_string = "postgresql://user:pass@localhost/mydb"
table = "existing_table"
def rows(self):
with self.input().open('r') as fobj:
for line in fobj:
yield line.strip('\n').split('\t')