Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi PigJobTask

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_PigJobTask.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Overview

PigJobTask is a Luigi task base class in the luigi.contrib.pig module that provides integration with Apache Pig for running Pig Latin scripts as part of a Luigi pipeline. It handles script execution, environment configuration, progress tracking, and error handling through subprocess management. The module also provides PigRunContext for signal handling during Pig job execution and PigJobError for structured error reporting.

Source Location

Property Value
Source File luigi/contrib/pig.py
Lines of Code 204
Module luigi.contrib.pig
Domain Data_Processing, Hadoop

Import Statement

from luigi.contrib.pig import PigJobTask

Classes

PigJobTask

PigJobTask(luigi.Task)

The primary class for executing Apache Pig scripts within a Luigi workflow. Subclasses must implement pig_script_path() to specify the Pig script to run and output() to define the task output.

Configuration Properties

Method Return Type Default Description
pig_home(self) str /usr/share/pig Returns the Pig installation home directory. Reads from the [pig] config section, key home.
pig_command_path(self) str {pig_home}/bin/pig Returns the full path to the Pig binary executable.
pig_env_vars(self) dict {} Dictionary of environment variables to set when running Pig (e.g., {'PIG_CLASSPATH': '/your/path'}).
pig_properties(self) dict {} Dictionary of Pig properties passed via -propertyFile (e.g., {'pig.additional.jars': '/path/to/jar'}).
pig_parameters(self) dict {} Dictionary of parameters passed via -param_file (e.g., {'YOUR_PARAM_NAME': 'Your param value'}).
pig_options(self) list [] List of additional command-line options appended to the Pig command (e.g., ['-x', 'local']).

Abstract Methods

Method Description
pig_script_path(self) Must be implemented by subclasses. Returns the path to the Pig Latin script file to be executed.
output(self) Must be implemented by subclasses. Returns the output target for the task.

Execution Methods

Method Signature Description
run run(self) Builds the Pig command using _build_pig_cmd() and delegates to track_and_progress().
track_and_progress track_and_progress(self, cmd) Executes the Pig command as a subprocess. Sets PIG_HOME and custom environment variables. Reads stdout/stderr via select() for real-time progress tracking. Raises PigJobError on non-zero return code.
_build_pig_cmd _build_pig_cmd(self) Context manager that builds the full Pig command list. Writes parameters and properties to temporary files and appends -param_file, -propertyFile, and -f flags.

PigRunContext

PigRunContext

A context manager that intercepts SIGTERM and KeyboardInterrupt signals during Pig job execution. When a signal is received, it attempts to kill the running Pig job before exiting.

Method Signature Description
__init__ __init__(self) Initializes with job_id = None.
__enter__ __enter__(self) Stores the existing SIGTERM handler and replaces it with kill_job.
kill_job kill_job(self, captured_signal=None, stack_frame=None) Kills the Pig job via subprocess.call(['pig', '-e', '"kill {job_id}"']). Exits with code 128 + signal if a signal was captured.
__exit__ __exit__(self, exc_type, exc_val, exc_tb) Restores the original SIGTERM handler. Calls kill_job() on KeyboardInterrupt.

PigJobError

PigJobError(RuntimeError)

Custom exception raised when a Pig script execution fails.

Attribute Type Description
message str Error message describing the failure.
out str or None Captured stdout content.
err str or None Captured stderr content.
PigJobError.__init__(self, message, out=None, err=None)

Configuration

The [pig] section in luigi.cfg provides default configuration:

[pig]
# pig home directory
home: /usr/share/pig

Usage Example

from luigi.contrib.pig import PigJobTask
import luigi

class MyPigJob(PigJobTask):

    date = luigi.DateParameter()

    def pig_script_path(self):
        return '/scripts/my_analysis.pig'

    def pig_parameters(self):
        return {'INPUT_DATE': self.date.isoformat()}

    def pig_properties(self):
        return {'pig.additional.jars': '/libs/custom-udfs.jar'}

    def pig_env_vars(self):
        return {'PIG_CLASSPATH': '/libs'}

    def pig_options(self):
        return ['-x', 'local']

    def output(self):
        return luigi.LocalTarget('/output/results_%s' % self.date)

Internal Execution Flow

  1. The run() method calls _build_pig_cmd() to construct the command line.
  2. Parameters and properties are written to temporary files via tempfile.NamedTemporaryFile().
  3. The command is structured as: [pig_command_path] [options] [-param_file file] [-propertyFile file] -f [pig_script_path].
  4. track_and_progress() spawns the Pig process via subprocess.Popen with shell=False.
  5. Stdout and stderr are read using select.select() for non-blocking I/O.
  6. A PigRunContext wraps the execution loop to handle graceful termination on SIGTERM.
  7. On success (return code 0), a success message is logged.
  8. On failure, stderr is collected and a PigJobError is raised with the return code and error output.

Dependencies

  • Python standard library: contextlib, logging, os, select, signal, subprocess, sys, tempfile
  • Luigi core: luigi, luigi.configuration
  • External: Apache Pig must be installed and accessible at the configured path.

Related Principles

See Also

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment