Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi Luigi Build Run

From Leeroopedia


Template:Knowledge Source

Overview

Concrete tool for triggering pipeline execution programmatically or via the command line, provided by Luigi.

Description

Luigi provides three main entry points for pipeline execution:

  • luigi.build() -- The programmatic API. Accepts a list of instantiated task objects and runs them with a scheduler and worker, bypassing command-line parsing. This is the recommended way to run Luigi pipelines from Python code, tests, or notebooks.
  • luigi.run() -- The command-line-oriented API. Parses sys.argv (or provided arguments) to determine which task to run, then schedules and executes it. Used internally by the luigi CLI.
  • luigi_run() -- The actual CLI entry point registered as the luigi console script. Delegates to run_with_retcodes() which wraps luigi.run() with return code handling.

Behind the scenes, all three entry points converge on _schedule_and_run(), which creates a scheduler (local or remote), instantiates a Worker, adds all tasks to the worker (triggering recursive dependency resolution), and then starts the worker's run loop.

The Worker class is the execution engine. It communicates with the scheduler to discover tasks, checks completeness, dispatches task execution (in-process or via multiprocessing), handles results, and manages the keep-alive heartbeat.

Usage

Use these entry points when:

  • luigi.build() -- Running from Python scripts, unit tests, Jupyter notebooks, or embedded within larger applications.
  • luigi CLI -- Running from cron jobs, CI/CD pipelines, or shell scripts.

Code Reference

luigi.build()

Attribute Value
Source Location luigi/interface.py, lines 220-244
Signature def build(tasks, worker_scheduler_factory=None, detailed_summary=False, **env_params)
Import import luigi; luigi.build(...)
def build(tasks, worker_scheduler_factory=None, detailed_summary=False, **env_params):
    """
    Run internally, bypassing the cmdline parsing.

    Useful if you have some luigi code that you want to run internally.
    Example:

        luigi.build([MyTask1(), MyTask2()], local_scheduler=True)

    One notable difference is that `build` defaults to not using
    the identical process lock. Otherwise, `build` would only be
    callable once from each process.

    :param tasks:
    :param worker_scheduler_factory:
    :param env_params:
    :return: True if there were no scheduling errors, even if tasks may fail.
    """
    if "no_lock" not in env_params:
        env_params["no_lock"] = True

    luigi_run_result = _schedule_and_run(
        tasks, worker_scheduler_factory, override_defaults=env_params
    )
    return luigi_run_result if detailed_summary else luigi_run_result.scheduling_succeeded

luigi.run()

Attribute Value
Source Location luigi/interface.py, lines 192-201
Signature def run(*args, **kwargs)
Import import luigi; luigi.run(...)
def run(*args, **kwargs):
    """
    Please dont use. Instead use `luigi` binary.

    Run from cmdline using argparse.
    """
    luigi_run_result = _run(*args, **kwargs)
    return luigi_run_result if kwargs.get('detailed_summary') else luigi_run_result.scheduling_succeeded

The internal _run() function (lines 204-217) handles argument parsing:

def _run(cmdline_args=None, main_task_cls=None,
         worker_scheduler_factory=None, use_dynamic_argparse=None,
         local_scheduler=False, detailed_summary=False):
    if cmdline_args is None:
        cmdline_args = sys.argv[1:]
    if main_task_cls:
        cmdline_args.insert(0, main_task_cls.task_family)
    if local_scheduler:
        cmdline_args.append('--local-scheduler')
    with CmdlineParser.global_instance(cmdline_args) as cp:
        return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)

luigi_run() (CLI entry point)

Attribute Value
Source Location luigi/cmdline.py, lines 8-9
Signature def luigi_run(argv=sys.argv[1:])
Import Registered as console script luigi; calls run_with_retcodes(argv)
def luigi_run(argv=sys.argv[1:]):
    run_with_retcodes(argv)

Worker class

Attribute Value
Source Location luigi/worker.py, lines 553-616
Signature class Worker
Constructor def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs)
Import from luigi.worker import Worker (internal; typically not imported directly)
class Worker:
    """
    Worker object communicates with a scheduler.

    Simple class that talks to a scheduler and:
    * tells the scheduler what it has to do + its dependencies
    * asks for stuff to do (pulls it in a loop and runs it)
    """

    def __init__(self, scheduler=None, worker_id=None,
                 worker_processes=1, assistant=False, **kwargs):
        if scheduler is None:
            scheduler = Scheduler()
        self.worker_processes = int(worker_processes)
        self._config = worker(**kwargs)
        ...

Key Worker methods:

  • add(task) -- Recursively resolves dependencies, checks completeness, and registers tasks with the scheduler.
  • run() -- The main execution loop. Repeatedly asks the scheduler for work, executes tasks, and reports results. Returns True if all tasks succeeded.

I/O Contract

luigi.build()

Direction Name Type Description
Input tasks list[Task] Instantiated task objects to schedule and run.
Input local_scheduler bool (via **env_params) If True, use an in-memory scheduler instead of connecting to a remote one.
Input workers int (via **env_params) Number of parallel worker processes. Default: 1.
Input detailed_summary bool If True, return a LuigiRunResult object instead of a boolean.
Output return value bool or LuigiRunResult True if scheduling succeeded; or a detailed result object.

luigi.run()

Direction Name Type Description
Input cmdline_args list[str] or None Command-line arguments. Defaults to sys.argv[1:].
Input main_task_cls Task class or None If provided, prepends the task family to cmdline_args.
Input local_scheduler bool If True, appends --local-scheduler.
Output return value bool or LuigiRunResult True if scheduling succeeded.

Usage Examples

Programmatic execution with luigi.build()

import luigi
import datetime


class MyTask(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('/tmp/output/%s.txt' % self.date)

    def run(self):
        with self.output().open('w') as f:
            f.write("Hello from %s\n" % self.date)


# Run the pipeline programmatically with a local scheduler
success = luigi.build(
    [MyTask(date=datetime.date(2026, 1, 15))],
    local_scheduler=True,
    workers=2,
)
print("Pipeline succeeded:", success)

Detailed summary for programmatic execution

import luigi

result = luigi.build(
    [MyTask(date=datetime.date(2026, 1, 15))],
    local_scheduler=True,
    detailed_summary=True,
)
print(result.summary_text)
print("Scheduling succeeded:", result.scheduling_succeeded)

Command-line execution

# Run a task via the luigi CLI
luigi --module my_tasks MyTask --date 2026-01-15 --local-scheduler

# Run with multiple workers
luigi --module my_tasks MyTask --date 2026-01-15 --workers 4 --local-scheduler

Using luigi.run() from a script (as in the hello_world example)

import luigi


class HelloWorldTask(luigi.Task):
    task_namespace = 'examples'

    def run(self):
        print("{task} says: Hello world!".format(task=self.__class__.__name__))


if __name__ == '__main__':
    luigi.run(
        ['examples.HelloWorldTask', '--workers', '1', '--local-scheduler']
    )

Running multiple root tasks

import luigi
import datetime


# Build multiple independent tasks in a single pipeline run
success = luigi.build(
    [
        DailyReport(date=datetime.date(2026, 1, 13)),
        DailyReport(date=datetime.date(2026, 1, 14)),
        DailyReport(date=datetime.date(2026, 1, 15)),
    ],
    local_scheduler=True,
    workers=3,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment