Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Spotify Luigi Database Ingestion Pipeline

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, ETL, Database, Data_Warehousing
Last Updated 2026-02-10 12:00 GMT

Overview

End-to-end process for loading processed data into relational databases or search indexes using Luigi's database contrib modules (PostgreSQL, MySQL, Redshift, Elasticsearch, SQLAlchemy).

Description

This workflow covers the final stage of a typical ETL pipeline: ingesting processed data into a target database or search engine for serving, analytics, or exploration. Luigi provides specialized CopyToTable task subclasses for multiple database backends. These tasks handle connection management, bulk data loading, schema definition, and idempotent completion tracking via marker tables. The marker table pattern ensures that a completed load is never repeated, even if the pipeline is re-run.

Usage

Execute this workflow when you have processed data (from upstream Luigi tasks) that needs to be bulk-loaded into a relational database (PostgreSQL, MySQL, Redshift) or a search index (Elasticsearch). This is the final step in pipelines that transform raw data into queryable storage for dashboards, APIs, or ad-hoc analysis.

Execution Steps

Step 1: Define the Upstream Data Source

Identify the upstream Luigi task that produces the data to be loaded. This task's output (typically a LocalTarget or HdfsTarget containing TSV, CSV, or JSON data) becomes the input to the database loading task. The upstream task must be complete before ingestion begins.

Key considerations:

  • The upstream task output format must match the database loader's expected input
  • CopyToTable expects an iterable of tuples (rows) via the rows() method
  • Alternatively, the loader reads directly from the upstream task's output file
  • Data should be clean and match the target table's schema before loading

Step 2: Configure Database Connection

Set database connection parameters on the CopyToTable subclass: host, database, user, password, and table name. For cloud databases (Redshift), additional parameters include AWS credentials and S3 staging paths. Connection parameters can be hardcoded, read from Luigi configuration, or passed as task parameters.

Key considerations:

  • Connection credentials should be stored in Luigi config files, not in code
  • For PostgreSQL: host, database, user, password, table
  • For Redshift: adds aws_access_key_id, aws_secret_access_key, s3_load_path
  • For SQLAlchemy: uses a connection_string parameter for database-agnostic access
  • For Elasticsearch: host, port, index name, doc_type

Step 3: Define Table Schema

Specify the target table's column definitions on the task class. The columns attribute is a list of (name, type) tuples that define the table schema. For databases that support it, the task can auto-create the table if it does not exist. Primary keys and column constraints can also be specified.

Key considerations:

  • columns = [("col_name", "TYPE"), ...] defines the schema
  • The table is auto-created if it does not exist (for supported backends)
  • Column order must match the order of values in each row
  • Redshift supports COPY from S3 with column mapping

Step 4: Implement Data Loading

Subclass the appropriate CopyToTable variant and either implement rows() to yield data tuples, or rely on the default behavior that reads from the upstream task's output file. The base class handles opening a database connection, executing the bulk insert (COPY for Postgres, INSERT for others), and recording completion in a marker table.

Key considerations:

  • Default behavior reads tab-separated lines from self.input().open('r')
  • Override rows() for custom data transformation before insertion
  • The marker table (table_updates) tracks which task+parameter combinations have been loaded
  • Loading is transactional: partial loads are rolled back on failure

Step 5: Execute and Verify

Run the pipeline, which triggers the full dependency chain from data generation through transformation to database loading. After execution, verify the data in the target database. Re-running the pipeline skips already-loaded data (idempotency via marker table checks).

Key considerations:

  • The marker table check happens in the complete() method
  • Re-running is safe: completed loads are skipped automatically
  • The execution summary shows which database load tasks succeeded or failed
  • For Elasticsearch, verify via the search API; for databases, query the target table

Execution Diagram

GitHub URL

Workflow Repository