Implementation:Spotify Luigi KubernetesJobTask
| Knowledge Sources | |
|---|---|
| Domains | Container_Orchestration, Kubernetes |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
Luigi contrib module providing Kubernetes Job integration through the KubernetesJobTask class, enabling Luigi pipelines to submit, track, and manage containerized workloads on Kubernetes clusters.
Description
The kubernetes module wraps the Kubernetes Job API for use within Luigi pipelines. It uses the pykube-ng library for cluster interaction and supports both kubeconfig-based and service account authentication.
Core Classes:
- kubernetes (extends
luigi.Config): Configuration class that reads Kubernetes settings from Luigi's config file. Parameters include:auth_method: Authorization method, either "kubeconfig" (default) or "service-account".kubeconfig_path: Path to the kubeconfig file; defaults to~/.kube/config.max_retrials: Maximum number of job retries on failure; defaults to 0.kubernetes_namespace: Kubernetes namespace for job execution; defaults to None (uses default namespace).
- KubernetesJobTask (extends
luigi.Task): The main task class for running Kubernetes Jobs. Subclasses must implement:name(property): A name for the job; a UUID is automatically appended.spec_schema(property): The Kubernetes Pod spec in JSON/dict format, defining containers, images, commands, and restart policy.
The task lifecycle:
- Initialization:
_init_kubernetes()creates the Kubernetes API client and generates a unique job name ({name}-{timestamp}-{uuid}). - Job creation:
run()constructs a complete Kubernetes Job JSON spec with batch/v1 API version, labels (includingspawned_by: luigiandluigi_task_id), backoff limit, and optional active deadline. - Job tracking:
__track_job()polls the job status at configurable intervals, waiting for the job to start and then monitoring until it succeeds or fails. - Pod verification:
__verify_job_has_started()checks pod status including container states (waiting, terminated) and conditions. - Completion: On success, calls
signal_complete()and optionally deletes the job (cascade delete). On failure beyondmax_retrials, the job is scaled to zero replicas.
Key Properties:
labels: Custom labels for the Kubernetes job (dict).max_retrials: Maximum failure retries before marking as failed.backoff_limit: Kubernetes backoff limit (default: 6).delete_on_success: Whether to delete the job after success (default: True).print_pod_logs_on_exit: Whether to fetch and log pod output (default: False).active_deadline_seconds: Optional time limit for pod scheduling.poll_interval: Status polling interval in seconds (default: 5).
Usage
Use this module when your Luigi pipeline needs to execute containerized workloads on a Kubernetes cluster. Common use cases include running data processing jobs in Docker containers, leveraging cluster resources for heavy computation, and integrating with cloud-native Kubernetes deployments.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File:
luigi/contrib/kubernetes.py - Lines: 1-403
Signature
class kubernetes(luigi.Config):
auth_method = luigi.Parameter(default="kubeconfig")
kubeconfig_path = luigi.Parameter(default="~/.kube/config")
max_retrials = luigi.IntParameter(default=0)
kubernetes_namespace = luigi.OptionalParameter(default=None)
class KubernetesJobTask(luigi.Task):
@property
def name(self):
raise NotImplementedError("subclass must define name")
@property
def spec_schema(self):
raise NotImplementedError("subclass must define spec_schema")
@property
def labels(self):
return {}
@property
def max_retrials(self):
return self.kubernetes_config.max_retrials
@property
def backoff_limit(self):
return 6
@property
def delete_on_success(self):
return True
@property
def print_pod_logs_on_exit(self):
return False
def run(self):
...
def signal_complete(self):
...
Import
from luigi.contrib.kubernetes import KubernetesJobTask
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str (property) | Yes | Base name for the Kubernetes job; a UUID is appended automatically |
| spec_schema | dict (property) | Yes | Kubernetes Pod spec defining containers, images, commands, and restart policy |
| auth_method | str | No | Authentication method: "kubeconfig" (default) or "service-account" |
| kubeconfig_path | str | No | Path to kubeconfig file; defaults to ~/.kube/config
|
| kubernetes_namespace | str | No | Kubernetes namespace; defaults to the cluster's default namespace |
| labels | dict | No | Custom labels to attach to the Job and Pod metadata |
| max_retrials | int | No | Maximum retries on failure; defaults to 0 (from config) |
| backoff_limit | int | No | Kubernetes backoff limit for pod failures; defaults to 6 |
| delete_on_success | bool | No | Delete job resources after success; defaults to True |
| print_pod_logs_on_exit | bool | No | Fetch and print pod logs after completion; defaults to False |
| active_deadline_seconds | int | No | Time limit for pod scheduling; defaults to None (unlimited) |
Outputs
| Name | Type | Description |
|---|---|---|
| Job execution | Kubernetes Job | The containerized workload is submitted and tracked on the Kubernetes cluster |
| signal_complete() | callback | Called on successful job completion; subclasses can override to write output markers |
| output() | luigi.Target | Optional output target for completion checking; returns None by default |
Usage Examples
Basic Usage
import luigi
from luigi.contrib.kubernetes import KubernetesJobTask
class MyK8sJob(KubernetesJobTask):
@property
def name(self):
return "my-data-processor"
@property
def spec_schema(self):
return {
"containers": [{
"name": "processor",
"image": "myregistry/data-processor:latest",
"command": ["python", "process.py", "--date", "2024-01-01"]
}],
"restartPolicy": "Never"
}
def signal_complete(self):
with self.output().open('w') as f:
f.write('')
def output(self):
return luigi.LocalTarget('/tmp/k8s_job_complete')
With Custom Labels and Namespace
class LabeledK8sJob(KubernetesJobTask):
@property
def name(self):
return "labeled-job"
@property
def kubernetes_namespace(self):
return "data-pipeline"
@property
def labels(self):
return {
"team": "data-engineering",
"environment": "production"
}
@property
def spec_schema(self):
return {
"containers": [{
"name": "etl",
"image": "myregistry/etl-runner:v2",
"command": ["./run_etl.sh"],
"resources": {
"requests": {"memory": "2Gi", "cpu": "1"},
"limits": {"memory": "4Gi", "cpu": "2"}
}
}],
"restartPolicy": "Never"
}
@property
def print_pod_logs_on_exit(self):
return True
@property
def max_retrials(self):
return 2