Implementation:Spotify Luigi BaseHadoopJobTask Run
Domains: Pipeline_Orchestration, Big_Data
Last Updated: 2026-02-10 00:00 GMT
Overview
Concrete tool for executing Hadoop MapReduce pipelines via Luigi orchestration provided by Luigi.
Description
The execution of a Hadoop MapReduce pipeline in Luigi is driven by two key methods that work together:
BaseHadoopJobTask.run()(luigi/contrib/hadoop.py, lines 721--731) -- The entry point called by the Luigi scheduler. It configures the data serialization format (Pythonrepr/evalor JSON), invokesinit_local()for any pre-submission setup, and then delegates toself.job_runner().run_job(self).
HadoopJobRunner.run_job()(luigi/contrib/hadoop.py, lines 414--576) -- The method that performs the actual job submission. It:- Collects all required Python packages (Luigi itself, user modules, attached packages) and creates a
packages.tararchive. - Serializes the task instance to
job-instance.pickleso it can be reconstructed on Hadoop nodes. - Constructs the full
hadoop jarcommand with the streaming JAR and all flags:-libjars,-archives,-files,-Dconfiguration properties,-mapper,-combiner,-reducer,-inputformat,-outputformat,-input, and-output. - If
end_job_with_atomic_move_diris enabled, replaces the output path with a timestamped temporary path. - Calls
run_and_track_hadoop_job()which starts the subprocess, parses stderr for tracking URLs and job IDs, captures output, and raisesHadoopJobErroron failure. - On success, atomically moves the temporary output directory to the final path.
- Cleans up the local temporary directory.
- Collects all required Python packages (Luigi itself, user modules, attached packages) and creates a
The run_and_track_hadoop_job() function also installs a HadoopRunContext that intercepts SIGTERM and KeyboardInterrupt to kill the running Hadoop job via yarn application -kill or mapred job -kill.
Usage
Use BaseHadoopJobTask.run() indirectly by subclassing JobTask or HadoopJarJobTask and letting the Luigi scheduler invoke run(). Direct calls are only needed for testing or manual execution outside the scheduler.
Code Reference
Source Location
luigi/contrib/hadoop.py, lines 721--731 (BaseHadoopJobTask.run)luigi/contrib/hadoop.py, lines 414--576 (HadoopJobRunner.run_job)
Key Signatures
class BaseHadoopJobTask(luigi.Task):
# Configurable data interchange: "python" or "json"
data_interchange_format = "python"
def run(self):
self.serialize = DataInterchange[self.data_interchange_format]['serialize']
self.internal_serialize = DataInterchange[self.data_interchange_format]['internal_serialize']
self.deserialize = DataInterchange[self.data_interchange_format]['deserialize']
self.init_local()
self.job_runner().run_job(self)
class HadoopJobRunner(JobRunner):
def run_job(self, job, tracking_url_callback=None):
"""
Full job submission lifecycle:
1. Package Python modules into packages.tar
2. Pickle the job instance to job-instance.pickle
3. Build hadoop jar command with all flags
4. Submit via run_and_track_hadoop_job()
5. Atomic move of output on success
6. Cleanup temp directory
"""
...
def run_and_track_hadoop_job(arglist, tracking_url_callback=None, env=None):
"""
Execute hadoop command, parse stderr for tracking URLs and job IDs,
capture output, and raise HadoopJobError on failure.
"""
...
Import
from luigi.contrib.hadoop import BaseHadoopJobTask, HadoopJobRunner
from luigi.contrib.hadoop import run_and_track_hadoop_job, HadoopJobError
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| self (BaseHadoopJobTask) | BaseHadoopJobTask | The task instance; must have input_hadoop(), output(), job_runner(), and mapper/reducer methods defined.
|
| data_interchange_format | str | "python" (uses repr/eval) or "json" (uses json.dumps/json.loads) for intermediate key-value serialization.
|
| job (run_job arg) | BaseHadoopJobTask | The task to execute. The runner reads its input_hadoop(), output(), jobconfs(), extra_modules(), extra_files(), extra_archives(), and extra_streaming_arguments().
|
| streaming_jar | str | Path to the Hadoop Streaming JAR (from runner configuration). |
Outputs
| Name | Type | Description |
|---|---|---|
| HDFS output path | directory on HDFS | The final output directory, atomically moved from a temporary path upon job success. |
| HadoopJobError | exception | Raised on failure, containing the error message, stdout, and stderr from the Hadoop process. |
| Tracking URL | str (via callback) | The YARN/MapReduce tracking URL, set on the task via job.set_tracking_url().
|
Usage Examples
Example 1: Complete WordCount pipeline execution
import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
class InputText(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(
self.date.strftime('/tmp/text/%Y-%m-%d.txt')
)
class WordCount(luigi.contrib.hadoop.JobTask):
"""
When Luigi's scheduler calls WordCount.run(), the following happens:
1. Data serialization functions are configured (repr/eval by default)
2. init_local() is called (no-op unless overridden)
3. job_runner() returns DefaultHadoopJobRunner (reads streaming-jar from config)
4. run_job() packages code, pickles the task, builds the hadoop command,
submits it, and atomically moves output on success
"""
date_interval = luigi.DateIntervalParameter()
def requires(self):
return [InputText(date) for date in self.date_interval.dates()]
def output(self):
return luigi.contrib.hdfs.HdfsTarget(
'/tmp/text-count/%s' % self.date_interval
)
def mapper(self, line):
for word in line.strip().split():
yield word, 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
luigi.run()
Example 2: Multi-step pipeline with dependency chain
import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
class RawData(luigi.ExternalTask):
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/raw/events/')
class CountByUser(luigi.contrib.hadoop.JobTask):
def requires(self):
return RawData()
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/intermediate/user-counts/')
def mapper(self, line):
fields = line.strip().split('\t')
if len(fields) >= 2:
user_id = fields[0]
yield user_id, 1
def reducer(self, key, values):
yield key, sum(values)
class TopUsers(luigi.contrib.hadoop.JobTask):
"""
Execution flow:
1. Luigi scheduler resolves dependencies: TopUsers -> CountByUser -> RawData
2. RawData.output().exists() is checked on HDFS
3. CountByUser.run() is called, which submits a streaming job
4. After CountByUser completes, TopUsers.run() is called
5. Each run() call goes through the full BaseHadoopJobTask.run() lifecycle
"""
threshold = luigi.IntParameter(default=100)
def requires(self):
return CountByUser()
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/output/top-users/')
def mapper(self, line):
parts = line.strip().split('\t')
if len(parts) == 2:
user, count = parts[0], int(parts[1])
if count >= self.threshold:
yield user, count
if __name__ == '__main__':
luigi.run()
Example 3: Handling job failures
from luigi.contrib.hadoop import HadoopJobError, BaseHadoopJobTask
class MyTask(BaseHadoopJobTask):
def on_failure(self, exception):
if isinstance(exception, HadoopJobError):
# HadoopJobError contains .message, .out (stdout), .err (stderr)
return (
"Hadoop job failed: %s\n\nstdout:\n%s\n\nstderr:\n%s"
% (exception.message, exception.out, exception.err)
)
return super().on_failure(exception)