Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Spotify Luigi Local Batch Pipeline

From Leeroopedia
Revision as of 11:05, 16 February 2026 by Admin (talk | contribs) (Auto-imported from workflows/Spotify_Luigi_Local_Batch_Pipeline.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Data_Engineering, Batch_Processing, Pipeline_Orchestration
Last Updated 2026-02-10 12:00 GMT

Overview

End-to-end process for building a local file-based batch data pipeline using Luigi's core Task, Target, and Parameter abstractions.

Description

This workflow outlines the standard procedure for constructing a multi-step data processing pipeline that runs on a single machine using local filesystem targets. It demonstrates Luigi's fundamental pattern: define Tasks with dependencies via requires(), produce outputs via output(), and implement processing logic in run(). Each task checks whether its output target already exists before executing, providing built-in idempotency. The pipeline handles dependency resolution automatically, running upstream tasks before downstream consumers.

Usage

Execute this workflow when you need to build a batch data processing pipeline that reads from local files, transforms data through multiple stages, and writes results to local filesystem targets. This is the starting point for most Luigi projects and is appropriate when processing volumes fit on a single machine (or when Luigi is used purely for orchestration of external compute).

Execution Steps

Step 1: Define External Data Sources

Declare external data inputs as ExternalTask subclasses. These represent data that exists outside the pipeline (e.g., log files, data dumps, API exports). The task only implements output() to point at the expected file location; it has no run() method because it does not produce the data itself. Luigi uses the target's exists() check to determine if the external data is available.

Key considerations:

  • ExternalTask has no run() method; it only declares where data lives
  • Parameters (e.g., DateParameter) allow the same task class to represent data across different time periods
  • If the external target does not exist, dependent tasks will fail with a clear error

Step 2: Define Processing Tasks

Create Task subclasses that implement the three core methods: requires() to declare dependencies on upstream tasks, output() to specify the result target (typically a LocalTarget), and run() to contain the data transformation logic. Each task reads from its input targets (provided by self.input()) and writes to its output target.

Key considerations:

  • self.input() returns Target objects corresponding to the outputs of required tasks
  • self.output() should return a LocalTarget with a path derived from task parameters
  • File writes should use the target's open('w') context manager for atomic writes
  • Tasks are idempotent: if output() exists, the task is considered complete and skipped

Step 3: Chain Dependencies into a Pipeline

Connect tasks by having downstream tasks reference upstream ones in their requires() method. Luigi's dependency resolver walks the graph from the requested task backwards, identifying all tasks that need to run. Tasks with satisfied outputs are skipped. Multiple dependencies can be expressed as lists, and date-based fan-out is supported via DateIntervalParameter.

Key considerations:

  • Dependencies are expressed in Python code, enabling conditional logic and date algebra
  • WrapperTask can aggregate multiple tasks without producing its own output
  • The dependency graph can include fan-out (one task depending on many) and fan-in patterns

Step 4: Parameterize the Pipeline

Add Parameter descriptors (DateParameter, IntParameter, BoolParameter, etc.) to task classes to make the pipeline configurable. Parameters are automatically parsed from the command line, config files, or programmatic invocation. They also determine task identity: two task instances with different parameter values are distinct tasks in the dependency graph.

Key considerations:

  • Parameters define the task's identity (used in task_id hashing)
  • DateIntervalParameter enables iteration over date ranges
  • Parameters can have defaults and can be read from luigi.cfg / luigi.toml configuration files
  • significant=False excludes a parameter from task identity (useful for runtime-only settings)

Step 5: Execute the Pipeline

Run the pipeline from the command line using the luigi CLI or programmatically via luigi.build(). In local-scheduler mode, Luigi resolves dependencies and executes tasks in a single process. The execution summary at the end reports which tasks completed, were skipped (already done), or failed.

Key considerations:

  • Use --local-scheduler for development and testing
  • The --workers flag controls parallel task execution
  • luigi.build() provides a programmatic alternative to CLI invocation
  • Execution summary reports task status breakdown (completed, already done, failed)

Execution Diagram

GitHub URL

Workflow Repository