Implementation:Spotify Luigi ScaldingJobTask
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Hadoop |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
ScaldingJobTask is a Luigi task that compiles, builds, and submits Twitter Scalding (Scala-based MapReduce) jobs to a Hadoop cluster, with automatic source compilation, jar packaging, and HDFS integration.
Description
The Scalding contrib module provides Luigi integration for Twitter Scalding, a Scala library for writing MapReduce jobs. The module consists of two primary classes:
- ScaldingJobTask (extends luigi.contrib.hadoop.BaseHadoopJobTask) -- An abstract task class for defining Scalding jobs within Luigi workflows. Users specify either a Scala source file (via source()) or a pre-built JAR (via jar()), along with an optional job_class() method to identify the main Scalding job class. The requires() method should return a dictionary where keys are Scalding argument names and values are subtasks whose outputs become the input paths. The args() method assembles the full argument list including --output and any job_args(). The task supports atomic output writes to prevent partial results from corrupting downstream tasks.
- ScaldingJobRunner (extends luigi.contrib.hadoop.JobRunner) -- The execution engine that handles the full lifecycle of a Scalding job: compiling Scala source files using the Scala compiler, packaging compiled classes into a JAR, and submitting the job to Hadoop via the hadoop jar command with appropriate classpath and libjars configuration. The runner reads configuration from the [scalding] section of the Luigi config file, including paths for scala-home, scalding-home, scalding-provided, and scalding-libjars.
The runner's build_job_jar() method either uses a pre-built JAR directly or compiles the Scala source on-the-fly. The run_job() method constructs the full Hadoop command line, sets HADOOP_CLASSPATH, and invokes the job with tracking URL support. The module leverages luigi.contrib.hadoop_jar.fix_paths() for atomic output handling on HDFS.
Usage
Use ScaldingJobTask when you have existing Scalding (Scala MapReduce) jobs that you want to orchestrate within a Luigi pipeline. This is appropriate for teams that have invested in Twitter Scalding for their Hadoop data processing and want to integrate these jobs with other Luigi tasks. The task supports both source-level compilation (convenient for development) and pre-built JAR submission (suitable for production). Configure the [scalding] section in your Luigi configuration file with the paths to your Scala and Scalding installations.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/contrib/scalding.py
- Lines: 1-309
Signature
class ScaldingJobRunner(luigi.contrib.hadoop.JobRunner):
def __init__(self): ...
def get_scala_jars(self, include_compiler=False): ...
def get_scalding_jars(self): ...
def get_scalding_core(self): ...
def get_provided_jars(self): ...
def get_libjars(self): ...
def get_tmp_job_jar(self, source): ...
def get_build_dir(self, source): ...
def get_job_class(self, source): ...
def build_job_jar(self, job): ...
def run_job(self, job, tracking_url_callback=None): ...
class ScaldingJobTask(luigi.contrib.hadoop.BaseHadoopJobTask):
def relpath(self, current_file, rel_path): ...
def source(self): ... # str or None: path to Scala source file
def jar(self): ... # str or None: path to pre-built JAR
def extra_jars(self): ... # list[str]: additional JAR paths
def job_class(self): ... # str or None: main Scalding job class name
def job_runner(self): ... # returns ScaldingJobRunner()
def atomic_output(self): ... # bool: use atomic output (default: True)
def requires(self): ... # dict: {arg_name: subtask_or_list}
def job_args(self): ... # list[str]: extra arguments
def args(self): ... # list[str]: full argument list
Import
from luigi.contrib.scalding import ScaldingJobTask, ScaldingJobRunner
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| source() | str | Conditional | Path to the Scala source file for the Scalding job; required if jar() is not provided |
| jar() | str | Conditional | Path to a pre-built JAR file; required if source() is not provided |
| job_class() | str | Conditional | Fully qualified class name of the Scalding job; required if jar() is provided, auto-detected from source otherwise |
| extra_jars() | list[str] | No | Additional JAR files needed for building and running the job |
| requires() | dict | No | Dictionary mapping Scalding argument names to Luigi tasks whose output paths become input arguments |
| job_args() | list[str] | No | Extra command-line arguments to pass to the Scalding job |
Outputs
| Name | Type | Description |
|---|---|---|
| output | HdfsTarget | HDFS target path passed as --output to the Scalding job |
| job_jar | str | Path to the compiled and packaged JAR file (intermediate artifact) |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.scalding import ScaldingJobTask
import luigi.contrib.hdfs
class WordCount(ScaldingJobTask):
def source(self):
return self.relpath(__file__, 'WordCount.scala')
def requires(self):
return {'input': SomeUpstreamTask()}
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/output/word_count')
class WordCountFromJar(ScaldingJobTask):
def jar(self):
return '/jars/word-count-assembly.jar'
def job_class(self):
return 'com.example.WordCount'
def extra_jars(self):
return ['/jars/custom-utils.jar']
def requires(self):
return {'input': SomeUpstreamTask()}
def job_args(self):
return ['--num-reducers', '10']
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/output/word_count_jar')
Configuration
# luigi.cfg
[scalding]
scala-home: /usr/share/scala
scalding-home: /usr/share/scalding
scalding-provided: /usr/share/scalding/provided
scalding-libjars: /usr/share/scalding/libjars