Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SparkSubmitTask Config

From Leeroopedia


Template:Knowledge Sources Template:Domains Template:Last Updated

Overview

Concrete tool for declaring the Apache Spark submission environment -- cluster manager, deploy mode, application identity, and supplementary files -- provided by Luigi.

Description

The SparkSubmitTask class in luigi.contrib.spark exposes a comprehensive set of Python @property accessors that map one-to-one to the flags accepted by spark-submit. Each property reads its default value from the [spark] section of the Luigi configuration file (luigi.cfg), and can be overridden at the class level or at the instance level by subclasses. The properties governing configuration (as opposed to resources or execution) include:

  • spark_submit -- path to the spark-submit binary (default: "spark-submit").
  • master -- cluster manager URL (e.g., yarn, local[*], spark://host:7077).
  • deploy_mode -- "client" or "cluster".
  • name -- human-readable application name.
  • entry_class -- JVM main class for .jar applications.
  • app -- path to the application artifact (.jar or .py file).
  • jars, packages, py_files, files, archives -- supplementary artifacts distributed to the cluster.
  • conf / properties_file -- arbitrary Spark configuration key-value pairs.

List-valued properties are parsed from comma-separated strings via the private helper _list_config, while dictionary-valued properties use pipe-and-equals syntax via _dict_config.

Usage

Import SparkSubmitTask and subclass it whenever you need to submit a Spark application to a cluster. Override individual properties or set class attributes to tailor the configuration for your environment.

Code Reference

Source Location

luigi/contrib/spark.py, lines 37--263 (class SparkSubmitTask). Configuration properties span lines 49--51 (class attributes) and 59--184 (@property accessors).

Class Signature

class SparkSubmitTask(ExternalProgramTask):
    # Application (.jar or .py file)
    name = None
    entry_class = None
    app = None

Key Property Signatures

@property
def spark_submit(self):
    return configuration.get_config().get(self.spark_version, 'spark-submit', 'spark-submit')

@property
def master(self):
    return configuration.get_config().get(self.spark_version, "master", None)

@property
def deploy_mode(self):
    return configuration.get_config().get(self.spark_version, "deploy-mode", None)

@property
def jars(self):
    return self._list_config(configuration.get_config().get(self.spark_version, "jars", None))

@property
def conf(self):
    return self._dict_config(configuration.get_config().get(self.spark_version, "conf", None))

Import

from luigi.contrib.spark import SparkSubmitTask

I/O Contract

Inputs

Name Type Description
[spark] spark-submit str Path to the spark-submit executable. Default: "spark-submit".
[spark] master str or None Cluster manager URL (e.g., "yarn", "local[*]").
[spark] deploy-mode str or None Deployment mode: "client" or "cluster".
[spark] jars str (comma-separated) or None Additional JARs to include on the driver and executor classpaths.
[spark] conf str (pipe-separated key=value) or None Arbitrary Spark configuration properties.
[spark] properties-file str or None Path to a file from which to load extra Spark properties.
app (class attribute) str Path to the .jar or .py application file.

Outputs

Name Type Description
Command-line argument list list[str] Assembled list of spark-submit flags and values, produced by spark_command().

Usage Examples

Example 1: Minimal JVM Spark Job with Explicit Configuration

from luigi.contrib.spark import SparkSubmitTask
import luigi


class TrainALSModel(SparkSubmitTask):
    """Submit a Scala Spark JAR with explicit configuration."""

    name = "ALS Training Job"
    entry_class = "com.example.spark.TrainALS"
    app = "my-spark-assembly.jar"

    # Override configuration properties directly
    driver_memory = "2g"
    executor_memory = "3g"

    data_size = luigi.IntParameter(default=1000)

    def app_options(self):
        return [self.input().path, self.output().path]

    def requires(self):
        return GenerateMatrix(self.data_size)

    def output(self):
        return luigi.LocalTarget("als-output/")

Example 2: Configuration Driven by luigi.cfg

Contents of luigi.cfg:

[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: yarn
deploy-mode: client
jars: /opt/libs/custom-udf.jar, /opt/libs/avro-tools.jar
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer|spark.executor.instances=10
from luigi.contrib.spark import SparkSubmitTask


class WordCount(SparkSubmitTask):
    """All configuration comes from luigi.cfg; only app is set here."""

    name = "PySpark Word Count"
    app = "wordcount.py"

    def app_options(self):
        return [self.input().path, self.output().path]

Example 3: Overriding the Cluster Manager at the Class Level

from luigi.contrib.spark import SparkSubmitTask


class LocalDebugJob(SparkSubmitTask):
    """Force local mode for development testing."""

    name = "Debug Job"
    app = "debug_pipeline.py"

    @property
    def master(self):
        return "local[4]"

    @property
    def deploy_mode(self):
        return "client"

    def app_options(self):
        return ["--verbose"]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment