Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Spotify Luigi JobTask MapReduce

From Leeroopedia


Template:Knowledge Source

Domains: Pipeline_Orchestration, Big_Data

Last Updated: 2026-02-10 00:00 GMT

Overview

Concrete tool for implementing MapReduce jobs with the mapper/reducer paradigm provided by Luigi.

Description

The JobTask class in luigi.contrib.hadoop is a Luigi task that represents a complete Hadoop Streaming MapReduce job. It extends BaseHadoopJobTask and provides the full map-shuffle-reduce lifecycle. Developers subclass JobTask and override the mapper() method (and optionally reducer() and combiner()) to define their data processing logic. The class handles:

  • Parsing input records via reader() and formatting output via writer().
  • Running the mapper, combiner, and reducer on Hadoop nodes via the run_mapper(), run_combiner(), and run_reducer() methods.
  • Serialization of intermediate key-value pairs using configurable data interchange formats (Python repr/eval or JSON).
  • Hadoop counter support with batched increments via incr_counter().
  • Automatic job runner selection: if all outputs are HdfsTarget, it uses DefaultHadoopJobRunner; otherwise it falls back to LocalJobRunner for testing.
  • Hook methods final_mapper, final_combiner, and final_reducer for emitting summary records after all input has been processed.

Usage

Use JobTask when:

  • You want to write a Hadoop Streaming MapReduce job in pure Python within a Luigi pipeline.
  • Your computation follows the map-then-reduce pattern (e.g., word count, aggregation, filtering).
  • You need a combiner to reduce shuffle traffic between mappers and reducers.
  • You want automatic local-mode fallback for unit testing without a Hadoop cluster.

Code Reference

Source Location

luigi/contrib/hadoop.py, lines 777--1049.

Key Signatures

class JobTask(BaseHadoopJobTask):
    n_reduce_tasks = 25
    reducer = NotImplemented

    def mapper(self, item):
        """Process one input record. Yields (key, value) tuples."""
        yield None, item

    combiner = NotImplemented

    def reducer(self, key, values):
        """Aggregate all values for a given key. Yields output records."""
        ...

    def reader(self, input_stream):
        """Parse raw input lines into records for the mapper."""
        for line in input_stream:
            yield line,

    def writer(self, outputs, stdout, stderr=sys.stderr):
        """Format output records as tab-separated lines."""
        ...

    def run_mapper(self, stdin=sys.stdin, stdout=sys.stdout):
        """Execute the map phase on a Hadoop node."""
        ...

    def run_reducer(self, stdin=sys.stdin, stdout=sys.stdout):
        """Execute the reduce phase on a Hadoop node."""
        ...

    def run_combiner(self, stdin=sys.stdin, stdout=sys.stdout):
        """Execute the combine phase on a Hadoop node."""
        ...

    def incr_counter(self, *args, **kwargs):
        """Increment a Hadoop counter (batched for performance)."""
        ...

    def job_runner(self):
        """Return DefaultHadoopJobRunner or LocalJobRunner."""
        ...

    def extra_modules(self):
        """Additional Python modules to ship to the cluster."""
        return []

    def extra_files(self):
        """Additional files to distribute via Hadoop's -files option."""
        return []

Import

import luigi.contrib.hadoop

class MyJob(luigi.contrib.hadoop.JobTask):
    ...

I/O Contract

Inputs

Name Type Description
input (via requires()) list[HdfsTarget] One or more HDFS targets providing input data to the mapper.
item (mapper arg) tuple A single record parsed by reader(); by default a one-element tuple containing the raw line.
key (reducer arg) any The grouping key emitted by the mapper.
values (reducer arg) iterator All values associated with the key, across all mappers.
n_reduce_tasks int Number of reduce tasks (default 25). Set to 0 for map-only jobs.
data_interchange_format str "python" (default) or "json" for intermediate serialization.

Outputs

Name Type Description
output (via output()) HdfsTarget The HDFS target where final reducer output is written.
mapper yields (key, value) tuples Intermediate key-value pairs shuffled to reducers.
reducer yields tuples Final output records written as tab-separated lines.

Usage Examples

Example 1: Word count (from examples/wordcount_hadoop.py)

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs


class InputText(luigi.ExternalTask):
    date = luigi.DateParameter()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(
            self.date.strftime('/tmp/text/%Y-%m-%d.txt')
        )


class WordCount(luigi.contrib.hadoop.JobTask):
    date_interval = luigi.DateIntervalParameter()

    def requires(self):
        return [InputText(date) for date in self.date_interval.dates()]

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(
            '/tmp/text-count/%s' % self.date_interval
        )

    def mapper(self, line):
        for word in line.strip().split():
            yield word, 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    luigi.run()

Example 2: Map-only job (no reducer)

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs


class FilterLines(luigi.contrib.hadoop.JobTask):
    input_path = luigi.Parameter()
    keyword = luigi.Parameter()

    def requires(self):
        return luigi.contrib.hdfs.HdfsTarget(self.input_path)

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/filtered/%s' % self.keyword)

    def mapper(self, line):
        if self.keyword in line:
            yield line,

Example 3: Using a combiner for optimization

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs


class OptimizedWordCount(luigi.contrib.hadoop.JobTask):
    date_interval = luigi.DateIntervalParameter()

    def requires(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/input/')

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/wordcount-output/')

    def mapper(self, line):
        for word in line.strip().split():
            yield word, 1

    def combiner(self, key, values):
        # Pre-aggregate on the map side to reduce shuffle traffic
        yield key, sum(values)

    def reducer(self, key, values):
        yield key, sum(values)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment