Implementation:Spotify Luigi LSFJobTask
| Knowledge Sources | |
|---|---|
| Domains | HPC, Batch_Computing |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Luigi contrib module providing IBM Platform LSF (Load Sharing Facility) batch system integration through the LSFJobTask class, enabling Luigi pipelines to submit and track jobs on HPC cluster environments.
Description
The lsf module wraps the LSF batch computing system for use within Luigi pipelines. LSF is a workload management platform commonly used in high-performance computing (HPC) environments. The module follows the same architectural pattern as Luigi's Hadoop integration: pickle the task, submit it to the cluster via bsub, and track its status via bjobs.
Core Class:
- LSFJobTask (extends
luigi.Task): The main task class for submitting and tracking LSF batch jobs. It provides a complete lifecycle:
- Initialization (
_init_local()): Creates a temporary directory, pickles the task instance to disk, and tarballs the Python dependencies (luigi and the task's module) into apackages.tararchive. - Job submission (
_run_job()): Constructs absubcommand with the configured parameters and submits it to the LSF scheduler. The job runslsf_runner.pywhich unpickles the task and callswork(). - Job tracking (
_track_job()): Pollsbjobsat configurable intervals to monitor job status (RUN, PEND, DONE, EXIT, SSUSP). - Cleanup (
_finish()): Removes temporary files after job completion.
Luigi Parameters:
n_cpu_flag(IntParameter, default=2): Number of CPUs requested viabsub -n.shared_tmp_dir(Parameter, default='/tmp'): Base directory for temporary files shared across cluster nodes.resource_flag(Parameter, default='mem=8192'): Resource requirement string forbsub -R rusage[...].memory_flag(Parameter, default='8192'): Memory limit forbsub -M.queue_flag(Parameter, default='queue_name'): LSF queue name forbsub -q.runtime_flag(IntParameter, default=60): Wall clock time limit forbsub -W.job_name_flag(Parameter, default=): Optional job name forbsub -J.poll_time(FloatParameter, default=5): Seconds betweenbjobsstatus checks.save_job_info(BoolParameter, default=False): Whether to preserve temporary files after completion.extra_bsub_args(Parameter, default=): Additional arguments passed tobsub.
Key Methods:
work(): The method that subclasses should override to implement actual computation. This is called by the runner on the cluster node (notrun()).init_local(): Hook for subclasses to perform setup before job submission.fetch_task_failures(): Reads the error file (job.err) from the temporary directory.fetch_task_output(): Reads the output file (job.out) from the temporary directory.
Helper Class:
- LocalLSFJobTask (extends
LSFJobTask): A local variant that callswork()directly without LSF submission, useful for debugging.
Utility Functions:
track_job(job_id): Runsbjobs -noheader -o statto get the status of a job.kill_job(job_id): Runsbkillto terminate a running job.
Usage
Use this module when your Luigi pipeline runs in an HPC environment managed by IBM Platform LSF. Subclass LSFJobTask and implement the work() method with your computation logic. The module handles serialization, submission, tracking, and cleanup automatically.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/lsf.py - Lines: 1-356
Signature
class LSFJobTask(luigi.Task):
n_cpu_flag = luigi.IntParameter(default=2, significant=False)
shared_tmp_dir = luigi.Parameter(default='/tmp', significant=False)
resource_flag = luigi.Parameter(default='mem=8192', significant=False)
memory_flag = luigi.Parameter(default='8192', significant=False)
queue_flag = luigi.Parameter(default='queue_name', significant=False)
runtime_flag = luigi.IntParameter(default=60)
job_name_flag = luigi.Parameter(default='')
poll_time = luigi.FloatParameter(significant=False, default=5)
save_job_info = luigi.BoolParameter(default=False)
output = luigi.Parameter(default='')
extra_bsub_args = luigi.Parameter(default='')
def run(self):
...
def work(self):
"""Subclass this for actual computation."""
pass
class LocalLSFJobTask(LSFJobTask):
def run(self):
self.init_local()
self.work()
Import
from luigi.contrib.lsf import LSFJobTask
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| n_cpu_flag | int | No | Number of CPUs to request from LSF; defaults to 2 |
| shared_tmp_dir | str | No | Shared temporary directory accessible by all cluster nodes; defaults to '/tmp' |
| resource_flag | str | No | LSF resource requirement string; defaults to 'mem=8192' |
| memory_flag | str | No | Memory limit in MB for the job; defaults to '8192' |
| queue_flag | str | No | LSF queue name to submit the job to; defaults to 'queue_name' |
| runtime_flag | int | No | Wall clock time limit in minutes; defaults to 60 |
| job_name_flag | str | No | Optional human-readable job name |
| poll_time | float | No | Seconds between status polling; defaults to 5 |
| save_job_info | bool | No | Whether to preserve temporary files after completion; defaults to False |
| extra_bsub_args | str | No | Additional space-separated arguments passed to bsub |
Outputs
| Name | Type | Description |
|---|---|---|
| Job execution | LSF job | The task is pickled, submitted to LSF via bsub, and executed on a cluster node |
| job_status | str | Final status: DONE, FAILED, PENDING, RUNNING, or UNKNOWN |
| job.out | file | Standard output captured from the LSF job execution |
| job.err | file | Standard error captured from the LSF job execution |
| output() | luigi.Target | The output target(s) defined by the subclass |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.lsf import LSFJobTask
class MyHPCJob(LSFJobTask):
n_cpu_flag = luigi.IntParameter(default=4)
memory_flag = luigi.Parameter(default='16384')
queue_flag = luigi.Parameter(default='long')
runtime_flag = luigi.IntParameter(default=120)
input_file = luigi.Parameter()
def work(self):
"""This runs on the cluster node."""
import pandas as pd
df = pd.read_csv(self.input_file)
result = df.groupby('category').sum()
result.to_csv(self.output().path)
def output(self):
return luigi.LocalTarget('/shared/results/aggregated.csv')
With Job Name and Extra Arguments
class NamedHPCJob(LSFJobTask):
date = luigi.DateParameter()
job_name_flag = luigi.Parameter(default='daily-etl')
extra_bsub_args = luigi.Parameter(default='-P my_project -u user@example.com')
save_job_info = luigi.BoolParameter(default=True)
def work(self):
# Heavy computation on the cluster
pass
def output(self):
return luigi.LocalTarget('/shared/output/daily_{}.csv'.format(self.date))
Local Debugging with LocalLSFJobTask
from luigi.contrib.lsf import LocalLSFJobTask
class DebugJob(LocalLSFJobTask):
"""Runs work() directly without LSF submission, for local testing."""
def work(self):
print("Running locally for debugging")
with self.output().open('w') as f:
f.write('debug output\n')
def output(self):
return luigi.LocalTarget('/tmp/debug_output.txt')