Implementation:Spotify Luigi SGEJobTask
| Knowledge Sources | |
|---|---|
| Domains | HPC, Batch_Computing |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
SGEJobTask is a Luigi Task that submits and monitors jobs on Sun Grid Engine (SGE) clusters by pickling the task instance, constructing qsub commands, and polling qstat for completion.
Description
The SGE contrib module enables Luigi to distribute tasks across a Sun Grid Engine (SGE) high-performance computing cluster. The module follows a pickle-and-submit pattern: the task instance is serialized to a shared filesystem, a qsub command is constructed to run a generic runner script that deserializes the task and calls its work() method, and the master process polls qstat until the job completes or fails.
- SGEJobTask (extends luigi.Task) -- The main abstract class users subclass. Instead of overriding run(), users override the work() method to define job logic. The run() method handles all cluster interaction: creating a temporary directory on the shared filesystem, pickling the task instance, tarballing Luigi and module dependencies (unless no_tarball is set), submitting via qsub, and tracking the job until completion. Parameters control resource allocation (n_cpu, parallel_env), job naming (job_name, job_name_format), execution mode (run_locally), monitoring (poll_time), and cleanup behavior (dont_remove_tmp_dir).
- LocalSGEJobTask (extends SGEJobTask) -- A convenience subclass that skips all cluster submission logic and simply calls work() locally. Useful for testing and debugging SGE workflows without requiring cluster access.
- Helper functions:
- _parse_qstat_state(qstat_out, job_id) -- Parses the state column from qstat output for a given job ID, returning states like r (running), qw (queued/waiting), E (error), t (transferring), or u (unknown/finished).
- _parse_qsub_job_id(qsub_out) -- Extracts the job ID from qsub submission output.
- _build_qsub_command(cmd, job_name, outfile, errfile, pe, n_cpu) -- Constructs the qsub shell command with proper flags for output/error files, parallel environment, and CPU allocation.
Default parameter values are matched to MIT StarCluster, an open-source SGE cluster manager for Amazon EC2.
Usage
Use SGEJobTask when you need to run computationally intensive tasks on an SGE cluster as part of a Luigi workflow. This is appropriate for high-performance computing environments where jobs are submitted via qsub and monitored via qstat. Run your Luigi workflow from the master node with multiple workers to distribute SGEJobTasks in parallel across cluster nodes. Use LocalSGEJobTask during development and testing to run the same task logic locally without cluster access. Configure default parameters in your Luigi configuration file under the [SGEJobTask] section.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/contrib/sge.py
- Lines: 1-354
Signature
def _parse_qstat_state(qstat_out, job_id): ...
def _parse_qsub_job_id(qsub_out): ...
def _build_qsub_command(cmd, job_name, outfile, errfile, pe, n_cpu): ...
class SGEJobTask(luigi.Task):
n_cpu = luigi.IntParameter(default=2, significant=False)
shared_tmp_dir = luigi.Parameter(default='/home', significant=False)
parallel_env = luigi.Parameter(default='orte', significant=False)
job_name_format = luigi.Parameter(significant=False, default=None)
job_name = luigi.Parameter(significant=False, default=None)
run_locally = luigi.BoolParameter(significant=False)
poll_time = luigi.IntParameter(significant=False, default=5)
dont_remove_tmp_dir = luigi.BoolParameter(significant=False)
no_tarball = luigi.BoolParameter(significant=False)
def __init__(self, *args, **kwargs): ...
def run(self): ...
def work(self): ... # Override this method with your job logic
class LocalSGEJobTask(SGEJobTask):
def run(self): ... # Simply calls self.work()
Import
from luigi.contrib.sge import SGEJobTask, LocalSGEJobTask
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| n_cpu | int | No | Number of CPUs (slots) to allocate via qsub -pe; default: 2 |
| shared_tmp_dir | str | No | Shared directory accessible from all cluster nodes for temporary files; default: /home |
| parallel_env | str | No | SGE parallel environment name (e.g., orte for StarCluster); default: orte |
| job_name_format | str | No | Format string for job name; can use class variables like {task_family} |
| job_name | str | No | Explicit job name to pass to qsub; overrides job_name_format |
| run_locally | bool | No | Run locally instead of on the cluster; default: False |
| poll_time | int | No | Seconds between qstat polls; default: 5 |
| dont_remove_tmp_dir | bool | No | Preserve temporary directory after job completes; default: False |
| no_tarball | bool | No | Skip tarballing Luigi project files; useful when Luigi is pre-installed on cluster nodes; default: False |
Outputs
| Name | Type | Description |
|---|---|---|
| output | Target | User-defined output target (must be implemented in subclass) |
| job.out | file | Standard output from the SGE job, written to the temporary directory |
| job.err | file | Standard error from the SGE job, written to the temporary directory |
Usage Examples
Basic Usage
import logging
import luigi
import os
from luigi.contrib.sge import SGEJobTask
logger = logging.getLogger('luigi-interface')
class TestJobTask(SGEJobTask):
i = luigi.Parameter()
def work(self):
logger.info('Running test job...')
with open(self.output().path, 'w') as f:
f.write('this is a test')
def output(self):
return luigi.LocalTarget(os.path.join('/home', 'testfile_' + str(self.i)))
if __name__ == '__main__':
tasks = [TestJobTask(i=str(i), n_cpu=i+1) for i in range(3)]
luigi.build(tasks, local_scheduler=True, workers=3)
Configuration File
# luigi.cfg
[SGEJobTask]
shared-tmp-dir = /home
parallel-env = orte
n-cpu = 2
Local Debugging
from luigi.contrib.sge import LocalSGEJobTask
import luigi
class DebugTask(LocalSGEJobTask):
"""Runs locally without SGE for testing."""
def work(self):
with open(self.output().path, 'w') as f:
f.write('debug output')
def output(self):
return luigi.LocalTarget('/tmp/debug_output.txt')