Implementation:Spotify Luigi PaiTask
| Knowledge Sources | |
|---|---|
| Domains | Cloud_Computing, Microsoft_Azure |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
PaiTask is a Luigi Task subclass that submits and monitors jobs on Microsoft OpenPAI clusters via REST API, enabling distributed AI model training and resource management on Azure-based infrastructure.
Description
The PaiTask module provides a complete integration between Luigi workflows and Microsoft OpenPAI, an open source platform for AI model training and resource management. The module defines several classes that map directly to the OpenPAI job specification:
- PaiTask (extends luigi.Task) -- The main abstract task class that users subclass to define jobs. It handles authentication token acquisition, job submission via REST API, status polling, and completion detection. The run() method constructs a PaiJob from the task properties, serializes it to JSON, and submits it to the OpenPAI REST endpoint at /api/v1/jobs. It then polls the job status endpoint until the job reaches a terminal state (SUCCEED or failure).
- PaiJob -- A data class representing the OpenPAI job definition. It holds the job name, Docker image URL, task roles, and optional configuration for virtual clusters, GPU types, authentication, data/code/output directories, and retry counts.
- TaskRole -- Defines a single task role within a PaiJob, specifying the command to run, resource requirements (CPU, memory, shared memory, GPU), task count, and optional port mappings.
- Port -- Defines a port mapping for a TaskRole, with a label, starting port number, and count.
- OpenPai (extends luigi.Config) -- Configuration class that reads OpenPAI connection settings (pai_url, username, password, expiration) from the Luigi configuration file.
The module requires the requests library for HTTP communication with the OpenPAI REST server and was tested against OpenPAI version 0.7.1.
Usage
Use PaiTask when you need to submit containerized AI training or batch processing jobs to a Microsoft OpenPAI cluster as part of a Luigi pipeline. This is appropriate for distributed machine learning workloads on Azure or on-premise OpenPAI deployments where jobs are defined as Docker containers with specific resource requirements (CPU, GPU, memory). Subclass PaiTask and override the abstract properties name, image, and tasks to define your job specification.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/contrib/pai.py
- Lines: 1-317
Signature
class PaiJob:
__slots__ = (
'jobName', 'image', 'authFile', 'dataDir', 'outputDir', 'codeDir',
'virtualCluster', 'taskRoles', 'gpuType', 'retryCount'
)
def __init__(self, jobName, image, tasks): ...
class Port:
__slots__ = ('label', 'beginAt', 'portNumber')
def __init__(self, label, begin_at=0, port_number=1): ...
class TaskRole:
__slots__ = (
'name', 'taskNumber', 'cpuNumber', 'memoryMB', 'shmMB', 'gpuNumber',
'portList', 'command', 'minFailedTaskCount', 'minSucceededTaskCount'
)
def __init__(self, name, command, taskNumber=1, cpuNumber=1,
memoryMB=2048, shmMB=64, gpuNumber=0, portList=[]): ...
class OpenPai(luigi.Config):
pai_url = luigi.Parameter(default='http://127.0.0.1:9186')
username = luigi.Parameter(default='admin')
password = luigi.Parameter(default=None)
expiration = luigi.IntParameter(default=3600)
class PaiTask(luigi.Task):
# Abstract properties (must override):
@property
def name(self): ... # str: unique job name
@property
def image(self): ... # str: Docker image URL
@property
def tasks(self): ... # list[TaskRole]: task roles
# Optional properties:
@property
def auth_file_path(self): ... # str or None: HDFS auth file
@property
def data_dir(self): ... # str or None: HDFS data directory
@property
def code_dir(self): ... # str or None: HDFS code directory
@property
def output_dir(self): ... # str: HDFS output directory
@property
def virtual_cluster(self): ... # str: virtual cluster name (default: 'default')
@property
def gpu_type(self): ... # str or None: GPU type constraint
@property
def retry_count(self): ... # int: retry count (default: 0)
def run(self): ...
def output(self): ...
def complete(self): ...
Import
from luigi.contrib.pai import PaiTask, PaiJob, TaskRole, Port, OpenPai
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| name | str | Yes | Unique name for the OpenPAI job; must be unique across the cluster |
| image | str | Yes | URL pointing to the Docker image for all tasks in the job |
| tasks | list[TaskRole] | Yes | List of TaskRole objects defining the work to be performed; at least one required |
| auth_file_path | str | No | Path to Docker registry authentication file on HDFS |
| data_dir | str | No | Data directory existing on HDFS |
| code_dir | str | No | Code directory on HDFS (should be less than 200MB) |
| output_dir | str | No | Output directory on HDFS; defaults to $PAI_DEFAULT_FS_URI/{name}/output |
| virtual_cluster | str | No | Virtual cluster to run on; defaults to default |
| gpu_type | str | No | GPU type constraint for task scheduling |
| retry_count | int | No | Number of job retries; defaults to 0 |
Outputs
| Name | Type | Description |
|---|---|---|
| output | HdfsTarget | HDFS target at the configured output directory path |
| job_status | str | Terminal job state: SUCCEED on success; raises RuntimeError on failure |
Usage Examples
Basic Usage
from luigi.contrib.pai import PaiTask, TaskRole
class MyTrainingJob(PaiTask):
@property
def name(self):
return 'sklearn-training-job'
@property
def image(self):
return 'openpai/pai.example.sklearn'
@property
def tasks(self):
return [
TaskRole(
name='worker',
command='python train.py --epochs 100',
taskNumber=1,
cpuNumber=4,
memoryMB=8192,
gpuNumber=1
)
]
@property
def data_dir(self):
return '/data/training'
@property
def code_dir(self):
return '/code/sklearn-example'
@property
def virtual_cluster(self):
return 'gpu-cluster'
@property
def gpu_type(self):
return 'K80'
Multi-Role Job
from luigi.contrib.pai import PaiTask, TaskRole, Port
class DistributedTrainingJob(PaiTask):
@property
def name(self):
return 'distributed-tf-job'
@property
def image(self):
return 'myregistry/tensorflow:latest'
@property
def tasks(self):
return [
TaskRole(
name='ps',
command='python ps_server.py',
taskNumber=2,
cpuNumber=4,
memoryMB=4096,
gpuNumber=0
),
TaskRole(
name='worker',
command='python worker.py',
taskNumber=4,
cpuNumber=8,
memoryMB=16384,
gpuNumber=2,
portList=[Port(label='http', begin_at=8080, port_number=1)]
),
]
@property
def retry_count(self):
return 3