Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SparkSubmitTask Execution

From Leeroopedia


Template:Knowledge Sources Template:Domains Template:Last Updated

Overview

Concrete tool for assembling the spark-submit command, launching it as a subprocess, capturing output, extracting tracking URLs, and handling the serialisation bridge for inline PySpark jobs -- provided by Luigi.

Description

Execution in Luigi's Spark integration spans two files:

SparkSubmitTask.program_args() and app_command() (spark.py, lines 201--233) assemble the final argument list that is handed to subprocess.Popen by the parent class ExternalProgramTask.run():

  • program_args() returns self.spark_command() + self.app_command().
  • spark_command() builds the left half: the spark-submit binary and all its flags.
  • app_command() builds the right half: [self.app] + self.app_options().

PySparkTask.run() (spark.py, lines 325--337) adds the serialisation bridge before delegating to the parent run():

  1. Creates a temporary directory with a name derived from the task name.
  2. Pickles the task instance into that directory via _dump().
  3. Copies the module file that defines the task class into the temporary directory (needed for unpickling on the remote driver).
  4. Calls super().run(), which triggers program_args() and ultimately subprocess.Popen.
  5. Cleans up the temporary directory in a finally block.

pyspark_runner.py (lines 95--135) is the generic driver-side script that spark-submit executes:

  1. AbstractPySparkRunner.__init__() appends the pickle file's directory to sys.path, then deserialises the task instance from the pickle.
  2. AbstractPySparkRunner.run() creates a SparkConf, calls job.setup(conf), enters a context manager that creates either a SparkContext or a SparkSession (controlled by the [pyspark_runner] use_spark_session config flag), calls job.setup_remote(sc) (which distributes py_packages), and finally calls job.main(entry_point, *args).
  3. Two concrete runner classes exist: PySparkRunner (uses SparkContext) and PySparkSessionRunner (uses SparkSession).

The parent class ExternalProgramTask.run() (external_program.py, lines 129--169) handles the actual subprocess lifecycle: spawning via subprocess.Popen, capturing stdout/stderr to temporary files, scanning stderr for tracking URL patterns, checking the exit code, and raising ExternalProgramRunError on failure.

Usage

Execution is triggered automatically when Luigi's scheduler calls task.run(). For SparkSubmitTask subclasses, no explicit invocation is needed beyond defining the task and adding it to the dependency graph. For PySparkTask subclasses, the main() method is automatically called on the Spark driver after deserialisation.

Code Reference

Source Locations

  • luigi/contrib/spark.py, lines 201--233: program_args(), spark_command(), app_command()
  • luigi/contrib/spark.py, lines 318--347: PySparkTask.app_command(), PySparkTask.run(), PySparkTask._dump()
  • luigi/contrib/pyspark_runner.py, lines 95--135: AbstractPySparkRunner, runner classes, and __main__ entry point
  • luigi/contrib/external_program.py, lines 129--169: ExternalProgramTask.run()

Key Method Signatures

# spark.py -- SparkSubmitTask
def program_args(self):
    return self.spark_command() + self.app_command()

def spark_command(self):
    command = [self.spark_submit]
    command += self._text_arg('--master', self.master)
    ...
    return command

def app_command(self):
    if not self.app:
        raise NotImplementedError("subclass should define an app (.jar or .py file)")
    return [self.app] + self.app_options()
# spark.py -- PySparkTask
def run(self):
    path_name_fragment = re.sub(r'[^\w]', '_', self.name)
    self.run_path = tempfile.mkdtemp(prefix=path_name_fragment)
    self.run_pickle = os.path.join(self.run_path, '.'.join([path_name_fragment, 'pickle']))
    with open(self.run_pickle, 'wb') as fd:
        module_path = os.path.abspath(inspect.getfile(self.__class__))
        shutil.copy(module_path, os.path.join(self.run_path, '.'))
        self._dump(fd)
    try:
        super(PySparkTask, self).run()
    finally:
        shutil.rmtree(self.run_path)
# pyspark_runner.py -- AbstractPySparkRunner
class AbstractPySparkRunner(object):
    _entry_point_class = None

    def __init__(self, job, *args):
        sys.path.append(os.path.dirname(job))
        with open(job, "rb") as fd:
            self.job = pickle.load(fd)
        self.args = args

    def run(self):
        from pyspark import SparkConf
        conf = SparkConf()
        self.job.setup(conf)
        with self._entry_point_class(conf=conf) as (entry_point, sc):
            self.job.setup_remote(sc)
            self.job.main(entry_point, *self.args)
# pyspark_runner.py -- main entry point
if __name__ == '__main__':
    logging.basicConfig(level=logging.WARN)
    _get_runner_class()(*sys.argv[1:]).run()

Import

from luigi.contrib.spark import SparkSubmitTask, PySparkTask

I/O Contract

Inputs

Name Type Description
spark_command() list[str] The spark-submit binary and all configuration/resource flags.
app_command() list[str] The application artifact path and positional arguments.
program_environment() dict Environment variables (includes HADOOP_CONF_DIR, HADOOP_USER_NAME if set).
Pickled task instance (PySparkTask) bytes (pickle file on disk) Serialised task object with all parameter values, passed as first argument to pyspark_runner.py.
[pyspark_runner] use_spark_session bool Config flag selecting SparkContext (default) or SparkSession entry point.

Outputs

Name Type Description
Subprocess exit code int 0 on success; non-zero raises ExternalProgramRunError.
Captured stdout / stderr str Logged after execution; stderr is scanned for the Spark UI tracking URL.
Tracking URL str or None Extracted from stderr via regex and registered with the Luigi scheduler for display in the web UI.
Task output target As defined by output() The file or object written by the Spark application (e.g., an S3 or HDFS path).

Usage Examples

Example 1: Full Execution Flow with SparkSubmitTask

import luigi
import luigi.contrib.hdfs
import luigi.format
from luigi.contrib.spark import SparkSubmitTask


class SparkALS(SparkSubmitTask):
    """
    When Luigi's scheduler calls run(), the following happens:
    1. program_args() returns spark_command() + app_command()
    2. ExternalProgramTask.run() spawns subprocess.Popen with these args
    3. stderr is scanned for the Spark UI tracking URL
    4. Exit code 0 -> success; non-zero -> ExternalProgramRunError
    """
    data_size = luigi.IntParameter(default=1000)
    driver_memory = '2g'
    executor_memory = '3g'
    num_executors = luigi.IntParameter(default=100)

    app = 'my-spark-assembly.jar'
    entry_class = 'com.spotify.spark.ImplicitALS'

    def app_options(self):
        return [self.input().path, self.output().path]

    def requires(self):
        return UserItemMatrix(self.data_size)

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('als-output/', format=luigi.format.Gzip)

Example 2: Inline PySpark Execution with Serialisation Bridge

from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import PySparkTask


class InlinePySparkWordCount(PySparkTask):
    """
    Execution flow for PySparkTask:
    1. run() pickles self to a temp directory
    2. app_command() returns [pyspark_runner.py, pickle_path] + app_options()
    3. spark-submit launches pyspark_runner.py on the Spark driver
    4. The runner deserialises the pickle, creates SparkContext, calls main()
    5. Temp directory is cleaned up in a finally block
    """
    driver_memory = '2g'
    executor_memory = '3g'

    def input(self):
        return S3Target("s3n://bucket.example.org/wordcount.input")

    def output(self):
        return S3Target("s3n://bucket.example.org/wordcount.output")

    def main(self, sc, *args):
        sc.textFile(self.input().path) \
          .flatMap(lambda line: line.split()) \
          .map(lambda word: (word, 1)) \
          .reduceByKey(lambda a, b: a + b) \
          .saveAsTextFile(self.output().path)

Example 3: Using SparkSession Entry Point

Contents of luigi.cfg:

[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: yarn
deploy-mode: client

[pyspark_runner]
use_spark_session: true
from luigi.contrib.spark import PySparkTask
import luigi


class SparkSessionJob(PySparkTask):
    """
    With use_spark_session=true, the pyspark_runner creates a SparkSession
    instead of a raw SparkContext. The first argument to main() is then
    a SparkSession object.
    """
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget("/output/report-{}.parquet".format(self.date))

    def main(self, spark, *args):
        df = spark.read.parquet("/data/events/date={}".format(self.date))
        summary = df.groupBy("category").count()
        summary.write.parquet(self.output().path)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment