Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Spotify Luigi HadoopJobRunner

From Leeroopedia


Template:Knowledge Source

Domains: Pipeline_Orchestration, Big_Data

Last Updated: 2026-02-10 00:00 GMT

Overview

Concrete tool for configuring Hadoop job execution resources and environment provided by Luigi.

Description

Luigi provides three job runner classes that handle the mechanics of submitting and executing Hadoop jobs:

  • HadoopJobRunner (luigi/contrib/hadoop.py, lines 389--585) -- The primary runner for Hadoop Streaming jobs. It packages Python modules into a tarball, serializes the task instance as a pickle file, constructs the full hadoop jar command with all configuration flags (-libjars, -archives, -files, -D properties, -mapper, -reducer, -combiner, -input, -output), submits the job, and optionally performs an atomic move of the output directory.
  • BaseHadoopJobTask (luigi/contrib/hadoop.py, lines 664--764) -- The abstract base task class that defines job configuration methods: jobconfs() for generating -D properties (job name, priority, scheduler pool), init_local() and init_hadoop() for initialization hooks, and job_runner() as the abstract factory method that returns a runner instance.
  • HadoopJarJobTask (luigi/contrib/hadoop_jar.py, lines 128--167) -- A task class for running pre-compiled Java JAR jobs. It defines jar(), main(), args(), atomic_output(), and ssh() methods, and uses HadoopJarJobRunner to submit the job.

Usage

Use these classes when:

  • You need fine-grained control over the streaming JAR, libjars, archives, or distributed files for a Hadoop Streaming job.
  • You want to set custom Hadoop configuration properties (mapred.reduce.tasks, pool assignments, job priority).
  • You need to run a pre-compiled Java MapReduce JAR (like TeraSort or a custom Hadoop application).
  • You want atomic output semantics where the final output path appears only after the job completes successfully.
  • You need to submit jobs to a remote cluster via SSH.

Code Reference

Source Location

  • luigi/contrib/hadoop.py, lines 389--585 (HadoopJobRunner)
  • luigi/contrib/hadoop.py, lines 664--764 (BaseHadoopJobTask)
  • luigi/contrib/hadoop_jar.py, lines 128--167 (HadoopJarJobTask)

Key Signatures

class HadoopJobRunner(JobRunner):
    def __init__(self, streaming_jar, modules=None, streaming_args=None,
                 libjars=None, libjars_in_hdfs=None, jobconfs=None,
                 input_format=None, output_format=None,
                 end_job_with_atomic_move_dir=True, archives=None):
        ...

    def run_job(self, job, tracking_url_callback=None):
        """Package code, build arglist, submit streaming job, atomic move output."""
        ...

    def finish(self):
        """Clean up local temporary directory."""
        ...


class BaseHadoopJobTask(luigi.Task):
    pool = luigi.OptionalParameter(default=None, significant=False, positional=False)

    def jobconfs(self):
        """Return list of 'key=value' Hadoop configuration strings."""
        ...

    def init_local(self):
        """Hook for local initialization before job submission."""
        ...

    def init_hadoop(self):
        """Hook for initialization on Hadoop nodes."""
        ...

    @abc.abstractmethod
    def job_runner(self):
        """Return a JobRunner instance (factory method)."""
        ...


class HadoopJarJobTask(BaseHadoopJobTask):
    def jar(self):
        """Path to the JAR file."""
        return None

    def main(self):
        """Optional main class name."""
        return None

    def args(self):
        """Arguments after 'hadoop jar <jar> <main>'."""
        return []

    def atomic_output(self):
        """If True, use temp paths for outputs and move atomically."""
        return True

    def ssh(self):
        """SSH config dict for remote submission, or None."""
        return None

    def job_runner(self):
        return HadoopJarJobRunner()

Import

from luigi.contrib.hadoop import HadoopJobRunner, BaseHadoopJobTask
from luigi.contrib.hadoop_jar import HadoopJarJobTask, HadoopJarJobRunner

I/O Contract

Inputs (HadoopJobRunner.__init__)

Name Type Description
streaming_jar str Filesystem path to the Hadoop Streaming JAR.
modules list or None Additional Python modules to include in the packages archive.
streaming_args list or None Extra arguments passed directly to the streaming JAR.
libjars list or None Local paths to Java JAR files to distribute via -libjars.
libjars_in_hdfs list or None HDFS paths to Java JAR files to download and distribute.
jobconfs dict or None Additional key: value Hadoop configuration properties.
input_format str or None Custom Hadoop InputFormat class name.
output_format str or None Custom Hadoop OutputFormat class name.
end_job_with_atomic_move_dir bool If True (default), write to a temp dir and atomically move on success.
archives list or None Paths to archive files to distribute via -archives.

Inputs (BaseHadoopJobTask)

Name Type Description
pool str or None Hadoop scheduler pool name (Fair Scheduler) or queue name (Capacity Scheduler).

Outputs

Name Type Description
run_job() None Submits the Hadoop job; on success the output HDFS path is populated. Raises HadoopJobError on failure.
jobconfs() list[str] List of "key=value" strings for -D flags (e.g., "mapred.job.name=MyTask").

Usage Examples

Example 1: Custom streaming job runner with libjars

from luigi.contrib.hadoop import HadoopJobRunner, JobTask
import luigi.contrib.hdfs


class CustomRunner(HadoopJobRunner):
    def __init__(self):
        super().__init__(
            streaming_jar='/usr/lib/hadoop/hadoop-streaming-2.10.1.jar',
            libjars=['/opt/libs/custom-inputformat.jar'],
            jobconfs={'mapred.reduce.tasks': '50'},
            end_job_with_atomic_move_dir=True,
        )


class MyStreamingJob(JobTask):
    def job_runner(self):
        return CustomRunner()

    def requires(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/input/')

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/output/')

    def mapper(self, line):
        yield line.split(',')[0], 1

    def reducer(self, key, values):
        yield key, sum(values)

Example 2: Hadoop JAR job (TeraSort pattern)

import luigi
import luigi.contrib.hadoop_jar
import luigi.contrib.hdfs


class TeraSort(luigi.contrib.hadoop_jar.HadoopJarJobTask):
    terasort_in = luigi.Parameter(default='/tmp/terasort-in')
    terasort_out = luigi.Parameter(default='/tmp/terasort-out')

    def requires(self):
        return luigi.contrib.hdfs.HdfsTarget(self.terasort_in)

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(self.terasort_out)

    def jar(self):
        return '/usr/lib/hadoop/hadoop-examples.jar'

    def main(self):
        return 'terasort'

    def args(self):
        return [self.input(), self.output()]

Example 3: Setting pool and priority via BaseHadoopJobTask

import luigi.contrib.hadoop
import luigi.contrib.hdfs


class PrioritizedJob(luigi.contrib.hadoop.JobTask):
    pool = luigi.Parameter(default='production')

    def mr_priority(self):
        return 'HIGH'

    def requires(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/high-priority-input/')

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/high-priority-output/')

    def mapper(self, line):
        yield line.strip(), 1

    def reducer(self, key, values):
        yield key, sum(values)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment