Workflow:Spotify Luigi Database Ingestion Pipeline
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, ETL, Database, Data_Warehousing |
| Last Updated | 2026-02-10 12:00 GMT |
Overview
End-to-end process for loading processed data into relational databases or search indexes using Luigi's database contrib modules (PostgreSQL, MySQL, Redshift, Elasticsearch, SQLAlchemy).
Description
This workflow covers the final stage of a typical ETL pipeline: ingesting processed data into a target database or search engine for serving, analytics, or exploration. Luigi provides specialized CopyToTable task subclasses for multiple database backends. These tasks handle connection management, bulk data loading, schema definition, and idempotent completion tracking via marker tables. The marker table pattern ensures that a completed load is never repeated, even if the pipeline is re-run.
Usage
Execute this workflow when you have processed data (from upstream Luigi tasks) that needs to be bulk-loaded into a relational database (PostgreSQL, MySQL, Redshift) or a search index (Elasticsearch). This is the final step in pipelines that transform raw data into queryable storage for dashboards, APIs, or ad-hoc analysis.
Execution Steps
Step 1: Define the Upstream Data Source
Identify the upstream Luigi task that produces the data to be loaded. This task's output (typically a LocalTarget or HdfsTarget containing TSV, CSV, or JSON data) becomes the input to the database loading task. The upstream task must be complete before ingestion begins.
Key considerations:
- The upstream task output format must match the database loader's expected input
- CopyToTable expects an iterable of tuples (rows) via the rows() method
- Alternatively, the loader reads directly from the upstream task's output file
- Data should be clean and match the target table's schema before loading
Step 2: Configure Database Connection
Set database connection parameters on the CopyToTable subclass: host, database, user, password, and table name. For cloud databases (Redshift), additional parameters include AWS credentials and S3 staging paths. Connection parameters can be hardcoded, read from Luigi configuration, or passed as task parameters.
Key considerations:
- Connection credentials should be stored in Luigi config files, not in code
- For PostgreSQL: host, database, user, password, table
- For Redshift: adds aws_access_key_id, aws_secret_access_key, s3_load_path
- For SQLAlchemy: uses a connection_string parameter for database-agnostic access
- For Elasticsearch: host, port, index name, doc_type
Step 3: Define Table Schema
Specify the target table's column definitions on the task class. The columns attribute is a list of (name, type) tuples that define the table schema. For databases that support it, the task can auto-create the table if it does not exist. Primary keys and column constraints can also be specified.
Key considerations:
- columns = [("col_name", "TYPE"), ...] defines the schema
- The table is auto-created if it does not exist (for supported backends)
- Column order must match the order of values in each row
- Redshift supports COPY from S3 with column mapping
Step 4: Implement Data Loading
Subclass the appropriate CopyToTable variant and either implement rows() to yield data tuples, or rely on the default behavior that reads from the upstream task's output file. The base class handles opening a database connection, executing the bulk insert (COPY for Postgres, INSERT for others), and recording completion in a marker table.
Key considerations:
- Default behavior reads tab-separated lines from self.input().open('r')
- Override rows() for custom data transformation before insertion
- The marker table (table_updates) tracks which task+parameter combinations have been loaded
- Loading is transactional: partial loads are rolled back on failure
Step 5: Execute and Verify
Run the pipeline, which triggers the full dependency chain from data generation through transformation to database loading. After execution, verify the data in the target database. Re-running the pipeline skips already-loaded data (idempotency via marker table checks).
Key considerations:
- The marker table check happens in the complete() method
- Re-running is safe: completed loads are skipped automatically
- The execution summary shows which database load tasks succeeded or failed
- For Elasticsearch, verify via the search API; for databases, query the target table