Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SparkSubmitTask Resources

From Leeroopedia


Template:Knowledge Sources Template:Domains Template:Last Updated

Overview

Concrete tool for declaring and passing computational resource settings -- driver memory, executor memory, core counts, queue, and supervision -- to spark-submit, provided by Luigi.

Description

The SparkSubmitTask class in luigi.contrib.spark (lines 74--228) exposes a set of @property accessors and a spark_command() method that together translate resource declarations into spark-submit command-line flags. Each property reads its default from the [spark] section of luigi.cfg and can be overridden at the class or instance level.

The resource-related properties are:

  • driver_memory (line 138) -- maps to --driver-memory.
  • driver_cores (line 158) -- maps to --driver-cores.
  • driver_java_options (line 142) -- maps to --driver-java-options.
  • driver_library_path (line 146) -- maps to --driver-library-path.
  • driver_class_path (line 150) -- maps to --driver-class-path.
  • executor_memory (line 154) -- maps to --executor-memory.
  • executor_cores (line 170) -- maps to --executor-cores.
  • num_executors (line 178) -- maps to --num-executors.
  • total_executor_cores (line 166) -- maps to --total-executor-cores.
  • queue (line 174) -- maps to --queue.
  • supervise (line 162) -- maps to --supervise (boolean flag).

The spark_command() method (lines 204--228) iterates over all properties, using three private helpers to map values to command-line arguments:

  • _text_arg(name, value) -- emits [name, value] if the value is truthy.
  • _flag_arg(name, value) -- emits [name] (no value) if the boolean is true.
  • _dict_arg(name, value) -- emits repeated [name, key=value] pairs for Spark --conf properties.

Usage

Override resource properties as class attributes for static values (e.g., driver_memory = '4g'), as Luigi parameters for values that change per invocation, or rely on luigi.cfg defaults for shared cluster-wide settings.

Code Reference

Source Location

luigi/contrib/spark.py, lines 74--228 (resource properties and spark_command() assembly).

Key Property Signatures

@property
def driver_memory(self):
    return configuration.get_config().get(self.spark_version, "driver-memory", None)

@property
def executor_memory(self):
    return configuration.get_config().get(self.spark_version, "executor-memory", None)

@property
def driver_cores(self):
    return configuration.get_config().get(self.spark_version, "driver-cores", None)

@property
def executor_cores(self):
    return configuration.get_config().get(self.spark_version, "executor-cores", None)

@property
def num_executors(self):
    return configuration.get_config().get(self.spark_version, "num-executors", None)

@property
def total_executor_cores(self):
    return configuration.get_config().get(self.spark_version, "total-executor-cores", None)

@property
def queue(self):
    return configuration.get_config().get(self.spark_version, "queue", None)

@property
def supervise(self):
    return bool(configuration.get_config().get(self.spark_version, "supervise", False))

Command Assembly (spark_command excerpt)

def spark_command(self):
    command = [self.spark_submit]
    command += self._text_arg('--master', self.master)
    command += self._text_arg('--deploy-mode', self.deploy_mode)
    ...
    command += self._text_arg('--driver-memory', self.driver_memory)
    command += self._text_arg('--driver-java-options', self.driver_java_options)
    command += self._text_arg('--driver-library-path', self.driver_library_path)
    command += self._text_arg('--driver-class-path', self.driver_class_path)
    command += self._text_arg('--executor-memory', self.executor_memory)
    command += self._text_arg('--driver-cores', self.driver_cores)
    command += self._flag_arg('--supervise', self.supervise)
    command += self._text_arg('--total-executor-cores', self.total_executor_cores)
    command += self._text_arg('--executor-cores', self.executor_cores)
    command += self._text_arg('--queue', self.queue)
    command += self._text_arg('--num-executors', self.num_executors)
    return command

Import

from luigi.contrib.spark import SparkSubmitTask

I/O Contract

Inputs

Name Type Description
driver_memory str or None Driver heap size (e.g., "2g", "4096m"). Defaults to [spark] driver-memory in luigi.cfg.
driver_cores str or None Number of driver cores (cluster mode only).
driver_java_options str or None Extra JVM flags for the driver process.
driver_library_path str or None Native library path for the driver.
driver_class_path str or None Additional classpath entries for the driver.
executor_memory str or None Memory per executor (e.g., "3g").
executor_cores str or None Cores per executor.
num_executors str or None Number of executor instances (YARN).
total_executor_cores str or None Total cores across all executors (standalone/Mesos).
queue str or None YARN scheduler queue.
supervise bool If True, adds --supervise flag (standalone cluster mode).

Outputs

Name Type Description
Resource flags in command list list[str] The resource-related portion of the spark-submit command line, produced by spark_command().

Usage Examples

Example 1: Static Resource Allocation as Class Attributes

from luigi.contrib.spark import SparkSubmitTask


class HeavyETLJob(SparkSubmitTask):
    """A resource-intensive ETL job with fixed resource settings."""

    name = "Heavy ETL"
    app = "etl_pipeline.jar"
    entry_class = "com.example.ETLMain"

    driver_memory = "4g"
    executor_memory = "8g"
    executor_cores = "4"
    num_executors = "20"

    def app_options(self):
        return ["--date", "2026-02-10"]

Example 2: Parameterised Resource Allocation

import luigi
from luigi.contrib.spark import SparkSubmitTask


class ScalableJob(SparkSubmitTask):
    """Resources can be tuned per invocation via Luigi parameters."""

    name = "Scalable Spark Job"
    app = "process.py"

    driver_memory = "2g"
    executor_memory = "3g"
    num_executors = luigi.IntParameter(default=10)
    total_executor_cores = luigi.IntParameter(default=40, significant=False)

    def app_options(self):
        return [self.input().path, self.output().path]

Example 3: Centralised Resources via luigi.cfg

Contents of luigi.cfg:

[spark]
spark-submit: /opt/spark/bin/spark-submit
master: yarn
deploy-mode: cluster
driver-memory: 4g
executor-memory: 8g
executor-cores: 4
num-executors: 50
queue: production
from luigi.contrib.spark import SparkSubmitTask


class ProductionJob(SparkSubmitTask):
    """All resource settings come from luigi.cfg; nothing is overridden here."""

    name = "Production Analytics"
    app = "analytics.jar"
    entry_class = "com.example.Analytics"

    def app_options(self):
        return ["--mode", "full"]

Example 4: YARN Queue and Supervise Mode

from luigi.contrib.spark import SparkSubmitTask


class SupervisedJob(SparkSubmitTask):
    """Use YARN queue routing and standalone supervise mode."""

    name = "Supervised Job"
    app = "critical_pipeline.jar"
    entry_class = "com.example.CriticalPipeline"

    driver_memory = "2g"
    executor_memory = "4g"

    @property
    def queue(self):
        return "high-priority"

    @property
    def supervise(self):
        return True

    def app_options(self):
        return ["--input", "/data/raw", "--output", "/data/processed"]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment