Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi ExternalProgramTask

From Leeroopedia


Knowledge Sources
Domains External_Execution, Subprocess
Last Updated 2026-02-10 08:00 GMT

Overview

ExternalProgramTask is a Luigi contrib template task for running external programs as subprocesses. It provides a structured way to wrap command-line tools, shell scripts, or any executable as a Luigi task, with support for output capture, tracking URL extraction, and graceful signal handling.

Description

The module provides several classes:

  • ExternalProgramTask (extends luigi.Task): The main template class. Override program_args() to return the argument list for subprocess.Popen. The run() method handles process execution, output capture, tracking URL search, and error reporting.
  • ExternalPythonProgramTask (extends ExternalProgramTask): Adds virtualenv and extra_pythonpath parameters for running Python programs in isolated environments.
  • ExternalProgramRunContext: A context manager that intercepts SIGTERM and KeyboardInterrupt signals to kill the subprocess, preventing orphaned processes.
  • ExternalProgramRunError (extends RuntimeError): Raised when the program exits with a non-zero return code. Includes the command, stdout, stderr, and environment in its string representation.

Key parameters for ExternalProgramTask:

  • capture_output (BoolParameter, default True): When enabled, stdout and stderr are captured to temporary files and logged after execution.
  • stream_for_searching_tracking_url (ChoiceParameter: none, stdout, stderr): Specifies which stream to scan for a tracking URL pattern.
  • tracking_url_pattern (OptionalParameter): Regex pattern whose first capture group is used as the tracking URL for the Luigi web UI.

When tracking URL search is enabled, a separate multiprocessing.Process reads the specified stream line-by-line, matching against the pattern and calling set_tracking_url() on match.

Usage

Subclass ExternalProgramTask and override program_args(). Optionally override program_environment() for custom environment variables and always_log_stderr to control stderr logging behavior.

Code Reference

Source Location

luigi/contrib/external_program.py (292 lines)

Signature

class ExternalProgramTask(luigi.Task):
    capture_output = luigi.BoolParameter(default=True)
    stream_for_searching_tracking_url = luigi.parameter.ChoiceParameter(
        choices=['none', 'stdout', 'stderr'], default='none')
    tracking_url_pattern = luigi.OptionalParameter(default=None)

    def program_args(self):
        """Abstract: return list of program arguments for subprocess.Popen."""
        raise NotImplementedError

    def program_environment(self):
        """Override to customize environment variables. Default: os.environ.copy()."""

    @property
    def always_log_stderr(self):
        """When True (default), stderr is logged even on success."""

    def run(self):
        """Executes the external program, captures output, handles errors."""

class ExternalPythonProgramTask(ExternalProgramTask):
    virtualenv = luigi.OptionalParameter(default=None)
    extra_pythonpath = luigi.OptionalParameter(default=None)

    def program_environment(self):
        """Extends parent env with virtualenv PATH/VIRTUAL_ENV and PYTHONPATH."""

class ExternalProgramRunContext:
    """Context manager for SIGTERM/KeyboardInterrupt handling of subprocesses."""

class ExternalProgramRunError(RuntimeError):
    """Error with message, args, env, stdout, and stderr attributes."""

Import

from luigi.contrib.external_program import ExternalProgramTask, ExternalPythonProgramTask

I/O Contract

Inputs

Input Type Description
program_args() list List of strings passed to subprocess.Popen. First element is the executable.
program_environment() dict Environment variables for the subprocess. Defaults to a copy of os.environ.
virtualenv OptionalParameter Path to a virtualenv directory (for ExternalPythonProgramTask). Modifies PATH and VIRTUAL_ENV.
extra_pythonpath OptionalParameter Additional path prepended to PYTHONPATH (for ExternalPythonProgramTask).

Outputs

Output Type Description
Program execution Side effect The external program runs to completion. Raises ExternalProgramRunError on non-zero exit code.
Captured output Logged When capture_output=True, stdout and stderr are logged via the Luigi logger.
Tracking URL Luigi web UI When pattern matching is configured, the extracted URL is set as the task's tracking URL.

Usage Examples

from luigi.contrib.external_program import ExternalProgramTask

class RunSparkSubmit(ExternalProgramTask):
    date = luigi.DateParameter()
    app_jar = luigi.Parameter(default='/opt/spark/apps/etl.jar')

    def program_args(self):
        return [
            'spark-submit',
            '--master', 'yarn',
            '--deploy-mode', 'cluster',
            self.app_jar,
            '--date', str(self.date),
        ]

    # Track the Spark application UI URL from stderr
    stream_for_searching_tracking_url = 'stderr'
    tracking_url_pattern = r'tracking URL: (https?://\S+)'

    def output(self):
        return luigi.LocalTarget('/data/output/{}.done'.format(self.date))
from luigi.contrib.external_program import ExternalPythonProgramTask

class RunPythonScript(ExternalPythonProgramTask):
    virtualenv = '/opt/venvs/analytics'
    extra_pythonpath = '/opt/libs/custom'

    def program_args(self):
        return ['python', '/opt/scripts/analyze.py', '--verbose']

    def output(self):
        return luigi.LocalTarget('/tmp/analysis_complete')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment