Implementation:Spotify Luigi PigJobTask
Overview
PigJobTask is a Luigi task base class in the luigi.contrib.pig module that provides integration with Apache Pig for running Pig Latin scripts as part of a Luigi pipeline. It handles script execution, environment configuration, progress tracking, and error handling through subprocess management. The module also provides PigRunContext for signal handling during Pig job execution and PigJobError for structured error reporting.
Source Location
| Property | Value |
|---|---|
| Source File | luigi/contrib/pig.py
|
| Lines of Code | 204 |
| Module | luigi.contrib.pig
|
| Domain | Data_Processing, Hadoop |
Import Statement
from luigi.contrib.pig import PigJobTask
Classes
PigJobTask
PigJobTask(luigi.Task)
The primary class for executing Apache Pig scripts within a Luigi workflow. Subclasses must implement pig_script_path() to specify the Pig script to run and output() to define the task output.
Configuration Properties
| Method | Return Type | Default | Description |
|---|---|---|---|
pig_home(self) |
str |
/usr/share/pig |
Returns the Pig installation home directory. Reads from the [pig] config section, key home.
|
pig_command_path(self) |
str |
{pig_home}/bin/pig |
Returns the full path to the Pig binary executable. |
pig_env_vars(self) |
dict |
{} |
Dictionary of environment variables to set when running Pig (e.g., {'PIG_CLASSPATH': '/your/path'}).
|
pig_properties(self) |
dict |
{} |
Dictionary of Pig properties passed via -propertyFile (e.g., {'pig.additional.jars': '/path/to/jar'}).
|
pig_parameters(self) |
dict |
{} |
Dictionary of parameters passed via -param_file (e.g., {'YOUR_PARAM_NAME': 'Your param value'}).
|
pig_options(self) |
list |
[] |
List of additional command-line options appended to the Pig command (e.g., ['-x', 'local']).
|
Abstract Methods
| Method | Description |
|---|---|
pig_script_path(self) |
Must be implemented by subclasses. Returns the path to the Pig Latin script file to be executed. |
output(self) |
Must be implemented by subclasses. Returns the output target for the task. |
Execution Methods
| Method | Signature | Description |
|---|---|---|
run |
run(self) |
Builds the Pig command using _build_pig_cmd() and delegates to track_and_progress().
|
track_and_progress |
track_and_progress(self, cmd) |
Executes the Pig command as a subprocess. Sets PIG_HOME and custom environment variables. Reads stdout/stderr via select() for real-time progress tracking. Raises PigJobError on non-zero return code.
|
_build_pig_cmd |
_build_pig_cmd(self) |
Context manager that builds the full Pig command list. Writes parameters and properties to temporary files and appends -param_file, -propertyFile, and -f flags.
|
PigRunContext
PigRunContext
A context manager that intercepts SIGTERM and KeyboardInterrupt signals during Pig job execution. When a signal is received, it attempts to kill the running Pig job before exiting.
| Method | Signature | Description |
|---|---|---|
__init__ |
__init__(self) |
Initializes with job_id = None.
|
__enter__ |
__enter__(self) |
Stores the existing SIGTERM handler and replaces it with kill_job.
|
kill_job |
kill_job(self, captured_signal=None, stack_frame=None) |
Kills the Pig job via subprocess.call(['pig', '-e', '"kill {job_id}"']). Exits with code 128 + signal if a signal was captured.
|
__exit__ |
__exit__(self, exc_type, exc_val, exc_tb) |
Restores the original SIGTERM handler. Calls kill_job() on KeyboardInterrupt.
|
PigJobError
PigJobError(RuntimeError)
Custom exception raised when a Pig script execution fails.
| Attribute | Type | Description |
|---|---|---|
message |
str |
Error message describing the failure. |
out |
str or None |
Captured stdout content. |
err |
str or None |
Captured stderr content. |
PigJobError.__init__(self, message, out=None, err=None)
Configuration
The [pig] section in luigi.cfg provides default configuration:
[pig] # pig home directory home: /usr/share/pig
Usage Example
from luigi.contrib.pig import PigJobTask
import luigi
class MyPigJob(PigJobTask):
date = luigi.DateParameter()
def pig_script_path(self):
return '/scripts/my_analysis.pig'
def pig_parameters(self):
return {'INPUT_DATE': self.date.isoformat()}
def pig_properties(self):
return {'pig.additional.jars': '/libs/custom-udfs.jar'}
def pig_env_vars(self):
return {'PIG_CLASSPATH': '/libs'}
def pig_options(self):
return ['-x', 'local']
def output(self):
return luigi.LocalTarget('/output/results_%s' % self.date)
Internal Execution Flow
- The
run()method calls_build_pig_cmd()to construct the command line. - Parameters and properties are written to temporary files via
tempfile.NamedTemporaryFile(). - The command is structured as:
[pig_command_path] [options] [-param_file file] [-propertyFile file] -f [pig_script_path]. track_and_progress()spawns the Pig process viasubprocess.Popenwithshell=False.- Stdout and stderr are read using
select.select()for non-blocking I/O. - A
PigRunContextwraps the execution loop to handle graceful termination onSIGTERM. - On success (return code 0), a success message is logged.
- On failure, stderr is collected and a
PigJobErroris raised with the return code and error output.
Dependencies
- Python standard library:
contextlib,logging,os,select,signal,subprocess,sys,tempfile - Luigi core:
luigi,luigi.configuration - External: Apache Pig must be installed and accessible at the configured path.
Related Principles
See Also
- Spotify_Luigi_Task_Definition - Base task class that PigJobTask extends
luigi.contrib.pig- Source module