Implementation:Spotify Luigi ECSTask
| Knowledge Sources | |
|---|---|
| Domains | Container_Orchestration, AWS |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
ECSTask is a Luigi contrib task that runs Docker containers on Amazon EC2 Container Service (ECS). It wraps the ECS runTask API via boto3, allowing Luigi pipelines to orchestrate containerized workloads on managed AWS infrastructure.
Description
The ECSTask class extends luigi.Task and supports two modes of specifying the Docker task definition:
- Pre-registered task definition: Pass an Amazon Resource Name (ARN) via
task_def_arn. - On-the-fly registration: Pass a Python dict matching the ECS
taskDefinitionJSON format viatask_def. The task will be registered automatically before running.
Exactly one of task_def_arn or task_def must be provided.
Key properties:
command: Override to return a list of dicts withnameandcommandkeys for container command overrides.run_task_kwargs: Override to provide additional ECSrunTaskAPI parameters such aslaunchType,networkConfiguration, andoverrides.combined_overrides: Mergesrun_task_kwargs['overrides']withcommandproperty, withcommandtaking precedence for colliding container names.ecs_task_ids: Exposes the ECS task ARN(s) after submission.
The module also provides two helper functions:
_get_task_statuses(task_ids, cluster): Retrieves task statuses (RUNNING, PENDING, STOPPED) from the ECS API._track_tasks(task_ids, cluster): Polls task status every 2 seconds until all tasks reach STOPPED.
Usage
Subclass ECSTask and set task_def or task_def_arn. Optionally override command and run_task_kwargs for container overrides and launch configuration (e.g., Fargate, VPC networking).
Code Reference
Source Location
luigi/contrib/ecs.py (271 lines)
Signature
class ECSTask(luigi.Task):
task_def_arn = luigi.OptionalParameter(default=None)
task_def = luigi.OptionalParameter(default=None)
cluster = luigi.Parameter(default='default')
@property
def ecs_task_ids(self):
"""Returns list of ECS task ARNs after submission."""
@property
def command(self):
"""Override to return list of {'name': ..., 'command': [...]} dicts."""
@property
def combined_overrides(self):
"""Merges run_task_kwargs overrides with command property."""
@property
def run_task_kwargs(self):
"""Override to provide additional runTask API kwargs (launchType, networkConfiguration, etc.)."""
def run(self):
"""Registers task def (if needed), calls runTask, and polls until STOPPED."""
def _get_task_statuses(task_ids, cluster):
"""Retrieves task statuses from ECS describeTasks API."""
def _track_tasks(task_ids, cluster):
"""Polls task statuses every 2 seconds until all STOPPED."""
Import
from luigi.contrib.ecs import ECSTask
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
task_def_arn |
OptionalParameter |
ARN of a pre-registered ECS task definition. Format: arn:aws:ecs:<region>:<user_id>:task-definition/<family>:<tag>.
|
task_def |
OptionalParameter |
Dict describing the task in ECS taskDefinition JSON format (family, containerDefinitions, volumes, etc.).
|
cluster |
Parameter |
ECS cluster name (default: 'default').
|
Outputs
| Output | Type | Description |
|---|---|---|
| ECS task execution | Side effect | One or more Docker containers run on the specified ECS cluster until STOPPED. Raises an exception if runTask returns failures.
|
ecs_task_ids |
list of str |
The ECS task ARN(s) assigned to the running tasks, available after run() is called.
|
Usage Examples
from luigi.contrib.ecs import ECSTask
class ProcessDataOnECS(ECSTask):
task_def = luigi.DictParameter(default={
'family': 'data-processor',
'volumes': [],
'containerDefinitions': [
{
'memory': 512,
'essential': True,
'name': 'processor',
'image': 'my-registry/data-processor:latest',
'command': ['/bin/bash', '-c', 'python process.py']
}
]
})
cluster = 'production'
@property
def command(self):
return [{'name': 'processor', 'command': ['python', 'process.py', '--date', str(self.date)]}]
@property
def run_task_kwargs(self):
return {
'launchType': 'FARGATE',
'networkConfiguration': {
'awsvpcConfiguration': {
'subnets': ['subnet-abc123'],
'securityGroups': ['sg-def456'],
'assignPublicIp': 'ENABLED'
}
}
}
def output(self):
return luigi.contrib.s3.S3Target('s3://bucket/output/done.txt')
Related Pages
- Spotify_Luigi_Container_Job_Execution -- Principle governing container-based job execution in Luigi
- Spotify_Luigi_DockerTask -- Alternative container task for local Docker execution
boto3ECS client -- AWS SDK used internally for ECS API calls