Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi ScaldingJobTask

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Hadoop
Last Updated 2026-02-10 08:00 GMT

Overview

ScaldingJobTask is a Luigi task that compiles, builds, and submits Twitter Scalding (Scala-based MapReduce) jobs to a Hadoop cluster, with automatic source compilation, jar packaging, and HDFS integration.

Description

The Scalding contrib module provides Luigi integration for Twitter Scalding, a Scala library for writing MapReduce jobs. The module consists of two primary classes:

  • ScaldingJobTask (extends luigi.contrib.hadoop.BaseHadoopJobTask) -- An abstract task class for defining Scalding jobs within Luigi workflows. Users specify either a Scala source file (via source()) or a pre-built JAR (via jar()), along with an optional job_class() method to identify the main Scalding job class. The requires() method should return a dictionary where keys are Scalding argument names and values are subtasks whose outputs become the input paths. The args() method assembles the full argument list including --output and any job_args(). The task supports atomic output writes to prevent partial results from corrupting downstream tasks.
  • ScaldingJobRunner (extends luigi.contrib.hadoop.JobRunner) -- The execution engine that handles the full lifecycle of a Scalding job: compiling Scala source files using the Scala compiler, packaging compiled classes into a JAR, and submitting the job to Hadoop via the hadoop jar command with appropriate classpath and libjars configuration. The runner reads configuration from the [scalding] section of the Luigi config file, including paths for scala-home, scalding-home, scalding-provided, and scalding-libjars.

The runner's build_job_jar() method either uses a pre-built JAR directly or compiles the Scala source on-the-fly. The run_job() method constructs the full Hadoop command line, sets HADOOP_CLASSPATH, and invokes the job with tracking URL support. The module leverages luigi.contrib.hadoop_jar.fix_paths() for atomic output handling on HDFS.

Usage

Use ScaldingJobTask when you have existing Scalding (Scala MapReduce) jobs that you want to orchestrate within a Luigi pipeline. This is appropriate for teams that have invested in Twitter Scalding for their Hadoop data processing and want to integrate these jobs with other Luigi tasks. The task supports both source-level compilation (convenient for development) and pre-built JAR submission (suitable for production). Configure the [scalding] section in your Luigi configuration file with the paths to your Scala and Scalding installations.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/scalding.py
  • Lines: 1-309

Signature

class ScaldingJobRunner(luigi.contrib.hadoop.JobRunner):
    def __init__(self): ...
    def get_scala_jars(self, include_compiler=False): ...
    def get_scalding_jars(self): ...
    def get_scalding_core(self): ...
    def get_provided_jars(self): ...
    def get_libjars(self): ...
    def get_tmp_job_jar(self, source): ...
    def get_build_dir(self, source): ...
    def get_job_class(self, source): ...
    def build_job_jar(self, job): ...
    def run_job(self, job, tracking_url_callback=None): ...

class ScaldingJobTask(luigi.contrib.hadoop.BaseHadoopJobTask):
    def relpath(self, current_file, rel_path): ...
    def source(self): ...        # str or None: path to Scala source file
    def jar(self): ...           # str or None: path to pre-built JAR
    def extra_jars(self): ...    # list[str]: additional JAR paths
    def job_class(self): ...     # str or None: main Scalding job class name
    def job_runner(self): ...    # returns ScaldingJobRunner()
    def atomic_output(self): ... # bool: use atomic output (default: True)
    def requires(self): ...      # dict: {arg_name: subtask_or_list}
    def job_args(self): ...      # list[str]: extra arguments
    def args(self): ...          # list[str]: full argument list

Import

from luigi.contrib.scalding import ScaldingJobTask, ScaldingJobRunner

I/O Contract

Inputs

Name Type Required Description
source() str Conditional Path to the Scala source file for the Scalding job; required if jar() is not provided
jar() str Conditional Path to a pre-built JAR file; required if source() is not provided
job_class() str Conditional Fully qualified class name of the Scalding job; required if jar() is provided, auto-detected from source otherwise
extra_jars() list[str] No Additional JAR files needed for building and running the job
requires() dict No Dictionary mapping Scalding argument names to Luigi tasks whose output paths become input arguments
job_args() list[str] No Extra command-line arguments to pass to the Scalding job

Outputs

Name Type Description
output HdfsTarget HDFS target path passed as --output to the Scalding job
job_jar str Path to the compiled and packaged JAR file (intermediate artifact)

Usage Examples

Basic Usage

import luigi
from luigi.contrib.scalding import ScaldingJobTask
import luigi.contrib.hdfs

class WordCount(ScaldingJobTask):

    def source(self):
        return self.relpath(__file__, 'WordCount.scala')

    def requires(self):
        return {'input': SomeUpstreamTask()}

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/output/word_count')


class WordCountFromJar(ScaldingJobTask):

    def jar(self):
        return '/jars/word-count-assembly.jar'

    def job_class(self):
        return 'com.example.WordCount'

    def extra_jars(self):
        return ['/jars/custom-utils.jar']

    def requires(self):
        return {'input': SomeUpstreamTask()}

    def job_args(self):
        return ['--num-reducers', '10']

    def output(self):
        return luigi.contrib.hdfs.HdfsTarget('/output/word_count_jar')

Configuration

# luigi.cfg
[scalding]
scala-home: /usr/share/scala
scalding-home: /usr/share/scalding
scalding-provided: /usr/share/scalding/provided
scalding-libjars: /usr/share/scalding/libjars

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment