Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi ECSTask

From Leeroopedia


Knowledge Sources
Domains Container_Orchestration, AWS
Last Updated 2026-02-10 08:00 GMT

Overview

ECSTask is a Luigi contrib task that runs Docker containers on Amazon EC2 Container Service (ECS). It wraps the ECS runTask API via boto3, allowing Luigi pipelines to orchestrate containerized workloads on managed AWS infrastructure.

Description

The ECSTask class extends luigi.Task and supports two modes of specifying the Docker task definition:

  1. Pre-registered task definition: Pass an Amazon Resource Name (ARN) via task_def_arn.
  2. On-the-fly registration: Pass a Python dict matching the ECS taskDefinition JSON format via task_def. The task will be registered automatically before running.

Exactly one of task_def_arn or task_def must be provided.

Key properties:

  • command: Override to return a list of dicts with name and command keys for container command overrides.
  • run_task_kwargs: Override to provide additional ECS runTask API parameters such as launchType, networkConfiguration, and overrides.
  • combined_overrides: Merges run_task_kwargs['overrides'] with command property, with command taking precedence for colliding container names.
  • ecs_task_ids: Exposes the ECS task ARN(s) after submission.

The module also provides two helper functions:

  • _get_task_statuses(task_ids, cluster): Retrieves task statuses (RUNNING, PENDING, STOPPED) from the ECS API.
  • _track_tasks(task_ids, cluster): Polls task status every 2 seconds until all tasks reach STOPPED.

Usage

Subclass ECSTask and set task_def or task_def_arn. Optionally override command and run_task_kwargs for container overrides and launch configuration (e.g., Fargate, VPC networking).

Code Reference

Source Location

luigi/contrib/ecs.py (271 lines)

Signature

class ECSTask(luigi.Task):
    task_def_arn = luigi.OptionalParameter(default=None)
    task_def = luigi.OptionalParameter(default=None)
    cluster = luigi.Parameter(default='default')

    @property
    def ecs_task_ids(self):
        """Returns list of ECS task ARNs after submission."""

    @property
    def command(self):
        """Override to return list of {'name': ..., 'command': [...]} dicts."""

    @property
    def combined_overrides(self):
        """Merges run_task_kwargs overrides with command property."""

    @property
    def run_task_kwargs(self):
        """Override to provide additional runTask API kwargs (launchType, networkConfiguration, etc.)."""

    def run(self):
        """Registers task def (if needed), calls runTask, and polls until STOPPED."""

def _get_task_statuses(task_ids, cluster):
    """Retrieves task statuses from ECS describeTasks API."""

def _track_tasks(task_ids, cluster):
    """Polls task statuses every 2 seconds until all STOPPED."""

Import

from luigi.contrib.ecs import ECSTask

I/O Contract

Inputs

Input Type Description
task_def_arn OptionalParameter ARN of a pre-registered ECS task definition. Format: arn:aws:ecs:<region>:<user_id>:task-definition/<family>:<tag>.
task_def OptionalParameter Dict describing the task in ECS taskDefinition JSON format (family, containerDefinitions, volumes, etc.).
cluster Parameter ECS cluster name (default: 'default').

Outputs

Output Type Description
ECS task execution Side effect One or more Docker containers run on the specified ECS cluster until STOPPED. Raises an exception if runTask returns failures.
ecs_task_ids list of str The ECS task ARN(s) assigned to the running tasks, available after run() is called.

Usage Examples

from luigi.contrib.ecs import ECSTask

class ProcessDataOnECS(ECSTask):
    task_def = luigi.DictParameter(default={
        'family': 'data-processor',
        'volumes': [],
        'containerDefinitions': [
            {
                'memory': 512,
                'essential': True,
                'name': 'processor',
                'image': 'my-registry/data-processor:latest',
                'command': ['/bin/bash', '-c', 'python process.py']
            }
        ]
    })
    cluster = 'production'

    @property
    def command(self):
        return [{'name': 'processor', 'command': ['python', 'process.py', '--date', str(self.date)]}]

    @property
    def run_task_kwargs(self):
        return {
            'launchType': 'FARGATE',
            'networkConfiguration': {
                'awsvpcConfiguration': {
                    'subnets': ['subnet-abc123'],
                    'securityGroups': ['sg-def456'],
                    'assignPublicIp': 'ENABLED'
                }
            }
        }

    def output(self):
        return luigi.contrib.s3.S3Target('s3://bucket/output/done.txt')

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment