Implementation:Spotify Luigi BatchTask AWS
| Knowledge Sources | |
|---|---|
| Domains | Cloud_Computing, AWS |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
BatchTask is a Luigi Task that submits and monitors containerized jobs on AWS Batch, leveraging boto3 to queue Docker-based job definitions and poll for completion with automatic failure detection and CloudWatch log retrieval.
Description
The AWS Batch contrib module provides a Luigi wrapper for AWS Batch, Amazon's managed batch computing service. AWS Batch auto-scales EC2 Container Service instances, monitors cluster load, and schedules jobs based on submitted job definitions. The module contains three components:
- BatchTask (extends luigi.Task) -- The main task class that users subclass to submit jobs to AWS Batch. It accepts a pre-registered job_definition (either a name or ARN), an optional job_name for tracking, an optional job_queue to target a specific queue, and a configurable poll_time for status polling. The run() method creates a BatchClient, submits the job, and blocks until the job succeeds or fails. Users override the parameters property to return a dictionary of parameters passed to the job definition at submission time.
- BatchClient -- A boto3-powered client that encapsulates all AWS Batch API interactions:
- __init__(poll_time) -- Initializes boto3 clients for both Batch and CloudWatch Logs, and discovers the first active job queue.
- get_active_queue() -- Finds the first job queue with state ENABLED and status VALID.
- submit_job(job_definition, parameters, job_name, queue) -- Submits a job to AWS Batch and returns the job ID. Generates a random job name if none is provided.
- wait_on_job(job_id) -- Polls get_job_status() until the job reaches SUCCEEDED or FAILED. On failure, retrieves CloudWatch logs from the job's container log stream and raises a BatchJobException with the log content.
- get_job_status(job_id) -- Returns the current job status: SUBMITTED, PENDING, RUNNABLE, STARTING, RUNNING, SUCCEEDED, or FAILED.
- get_job_id_from_name(job_name) -- Looks up a running job by name in the active queue.
- get_logs(log_stream_name, get_last) -- Retrieves the last N log events from a CloudWatch Logs stream under the /aws/batch/job log group.
- register_job_definition(json_fpath) -- Registers a new job definition from a JSON file.
- BatchJobException -- A custom exception raised when a Batch job fails, containing the relevant CloudWatch log output.
The module requires boto3 and valid AWS credentials (discoverable by boto3, e.g., via aws configure).
Usage
Use BatchTask when you need to run containerized (Docker) batch computing workloads on AWS as part of a Luigi pipeline. This is appropriate for compute-intensive tasks that benefit from AWS Batch's auto-scaling capabilities, such as genomics processing, financial modeling, machine learning training, or any workload that can be packaged as a Docker container. Prerequisites include a registered AWS Batch job definition, an enabled job queue, and a configured compute environment. The task integrates naturally with other Luigi tasks for dependency management and can be mixed with local tasks, S3 targets, and other AWS service integrations.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/contrib/batch.py
- Lines: 1-219
Signature
POLL_TIME = 10
class BatchJobException(Exception):
pass
class BatchClient:
def __init__(self, poll_time=POLL_TIME): ...
def get_active_queue(self): ...
def get_job_id_from_name(self, job_name): ...
def get_job_status(self, job_id): ...
def get_logs(self, log_stream_name, get_last=50): ...
def submit_job(self, job_definition, parameters,
job_name=None, queue=None): ...
def wait_on_job(self, job_id): ...
def register_job_definition(self, json_fpath): ...
class BatchTask(luigi.Task):
job_definition = luigi.Parameter()
job_name = luigi.OptionalParameter(default=None)
job_queue = luigi.OptionalParameter(default=None)
poll_time = luigi.IntParameter(default=POLL_TIME)
def run(self): ...
@property
def parameters(self): ... # Override to return dict of job parameters
Import
from luigi.contrib.batch import BatchTask, BatchClient, BatchJobException
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| job_definition | str | Yes | Name or ARN of a pre-registered AWS Batch job definition |
| job_name | str | No | Human-readable name for the job; auto-generated if not provided (e.g., batch-job-abcdefgh) |
| job_queue | str | No | Name of the AWS Batch job queue; defaults to the first active queue discovered by get_active_queue() |
| poll_time | int | No | Seconds between status polls; default: 10 |
| parameters | dict | No | Dictionary of parameters passed to the job definition at submission time; override the parameters property to provide values |
| AWS credentials | -- | Yes | Valid AWS credentials discoverable by boto3 (environment variables, AWS config, IAM role, etc.) |
Outputs
| Name | Type | Description |
|---|---|---|
| job completion | bool | wait_on_job() returns True when the job status reaches SUCCEEDED |
| BatchJobException | exception | Raised when job status reaches FAILED; contains CloudWatch log output from the job container |
| job_id | str | AWS Batch job UUID returned by submit_job(); used for status tracking |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.batch import BatchTask
class RunETLContainer(BatchTask):
"""Submit a pre-registered ETL job to AWS Batch."""
job_definition = luigi.Parameter(default='etl-processor:3')
date = luigi.DateParameter()
@property
def parameters(self):
return {
'date': str(self.date),
'input_bucket': 's3://my-data/raw/',
'output_bucket': 's3://my-data/processed/'
}
def output(self):
return luigi.contrib.s3.S3Target(
's3://my-data/processed/%s/_SUCCESS' % self.date
)
if __name__ == '__main__':
luigi.build([
RunETLContainer(date='2024-01-15')
], local_scheduler=True)
Custom Queue and Polling
import luigi
from luigi.contrib.batch import BatchTask
class GPUTrainingJob(BatchTask):
"""Submit a GPU training job to a specific queue."""
job_definition = luigi.Parameter(default='ml-training-gpu:latest')
job_queue = luigi.OptionalParameter(default='gpu-queue')
poll_time = luigi.IntParameter(default=30)
model_name = luigi.Parameter()
@property
def parameters(self):
return {
'model': self.model_name,
'epochs': '100',
'batch_size': '64'
}
def output(self):
return luigi.contrib.s3.S3Target(
's3://ml-models/%s/model.pt' % self.model_name
)
Using BatchClient Directly
from luigi.contrib.batch import BatchClient
# Register a new job definition
bc = BatchClient(poll_time=15)
response = bc.register_job_definition('/config/my_job_def.json')
print('Registered:', response['jobDefinitionName'])
# Submit and wait
job_id = bc.submit_job(
job_definition='my-job-def:1',
parameters={'input': 's3://bucket/input', 'output': 's3://bucket/output'},
job_name='manual-run-2024',
queue='default-queue'
)
print('Submitted job:', job_id)
print('Status:', bc.get_job_status(job_id))
bc.wait_on_job(job_id)