Principle:Spotify Luigi Pipeline Execution
Overview
Pipeline execution is the process of triggering, scheduling, and running a complete pipeline -- resolving the dependency graph, dispatching tasks to workers, and reporting results.
Description
Defining tasks and their dependencies is only half the story. The other half is executing the pipeline: taking the declared DAG of tasks and running it end-to-end. Pipeline execution encompasses several responsibilities:
- Entry point -- The pipeline must be invokable, either from the command line (for scheduled batch jobs) or programmatically (for testing, embedding in applications, or orchestrating from notebooks).
- Dependency resolution -- Starting from the requested "root" task(s), the execution engine walks the dependency graph, discovers all tasks that need to run, and checks their completeness.
- Scheduling -- The engine determines the order in which tasks should execute, respecting dependency constraints and resource limits. It communicates with a scheduler (local in-memory or remote) to coordinate work.
- Worker management -- One or more worker processes pull tasks from the scheduler and execute them. Workers handle parallelism, timeouts, retries, and error reporting.
- Result reporting -- After execution, the engine summarizes which tasks succeeded, failed, or were already complete.
The execution model supports two main modes:
- Command-line mode -- The pipeline is invoked via a CLI entry point. The framework parses arguments, loads task classes dynamically, and runs the pipeline. This is the standard mode for production batch jobs.
- Programmatic mode -- The pipeline is invoked from Python code by passing instantiated task objects to a build function. This mode is useful for testing, interactive exploration, and embedding Luigi pipelines within larger applications.
Both modes ultimately funnel into the same internal machinery: a scheduler, a worker, and a run loop.
Usage
Use pipeline execution when:
- You need to run a batch pipeline on a schedule (e.g., a cron job that invokes the CLI entry point).
- You want to programmatically trigger pipeline runs from tests, scripts, or application code.
- You need to control execution parameters such as the number of parallel workers, local vs. remote scheduling, and logging configuration.
Theoretical Basis
The execution algorithm follows this high-level flow:
FUNCTION execute_pipeline(root_tasks, config):
scheduler = create_scheduler(config) -- local or remote
worker = create_worker(scheduler, config.num_workers)
-- Phase 1: Dependency resolution and scheduling
FOR EACH task IN root_tasks:
worker.add(task)
-- Recursively walks task.requires(), checks complete(),
-- and registers all discovered tasks with the scheduler
-- Phase 2: Task execution loop
WHILE scheduler HAS pending_tasks:
next_task = scheduler.get_work(worker)
IF next_task IS NOT NULL:
worker.execute(next_task)
report_result(next_task.status)
ELSE IF worker HAS running_tasks:
WAIT for a running task to finish
ELSE:
BREAK -- nothing left to do
-- Phase 3: Summary
RETURN execution_summary(worker)
The worker's task execution loop is the core of the run phase. It continuously asks the scheduler for work, runs tasks in subprocesses (for parallelism and isolation), and reports results back. The scheduler maintains the global state of which tasks are pending, running, done, or failed, and ensures that a task is only dispatched for execution once all of its dependencies are complete.