Implementation:Spotify Luigi SparkSubmitTask Execution
Template:Knowledge Sources
Template:Domains
Template:Last Updated
Overview
Concrete tool for assembling the spark-submit command, launching it as a subprocess, capturing output, extracting tracking URLs, and handling the serialisation bridge for inline PySpark jobs -- provided by Luigi.
Description
Execution in Luigi's Spark integration spans two files:
SparkSubmitTask.program_args() and app_command() (spark.py, lines 201--233) assemble the final argument list that is handed to subprocess.Popen by the parent class ExternalProgramTask.run():
program_args()returnsself.spark_command() + self.app_command().spark_command()builds the left half: thespark-submitbinary and all its flags.app_command()builds the right half:[self.app] + self.app_options().
PySparkTask.run() (spark.py, lines 325--337) adds the serialisation bridge before delegating to the parent run():
- Creates a temporary directory with a name derived from the task name.
- Pickles the task instance into that directory via
_dump(). - Copies the module file that defines the task class into the temporary directory (needed for unpickling on the remote driver).
- Calls
super().run(), which triggersprogram_args()and ultimatelysubprocess.Popen. - Cleans up the temporary directory in a
finallyblock.
pyspark_runner.py (lines 95--135) is the generic driver-side script that spark-submit executes:
AbstractPySparkRunner.__init__()appends the pickle file's directory tosys.path, then deserialises the task instance from the pickle.AbstractPySparkRunner.run()creates aSparkConf, callsjob.setup(conf), enters a context manager that creates either aSparkContextor aSparkSession(controlled by the[pyspark_runner] use_spark_sessionconfig flag), callsjob.setup_remote(sc)(which distributespy_packages), and finally callsjob.main(entry_point, *args).- Two concrete runner classes exist:
PySparkRunner(usesSparkContext) andPySparkSessionRunner(usesSparkSession).
The parent class ExternalProgramTask.run() (external_program.py, lines 129--169) handles the actual subprocess lifecycle: spawning via subprocess.Popen, capturing stdout/stderr to temporary files, scanning stderr for tracking URL patterns, checking the exit code, and raising ExternalProgramRunError on failure.
Usage
Execution is triggered automatically when Luigi's scheduler calls task.run(). For SparkSubmitTask subclasses, no explicit invocation is needed beyond defining the task and adding it to the dependency graph. For PySparkTask subclasses, the main() method is automatically called on the Spark driver after deserialisation.
Code Reference
Source Locations
luigi/contrib/spark.py, lines 201--233:program_args(),spark_command(),app_command()luigi/contrib/spark.py, lines 318--347:PySparkTask.app_command(),PySparkTask.run(),PySparkTask._dump()luigi/contrib/pyspark_runner.py, lines 95--135:AbstractPySparkRunner, runner classes, and__main__entry pointluigi/contrib/external_program.py, lines 129--169:ExternalProgramTask.run()
Key Method Signatures
# spark.py -- SparkSubmitTask
def program_args(self):
return self.spark_command() + self.app_command()
def spark_command(self):
command = [self.spark_submit]
command += self._text_arg('--master', self.master)
...
return command
def app_command(self):
if not self.app:
raise NotImplementedError("subclass should define an app (.jar or .py file)")
return [self.app] + self.app_options()
# spark.py -- PySparkTask
def run(self):
path_name_fragment = re.sub(r'[^\w]', '_', self.name)
self.run_path = tempfile.mkdtemp(prefix=path_name_fragment)
self.run_pickle = os.path.join(self.run_path, '.'.join([path_name_fragment, 'pickle']))
with open(self.run_pickle, 'wb') as fd:
module_path = os.path.abspath(inspect.getfile(self.__class__))
shutil.copy(module_path, os.path.join(self.run_path, '.'))
self._dump(fd)
try:
super(PySparkTask, self).run()
finally:
shutil.rmtree(self.run_path)
# pyspark_runner.py -- AbstractPySparkRunner
class AbstractPySparkRunner(object):
_entry_point_class = None
def __init__(self, job, *args):
sys.path.append(os.path.dirname(job))
with open(job, "rb") as fd:
self.job = pickle.load(fd)
self.args = args
def run(self):
from pyspark import SparkConf
conf = SparkConf()
self.job.setup(conf)
with self._entry_point_class(conf=conf) as (entry_point, sc):
self.job.setup_remote(sc)
self.job.main(entry_point, *self.args)
# pyspark_runner.py -- main entry point
if __name__ == '__main__':
logging.basicConfig(level=logging.WARN)
_get_runner_class()(*sys.argv[1:]).run()
Import
from luigi.contrib.spark import SparkSubmitTask, PySparkTask
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
spark_command() |
list[str] |
The spark-submit binary and all configuration/resource flags.
|
app_command() |
list[str] |
The application artifact path and positional arguments. |
program_environment() |
dict |
Environment variables (includes HADOOP_CONF_DIR, HADOOP_USER_NAME if set).
|
| Pickled task instance (PySparkTask) | bytes (pickle file on disk) |
Serialised task object with all parameter values, passed as first argument to pyspark_runner.py.
|
[pyspark_runner] use_spark_session |
bool |
Config flag selecting SparkContext (default) or SparkSession entry point.
|
Outputs
| Name | Type | Description |
|---|---|---|
| Subprocess exit code | int |
0 on success; non-zero raises ExternalProgramRunError.
|
| Captured stdout / stderr | str |
Logged after execution; stderr is scanned for the Spark UI tracking URL. |
| Tracking URL | str or None |
Extracted from stderr via regex and registered with the Luigi scheduler for display in the web UI. |
| Task output target | As defined by output() |
The file or object written by the Spark application (e.g., an S3 or HDFS path). |
Usage Examples
Example 1: Full Execution Flow with SparkSubmitTask
import luigi
import luigi.contrib.hdfs
import luigi.format
from luigi.contrib.spark import SparkSubmitTask
class SparkALS(SparkSubmitTask):
"""
When Luigi's scheduler calls run(), the following happens:
1. program_args() returns spark_command() + app_command()
2. ExternalProgramTask.run() spawns subprocess.Popen with these args
3. stderr is scanned for the Spark UI tracking URL
4. Exit code 0 -> success; non-zero -> ExternalProgramRunError
"""
data_size = luigi.IntParameter(default=1000)
driver_memory = '2g'
executor_memory = '3g'
num_executors = luigi.IntParameter(default=100)
app = 'my-spark-assembly.jar'
entry_class = 'com.spotify.spark.ImplicitALS'
def app_options(self):
return [self.input().path, self.output().path]
def requires(self):
return UserItemMatrix(self.data_size)
def output(self):
return luigi.contrib.hdfs.HdfsTarget('als-output/', format=luigi.format.Gzip)
Example 2: Inline PySpark Execution with Serialisation Bridge
from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import PySparkTask
class InlinePySparkWordCount(PySparkTask):
"""
Execution flow for PySparkTask:
1. run() pickles self to a temp directory
2. app_command() returns [pyspark_runner.py, pickle_path] + app_options()
3. spark-submit launches pyspark_runner.py on the Spark driver
4. The runner deserialises the pickle, creates SparkContext, calls main()
5. Temp directory is cleaned up in a finally block
"""
driver_memory = '2g'
executor_memory = '3g'
def input(self):
return S3Target("s3n://bucket.example.org/wordcount.input")
def output(self):
return S3Target("s3n://bucket.example.org/wordcount.output")
def main(self, sc, *args):
sc.textFile(self.input().path) \
.flatMap(lambda line: line.split()) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a + b) \
.saveAsTextFile(self.output().path)
Example 3: Using SparkSession Entry Point
Contents of luigi.cfg:
[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: yarn
deploy-mode: client
[pyspark_runner]
use_spark_session: true
from luigi.contrib.spark import PySparkTask
import luigi
class SparkSessionJob(PySparkTask):
"""
With use_spark_session=true, the pyspark_runner creates a SparkSession
instead of a raw SparkContext. The first argument to main() is then
a SparkSession object.
"""
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget("/output/report-{}.parquet".format(self.date))
def main(self, spark, *args):
df = spark.read.parquet("/data/events/date={}".format(self.date))
summary = df.groupBy("category").count()
summary.write.parquet(self.output().path)