Workflow:Spotify Luigi Spark Processing Pipeline
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Big_Data, Spark, PySpark |
| Last Updated | 2026-02-10 12:00 GMT |
Overview
End-to-end process for running Apache Spark and PySpark data processing jobs orchestrated by Luigi using SparkSubmitTask and PySparkTask.
Description
This workflow covers how to execute Spark jobs through Luigi's contrib.spark module. Two primary task classes are provided: SparkSubmitTask for submitting external Spark applications (Java, Scala, or Python) via spark-submit, and PySparkTask for defining PySpark logic inline within the Luigi task class. Luigi manages the spark-submit invocation, passes application arguments, and tracks completion through target existence checks. Input and output can reside on local filesystem, HDFS, or S3.
Usage
Execute this workflow when you need to process large-scale datasets using Spark's distributed compute engine, orchestrated within a Luigi pipeline. This is appropriate for workloads such as large-scale aggregations, ML feature engineering, collaborative filtering (ALS), or any transformation that benefits from Spark's in-memory distributed processing. Requires a configured Spark installation and spark-submit on the PATH.
Execution Steps
Step 1: Configure Spark Environment
Set up the Spark connection by configuring the [spark] section in Luigi's configuration file. Key settings include the path to spark-submit, the cluster master URL (local, yarn, spark://host:port, or k8s://), deploy mode (client or cluster), and default resource allocations (driver memory, executor memory, executor cores).
Key considerations:
- spark-submit path must be configured in [spark] section
- Master URL determines where Spark runs: local[*] for development, yarn for production clusters
- Deploy mode affects where the driver runs: client (local) or cluster (on a worker node)
- Additional Spark properties can be set via conf parameter or spark-defaults.conf
Step 2: Define Data Sources and Targets
Declare input and output targets for the Spark pipeline. Inputs can be HDFS paths (HdfsTarget), S3 paths (S3Target), or local files (LocalTarget). Outputs typically reside on the same storage system as inputs. When using Spark in cluster mode, all paths must be accessible from the cluster (ruling out local filesystem for data).
Key considerations:
- S3Target supports s3://, s3n://, and s3a:// URI schemes
- HdfsTarget handles HDFS paths with atomic write semantics
- For cluster mode, both input and output paths must be cluster-accessible
- Multiple input targets can be combined by joining their paths
Step 3: Implement Spark Job Tasks
Choose between SparkSubmitTask and PySparkTask based on your needs. For SparkSubmitTask, set the app attribute to point at an external driver script (Python, Java jar, or Scala jar) and implement app_options() to pass arguments. For PySparkTask, implement the main(self, sc, *args) method directly with PySpark logic, receiving a SparkContext.
Pseudocode:
SparkSubmitTask approach: Set app = 'my_spark_job.py' Set master, driver_memory, executor_memory Implement app_options() returning [input_path, output_path]
PySparkTask approach: Implement main(self, sc, *args) Use sc.textFile() to read, perform transformations, saveAsTextFile()
Key considerations:
- SparkSubmitTask is preferred for production: separates orchestration from processing logic
- PySparkTask is convenient for simple jobs: embeds Spark code in the Luigi task
- app_options() returns command-line arguments passed to the Spark application
- py_packages can bundle additional Python dependencies for the cluster
Step 4: Configure Job Resources
Set resource allocation parameters on the task class: driver_memory, executor_memory, executor_cores, num_executors, and total_executor_cores. These map directly to spark-submit flags. Additional Spark configuration properties can be set as a list of key=value strings.
Key considerations:
- Resource settings directly affect job performance and cluster utilization
- Parameters can be made non-significant (significant=False) so resource changes do not change task identity
- Queue/pool assignment determines scheduling priority on shared clusters
- Spark configuration properties override spark-defaults.conf values
Step 5: Chain and Execute the Pipeline
Connect Spark tasks with upstream and downstream Luigi tasks using requires(). The pipeline can mix Spark tasks (for heavy distributed processing) with regular Python tasks (for lightweight local operations like top-N selection or report generation). Execute via the Luigi CLI or programmatic build.
Key considerations:
- Spark tasks commonly feed into lighter local tasks for final aggregation
- Use BoolParameter to toggle between local and Spark execution paths
- The central scheduler provides distributed locking to prevent duplicate Spark job submissions
- Failed Spark jobs appear in the Luigi execution summary with error details