Implementation:Spotify Luigi SparkSubmitTask Config
Template:Knowledge Sources
Template:Domains
Template:Last Updated
Overview
Concrete tool for declaring the Apache Spark submission environment -- cluster manager, deploy mode, application identity, and supplementary files -- provided by Luigi.
Description
The SparkSubmitTask class in luigi.contrib.spark exposes a comprehensive set of Python @property accessors that map one-to-one to the flags accepted by spark-submit. Each property reads its default value from the [spark] section of the Luigi configuration file (luigi.cfg), and can be overridden at the class level or at the instance level by subclasses. The properties governing configuration (as opposed to resources or execution) include:
spark_submit-- path to thespark-submitbinary (default:"spark-submit").master-- cluster manager URL (e.g.,yarn,local[*],spark://host:7077).deploy_mode--"client"or"cluster".name-- human-readable application name.entry_class-- JVM main class for.jarapplications.app-- path to the application artifact (.jaror.pyfile).jars,packages,py_files,files,archives-- supplementary artifacts distributed to the cluster.conf/properties_file-- arbitrary Spark configuration key-value pairs.
List-valued properties are parsed from comma-separated strings via the private helper _list_config, while dictionary-valued properties use pipe-and-equals syntax via _dict_config.
Usage
Import SparkSubmitTask and subclass it whenever you need to submit a Spark application to a cluster. Override individual properties or set class attributes to tailor the configuration for your environment.
Code Reference
Source Location
luigi/contrib/spark.py, lines 37--263 (class SparkSubmitTask). Configuration properties span lines 49--51 (class attributes) and 59--184 (@property accessors).
Class Signature
class SparkSubmitTask(ExternalProgramTask):
# Application (.jar or .py file)
name = None
entry_class = None
app = None
Key Property Signatures
@property
def spark_submit(self):
return configuration.get_config().get(self.spark_version, 'spark-submit', 'spark-submit')
@property
def master(self):
return configuration.get_config().get(self.spark_version, "master", None)
@property
def deploy_mode(self):
return configuration.get_config().get(self.spark_version, "deploy-mode", None)
@property
def jars(self):
return self._list_config(configuration.get_config().get(self.spark_version, "jars", None))
@property
def conf(self):
return self._dict_config(configuration.get_config().get(self.spark_version, "conf", None))
Import
from luigi.contrib.spark import SparkSubmitTask
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
[spark] spark-submit |
str |
Path to the spark-submit executable. Default: "spark-submit".
|
[spark] master |
str or None |
Cluster manager URL (e.g., "yarn", "local[*]").
|
[spark] deploy-mode |
str or None |
Deployment mode: "client" or "cluster".
|
[spark] jars |
str (comma-separated) or None |
Additional JARs to include on the driver and executor classpaths. |
[spark] conf |
str (pipe-separated key=value) or None |
Arbitrary Spark configuration properties. |
[spark] properties-file |
str or None |
Path to a file from which to load extra Spark properties. |
app (class attribute) |
str |
Path to the .jar or .py application file.
|
Outputs
| Name | Type | Description |
|---|---|---|
| Command-line argument list | list[str] |
Assembled list of spark-submit flags and values, produced by spark_command().
|
Usage Examples
Example 1: Minimal JVM Spark Job with Explicit Configuration
from luigi.contrib.spark import SparkSubmitTask
import luigi
class TrainALSModel(SparkSubmitTask):
"""Submit a Scala Spark JAR with explicit configuration."""
name = "ALS Training Job"
entry_class = "com.example.spark.TrainALS"
app = "my-spark-assembly.jar"
# Override configuration properties directly
driver_memory = "2g"
executor_memory = "3g"
data_size = luigi.IntParameter(default=1000)
def app_options(self):
return [self.input().path, self.output().path]
def requires(self):
return GenerateMatrix(self.data_size)
def output(self):
return luigi.LocalTarget("als-output/")
Example 2: Configuration Driven by luigi.cfg
Contents of luigi.cfg:
[spark]
spark-submit: /usr/local/spark/bin/spark-submit
master: yarn
deploy-mode: client
jars: /opt/libs/custom-udf.jar, /opt/libs/avro-tools.jar
conf: spark.serializer=org.apache.spark.serializer.KryoSerializer|spark.executor.instances=10
from luigi.contrib.spark import SparkSubmitTask
class WordCount(SparkSubmitTask):
"""All configuration comes from luigi.cfg; only app is set here."""
name = "PySpark Word Count"
app = "wordcount.py"
def app_options(self):
return [self.input().path, self.output().path]
Example 3: Overriding the Cluster Manager at the Class Level
from luigi.contrib.spark import SparkSubmitTask
class LocalDebugJob(SparkSubmitTask):
"""Force local mode for development testing."""
name = "Debug Job"
app = "debug_pipeline.py"
@property
def master(self):
return "local[4]"
@property
def deploy_mode(self):
return "client"
def app_options(self):
return ["--verbose"]