Implementation:Spotify Luigi JobTask MapReduce
Appearance
Domains: Pipeline_Orchestration, Big_Data
Last Updated: 2026-02-10 00:00 GMT
Overview
Concrete tool for implementing MapReduce jobs with the mapper/reducer paradigm provided by Luigi.
Description
The JobTask class in luigi.contrib.hadoop is a Luigi task that represents a complete Hadoop Streaming MapReduce job. It extends BaseHadoopJobTask and provides the full map-shuffle-reduce lifecycle. Developers subclass JobTask and override the mapper() method (and optionally reducer() and combiner()) to define their data processing logic. The class handles:
- Parsing input records via
reader()and formatting output viawriter(). - Running the mapper, combiner, and reducer on Hadoop nodes via the
run_mapper(),run_combiner(), andrun_reducer()methods. - Serialization of intermediate key-value pairs using configurable data interchange formats (Python
repr/evalor JSON). - Hadoop counter support with batched increments via
incr_counter(). - Automatic job runner selection: if all outputs are
HdfsTarget, it usesDefaultHadoopJobRunner; otherwise it falls back toLocalJobRunnerfor testing. - Hook methods
final_mapper,final_combiner, andfinal_reducerfor emitting summary records after all input has been processed.
Usage
Use JobTask when:
- You want to write a Hadoop Streaming MapReduce job in pure Python within a Luigi pipeline.
- Your computation follows the map-then-reduce pattern (e.g., word count, aggregation, filtering).
- You need a combiner to reduce shuffle traffic between mappers and reducers.
- You want automatic local-mode fallback for unit testing without a Hadoop cluster.
Code Reference
Source Location
luigi/contrib/hadoop.py, lines 777--1049.
Key Signatures
class JobTask(BaseHadoopJobTask):
n_reduce_tasks = 25
reducer = NotImplemented
def mapper(self, item):
"""Process one input record. Yields (key, value) tuples."""
yield None, item
combiner = NotImplemented
def reducer(self, key, values):
"""Aggregate all values for a given key. Yields output records."""
...
def reader(self, input_stream):
"""Parse raw input lines into records for the mapper."""
for line in input_stream:
yield line,
def writer(self, outputs, stdout, stderr=sys.stderr):
"""Format output records as tab-separated lines."""
...
def run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
"""Execute the map phase on a Hadoop node."""
...
def run_reducer(self, stdin=sys.stdin, stdout=sys.stdout):
"""Execute the reduce phase on a Hadoop node."""
...
def run_combiner(self, stdin=sys.stdin, stdout=sys.stdout):
"""Execute the combine phase on a Hadoop node."""
...
def incr_counter(self, *args, **kwargs):
"""Increment a Hadoop counter (batched for performance)."""
...
def job_runner(self):
"""Return DefaultHadoopJobRunner or LocalJobRunner."""
...
def extra_modules(self):
"""Additional Python modules to ship to the cluster."""
return []
def extra_files(self):
"""Additional files to distribute via Hadoop's -files option."""
return []
Import
import luigi.contrib.hadoop
class MyJob(luigi.contrib.hadoop.JobTask):
...
I/O Contract
Inputs
| Name | Type | Description |
|---|---|---|
| input (via requires()) | list[HdfsTarget] | One or more HDFS targets providing input data to the mapper. |
| item (mapper arg) | tuple | A single record parsed by reader(); by default a one-element tuple containing the raw line.
|
| key (reducer arg) | any | The grouping key emitted by the mapper. |
| values (reducer arg) | iterator | All values associated with the key, across all mappers. |
| n_reduce_tasks | int | Number of reduce tasks (default 25). Set to 0 for map-only jobs. |
| data_interchange_format | str | "python" (default) or "json" for intermediate serialization.
|
Outputs
| Name | Type | Description |
|---|---|---|
| output (via output()) | HdfsTarget | The HDFS target where final reducer output is written. |
| mapper yields | (key, value) tuples | Intermediate key-value pairs shuffled to reducers. |
| reducer yields | tuples | Final output records written as tab-separated lines. |
Usage Examples
Example 1: Word count (from examples/wordcount_hadoop.py)
import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
class InputText(luigi.ExternalTask):
date = luigi.DateParameter()
def output(self):
return luigi.contrib.hdfs.HdfsTarget(
self.date.strftime('/tmp/text/%Y-%m-%d.txt')
)
class WordCount(luigi.contrib.hadoop.JobTask):
date_interval = luigi.DateIntervalParameter()
def requires(self):
return [InputText(date) for date in self.date_interval.dates()]
def output(self):
return luigi.contrib.hdfs.HdfsTarget(
'/tmp/text-count/%s' % self.date_interval
)
def mapper(self, line):
for word in line.strip().split():
yield word, 1
def reducer(self, key, values):
yield key, sum(values)
if __name__ == '__main__':
luigi.run()
Example 2: Map-only job (no reducer)
import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
class FilterLines(luigi.contrib.hadoop.JobTask):
input_path = luigi.Parameter()
keyword = luigi.Parameter()
def requires(self):
return luigi.contrib.hdfs.HdfsTarget(self.input_path)
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/filtered/%s' % self.keyword)
def mapper(self, line):
if self.keyword in line:
yield line,
Example 3: Using a combiner for optimization
import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs
class OptimizedWordCount(luigi.contrib.hadoop.JobTask):
date_interval = luigi.DateIntervalParameter()
def requires(self):
return luigi.contrib.hdfs.HdfsTarget('/data/input/')
def output(self):
return luigi.contrib.hdfs.HdfsTarget('/data/wordcount-output/')
def mapper(self, line):
for word in line.strip().split():
yield word, 1
def combiner(self, key, values):
# Pre-aggregate on the map side to reduce shuffle traffic
yield key, sum(values)
def reducer(self, key, values):
yield key, sum(values)
Related Pages
Page Connections
Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment