Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi RemoteScheduler Worker

From Leeroopedia


Template:Metadata

Overview

Concrete tool for communication between distributed workers and the central scheduler provided by Luigi.

Description

Luigi's worker-scheduler communication is implemented by two cooperating classes:

RemoteScheduler (luigi.rpc) is a client-side proxy that mirrors the Scheduler interface over HTTP. Every method decorated with @rpc_method() on the Scheduler class is dynamically attached to RemoteScheduler at module load time. When called, these methods serialize their arguments to JSON and POST them to /api/{method_name} on the remote scheduler server. The class handles:

  • Automatic retry: Failed HTTP requests are retried using the tenacity library with configurable attempt count and wait interval.
  • Multiple transport backends: Uses requests with connection pooling when available, falling back to urllib. Supports Unix domain sockets via requests-unixsocket.
  • Basic authentication: Credentials embedded in the URL (e.g., http://user:pass@host:port) are extracted and sent via HTTP Basic Auth headers.

Worker (luigi.worker) is the task execution engine that communicates with any scheduler (local or remote) through a uniform interface. Key behaviors include:

  • Dependency traversal: The add() method performs a breadth-first traversal of the task dependency graph, checking each task for completion and registering unfinished tasks with the scheduler via add_task().
  • Work loop: The run() method repeatedly calls get_work() on the scheduler, executes assigned tasks, and reports results. It manages multiple concurrent task processes up to the configured worker_processes limit.
  • Multiprocessing support: Tasks can be executed in separate processes using Python's multiprocessing module, with results communicated back through a multiprocessing.Queue.
  • Graceful shutdown: Signal handlers (SIGUSR1) allow workers to stop requesting new work and drain running tasks before exiting.

Usage

Use RemoteScheduler and Worker when:

  • You are running Luigi tasks against a central scheduler deployed on a remote server.
  • You need multiprocess task execution on a single machine.
  • You are building custom Luigi integrations that need programmatic access to the scheduler.

Code Reference

Source Location

File Lines Role
luigi/rpc.py L139-206 RemoteScheduler: HTTP proxy for the central scheduler
luigi/worker.py L553-616 Worker.__init__(): worker initialization
luigi/worker.py L783-827 Worker.add(): dependency traversal and task registration
luigi/worker.py L1225-1268 Worker.run(): main execution loop

Signature

# luigi/rpc.py
class RemoteScheduler:
    def __init__(self, url='http://localhost:8082/', connect_timeout=None):
        """
        Scheduler proxy object. Talks to a RemoteSchedulerResponder.
        """

    def close(self): ...

    # All @rpc_method() methods from Scheduler are dynamically attached:
    # add_task, get_work, ping, prune, etc.

# luigi/worker.py
class Worker:
    def __init__(self, scheduler=None, worker_id=None, worker_processes=1,
                 assistant=False, **kwargs):
        """
        Worker object communicates with a scheduler.
        Tells the scheduler what it has to do + its dependencies.
        Asks for stuff to do (pulls it in a loop and runs it).
        """

    def add(self, task, multiprocess=False, processes=0):
        """
        Add a Task for the worker to check and possibly schedule and run.
        Returns True if task and its dependencies were successfully scheduled.
        """

    def run(self):
        """
        Returns True if all scheduled tasks were executed successfully.
        """

Import

from luigi.rpc import RemoteScheduler
from luigi.worker import Worker

I/O Contract

Inputs (RemoteScheduler)

Parameter Type Description
url str Base URL of the remote scheduler (default: http://localhost:8082/).
connect_timeout float HTTP connection timeout in seconds (default from config: 10.0).

Inputs (Worker)

Parameter Type Description
scheduler Scheduler or RemoteScheduler Scheduler instance to communicate with. Defaults to a local Scheduler().
worker_id str Unique identifier for this worker. Auto-generated if not provided.
worker_processes int Number of concurrent task execution processes (default: 1).
assistant bool If True, worker runs as an assistant that does not own tasks.

Outputs

Method Return Type Description
Worker.add() bool True if task and all dependencies were successfully registered.
Worker.run() bool True if all scheduled tasks executed successfully.
RemoteScheduler._request() dict Parsed JSON response from the scheduler API.

Usage Examples

Running Tasks Against a Remote Scheduler

from luigi.rpc import RemoteScheduler
from luigi.worker import Worker
import luigi

class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return []

    def output(self):
        return luigi.LocalTarget('/data/output/%s.csv' % self.date)

    def run(self):
        with self.output().open('w') as f:
            f.write('data for %s' % self.date)

# Connect to a remote scheduler
scheduler = RemoteScheduler(url='http://scheduler.example.com:8082/')

# Create a worker with 4 parallel processes
worker = Worker(scheduler=scheduler, worker_processes=4)

# Register and run a task
task = MyTask(date='2026-02-10')
worker.add(task)
success = worker.run()
print("All tasks succeeded:", success)

Using the Worker as a Context Manager

from luigi.rpc import RemoteScheduler
from luigi.worker import Worker

scheduler = RemoteScheduler(url='http://localhost:8082/')

# The context manager starts a KeepAliveThread for heartbeats
with Worker(scheduler=scheduler, worker_processes=2) as worker:
    worker.add(MyTask(date='2026-02-10'))
    success = worker.run()

Programmatic Scheduler Interaction

from luigi.rpc import RemoteScheduler

scheduler = RemoteScheduler(url='http://localhost:8082/')

# Directly call scheduler RPC methods
scheduler.add_task(
    task_id='MyTask(date=2026-02-10)',
    worker='manual-worker-01',
    status='PENDING',
    runnable=True,
)

# Request work
response = scheduler.get_work(worker='manual-worker-01')
print("Assigned task:", response)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment