Implementation:Spotify Luigi RemoteScheduler Worker
Overview
Concrete tool for communication between distributed workers and the central scheduler provided by Luigi.
Description
Luigi's worker-scheduler communication is implemented by two cooperating classes:
RemoteScheduler (luigi.rpc) is a client-side proxy that mirrors the Scheduler interface over HTTP. Every method decorated with @rpc_method() on the Scheduler class is dynamically attached to RemoteScheduler at module load time. When called, these methods serialize their arguments to JSON and POST them to /api/{method_name} on the remote scheduler server. The class handles:
- Automatic retry: Failed HTTP requests are retried using the
tenacitylibrary with configurable attempt count and wait interval. - Multiple transport backends: Uses
requestswith connection pooling when available, falling back tourllib. Supports Unix domain sockets viarequests-unixsocket. - Basic authentication: Credentials embedded in the URL (e.g.,
http://user:pass@host:port) are extracted and sent via HTTP Basic Auth headers.
Worker (luigi.worker) is the task execution engine that communicates with any scheduler (local or remote) through a uniform interface. Key behaviors include:
- Dependency traversal: The
add()method performs a breadth-first traversal of the task dependency graph, checking each task for completion and registering unfinished tasks with the scheduler viaadd_task(). - Work loop: The
run()method repeatedly callsget_work()on the scheduler, executes assigned tasks, and reports results. It manages multiple concurrent task processes up to the configuredworker_processeslimit. - Multiprocessing support: Tasks can be executed in separate processes using Python's
multiprocessingmodule, with results communicated back through amultiprocessing.Queue. - Graceful shutdown: Signal handlers (SIGUSR1) allow workers to stop requesting new work and drain running tasks before exiting.
Usage
Use RemoteScheduler and Worker when:
- You are running Luigi tasks against a central scheduler deployed on a remote server.
- You need multiprocess task execution on a single machine.
- You are building custom Luigi integrations that need programmatic access to the scheduler.
Code Reference
Source Location
| File | Lines | Role |
|---|---|---|
luigi/rpc.py |
L139-206 | RemoteScheduler: HTTP proxy for the central scheduler
|
luigi/worker.py |
L553-616 | Worker.__init__(): worker initialization
|
luigi/worker.py |
L783-827 | Worker.add(): dependency traversal and task registration
|
luigi/worker.py |
L1225-1268 | Worker.run(): main execution loop
|
Signature
# luigi/rpc.py
class RemoteScheduler:
def __init__(self, url='http://localhost:8082/', connect_timeout=None):
"""
Scheduler proxy object. Talks to a RemoteSchedulerResponder.
"""
def close(self): ...
# All @rpc_method() methods from Scheduler are dynamically attached:
# add_task, get_work, ping, prune, etc.
# luigi/worker.py
class Worker:
def __init__(self, scheduler=None, worker_id=None, worker_processes=1,
assistant=False, **kwargs):
"""
Worker object communicates with a scheduler.
Tells the scheduler what it has to do + its dependencies.
Asks for stuff to do (pulls it in a loop and runs it).
"""
def add(self, task, multiprocess=False, processes=0):
"""
Add a Task for the worker to check and possibly schedule and run.
Returns True if task and its dependencies were successfully scheduled.
"""
def run(self):
"""
Returns True if all scheduled tasks were executed successfully.
"""
Import
from luigi.rpc import RemoteScheduler
from luigi.worker import Worker
I/O Contract
Inputs (RemoteScheduler)
| Parameter | Type | Description |
|---|---|---|
url |
str |
Base URL of the remote scheduler (default: http://localhost:8082/).
|
connect_timeout |
float |
HTTP connection timeout in seconds (default from config: 10.0).
|
Inputs (Worker)
| Parameter | Type | Description |
|---|---|---|
scheduler |
Scheduler or RemoteScheduler |
Scheduler instance to communicate with. Defaults to a local Scheduler().
|
worker_id |
str |
Unique identifier for this worker. Auto-generated if not provided. |
worker_processes |
int |
Number of concurrent task execution processes (default: 1).
|
assistant |
bool |
If True, worker runs as an assistant that does not own tasks.
|
Outputs
| Method | Return Type | Description |
|---|---|---|
Worker.add() |
bool |
True if task and all dependencies were successfully registered.
|
Worker.run() |
bool |
True if all scheduled tasks executed successfully.
|
RemoteScheduler._request() |
dict |
Parsed JSON response from the scheduler API. |
Usage Examples
Running Tasks Against a Remote Scheduler
from luigi.rpc import RemoteScheduler
from luigi.worker import Worker
import luigi
class MyTask(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return []
def output(self):
return luigi.LocalTarget('/data/output/%s.csv' % self.date)
def run(self):
with self.output().open('w') as f:
f.write('data for %s' % self.date)
# Connect to a remote scheduler
scheduler = RemoteScheduler(url='http://scheduler.example.com:8082/')
# Create a worker with 4 parallel processes
worker = Worker(scheduler=scheduler, worker_processes=4)
# Register and run a task
task = MyTask(date='2026-02-10')
worker.add(task)
success = worker.run()
print("All tasks succeeded:", success)
Using the Worker as a Context Manager
from luigi.rpc import RemoteScheduler
from luigi.worker import Worker
scheduler = RemoteScheduler(url='http://localhost:8082/')
# The context manager starts a KeepAliveThread for heartbeats
with Worker(scheduler=scheduler, worker_processes=2) as worker:
worker.add(MyTask(date='2026-02-10'))
success = worker.run()
Programmatic Scheduler Interaction
from luigi.rpc import RemoteScheduler
scheduler = RemoteScheduler(url='http://localhost:8082/')
# Directly call scheduler RPC methods
scheduler.add_task(
task_id='MyTask(date=2026-02-10)',
worker='manual-worker-01',
status='PENDING',
runnable=True,
)
# Request work
response = scheduler.get_work(worker='manual-worker-01')
print("Assigned task:", response)