Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi LSFJobTask

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_LSFJobTask.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains HPC, Batch_Computing
Last Updated 2026-02-10 08:00 GMT

Overview

Luigi contrib module providing IBM Platform LSF (Load Sharing Facility) batch system integration through the LSFJobTask class, enabling Luigi pipelines to submit and track jobs on HPC cluster environments.

Description

The lsf module wraps the LSF batch computing system for use within Luigi pipelines. LSF is a workload management platform commonly used in high-performance computing (HPC) environments. The module follows the same architectural pattern as Luigi's Hadoop integration: pickle the task, submit it to the cluster via bsub, and track its status via bjobs.

Core Class:

  • LSFJobTask (extends luigi.Task): The main task class for submitting and tracking LSF batch jobs. It provides a complete lifecycle:
  1. Initialization (_init_local()): Creates a temporary directory, pickles the task instance to disk, and tarballs the Python dependencies (luigi and the task's module) into a packages.tar archive.
  2. Job submission (_run_job()): Constructs a bsub command with the configured parameters and submits it to the LSF scheduler. The job runs lsf_runner.py which unpickles the task and calls work().
  3. Job tracking (_track_job()): Polls bjobs at configurable intervals to monitor job status (RUN, PEND, DONE, EXIT, SSUSP).
  4. Cleanup (_finish()): Removes temporary files after job completion.

Luigi Parameters:

  • n_cpu_flag (IntParameter, default=2): Number of CPUs requested via bsub -n.
  • shared_tmp_dir (Parameter, default='/tmp'): Base directory for temporary files shared across cluster nodes.
  • resource_flag (Parameter, default='mem=8192'): Resource requirement string for bsub -R rusage[...].
  • memory_flag (Parameter, default='8192'): Memory limit for bsub -M.
  • queue_flag (Parameter, default='queue_name'): LSF queue name for bsub -q.
  • runtime_flag (IntParameter, default=60): Wall clock time limit for bsub -W.
  • job_name_flag (Parameter, default=): Optional job name for bsub -J.
  • poll_time (FloatParameter, default=5): Seconds between bjobs status checks.
  • save_job_info (BoolParameter, default=False): Whether to preserve temporary files after completion.
  • extra_bsub_args (Parameter, default=): Additional arguments passed to bsub.

Key Methods:

  • work(): The method that subclasses should override to implement actual computation. This is called by the runner on the cluster node (not run()).
  • init_local(): Hook for subclasses to perform setup before job submission.
  • fetch_task_failures(): Reads the error file (job.err) from the temporary directory.
  • fetch_task_output(): Reads the output file (job.out) from the temporary directory.

Helper Class:

  • LocalLSFJobTask (extends LSFJobTask): A local variant that calls work() directly without LSF submission, useful for debugging.

Utility Functions:

  • track_job(job_id): Runs bjobs -noheader -o stat to get the status of a job.
  • kill_job(job_id): Runs bkill to terminate a running job.

Usage

Use this module when your Luigi pipeline runs in an HPC environment managed by IBM Platform LSF. Subclass LSFJobTask and implement the work() method with your computation logic. The module handles serialization, submission, tracking, and cleanup automatically.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/lsf.py
  • Lines: 1-356

Signature

class LSFJobTask(luigi.Task):
    n_cpu_flag = luigi.IntParameter(default=2, significant=False)
    shared_tmp_dir = luigi.Parameter(default='/tmp', significant=False)
    resource_flag = luigi.Parameter(default='mem=8192', significant=False)
    memory_flag = luigi.Parameter(default='8192', significant=False)
    queue_flag = luigi.Parameter(default='queue_name', significant=False)
    runtime_flag = luigi.IntParameter(default=60)
    job_name_flag = luigi.Parameter(default='')
    poll_time = luigi.FloatParameter(significant=False, default=5)
    save_job_info = luigi.BoolParameter(default=False)
    output = luigi.Parameter(default='')
    extra_bsub_args = luigi.Parameter(default='')

    def run(self):
        ...

    def work(self):
        """Subclass this for actual computation."""
        pass

class LocalLSFJobTask(LSFJobTask):
    def run(self):
        self.init_local()
        self.work()

Import

from luigi.contrib.lsf import LSFJobTask

I/O Contract

Inputs

Name Type Required Description
n_cpu_flag int No Number of CPUs to request from LSF; defaults to 2
shared_tmp_dir str No Shared temporary directory accessible by all cluster nodes; defaults to '/tmp'
resource_flag str No LSF resource requirement string; defaults to 'mem=8192'
memory_flag str No Memory limit in MB for the job; defaults to '8192'
queue_flag str No LSF queue name to submit the job to; defaults to 'queue_name'
runtime_flag int No Wall clock time limit in minutes; defaults to 60
job_name_flag str No Optional human-readable job name
poll_time float No Seconds between status polling; defaults to 5
save_job_info bool No Whether to preserve temporary files after completion; defaults to False
extra_bsub_args str No Additional space-separated arguments passed to bsub

Outputs

Name Type Description
Job execution LSF job The task is pickled, submitted to LSF via bsub, and executed on a cluster node
job_status str Final status: DONE, FAILED, PENDING, RUNNING, or UNKNOWN
job.out file Standard output captured from the LSF job execution
job.err file Standard error captured from the LSF job execution
output() luigi.Target The output target(s) defined by the subclass

Usage Examples

Basic Usage

import luigi
from luigi.contrib.lsf import LSFJobTask

class MyHPCJob(LSFJobTask):
    n_cpu_flag = luigi.IntParameter(default=4)
    memory_flag = luigi.Parameter(default='16384')
    queue_flag = luigi.Parameter(default='long')
    runtime_flag = luigi.IntParameter(default=120)

    input_file = luigi.Parameter()

    def work(self):
        """This runs on the cluster node."""
        import pandas as pd

        df = pd.read_csv(self.input_file)
        result = df.groupby('category').sum()
        result.to_csv(self.output().path)

    def output(self):
        return luigi.LocalTarget('/shared/results/aggregated.csv')

With Job Name and Extra Arguments

class NamedHPCJob(LSFJobTask):
    date = luigi.DateParameter()
    job_name_flag = luigi.Parameter(default='daily-etl')
    extra_bsub_args = luigi.Parameter(default='-P my_project -u user@example.com')
    save_job_info = luigi.BoolParameter(default=True)

    def work(self):
        # Heavy computation on the cluster
        pass

    def output(self):
        return luigi.LocalTarget('/shared/output/daily_{}.csv'.format(self.date))

Local Debugging with LocalLSFJobTask

from luigi.contrib.lsf import LocalLSFJobTask

class DebugJob(LocalLSFJobTask):
    """Runs work() directly without LSF submission, for local testing."""

    def work(self):
        print("Running locally for debugging")
        with self.output().open('w') as f:
            f.write('debug output\n')

    def output(self):
        return luigi.LocalTarget('/tmp/debug_output.txt')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment