Implementation:Spotify Luigi SparkSubmitTask PySparkTask
Template:Knowledge Sources
Template:Domains
Template:Last Updated
Overview
Concrete tool for defining Spark processing jobs as Luigi tasks -- supporting both external application artifacts and inline PySpark logic -- provided by Luigi.
Description
Luigi offers two classes that work together to define Spark jobs:
SparkSubmitTask (lines 37--263 in spark.py) is the base class for submitting any Spark application. It inherits from ExternalProgramTask and constructs the full spark-submit command line. The key extension points are:
app-- class attribute pointing to the.jaror.pyapplication file.entry_class-- the JVM main class (for JAR applications).app_options()-- returns a list of positional arguments appended after the application artifact.app_command()-- assembles[self.app] + self.app_options().
PySparkTask (lines 266--368 in spark.py) extends SparkSubmitTask for inline PySpark jobs. Instead of requiring a separate .py script, users override the main(self, sc, *args) method with their Spark logic. Internally:
- The task's
run()method pickles the task instance to a temporary file via_dump(). - The
appis set topyspark_runner.py-- a generic runner shipped with Luigi. - The runner deserialises the pickle and calls
main()with a liveSparkContext. - Optional Python packages listed in
py_packagesare compressed into.tar.gzarchives and distributed to executors viasc.addPyFile().
Usage
- Subclass
SparkSubmitTaskwhen you have an external.jaror.pyapplication file. - Subclass
PySparkTaskwhen you want to write PySpark logic inline inside the Luigi task class.
Code Reference
Source Location
luigi/contrib/spark.py:
SparkSubmitTask: lines 37--263PySparkTask: lines 266--368
Class Signatures
class SparkSubmitTask(ExternalProgramTask):
name = None
entry_class = None
app = None
always_log_stderr = False
def app_options(self):
"""Subclass this method to map your task parameters to the app's arguments."""
return []
def app_command(self):
if not self.app:
raise NotImplementedError("subclass should define an app (.jar or .py file)")
return [self.app] + self.app_options()
class PySparkTask(SparkSubmitTask):
app = os.path.join(os.path.dirname(__file__), 'pyspark_runner.py')
@property
def name(self):
return self.__class__.__name__
def main(self, sc, *args):
"""Called by the pyspark_runner with a SparkContext and any arguments
returned by app_options()."""
raise NotImplementedError("subclass should define a main method")
def setup(self, conf):
"""Called by the pyspark_runner with a SparkConf instance."""
def setup_remote(self, sc):
self._setup_packages(sc)
def run(self):
... # pickles self, then calls super().run()
Import
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
app |
str |
Path to the .jar or .py application file. For PySparkTask, defaults to pyspark_runner.py.
|
entry_class |
str or None |
Fully qualified JVM main class. Only used with .jar applications.
|
app_options() |
list[str] |
Positional arguments passed to the application's main entry point. |
main(sc, *args) |
method (PySparkTask only) | User-defined PySpark logic receiving a SparkContext (or SparkSession).
|
py_packages |
list[str] or None |
Python package names to compress and distribute to executors. |
Outputs
| Name | Type | Description |
|---|---|---|
| Task output | As defined by output() |
The target produced by the Spark job (e.g., an S3Target or HdfsTarget).
|
| Process exit code | int |
Return code from the spark-submit subprocess; 0 indicates success.
|
Usage Examples
Example 1: External JAR Application (SparkSubmitTask)
import luigi
import luigi.contrib.hdfs
import luigi.format
from luigi.contrib.spark import SparkSubmitTask
class SparkALS(SparkSubmitTask):
"""Submit a compiled Scala JAR to run ALS recommendation training."""
data_size = luigi.IntParameter(default=1000)
driver_memory = '2g'
executor_memory = '3g'
num_executors = luigi.IntParameter(default=100)
app = 'my-spark-assembly.jar'
entry_class = 'com.spotify.spark.ImplicitALS'
def app_options(self):
return [self.input().path, self.output().path]
def requires(self):
return UserItemMatrix(self.data_size)
def output(self):
return luigi.contrib.hdfs.HdfsTarget('als-output/', format=luigi.format.Gzip)
Example 2: External Python Script (SparkSubmitTask)
import luigi
from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask
class PySparkWordCount(SparkSubmitTask):
"""Submit an external Python script via spark-submit."""
driver_memory = '2g'
executor_memory = '3g'
total_executor_cores = luigi.IntParameter(default=100, significant=False)
name = "PySpark Word Count"
app = 'wordcount.py'
def app_options(self):
return [self.input().path, self.output().path]
def input(self):
return S3Target("s3n://bucket.example.org/wordcount.input")
def output(self):
return S3Target("s3n://bucket.example.org/wordcount.output")
Example 3: Inline PySpark Logic (PySparkTask)
from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import PySparkTask
class InlinePySparkWordCount(PySparkTask):
"""Write PySpark logic directly inside the Luigi task."""
driver_memory = '2g'
executor_memory = '3g'
def input(self):
return S3Target("s3n://bucket.example.org/wordcount.input")
def output(self):
return S3Target("s3n://bucket.example.org/wordcount.output")
def main(self, sc, *args):
sc.textFile(self.input().path) \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.saveAsTextFile(self.output().path)