Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Spotify Luigi Spark Processing Pipeline

From Leeroopedia


Knowledge Sources
Domains Data_Engineering, Big_Data, Spark, PySpark
Last Updated 2026-02-10 12:00 GMT

Overview

End-to-end process for running Apache Spark and PySpark data processing jobs orchestrated by Luigi using SparkSubmitTask and PySparkTask.

Description

This workflow covers how to execute Spark jobs through Luigi's contrib.spark module. Two primary task classes are provided: SparkSubmitTask for submitting external Spark applications (Java, Scala, or Python) via spark-submit, and PySparkTask for defining PySpark logic inline within the Luigi task class. Luigi manages the spark-submit invocation, passes application arguments, and tracks completion through target existence checks. Input and output can reside on local filesystem, HDFS, or S3.

Usage

Execute this workflow when you need to process large-scale datasets using Spark's distributed compute engine, orchestrated within a Luigi pipeline. This is appropriate for workloads such as large-scale aggregations, ML feature engineering, collaborative filtering (ALS), or any transformation that benefits from Spark's in-memory distributed processing. Requires a configured Spark installation and spark-submit on the PATH.

Execution Steps

Step 1: Configure Spark Environment

Set up the Spark connection by configuring the [spark] section in Luigi's configuration file. Key settings include the path to spark-submit, the cluster master URL (local, yarn, spark://host:port, or k8s://), deploy mode (client or cluster), and default resource allocations (driver memory, executor memory, executor cores).

Key considerations:

  • spark-submit path must be configured in [spark] section
  • Master URL determines where Spark runs: local[*] for development, yarn for production clusters
  • Deploy mode affects where the driver runs: client (local) or cluster (on a worker node)
  • Additional Spark properties can be set via conf parameter or spark-defaults.conf

Step 2: Define Data Sources and Targets

Declare input and output targets for the Spark pipeline. Inputs can be HDFS paths (HdfsTarget), S3 paths (S3Target), or local files (LocalTarget). Outputs typically reside on the same storage system as inputs. When using Spark in cluster mode, all paths must be accessible from the cluster (ruling out local filesystem for data).

Key considerations:

  • S3Target supports s3://, s3n://, and s3a:// URI schemes
  • HdfsTarget handles HDFS paths with atomic write semantics
  • For cluster mode, both input and output paths must be cluster-accessible
  • Multiple input targets can be combined by joining their paths

Step 3: Implement Spark Job Tasks

Choose between SparkSubmitTask and PySparkTask based on your needs. For SparkSubmitTask, set the app attribute to point at an external driver script (Python, Java jar, or Scala jar) and implement app_options() to pass arguments. For PySparkTask, implement the main(self, sc, *args) method directly with PySpark logic, receiving a SparkContext.

Pseudocode:

SparkSubmitTask approach:
  Set app = 'my_spark_job.py'
  Set master, driver_memory, executor_memory
  Implement app_options() returning [input_path, output_path]
PySparkTask approach:
  Implement main(self, sc, *args)
  Use sc.textFile() to read, perform transformations, saveAsTextFile()

Key considerations:

  • SparkSubmitTask is preferred for production: separates orchestration from processing logic
  • PySparkTask is convenient for simple jobs: embeds Spark code in the Luigi task
  • app_options() returns command-line arguments passed to the Spark application
  • py_packages can bundle additional Python dependencies for the cluster

Step 4: Configure Job Resources

Set resource allocation parameters on the task class: driver_memory, executor_memory, executor_cores, num_executors, and total_executor_cores. These map directly to spark-submit flags. Additional Spark configuration properties can be set as a list of key=value strings.

Key considerations:

  • Resource settings directly affect job performance and cluster utilization
  • Parameters can be made non-significant (significant=False) so resource changes do not change task identity
  • Queue/pool assignment determines scheduling priority on shared clusters
  • Spark configuration properties override spark-defaults.conf values

Step 5: Chain and Execute the Pipeline

Connect Spark tasks with upstream and downstream Luigi tasks using requires(). The pipeline can mix Spark tasks (for heavy distributed processing) with regular Python tasks (for lightweight local operations like top-N selection or report generation). Execute via the Luigi CLI or programmatic build.

Key considerations:

  • Spark tasks commonly feed into lighter local tasks for final aggregation
  • Use BoolParameter to toggle between local and Spark execution paths
  • The central scheduler provides distributed locking to prevent duplicate Spark job submissions
  • Failed Spark jobs appear in the Luigi execution summary with error details

Execution Diagram

GitHub URL

Workflow Repository