Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi PaiTask

From Leeroopedia


Knowledge Sources
Domains Cloud_Computing, Microsoft_Azure
Last Updated 2026-02-10 08:00 GMT

Overview

PaiTask is a Luigi Task subclass that submits and monitors jobs on Microsoft OpenPAI clusters via REST API, enabling distributed AI model training and resource management on Azure-based infrastructure.

Description

The PaiTask module provides a complete integration between Luigi workflows and Microsoft OpenPAI, an open source platform for AI model training and resource management. The module defines several classes that map directly to the OpenPAI job specification:

  • PaiTask (extends luigi.Task) -- The main abstract task class that users subclass to define jobs. It handles authentication token acquisition, job submission via REST API, status polling, and completion detection. The run() method constructs a PaiJob from the task properties, serializes it to JSON, and submits it to the OpenPAI REST endpoint at /api/v1/jobs. It then polls the job status endpoint until the job reaches a terminal state (SUCCEED or failure).
  • PaiJob -- A data class representing the OpenPAI job definition. It holds the job name, Docker image URL, task roles, and optional configuration for virtual clusters, GPU types, authentication, data/code/output directories, and retry counts.
  • TaskRole -- Defines a single task role within a PaiJob, specifying the command to run, resource requirements (CPU, memory, shared memory, GPU), task count, and optional port mappings.
  • Port -- Defines a port mapping for a TaskRole, with a label, starting port number, and count.
  • OpenPai (extends luigi.Config) -- Configuration class that reads OpenPAI connection settings (pai_url, username, password, expiration) from the Luigi configuration file.

The module requires the requests library for HTTP communication with the OpenPAI REST server and was tested against OpenPAI version 0.7.1.

Usage

Use PaiTask when you need to submit containerized AI training or batch processing jobs to a Microsoft OpenPAI cluster as part of a Luigi pipeline. This is appropriate for distributed machine learning workloads on Azure or on-premise OpenPAI deployments where jobs are defined as Docker containers with specific resource requirements (CPU, GPU, memory). Subclass PaiTask and override the abstract properties name, image, and tasks to define your job specification.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/contrib/pai.py
  • Lines: 1-317

Signature

class PaiJob:
    __slots__ = (
        'jobName', 'image', 'authFile', 'dataDir', 'outputDir', 'codeDir',
        'virtualCluster', 'taskRoles', 'gpuType', 'retryCount'
    )
    def __init__(self, jobName, image, tasks): ...

class Port:
    __slots__ = ('label', 'beginAt', 'portNumber')
    def __init__(self, label, begin_at=0, port_number=1): ...

class TaskRole:
    __slots__ = (
        'name', 'taskNumber', 'cpuNumber', 'memoryMB', 'shmMB', 'gpuNumber',
        'portList', 'command', 'minFailedTaskCount', 'minSucceededTaskCount'
    )
    def __init__(self, name, command, taskNumber=1, cpuNumber=1,
                 memoryMB=2048, shmMB=64, gpuNumber=0, portList=[]): ...

class OpenPai(luigi.Config):
    pai_url = luigi.Parameter(default='http://127.0.0.1:9186')
    username = luigi.Parameter(default='admin')
    password = luigi.Parameter(default=None)
    expiration = luigi.IntParameter(default=3600)

class PaiTask(luigi.Task):
    # Abstract properties (must override):
    @property
    def name(self): ...          # str: unique job name
    @property
    def image(self): ...         # str: Docker image URL
    @property
    def tasks(self): ...         # list[TaskRole]: task roles

    # Optional properties:
    @property
    def auth_file_path(self): ...    # str or None: HDFS auth file
    @property
    def data_dir(self): ...          # str or None: HDFS data directory
    @property
    def code_dir(self): ...          # str or None: HDFS code directory
    @property
    def output_dir(self): ...        # str: HDFS output directory
    @property
    def virtual_cluster(self): ...   # str: virtual cluster name (default: 'default')
    @property
    def gpu_type(self): ...          # str or None: GPU type constraint
    @property
    def retry_count(self): ...       # int: retry count (default: 0)

    def run(self): ...
    def output(self): ...
    def complete(self): ...

Import

from luigi.contrib.pai import PaiTask, PaiJob, TaskRole, Port, OpenPai

I/O Contract

Inputs

Name Type Required Description
name str Yes Unique name for the OpenPAI job; must be unique across the cluster
image str Yes URL pointing to the Docker image for all tasks in the job
tasks list[TaskRole] Yes List of TaskRole objects defining the work to be performed; at least one required
auth_file_path str No Path to Docker registry authentication file on HDFS
data_dir str No Data directory existing on HDFS
code_dir str No Code directory on HDFS (should be less than 200MB)
output_dir str No Output directory on HDFS; defaults to $PAI_DEFAULT_FS_URI/{name}/output
virtual_cluster str No Virtual cluster to run on; defaults to default
gpu_type str No GPU type constraint for task scheduling
retry_count int No Number of job retries; defaults to 0

Outputs

Name Type Description
output HdfsTarget HDFS target at the configured output directory path
job_status str Terminal job state: SUCCEED on success; raises RuntimeError on failure

Usage Examples

Basic Usage

from luigi.contrib.pai import PaiTask, TaskRole

class MyTrainingJob(PaiTask):

    @property
    def name(self):
        return 'sklearn-training-job'

    @property
    def image(self):
        return 'openpai/pai.example.sklearn'

    @property
    def tasks(self):
        return [
            TaskRole(
                name='worker',
                command='python train.py --epochs 100',
                taskNumber=1,
                cpuNumber=4,
                memoryMB=8192,
                gpuNumber=1
            )
        ]

    @property
    def data_dir(self):
        return '/data/training'

    @property
    def code_dir(self):
        return '/code/sklearn-example'

    @property
    def virtual_cluster(self):
        return 'gpu-cluster'

    @property
    def gpu_type(self):
        return 'K80'

Multi-Role Job

from luigi.contrib.pai import PaiTask, TaskRole, Port

class DistributedTrainingJob(PaiTask):

    @property
    def name(self):
        return 'distributed-tf-job'

    @property
    def image(self):
        return 'myregistry/tensorflow:latest'

    @property
    def tasks(self):
        return [
            TaskRole(
                name='ps',
                command='python ps_server.py',
                taskNumber=2,
                cpuNumber=4,
                memoryMB=4096,
                gpuNumber=0
            ),
            TaskRole(
                name='worker',
                command='python worker.py',
                taskNumber=4,
                cpuNumber=8,
                memoryMB=16384,
                gpuNumber=2,
                portList=[Port(label='http', begin_at=8080, port_number=1)]
            ),
        ]

    @property
    def retry_count(self):
        return 3

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment