Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi BatchTask AWS

From Leeroopedia


Knowledge Sources
Domains Cloud_Computing, AWS
Last Updated 2026-02-10 08:00 GMT

Overview

BatchTask is a Luigi Task that submits and monitors containerized jobs on AWS Batch, leveraging boto3 to queue Docker-based job definitions and poll for completion with automatic failure detection and CloudWatch log retrieval.

Description

The AWS Batch contrib module provides a Luigi wrapper for AWS Batch, Amazon's managed batch computing service. AWS Batch auto-scales EC2 Container Service instances, monitors cluster load, and schedules jobs based on submitted job definitions. The module contains three components:

  • BatchTask (extends luigi.Task) -- The main task class that users subclass to submit jobs to AWS Batch. It accepts a pre-registered job_definition (either a name or ARN), an optional job_name for tracking, an optional job_queue to target a specific queue, and a configurable poll_time for status polling. The run() method creates a BatchClient, submits the job, and blocks until the job succeeds or fails. Users override the parameters property to return a dictionary of parameters passed to the job definition at submission time.
  • BatchClient -- A boto3-powered client that encapsulates all AWS Batch API interactions:
    • __init__(poll_time) -- Initializes boto3 clients for both Batch and CloudWatch Logs, and discovers the first active job queue.
    • get_active_queue() -- Finds the first job queue with state ENABLED and status VALID.
    • submit_job(job_definition, parameters, job_name, queue) -- Submits a job to AWS Batch and returns the job ID. Generates a random job name if none is provided.
    • wait_on_job(job_id) -- Polls get_job_status() until the job reaches SUCCEEDED or FAILED. On failure, retrieves CloudWatch logs from the job's container log stream and raises a BatchJobException with the log content.
    • get_job_status(job_id) -- Returns the current job status: SUBMITTED, PENDING, RUNNABLE, STARTING, RUNNING, SUCCEEDED, or FAILED.
    • get_job_id_from_name(job_name) -- Looks up a running job by name in the active queue.
    • get_logs(log_stream_name, get_last) -- Retrieves the last N log events from a CloudWatch Logs stream under the /aws/batch/job log group.
    • register_job_definition(json_fpath) -- Registers a new job definition from a JSON file.
  • BatchJobException -- A custom exception raised when a Batch job fails, containing the relevant CloudWatch log output.

The module requires boto3 and valid AWS credentials (discoverable by boto3, e.g., via aws configure).

Usage

Use BatchTask when you need to run containerized (Docker) batch computing workloads on AWS as part of a Luigi pipeline. This is appropriate for compute-intensive tasks that benefit from AWS Batch's auto-scaling capabilities, such as genomics processing, financial modeling, machine learning training, or any workload that can be packaged as a Docker container. Prerequisites include a registered AWS Batch job definition, an enabled job queue, and a configured compute environment. The task integrates naturally with other Luigi tasks for dependency management and can be mixed with local tasks, S3 targets, and other AWS service integrations.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/batch.py
  • Lines: 1-219

Signature

POLL_TIME = 10

class BatchJobException(Exception):
    pass

class BatchClient:
    def __init__(self, poll_time=POLL_TIME): ...
    def get_active_queue(self): ...
    def get_job_id_from_name(self, job_name): ...
    def get_job_status(self, job_id): ...
    def get_logs(self, log_stream_name, get_last=50): ...
    def submit_job(self, job_definition, parameters,
                   job_name=None, queue=None): ...
    def wait_on_job(self, job_id): ...
    def register_job_definition(self, json_fpath): ...

class BatchTask(luigi.Task):
    job_definition = luigi.Parameter()
    job_name = luigi.OptionalParameter(default=None)
    job_queue = luigi.OptionalParameter(default=None)
    poll_time = luigi.IntParameter(default=POLL_TIME)

    def run(self): ...

    @property
    def parameters(self): ...   # Override to return dict of job parameters

Import

from luigi.contrib.batch import BatchTask, BatchClient, BatchJobException

I/O Contract

Inputs

Name Type Required Description
job_definition str Yes Name or ARN of a pre-registered AWS Batch job definition
job_name str No Human-readable name for the job; auto-generated if not provided (e.g., batch-job-abcdefgh)
job_queue str No Name of the AWS Batch job queue; defaults to the first active queue discovered by get_active_queue()
poll_time int No Seconds between status polls; default: 10
parameters dict No Dictionary of parameters passed to the job definition at submission time; override the parameters property to provide values
AWS credentials -- Yes Valid AWS credentials discoverable by boto3 (environment variables, AWS config, IAM role, etc.)

Outputs

Name Type Description
job completion bool wait_on_job() returns True when the job status reaches SUCCEEDED
BatchJobException exception Raised when job status reaches FAILED; contains CloudWatch log output from the job container
job_id str AWS Batch job UUID returned by submit_job(); used for status tracking

Usage Examples

Basic Usage

import luigi
from luigi.contrib.batch import BatchTask


class RunETLContainer(BatchTask):
    """Submit a pre-registered ETL job to AWS Batch."""

    job_definition = luigi.Parameter(default='etl-processor:3')
    date = luigi.DateParameter()

    @property
    def parameters(self):
        return {
            'date': str(self.date),
            'input_bucket': 's3://my-data/raw/',
            'output_bucket': 's3://my-data/processed/'
        }

    def output(self):
        return luigi.contrib.s3.S3Target(
            's3://my-data/processed/%s/_SUCCESS' % self.date
        )


if __name__ == '__main__':
    luigi.build([
        RunETLContainer(date='2024-01-15')
    ], local_scheduler=True)

Custom Queue and Polling

import luigi
from luigi.contrib.batch import BatchTask


class GPUTrainingJob(BatchTask):
    """Submit a GPU training job to a specific queue."""

    job_definition = luigi.Parameter(default='ml-training-gpu:latest')
    job_queue = luigi.OptionalParameter(default='gpu-queue')
    poll_time = luigi.IntParameter(default=30)
    model_name = luigi.Parameter()

    @property
    def parameters(self):
        return {
            'model': self.model_name,
            'epochs': '100',
            'batch_size': '64'
        }

    def output(self):
        return luigi.contrib.s3.S3Target(
            's3://ml-models/%s/model.pt' % self.model_name
        )

Using BatchClient Directly

from luigi.contrib.batch import BatchClient

# Register a new job definition
bc = BatchClient(poll_time=15)
response = bc.register_job_definition('/config/my_job_def.json')
print('Registered:', response['jobDefinitionName'])

# Submit and wait
job_id = bc.submit_job(
    job_definition='my-job-def:1',
    parameters={'input': 's3://bucket/input', 'output': 's3://bucket/output'},
    job_name='manual-run-2024',
    queue='default-queue'
)
print('Submitted job:', job_id)
print('Status:', bc.get_job_status(job_id))
bc.wait_on_job(job_id)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment