Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Spotify Luigi BaseHadoopJobTask Run

From Leeroopedia


Template:Knowledge Source

Domains: Pipeline_Orchestration, Big_Data

Last Updated: 2026-02-10 00:00 GMT

Overview

Concrete tool for executing Hadoop MapReduce pipelines via Luigi orchestration provided by Luigi.

Description

The execution of a Hadoop MapReduce pipeline in Luigi is driven by two key methods that work together:

  • BaseHadoopJobTask.run() (luigi/contrib/hadoop.py, lines 721--731) -- The entry point called by the Luigi scheduler. It configures the data serialization format (Python repr/eval or JSON), invokes init_local() for any pre-submission setup, and then delegates to self.job_runner().run_job(self).
  • HadoopJobRunner.run_job() (luigi/contrib/hadoop.py, lines 414--576) -- The method that performs the actual job submission. It:
    1. Collects all required Python packages (Luigi itself, user modules, attached packages) and creates a packages.tar archive.
    2. Serializes the task instance to job-instance.pickle so it can be reconstructed on Hadoop nodes.
    3. Constructs the full hadoop jar command with the streaming JAR and all flags: -libjars, -archives, -files, -D configuration properties, -mapper, -combiner, -reducer, -inputformat, -outputformat, -input, and -output.
    4. If end_job_with_atomic_move_dir is enabled, replaces the output path with a timestamped temporary path.
    5. Calls run_and_track_hadoop_job() which starts the subprocess, parses stderr for tracking URLs and job IDs, captures output, and raises HadoopJobError on failure.
    6. On success, atomically moves the temporary output directory to the final path.
    7. Cleans up the local temporary directory.

The run_and_track_hadoop_job() function also installs a HadoopRunContext that intercepts SIGTERM and KeyboardInterrupt to kill the running Hadoop job via yarn application -kill or mapred job -kill.

Usage

Use BaseHadoopJobTask.run() indirectly by subclassing JobTask or HadoopJarJobTask and letting the Luigi scheduler invoke run(). Direct calls are only needed for testing or manual execution outside the scheduler.

Code Reference

Source Location

  • luigi/contrib/hadoop.py, lines 721--731 (BaseHadoopJobTask.run)
  • luigi/contrib/hadoop.py, lines 414--576 (HadoopJobRunner.run_job)

Key Signatures

class BaseHadoopJobTask(luigi.Task):
    # Configurable data interchange: "python" or "json"
    data_interchange_format = "python"

    def run(self):
        self.serialize = DataInterchange[self.data_interchange_format]['serialize']
        self.internal_serialize = DataInterchange[self.data_interchange_format]['internal_serialize']
        self.deserialize = DataInterchange[self.data_interchange_format]['deserialize']
        self.init_local()
        self.job_runner().run_job(self)


class HadoopJobRunner(JobRunner):
    def run_job(self, job, tracking_url_callback=None):
        """
        Full job submission lifecycle:
        1. Package Python modules into packages.tar
        2. Pickle the job instance to job-instance.pickle
        3. Build hadoop jar command with all flags
        4. Submit via run_and_track_hadoop_job()
        5. Atomic move of output on success
        6. Cleanup temp directory
        """
        ...


def run_and_track_hadoop_job(arglist, tracking_url_callback=None, env=None):
    """
    Execute hadoop command, parse stderr for tracking URLs and job IDs,
    capture output, and raise HadoopJobError on failure.
    """
    ...

Import

from luigi.contrib.hadoop import BaseHadoopJobTask, HadoopJobRunner
from luigi.contrib.hadoop import run_and_track_hadoop_job, HadoopJobError

I/O Contract

Inputs

Name Type Description
self (BaseHadoopJobTask) BaseHadoopJobTask The task instance; must have input_hadoop(), output(), job_runner(), and mapper/reducer methods defined.
data_interchange_format str "python" (uses repr/eval) or "json" (uses json.dumps/json.loads) for intermediate key-value serialization.
job (run_job arg) BaseHadoopJobTask The task to execute. The runner reads its input_hadoop(), output(), jobconfs(), extra_modules(), extra_files(), extra_archives(), and extra_streaming_arguments().
streaming_jar str Path to the Hadoop Streaming JAR (from runner configuration).

Outputs

Name Type Description
HDFS output path directory on HDFS The final output directory, atomically moved from a temporary path upon job success.
HadoopJobError exception Raised on failure, containing the error message, stdout, and stderr from the Hadoop process.
Tracking URL str (via callback) The YARN/MapReduce tracking URL, set on the task via job.set_tracking_url().

Usage Examples

Example 1: Complete WordCount pipeline execution

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs


class InputText(luigi.ExternalTask):
    date = luigi.DateParameter()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(
            self.date.strftime('/tmp/text/%Y-%m-%d.txt')
        )


class WordCount(luigi.contrib.hadoop.JobTask):
    """
    When Luigi's scheduler calls WordCount.run(), the following happens:
    1. Data serialization functions are configured (repr/eval by default)
    2. init_local() is called (no-op unless overridden)
    3. job_runner() returns DefaultHadoopJobRunner (reads streaming-jar from config)
    4. run_job() packages code, pickles the task, builds the hadoop command,
       submits it, and atomically moves output on success
    """
    date_interval = luigi.DateIntervalParameter()

    def requires(self):
        return [InputText(date) for date in self.date_interval.dates()]

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget(
            '/tmp/text-count/%s' % self.date_interval
        )

    def mapper(self, line):
        for word in line.strip().split():
            yield word, 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    luigi.run()

Example 2: Multi-step pipeline with dependency chain

import luigi
import luigi.contrib.hadoop
import luigi.contrib.hdfs


class RawData(luigi.ExternalTask):
    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/raw/events/')


class CountByUser(luigi.contrib.hadoop.JobTask):
    def requires(self):
        return RawData()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/intermediate/user-counts/')

    def mapper(self, line):
        fields = line.strip().split('\t')
        if len(fields) >= 2:
            user_id = fields[0]
            yield user_id, 1

    def reducer(self, key, values):
        yield key, sum(values)


class TopUsers(luigi.contrib.hadoop.JobTask):
    """
    Execution flow:
    1. Luigi scheduler resolves dependencies: TopUsers -> CountByUser -> RawData
    2. RawData.output().exists() is checked on HDFS
    3. CountByUser.run() is called, which submits a streaming job
    4. After CountByUser completes, TopUsers.run() is called
    5. Each run() call goes through the full BaseHadoopJobTask.run() lifecycle
    """
    threshold = luigi.IntParameter(default=100)

    def requires(self):
        return CountByUser()

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/data/output/top-users/')

    def mapper(self, line):
        parts = line.strip().split('\t')
        if len(parts) == 2:
            user, count = parts[0], int(parts[1])
            if count >= self.threshold:
                yield user, count


if __name__ == '__main__':
    luigi.run()

Example 3: Handling job failures

from luigi.contrib.hadoop import HadoopJobError, BaseHadoopJobTask


class MyTask(BaseHadoopJobTask):
    def on_failure(self, exception):
        if isinstance(exception, HadoopJobError):
            # HadoopJobError contains .message, .out (stdout), .err (stderr)
            return (
                "Hadoop job failed: %s\n\nstdout:\n%s\n\nstderr:\n%s"
                % (exception.message, exception.out, exception.err)
            )
        return super().on_failure(exception)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment