Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SparkSubmitTask PySparkTask

From Leeroopedia


Template:Knowledge Sources Template:Domains Template:Last Updated

Overview

Concrete tool for defining Spark processing jobs as Luigi tasks -- supporting both external application artifacts and inline PySpark logic -- provided by Luigi.

Description

Luigi offers two classes that work together to define Spark jobs:

SparkSubmitTask (lines 37--263 in spark.py) is the base class for submitting any Spark application. It inherits from ExternalProgramTask and constructs the full spark-submit command line. The key extension points are:

  • app -- class attribute pointing to the .jar or .py application file.
  • entry_class -- the JVM main class (for JAR applications).
  • app_options() -- returns a list of positional arguments appended after the application artifact.
  • app_command() -- assembles [self.app] + self.app_options().

PySparkTask (lines 266--368 in spark.py) extends SparkSubmitTask for inline PySpark jobs. Instead of requiring a separate .py script, users override the main(self, sc, *args) method with their Spark logic. Internally:

  1. The task's run() method pickles the task instance to a temporary file via _dump().
  2. The app is set to pyspark_runner.py -- a generic runner shipped with Luigi.
  3. The runner deserialises the pickle and calls main() with a live SparkContext.
  4. Optional Python packages listed in py_packages are compressed into .tar.gz archives and distributed to executors via sc.addPyFile().

Usage

  • Subclass SparkSubmitTask when you have an external .jar or .py application file.
  • Subclass PySparkTask when you want to write PySpark logic inline inside the Luigi task class.

Code Reference

Source Location

luigi/contrib/spark.py:

  • SparkSubmitTask: lines 37--263
  • PySparkTask: lines 266--368

Class Signatures

class SparkSubmitTask(ExternalProgramTask):
    name = None
    entry_class = None
    app = None
    always_log_stderr = False

    def app_options(self):
        """Subclass this method to map your task parameters to the app's arguments."""
        return []

    def app_command(self):
        if not self.app:
            raise NotImplementedError("subclass should define an app (.jar or .py file)")
        return [self.app] + self.app_options()
class PySparkTask(SparkSubmitTask):
    app = os.path.join(os.path.dirname(__file__), 'pyspark_runner.py')

    @property
    def name(self):
        return self.__class__.__name__

    def main(self, sc, *args):
        """Called by the pyspark_runner with a SparkContext and any arguments
        returned by app_options()."""
        raise NotImplementedError("subclass should define a main method")

    def setup(self, conf):
        """Called by the pyspark_runner with a SparkConf instance."""

    def setup_remote(self, sc):
        self._setup_packages(sc)

    def run(self):
        ...  # pickles self, then calls super().run()

Import

from luigi.contrib.spark import SparkSubmitTask, PySparkTask

I/O Contract

Inputs

Name Type Description
app str Path to the .jar or .py application file. For PySparkTask, defaults to pyspark_runner.py.
entry_class str or None Fully qualified JVM main class. Only used with .jar applications.
app_options() list[str] Positional arguments passed to the application's main entry point.
main(sc, *args) method (PySparkTask only) User-defined PySpark logic receiving a SparkContext (or SparkSession).
py_packages list[str] or None Python package names to compress and distribute to executors.

Outputs

Name Type Description
Task output As defined by output() The target produced by the Spark job (e.g., an S3Target or HdfsTarget).
Process exit code int Return code from the spark-submit subprocess; 0 indicates success.

Usage Examples

Example 1: External JAR Application (SparkSubmitTask)

import luigi
import luigi.contrib.hdfs
import luigi.format
from luigi.contrib.spark import SparkSubmitTask


class SparkALS(SparkSubmitTask):
    """Submit a compiled Scala JAR to run ALS recommendation training."""

    data_size = luigi.IntParameter(default=1000)

    driver_memory = '2g'
    executor_memory = '3g'
    num_executors = luigi.IntParameter(default=100)

    app = 'my-spark-assembly.jar'
    entry_class = 'com.spotify.spark.ImplicitALS'

    def app_options(self):
        return [self.input().path, self.output().path]

    def requires(self):
        return UserItemMatrix(self.data_size)

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('als-output/', format=luigi.format.Gzip)

Example 2: External Python Script (SparkSubmitTask)

import luigi
from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import SparkSubmitTask


class PySparkWordCount(SparkSubmitTask):
    """Submit an external Python script via spark-submit."""

    driver_memory = '2g'
    executor_memory = '3g'
    total_executor_cores = luigi.IntParameter(default=100, significant=False)

    name = "PySpark Word Count"
    app = 'wordcount.py'

    def app_options(self):
        return [self.input().path, self.output().path]

    def input(self):
        return S3Target("s3n://bucket.example.org/wordcount.input")

    def output(self):
        return S3Target("s3n://bucket.example.org/wordcount.output")

Example 3: Inline PySpark Logic (PySparkTask)

from luigi.contrib.s3 import S3Target
from luigi.contrib.spark import PySparkTask


class InlinePySparkWordCount(PySparkTask):
    """Write PySpark logic directly inside the Luigi task."""

    driver_memory = '2g'
    executor_memory = '3g'

    def input(self):
        return S3Target("s3n://bucket.example.org/wordcount.input")

    def output(self):
        return S3Target("s3n://bucket.example.org/wordcount.output")

    def main(self, sc, *args):
        sc.textFile(self.input().path) \
          .flatMap(lambda line: line.split()) \
          .map(lambda word: (word, 1)) \
          .reduceByKey(lambda a, b: a + b) \
          .saveAsTextFile(self.output().path)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment