Implementation:Spotify Luigi HadoopJobRunner
Appearance
Domains: Pipeline_Orchestration, Big_Data
Last Updated: 2026-02-10 00:00 GMT
Overview
Concrete tool for configuring Hadoop job execution resources and environment provided by Luigi.
Description
Luigi provides three job runner classes that handle the mechanics of submitting and executing Hadoop jobs:
HadoopJobRunner(luigi/contrib/hadoop.py, lines 389--585) -- The primary runner for Hadoop Streaming jobs. It packages Python modules into a tarball, serializes the task instance as a pickle file, constructs the fullhadoop jarcommand with all configuration flags (-libjars,-archives,-files,-Dproperties,-mapper,-reducer,-combiner,-input,-output), submits the job, and optionally performs an atomic move of the output directory.BaseHadoopJobTask(luigi/contrib/hadoop.py, lines 664--764) -- The abstract base task class that defines job configuration methods:jobconfs()for generating-Dproperties (job name, priority, scheduler pool),init_local()andinit_hadoop()for initialization hooks, andjob_runner()as the abstract factory method that returns a runner instance.HadoopJarJobTask(luigi/contrib/hadoop_jar.py, lines 128--167) -- A task class for running pre-compiled Java JAR jobs. It definesjar(),main(),args(),atomic_output(), andssh()methods, and usesHadoopJarJobRunnerto submit the job.
Usage
Use these classes when:
- You need fine-grained control over the streaming JAR, libjars, archives, or distributed files for a Hadoop Streaming job.
- You want to set custom Hadoop configuration properties (
mapred.reduce.tasks, pool assignments, job priority). - You need to run a pre-compiled Java MapReduce JAR (like TeraSort or a custom Hadoop application).
- You want atomic output semantics where the final output path appears only after the job completes successfully.
- You need to submit jobs to a remote cluster via SSH.
Code Reference
Source Location
luigi/contrib/hadoop.py, lines 389--585 (HadoopJobRunner)luigi/contrib/hadoop.py, lines 664--764 (BaseHadoopJobTask)luigi/contrib/hadoop_jar.py, lines 128--167 (HadoopJarJobTask)
Key Signatures
class HadoopJobRunner(JobRunner):
def __init__(self, streaming_jar, modules=None, streaming_args=None,
libjars=None, libjars_in_hdfs=None, jobconfs=None,
input_format=None, output_format=None,
end_job_with_atomic_move_dir=True, archives=None):
...
def run_job(self, job, tracking_url_callback=None):
"""Package code, build arglist, submit streaming job, atomic move output."""
...
def finish(self):
"""Clean up local temporary directory."""
...
class BaseHadoopJobTask(luigi.Task):
pool = luigi.OptionalParameter(default=None, significant=False, positional=False)
def jobconfs(self):
"""Return list of 'key=value' Hadoop configuration strings."""
...
def init_local(self):
"""Hook for local initialization before job submission."""
...
def init_hadoop(self):
"""Hook for initialization on Hadoop nodes."""
...
@abc.abstractmethod
def job_runner(self):
"""Return a JobRunner instance (factory method)."""
...
class HadoopJarJobTask(BaseHadoopJobTask):
def jar(self):
"""Path to the JAR file."""
return None
def main(self):
"""Optional main class name."""
return None
def args(self):
"""Arguments after 'hadoop jar <jar> <main>'."""
return []
def atomic_output(self):
"""If True, use temp paths for outputs and move atomically."""
return True
def ssh(self):
"""SSH config dict for remote submission, or None."""
return None
def job_runner(self):
return HadoopJarJobRunner()
Import
from luigi.contrib.hadoop import HadoopJobRunner, BaseHadoopJobTask
from luigi.contrib.hadoop_jar import HadoopJarJobTask, HadoopJarJobRunner
I/O Contract
Inputs (HadoopJobRunner.__init__)
| Name | Type | Description |
|---|---|---|
| streaming_jar | str | Filesystem path to the Hadoop Streaming JAR. |
| modules | list or None | Additional Python modules to include in the packages archive. |
| streaming_args | list or None | Extra arguments passed directly to the streaming JAR. |
| libjars | list or None | Local paths to Java JAR files to distribute via -libjars.
|
| libjars_in_hdfs | list or None | HDFS paths to Java JAR files to download and distribute. |
| jobconfs | dict or None | Additional key: value Hadoop configuration properties.
|
| input_format | str or None | Custom Hadoop InputFormat class name. |
| output_format | str or None | Custom Hadoop OutputFormat class name. |
| end_job_with_atomic_move_dir | bool | If True (default), write to a temp dir and atomically move on success.
|
| archives | list or None | Paths to archive files to distribute via -archives.
|
Inputs (BaseHadoopJobTask)
| Name | Type | Description |
|---|---|---|
| pool | str or None | Hadoop scheduler pool name (Fair Scheduler) or queue name (Capacity Scheduler). |
Outputs
| Name | Type | Description |
|---|---|---|
| run_job() | None | Submits the Hadoop job; on success the output HDFS path is populated. Raises HadoopJobError on failure.
|
| jobconfs() | list[str] | List of "key=value" strings for -D flags (e.g., "mapred.job.name=MyTask").
|
Usage Examples
Example 1: Custom streaming job runner with libjars
from luigi.contrib.hadoop import HadoopJobRunner, JobTask
import luigi.contrib.hdfs
class CustomRunner(HadoopJobRunner):
def __init__(self):
super().__init__(
streaming_jar='/usr/lib/hadoop/hadoop-streaming-2.10.1.jar',
libjars=['/opt/libs/custom-inputformat.jar'],
jobconfs={'mapred.reduce.tasks': '50'},
end_job_with_atomic_move_dir=True,
)
class MyStreamingJob(JobTask):
def job_runner(self):
return CustomRunner()
def requires(self):
return luigi.contrib.hdfs.HdfsTarget('/data/input/')
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/output/')
def mapper(self, line):
yield line.split(',')[0], 1
def reducer(self, key, values):
yield key, sum(values)
Example 2: Hadoop JAR job (TeraSort pattern)
import luigi
import luigi.contrib.hadoop_jar
import luigi.contrib.hdfs
class TeraSort(luigi.contrib.hadoop_jar.HadoopJarJobTask):
terasort_in = luigi.Parameter(default='/tmp/terasort-in')
terasort_out = luigi.Parameter(default='/tmp/terasort-out')
def requires(self):
return luigi.contrib.hdfs.HdfsTarget(self.terasort_in)
def output(self):
return luigi.contrib.hdfs.HdfsTarget(self.terasort_out)
def jar(self):
return '/usr/lib/hadoop/hadoop-examples.jar'
def main(self):
return 'terasort'
def args(self):
return [self.input(), self.output()]
Example 3: Setting pool and priority via BaseHadoopJobTask
import luigi.contrib.hadoop
import luigi.contrib.hdfs
class PrioritizedJob(luigi.contrib.hadoop.JobTask):
pool = luigi.Parameter(default='production')
def mr_priority(self):
return 'HIGH'
def requires(self):
return luigi.contrib.hdfs.HdfsTarget('/data/high-priority-input/')
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/high-priority-output/')
def mapper(self, line):
yield line.strip(), 1
def reducer(self, key, values):
yield key, sum(values)
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment