Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi SGEJobTask

From Leeroopedia


Knowledge Sources
Domains HPC, Batch_Computing
Last Updated 2026-02-10 08:00 GMT

Overview

SGEJobTask is a Luigi Task that submits and monitors jobs on Sun Grid Engine (SGE) clusters by pickling the task instance, constructing qsub commands, and polling qstat for completion.

Description

The SGE contrib module enables Luigi to distribute tasks across a Sun Grid Engine (SGE) high-performance computing cluster. The module follows a pickle-and-submit pattern: the task instance is serialized to a shared filesystem, a qsub command is constructed to run a generic runner script that deserializes the task and calls its work() method, and the master process polls qstat until the job completes or fails.

  • SGEJobTask (extends luigi.Task) -- The main abstract class users subclass. Instead of overriding run(), users override the work() method to define job logic. The run() method handles all cluster interaction: creating a temporary directory on the shared filesystem, pickling the task instance, tarballing Luigi and module dependencies (unless no_tarball is set), submitting via qsub, and tracking the job until completion. Parameters control resource allocation (n_cpu, parallel_env), job naming (job_name, job_name_format), execution mode (run_locally), monitoring (poll_time), and cleanup behavior (dont_remove_tmp_dir).
  • LocalSGEJobTask (extends SGEJobTask) -- A convenience subclass that skips all cluster submission logic and simply calls work() locally. Useful for testing and debugging SGE workflows without requiring cluster access.
  • Helper functions:
    • _parse_qstat_state(qstat_out, job_id) -- Parses the state column from qstat output for a given job ID, returning states like r (running), qw (queued/waiting), E (error), t (transferring), or u (unknown/finished).
    • _parse_qsub_job_id(qsub_out) -- Extracts the job ID from qsub submission output.
    • _build_qsub_command(cmd, job_name, outfile, errfile, pe, n_cpu) -- Constructs the qsub shell command with proper flags for output/error files, parallel environment, and CPU allocation.

Default parameter values are matched to MIT StarCluster, an open-source SGE cluster manager for Amazon EC2.

Usage

Use SGEJobTask when you need to run computationally intensive tasks on an SGE cluster as part of a Luigi workflow. This is appropriate for high-performance computing environments where jobs are submitted via qsub and monitored via qstat. Run your Luigi workflow from the master node with multiple workers to distribute SGEJobTasks in parallel across cluster nodes. Use LocalSGEJobTask during development and testing to run the same task logic locally without cluster access. Configure default parameters in your Luigi configuration file under the [SGEJobTask] section.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/sge.py
  • Lines: 1-354

Signature

def _parse_qstat_state(qstat_out, job_id): ...
def _parse_qsub_job_id(qsub_out): ...
def _build_qsub_command(cmd, job_name, outfile, errfile, pe, n_cpu): ...

class SGEJobTask(luigi.Task):
    n_cpu = luigi.IntParameter(default=2, significant=False)
    shared_tmp_dir = luigi.Parameter(default='/home', significant=False)
    parallel_env = luigi.Parameter(default='orte', significant=False)
    job_name_format = luigi.Parameter(significant=False, default=None)
    job_name = luigi.Parameter(significant=False, default=None)
    run_locally = luigi.BoolParameter(significant=False)
    poll_time = luigi.IntParameter(significant=False, default=5)
    dont_remove_tmp_dir = luigi.BoolParameter(significant=False)
    no_tarball = luigi.BoolParameter(significant=False)

    def __init__(self, *args, **kwargs): ...
    def run(self): ...
    def work(self): ...   # Override this method with your job logic

class LocalSGEJobTask(SGEJobTask):
    def run(self): ...    # Simply calls self.work()

Import

from luigi.contrib.sge import SGEJobTask, LocalSGEJobTask

I/O Contract

Inputs

Name Type Required Description
n_cpu int No Number of CPUs (slots) to allocate via qsub -pe; default: 2
shared_tmp_dir str No Shared directory accessible from all cluster nodes for temporary files; default: /home
parallel_env str No SGE parallel environment name (e.g., orte for StarCluster); default: orte
job_name_format str No Format string for job name; can use class variables like {task_family}
job_name str No Explicit job name to pass to qsub; overrides job_name_format
run_locally bool No Run locally instead of on the cluster; default: False
poll_time int No Seconds between qstat polls; default: 5
dont_remove_tmp_dir bool No Preserve temporary directory after job completes; default: False
no_tarball bool No Skip tarballing Luigi project files; useful when Luigi is pre-installed on cluster nodes; default: False

Outputs

Name Type Description
output Target User-defined output target (must be implemented in subclass)
job.out file Standard output from the SGE job, written to the temporary directory
job.err file Standard error from the SGE job, written to the temporary directory

Usage Examples

Basic Usage

import logging
import luigi
import os
from luigi.contrib.sge import SGEJobTask

logger = logging.getLogger('luigi-interface')


class TestJobTask(SGEJobTask):

    i = luigi.Parameter()

    def work(self):
        logger.info('Running test job...')
        with open(self.output().path, 'w') as f:
            f.write('this is a test')

    def output(self):
        return luigi.LocalTarget(os.path.join('/home', 'testfile_' + str(self.i)))


if __name__ == '__main__':
    tasks = [TestJobTask(i=str(i), n_cpu=i+1) for i in range(3)]
    luigi.build(tasks, local_scheduler=True, workers=3)

Configuration File

# luigi.cfg
[SGEJobTask]
shared-tmp-dir = /home
parallel-env = orte
n-cpu = 2

Local Debugging

from luigi.contrib.sge import LocalSGEJobTask
import luigi


class DebugTask(LocalSGEJobTask):
    """Runs locally without SGE for testing."""

    def work(self):
        with open(self.output().path, 'w') as f:
            f.write('debug output')

    def output(self):
        return luigi.LocalTarget('/tmp/debug_output.txt')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment