Implementation:Spotify Luigi ExternalProgramTask
| Knowledge Sources | |
|---|---|
| Domains | External_Execution, Subprocess |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
ExternalProgramTask is a Luigi contrib template task for running external programs as subprocesses. It provides a structured way to wrap command-line tools, shell scripts, or any executable as a Luigi task, with support for output capture, tracking URL extraction, and graceful signal handling.
Description
The module provides several classes:
ExternalProgramTask(extendsluigi.Task): The main template class. Overrideprogram_args()to return the argument list forsubprocess.Popen. Therun()method handles process execution, output capture, tracking URL search, and error reporting.ExternalPythonProgramTask(extendsExternalProgramTask): Addsvirtualenvandextra_pythonpathparameters for running Python programs in isolated environments.ExternalProgramRunContext: A context manager that interceptsSIGTERMandKeyboardInterruptsignals to kill the subprocess, preventing orphaned processes.ExternalProgramRunError(extendsRuntimeError): Raised when the program exits with a non-zero return code. Includes the command, stdout, stderr, and environment in its string representation.
Key parameters for ExternalProgramTask:
capture_output(BoolParameter, defaultTrue): When enabled, stdout and stderr are captured to temporary files and logged after execution.stream_for_searching_tracking_url(ChoiceParameter:none,stdout,stderr): Specifies which stream to scan for a tracking URL pattern.tracking_url_pattern(OptionalParameter): Regex pattern whose first capture group is used as the tracking URL for the Luigi web UI.
When tracking URL search is enabled, a separate multiprocessing.Process reads the specified stream line-by-line, matching against the pattern and calling set_tracking_url() on match.
Usage
Subclass ExternalProgramTask and override program_args(). Optionally override program_environment() for custom environment variables and always_log_stderr to control stderr logging behavior.
Code Reference
Source Location
luigi/contrib/external_program.py (292 lines)
Signature
class ExternalProgramTask(luigi.Task):
capture_output = luigi.BoolParameter(default=True)
stream_for_searching_tracking_url = luigi.parameter.ChoiceParameter(
choices=['none', 'stdout', 'stderr'], default='none')
tracking_url_pattern = luigi.OptionalParameter(default=None)
def program_args(self):
"""Abstract: return list of program arguments for subprocess.Popen."""
raise NotImplementedError
def program_environment(self):
"""Override to customize environment variables. Default: os.environ.copy()."""
@property
def always_log_stderr(self):
"""When True (default), stderr is logged even on success."""
def run(self):
"""Executes the external program, captures output, handles errors."""
class ExternalPythonProgramTask(ExternalProgramTask):
virtualenv = luigi.OptionalParameter(default=None)
extra_pythonpath = luigi.OptionalParameter(default=None)
def program_environment(self):
"""Extends parent env with virtualenv PATH/VIRTUAL_ENV and PYTHONPATH."""
class ExternalProgramRunContext:
"""Context manager for SIGTERM/KeyboardInterrupt handling of subprocesses."""
class ExternalProgramRunError(RuntimeError):
"""Error with message, args, env, stdout, and stderr attributes."""
Import
from luigi.contrib.external_program import ExternalProgramTask, ExternalPythonProgramTask
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
program_args() |
list |
List of strings passed to subprocess.Popen. First element is the executable.
|
program_environment() |
dict |
Environment variables for the subprocess. Defaults to a copy of os.environ.
|
virtualenv |
OptionalParameter |
Path to a virtualenv directory (for ExternalPythonProgramTask). Modifies PATH and VIRTUAL_ENV.
|
extra_pythonpath |
OptionalParameter |
Additional path prepended to PYTHONPATH (for ExternalPythonProgramTask).
|
Outputs
| Output | Type | Description |
|---|---|---|
| Program execution | Side effect | The external program runs to completion. Raises ExternalProgramRunError on non-zero exit code.
|
| Captured output | Logged | When capture_output=True, stdout and stderr are logged via the Luigi logger.
|
| Tracking URL | Luigi web UI | When pattern matching is configured, the extracted URL is set as the task's tracking URL. |
Usage Examples
from luigi.contrib.external_program import ExternalProgramTask
class RunSparkSubmit(ExternalProgramTask):
date = luigi.DateParameter()
app_jar = luigi.Parameter(default='/opt/spark/apps/etl.jar')
def program_args(self):
return [
'spark-submit',
'--master', 'yarn',
'--deploy-mode', 'cluster',
self.app_jar,
'--date', str(self.date),
]
# Track the Spark application UI URL from stderr
stream_for_searching_tracking_url = 'stderr'
tracking_url_pattern = r'tracking URL: (https?://\S+)'
def output(self):
return luigi.LocalTarget('/data/output/{}.done'.format(self.date))
from luigi.contrib.external_program import ExternalPythonProgramTask
class RunPythonScript(ExternalPythonProgramTask):
virtualenv = '/opt/venvs/analytics'
extra_pythonpath = '/opt/libs/custom'
def program_args(self):
return ['python', '/opt/scripts/analyze.py', '--verbose']
def output(self):
return luigi.LocalTarget('/tmp/analysis_complete')
Related Pages
- Spotify_Luigi_External_Process_Execution -- Principle governing external process execution patterns
subprocess.Popen-- Python standard library class used internally for process execution