Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi KubernetesJobTask

From Leeroopedia
Revision as of 16:47, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Spotify_Luigi_KubernetesJobTask.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Container_Orchestration, Kubernetes
Last Updated 2026-02-10 08:00 GMT

Overview

Luigi contrib module providing Kubernetes Job integration through the KubernetesJobTask class, enabling Luigi pipelines to submit, track, and manage containerized workloads on Kubernetes clusters.

Description

The kubernetes module wraps the Kubernetes Job API for use within Luigi pipelines. It uses the pykube-ng library for cluster interaction and supports both kubeconfig-based and service account authentication.

Core Classes:

  • kubernetes (extends luigi.Config): Configuration class that reads Kubernetes settings from Luigi's config file. Parameters include:
    • auth_method: Authorization method, either "kubeconfig" (default) or "service-account".
    • kubeconfig_path: Path to the kubeconfig file; defaults to ~/.kube/config.
    • max_retrials: Maximum number of job retries on failure; defaults to 0.
    • kubernetes_namespace: Kubernetes namespace for job execution; defaults to None (uses default namespace).
  • KubernetesJobTask (extends luigi.Task): The main task class for running Kubernetes Jobs. Subclasses must implement:
    • name (property): A name for the job; a UUID is automatically appended.
    • spec_schema (property): The Kubernetes Pod spec in JSON/dict format, defining containers, images, commands, and restart policy.

The task lifecycle:

  1. Initialization: _init_kubernetes() creates the Kubernetes API client and generates a unique job name ({name}-{timestamp}-{uuid}).
  2. Job creation: run() constructs a complete Kubernetes Job JSON spec with batch/v1 API version, labels (including spawned_by: luigi and luigi_task_id), backoff limit, and optional active deadline.
  3. Job tracking: __track_job() polls the job status at configurable intervals, waiting for the job to start and then monitoring until it succeeds or fails.
  4. Pod verification: __verify_job_has_started() checks pod status including container states (waiting, terminated) and conditions.
  5. Completion: On success, calls signal_complete() and optionally deletes the job (cascade delete). On failure beyond max_retrials, the job is scaled to zero replicas.

Key Properties:

  • labels: Custom labels for the Kubernetes job (dict).
  • max_retrials: Maximum failure retries before marking as failed.
  • backoff_limit: Kubernetes backoff limit (default: 6).
  • delete_on_success: Whether to delete the job after success (default: True).
  • print_pod_logs_on_exit: Whether to fetch and log pod output (default: False).
  • active_deadline_seconds: Optional time limit for pod scheduling.
  • poll_interval: Status polling interval in seconds (default: 5).

Usage

Use this module when your Luigi pipeline needs to execute containerized workloads on a Kubernetes cluster. Common use cases include running data processing jobs in Docker containers, leveraging cluster resources for heavy computation, and integrating with cloud-native Kubernetes deployments.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/kubernetes.py
  • Lines: 1-403

Signature

class kubernetes(luigi.Config):
    auth_method = luigi.Parameter(default="kubeconfig")
    kubeconfig_path = luigi.Parameter(default="~/.kube/config")
    max_retrials = luigi.IntParameter(default=0)
    kubernetes_namespace = luigi.OptionalParameter(default=None)

class KubernetesJobTask(luigi.Task):
    @property
    def name(self):
        raise NotImplementedError("subclass must define name")

    @property
    def spec_schema(self):
        raise NotImplementedError("subclass must define spec_schema")

    @property
    def labels(self):
        return {}

    @property
    def max_retrials(self):
        return self.kubernetes_config.max_retrials

    @property
    def backoff_limit(self):
        return 6

    @property
    def delete_on_success(self):
        return True

    @property
    def print_pod_logs_on_exit(self):
        return False

    def run(self):
        ...

    def signal_complete(self):
        ...

Import

from luigi.contrib.kubernetes import KubernetesJobTask

I/O Contract

Inputs

Name Type Required Description
name str (property) Yes Base name for the Kubernetes job; a UUID is appended automatically
spec_schema dict (property) Yes Kubernetes Pod spec defining containers, images, commands, and restart policy
auth_method str No Authentication method: "kubeconfig" (default) or "service-account"
kubeconfig_path str No Path to kubeconfig file; defaults to ~/.kube/config
kubernetes_namespace str No Kubernetes namespace; defaults to the cluster's default namespace
labels dict No Custom labels to attach to the Job and Pod metadata
max_retrials int No Maximum retries on failure; defaults to 0 (from config)
backoff_limit int No Kubernetes backoff limit for pod failures; defaults to 6
delete_on_success bool No Delete job resources after success; defaults to True
print_pod_logs_on_exit bool No Fetch and print pod logs after completion; defaults to False
active_deadline_seconds int No Time limit for pod scheduling; defaults to None (unlimited)

Outputs

Name Type Description
Job execution Kubernetes Job The containerized workload is submitted and tracked on the Kubernetes cluster
signal_complete() callback Called on successful job completion; subclasses can override to write output markers
output() luigi.Target Optional output target for completion checking; returns None by default

Usage Examples

Basic Usage

import luigi
from luigi.contrib.kubernetes import KubernetesJobTask

class MyK8sJob(KubernetesJobTask):

    @property
    def name(self):
        return "my-data-processor"

    @property
    def spec_schema(self):
        return {
            "containers": [{
                "name": "processor",
                "image": "myregistry/data-processor:latest",
                "command": ["python", "process.py", "--date", "2024-01-01"]
            }],
            "restartPolicy": "Never"
        }

    def signal_complete(self):
        with self.output().open('w') as f:
            f.write('')

    def output(self):
        return luigi.LocalTarget('/tmp/k8s_job_complete')

With Custom Labels and Namespace

class LabeledK8sJob(KubernetesJobTask):

    @property
    def name(self):
        return "labeled-job"

    @property
    def kubernetes_namespace(self):
        return "data-pipeline"

    @property
    def labels(self):
        return {
            "team": "data-engineering",
            "environment": "production"
        }

    @property
    def spec_schema(self):
        return {
            "containers": [{
                "name": "etl",
                "image": "myregistry/etl-runner:v2",
                "command": ["./run_etl.sh"],
                "resources": {
                    "requests": {"memory": "2Gi", "cpu": "1"},
                    "limits": {"memory": "4Gi", "cpu": "2"}
                }
            }],
            "restartPolicy": "Never"
        }

    @property
    def print_pod_logs_on_exit(self):
        return True

    @property
    def max_retrials(self):
        return 2

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment