Implementation:Spotify Luigi SparkSubmitTask Resources
Template:Knowledge Sources
Template:Domains
Template:Last Updated
Overview
Concrete tool for declaring and passing computational resource settings -- driver memory, executor memory, core counts, queue, and supervision -- to spark-submit, provided by Luigi.
Description
The SparkSubmitTask class in luigi.contrib.spark (lines 74--228) exposes a set of @property accessors and a spark_command() method that together translate resource declarations into spark-submit command-line flags. Each property reads its default from the [spark] section of luigi.cfg and can be overridden at the class or instance level.
The resource-related properties are:
driver_memory(line 138) -- maps to--driver-memory.driver_cores(line 158) -- maps to--driver-cores.driver_java_options(line 142) -- maps to--driver-java-options.driver_library_path(line 146) -- maps to--driver-library-path.driver_class_path(line 150) -- maps to--driver-class-path.executor_memory(line 154) -- maps to--executor-memory.executor_cores(line 170) -- maps to--executor-cores.num_executors(line 178) -- maps to--num-executors.total_executor_cores(line 166) -- maps to--total-executor-cores.queue(line 174) -- maps to--queue.supervise(line 162) -- maps to--supervise(boolean flag).
The spark_command() method (lines 204--228) iterates over all properties, using three private helpers to map values to command-line arguments:
_text_arg(name, value)-- emits[name, value]if the value is truthy._flag_arg(name, value)-- emits[name](no value) if the boolean is true._dict_arg(name, value)-- emits repeated[name, key=value]pairs for Spark--confproperties.
Usage
Override resource properties as class attributes for static values (e.g., driver_memory = '4g'), as Luigi parameters for values that change per invocation, or rely on luigi.cfg defaults for shared cluster-wide settings.
Code Reference
Source Location
luigi/contrib/spark.py, lines 74--228 (resource properties and spark_command() assembly).
Key Property Signatures
@property
def driver_memory(self):
return configuration.get_config().get(self.spark_version, "driver-memory", None)
@property
def executor_memory(self):
return configuration.get_config().get(self.spark_version, "executor-memory", None)
@property
def driver_cores(self):
return configuration.get_config().get(self.spark_version, "driver-cores", None)
@property
def executor_cores(self):
return configuration.get_config().get(self.spark_version, "executor-cores", None)
@property
def num_executors(self):
return configuration.get_config().get(self.spark_version, "num-executors", None)
@property
def total_executor_cores(self):
return configuration.get_config().get(self.spark_version, "total-executor-cores", None)
@property
def queue(self):
return configuration.get_config().get(self.spark_version, "queue", None)
@property
def supervise(self):
return bool(configuration.get_config().get(self.spark_version, "supervise", False))
Command Assembly (spark_command excerpt)
def spark_command(self):
command = [self.spark_submit]
command += self._text_arg('--master', self.master)
command += self._text_arg('--deploy-mode', self.deploy_mode)
...
command += self._text_arg('--driver-memory', self.driver_memory)
command += self._text_arg('--driver-java-options', self.driver_java_options)
command += self._text_arg('--driver-library-path', self.driver_library_path)
command += self._text_arg('--driver-class-path', self.driver_class_path)
command += self._text_arg('--executor-memory', self.executor_memory)
command += self._text_arg('--driver-cores', self.driver_cores)
command += self._flag_arg('--supervise', self.supervise)
command += self._text_arg('--total-executor-cores', self.total_executor_cores)
command += self._text_arg('--executor-cores', self.executor_cores)
command += self._text_arg('--queue', self.queue)
command += self._text_arg('--num-executors', self.num_executors)
return command
Import
from luigi.contrib.spark import SparkSubmitTask
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
driver_memory |
str or None |
Driver heap size (e.g., "2g", "4096m"). Defaults to [spark] driver-memory in luigi.cfg.
|
driver_cores |
str or None |
Number of driver cores (cluster mode only). |
driver_java_options |
str or None |
Extra JVM flags for the driver process. |
driver_library_path |
str or None |
Native library path for the driver. |
driver_class_path |
str or None |
Additional classpath entries for the driver. |
executor_memory |
str or None |
Memory per executor (e.g., "3g").
|
executor_cores |
str or None |
Cores per executor. |
num_executors |
str or None |
Number of executor instances (YARN). |
total_executor_cores |
str or None |
Total cores across all executors (standalone/Mesos). |
queue |
str or None |
YARN scheduler queue. |
supervise |
bool |
If True, adds --supervise flag (standalone cluster mode).
|
Outputs
| Name | Type | Description |
|---|---|---|
| Resource flags in command list | list[str] |
The resource-related portion of the spark-submit command line, produced by spark_command().
|
Usage Examples
Example 1: Static Resource Allocation as Class Attributes
from luigi.contrib.spark import SparkSubmitTask
class HeavyETLJob(SparkSubmitTask):
"""A resource-intensive ETL job with fixed resource settings."""
name = "Heavy ETL"
app = "etl_pipeline.jar"
entry_class = "com.example.ETLMain"
driver_memory = "4g"
executor_memory = "8g"
executor_cores = "4"
num_executors = "20"
def app_options(self):
return ["--date", "2026-02-10"]
Example 2: Parameterised Resource Allocation
import luigi
from luigi.contrib.spark import SparkSubmitTask
class ScalableJob(SparkSubmitTask):
"""Resources can be tuned per invocation via Luigi parameters."""
name = "Scalable Spark Job"
app = "process.py"
driver_memory = "2g"
executor_memory = "3g"
num_executors = luigi.IntParameter(default=10)
total_executor_cores = luigi.IntParameter(default=40, significant=False)
def app_options(self):
return [self.input().path, self.output().path]
Example 3: Centralised Resources via luigi.cfg
Contents of luigi.cfg:
[spark]
spark-submit: /opt/spark/bin/spark-submit
master: yarn
deploy-mode: cluster
driver-memory: 4g
executor-memory: 8g
executor-cores: 4
num-executors: 50
queue: production
from luigi.contrib.spark import SparkSubmitTask
class ProductionJob(SparkSubmitTask):
"""All resource settings come from luigi.cfg; nothing is overridden here."""
name = "Production Analytics"
app = "analytics.jar"
entry_class = "com.example.Analytics"
def app_options(self):
return ["--mode", "full"]
Example 4: YARN Queue and Supervise Mode
from luigi.contrib.spark import SparkSubmitTask
class SupervisedJob(SparkSubmitTask):
"""Use YARN queue routing and standalone supervise mode."""
name = "Supervised Job"
app = "critical_pipeline.jar"
entry_class = "com.example.CriticalPipeline"
driver_memory = "2g"
executor_memory = "4g"
@property
def queue(self):
return "high-priority"
@property
def supervise(self):
return True
def app_options(self):
return ["--input", "/data/raw", "--output", "/data/processed"]