Workflow:Spotify Luigi Hadoop MapReduce Pipeline
| Knowledge Sources | |
|---|---|
| Domains | Data_Engineering, Big_Data, Hadoop, MapReduce |
| Last Updated | 2026-02-10 12:00 GMT |
Overview
End-to-end process for running Hadoop MapReduce Streaming jobs orchestrated by Luigi, reading from and writing to HDFS targets.
Description
This workflow covers how to define and execute Hadoop MapReduce jobs through Luigi's contrib.hadoop module. Instead of implementing run(), tasks subclass JobTask and define mapper() and reducer() methods. Luigi handles packaging the Python code, submitting the job to the Hadoop cluster via the streaming jar, and managing HDFS input/output targets. The workflow supports both Python-based streaming jobs and pre-built Java/Scala jar execution via HadoopJarJobTask.
Usage
Execute this workflow when you have large-scale data stored in HDFS that needs to be processed using the MapReduce paradigm. This is appropriate for batch processing jobs that benefit from Hadoop's distributed compute (e.g., log aggregation, ETL transformations, word frequency counting across terabytes of text). Requires a configured Hadoop cluster and the hadoop-streaming jar.
Execution Steps
Step 1: Configure Hadoop Environment
Set up the Hadoop connection by configuring the [hadoop] section in Luigi's configuration file (luigi.cfg or luigi.toml). This includes specifying the path to the Hadoop streaming jar and any cluster-specific settings. The HDFS client configuration determines how Luigi interacts with the distributed filesystem (CLI-based or WebHDFS).
Key considerations:
- The streaming jar path must be configured in [hadoop] section
- HDFS client auto-selects between CLI (hdfs dfs) and WebHDFS based on configuration
- Environment variables and Hadoop configuration files must be accessible to the Luigi process
Step 2: Define HDFS Input Sources
Declare external HDFS data as ExternalTask subclasses with HdfsTarget outputs. These represent data already present in HDFS (e.g., daily log dumps, upstream pipeline outputs). The HdfsTarget wraps HDFS path operations and provides exists() checking against the distributed filesystem.
Key considerations:
- HdfsTarget supports glob patterns for multi-file inputs
- Date-parameterized paths enable daily/hourly data partitioning
- HdfsFlagTarget can check for _SUCCESS marker files instead of individual file existence
Step 3: Implement MapReduce Job Tasks
Create task subclasses of JobTask (from luigi.contrib.hadoop) and implement the mapper() and reducer() methods. The mapper receives one line of input at a time and yields key-value pairs. The reducer receives a key and an iterator of all values for that key, and yields output key-value pairs. Luigi serializes these methods, packages them with dependencies, and submits them to Hadoop Streaming.
Key considerations:
- mapper(self, line) receives raw input lines; yield (key, value) tuples
- reducer(self, key, values) receives grouped data; yield (key, result) tuples
- combiner() can be implemented for local pre-aggregation before shuffle
- Luigi automatically packages the task's Python module and dependencies for the cluster
Step 4: Configure Job Resources and Outputs
Set HdfsTarget as the output for MapReduce tasks and configure job-level settings such as number of reducers, job queue, memory limits, and extra Hadoop streaming arguments. The output path typically includes parameter values to ensure each run produces a unique output.
Key considerations:
- n_reduce_tasks controls the number of reducer slots
- Extra streaming arguments can be passed via extra_streaming_arguments()
- Output paths should incorporate parameters to avoid collisions between runs
- Compressed output formats (GZip) are supported through HDFS format classes
Step 5: Chain and Execute the Pipeline
Connect MapReduce tasks in a dependency chain using requires() and run the pipeline. Luigi resolves the dependency graph, checks HDFS for existing outputs, and only submits jobs whose outputs are missing. The execution can be orchestrated via the local scheduler for development or the central scheduler for production.
Key considerations:
- Multiple MapReduce tasks can be chained, with each reading the previous task's HDFS output
- Mixed pipelines can combine MapReduce tasks with local Python tasks
- Use the central scheduler (luigid) for production to get locking and visualization
- Failed jobs can be retried with configurable retry policies