Implementation:Spotify Luigi DockerTask
| Knowledge Sources | |
|---|---|
| Domains | Container_Orchestration, Docker |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
DockerTask is a Luigi contrib task that wraps Docker container execution, enabling any Docker image to be run as a Luigi task. It uses the Docker Python SDK (low-level API) to communicate directly with the Docker daemon, providing proper error handling for image pulling, container creation, execution, and cleanup.
Description
The DockerTask class extends luigi.Task and exposes a rich set of overridable properties to configure container behavior:
image: Docker image to run (default:'alpine'). Automatically appends:latesttag if no tag is specified.command: Command to execute inside the container (default:"echo hello world").name: Optional container name.environment: Dict of environment variables. Automatically includesLUIGI_TMP_DIR.mount_tmp: WhenTrue(default), creates a host temp directory and mounts it atcontainer_tmp_dir(default:/tmp/luigi).binds: Additional volume bind mounts as a list of strings.network_mode: Docker network mode.docker_url: Docker daemon URL.auto_remove: Automatically remove the container after execution (default:True).force_pull: Always pull the image before running (default:False).host_config_options: Dict of additional host config options (e.g., GPU requests, shared memory size).container_options: Dict of additional container creation options (e.g., user, ports).
The run() method orchestrates the full lifecycle: pull image, remove any conflicting container, create container with volumes and environment, start, wait for exit, check exit status, retrieve error logs if non-zero, auto-remove, and clean up the temp directory.
Usage
Subclass DockerTask and override properties such as image, command, environment, and binds to define the containerized workload. Override requires() and output() as with any Luigi task.
Code Reference
Source Location
luigi/contrib/docker_runner.py (265 lines)
Signature
class DockerTask(luigi.Task):
@property
def image(self): return 'alpine'
@property
def command(self): return "echo hello world"
@property
def name(self): return None
@property
def host_config_options(self): return {}
@property
def container_options(self): return {}
@property
def environment(self): return {}
@property
def container_tmp_dir(self): return '/tmp/luigi'
@property
def binds(self): return None
@property
def network_mode(self): return ''
@property
def docker_url(self): return None
@property
def auto_remove(self): return True
@property
def force_pull(self): return False
@property
def mount_tmp(self): return True
def run(self):
"""Pulls image, creates/starts container, waits, handles errors, cleans up."""
Import
from luigi.contrib.docker_runner import DockerTask
I/O Contract
Inputs
| Input | Type | Description |
|---|---|---|
image |
str (property) |
Docker image name (and optional tag) to run. |
command |
str (property) |
Shell command to execute in the container. |
environment |
dict (property) |
Environment variables passed to the container. LUIGI_TMP_DIR is automatically set.
|
binds |
list or str (property) |
Additional host-to-container volume bind mounts. |
Outputs
| Output | Type | Description |
|---|---|---|
| Container execution | Side effect | The Docker container runs to completion. Raises ContainerError on non-zero exit, ImageNotFound if image is missing, or APIError on Docker API failures.
|
| Temp directory | Host filesystem | If mount_tmp is True, files written to container_tmp_dir inside the container are available on the host at the auto-created temp directory during execution. Cleaned up after run.
|
Usage Examples
from luigi.contrib.docker_runner import DockerTask
class RunETLContainer(DockerTask):
date = luigi.DateParameter()
@property
def image(self):
return 'my-registry/etl-processor:v2.1'
@property
def command(self):
return 'python /app/etl.py --date {}'.format(self.date)
@property
def environment(self):
return {
'DATABASE_URL': 'postgres://db.internal:5432/warehouse',
'DATE': str(self.date),
}
@property
def binds(self):
return ['/data/shared:/mnt/shared:ro']
@property
def auto_remove(self):
return True
def output(self):
return luigi.LocalTarget('/data/output/{}.done'.format(self.date))
Related Pages
- Spotify_Luigi_Container_Job_Execution -- Principle governing container-based job execution in Luigi
docker.APIClient-- The Docker Python SDK low-level API client used internally- Spotify_Luigi_ECSTask -- Alternative container task for Amazon ECS