Implementation:Spotify Luigi Luigi Build Run
Overview
Concrete tool for triggering pipeline execution programmatically or via the command line, provided by Luigi.
Description
Luigi provides three main entry points for pipeline execution:
luigi.build()-- The programmatic API. Accepts a list of instantiated task objects and runs them with a scheduler and worker, bypassing command-line parsing. This is the recommended way to run Luigi pipelines from Python code, tests, or notebooks.luigi.run()-- The command-line-oriented API. Parsessys.argv(or provided arguments) to determine which task to run, then schedules and executes it. Used internally by theluigiCLI.luigi_run()-- The actual CLI entry point registered as theluigiconsole script. Delegates torun_with_retcodes()which wrapsluigi.run()with return code handling.
Behind the scenes, all three entry points converge on _schedule_and_run(), which creates a scheduler (local or remote), instantiates a Worker, adds all tasks to the worker (triggering recursive dependency resolution), and then starts the worker's run loop.
The Worker class is the execution engine. It communicates with the scheduler to discover tasks, checks completeness, dispatches task execution (in-process or via multiprocessing), handles results, and manages the keep-alive heartbeat.
Usage
Use these entry points when:
luigi.build()-- Running from Python scripts, unit tests, Jupyter notebooks, or embedded within larger applications.luigiCLI -- Running from cron jobs, CI/CD pipelines, or shell scripts.
Code Reference
luigi.build()
| Attribute | Value |
|---|---|
| Source Location | luigi/interface.py, lines 220-244
|
| Signature | def build(tasks, worker_scheduler_factory=None, detailed_summary=False, **env_params)
|
| Import | import luigi; luigi.build(...)
|
def build(tasks, worker_scheduler_factory=None, detailed_summary=False, **env_params):
"""
Run internally, bypassing the cmdline parsing.
Useful if you have some luigi code that you want to run internally.
Example:
luigi.build([MyTask1(), MyTask2()], local_scheduler=True)
One notable difference is that `build` defaults to not using
the identical process lock. Otherwise, `build` would only be
callable once from each process.
:param tasks:
:param worker_scheduler_factory:
:param env_params:
:return: True if there were no scheduling errors, even if tasks may fail.
"""
if "no_lock" not in env_params:
env_params["no_lock"] = True
luigi_run_result = _schedule_and_run(
tasks, worker_scheduler_factory, override_defaults=env_params
)
return luigi_run_result if detailed_summary else luigi_run_result.scheduling_succeeded
luigi.run()
| Attribute | Value |
|---|---|
| Source Location | luigi/interface.py, lines 192-201
|
| Signature | def run(*args, **kwargs)
|
| Import | import luigi; luigi.run(...)
|
def run(*args, **kwargs):
"""
Please dont use. Instead use `luigi` binary.
Run from cmdline using argparse.
"""
luigi_run_result = _run(*args, **kwargs)
return luigi_run_result if kwargs.get('detailed_summary') else luigi_run_result.scheduling_succeeded
The internal _run() function (lines 204-217) handles argument parsing:
def _run(cmdline_args=None, main_task_cls=None,
worker_scheduler_factory=None, use_dynamic_argparse=None,
local_scheduler=False, detailed_summary=False):
if cmdline_args is None:
cmdline_args = sys.argv[1:]
if main_task_cls:
cmdline_args.insert(0, main_task_cls.task_family)
if local_scheduler:
cmdline_args.append('--local-scheduler')
with CmdlineParser.global_instance(cmdline_args) as cp:
return _schedule_and_run([cp.get_task_obj()], worker_scheduler_factory)
luigi_run() (CLI entry point)
| Attribute | Value |
|---|---|
| Source Location | luigi/cmdline.py, lines 8-9
|
| Signature | def luigi_run(argv=sys.argv[1:])
|
| Import | Registered as console script luigi; calls run_with_retcodes(argv)
|
def luigi_run(argv=sys.argv[1:]):
run_with_retcodes(argv)
Worker class
| Attribute | Value |
|---|---|
| Source Location | luigi/worker.py, lines 553-616
|
| Signature | class Worker
|
| Constructor | def __init__(self, scheduler=None, worker_id=None, worker_processes=1, assistant=False, **kwargs)
|
| Import | from luigi.worker import Worker (internal; typically not imported directly)
|
class Worker:
"""
Worker object communicates with a scheduler.
Simple class that talks to a scheduler and:
* tells the scheduler what it has to do + its dependencies
* asks for stuff to do (pulls it in a loop and runs it)
"""
def __init__(self, scheduler=None, worker_id=None,
worker_processes=1, assistant=False, **kwargs):
if scheduler is None:
scheduler = Scheduler()
self.worker_processes = int(worker_processes)
self._config = worker(**kwargs)
...
Key Worker methods:
add(task)-- Recursively resolves dependencies, checks completeness, and registers tasks with the scheduler.run()-- The main execution loop. Repeatedly asks the scheduler for work, executes tasks, and reports results. ReturnsTrueif all tasks succeeded.
I/O Contract
luigi.build()
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | tasks |
list[Task] |
Instantiated task objects to schedule and run. |
| Input | local_scheduler |
bool (via **env_params) |
If True, use an in-memory scheduler instead of connecting to a remote one.
|
| Input | workers |
int (via **env_params) |
Number of parallel worker processes. Default: 1. |
| Input | detailed_summary |
bool |
If True, return a LuigiRunResult object instead of a boolean.
|
| Output | return value | bool or LuigiRunResult |
True if scheduling succeeded; or a detailed result object.
|
luigi.run()
| Direction | Name | Type | Description |
|---|---|---|---|
| Input | cmdline_args |
list[str] or None |
Command-line arguments. Defaults to sys.argv[1:].
|
| Input | main_task_cls |
Task class or None |
If provided, prepends the task family to cmdline_args. |
| Input | local_scheduler |
bool |
If True, appends --local-scheduler.
|
| Output | return value | bool or LuigiRunResult |
True if scheduling succeeded.
|
Usage Examples
Programmatic execution with luigi.build()
import luigi
import datetime
class MyTask(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget('/tmp/output/%s.txt' % self.date)
def run(self):
with self.output().open('w') as f:
f.write("Hello from %s\n" % self.date)
# Run the pipeline programmatically with a local scheduler
success = luigi.build(
[MyTask(date=datetime.date(2026, 1, 15))],
local_scheduler=True,
workers=2,
)
print("Pipeline succeeded:", success)
Detailed summary for programmatic execution
import luigi
result = luigi.build(
[MyTask(date=datetime.date(2026, 1, 15))],
local_scheduler=True,
detailed_summary=True,
)
print(result.summary_text)
print("Scheduling succeeded:", result.scheduling_succeeded)
Command-line execution
# Run a task via the luigi CLI
luigi --module my_tasks MyTask --date 2026-01-15 --local-scheduler
# Run with multiple workers
luigi --module my_tasks MyTask --date 2026-01-15 --workers 4 --local-scheduler
Using luigi.run() from a script (as in the hello_world example)
import luigi
class HelloWorldTask(luigi.Task):
task_namespace = 'examples'
def run(self):
print("{task} says: Hello world!".format(task=self.__class__.__name__))
if __name__ == '__main__':
luigi.run(
['examples.HelloWorldTask', '--workers', '1', '--local-scheduler']
)
Running multiple root tasks
import luigi
import datetime
# Build multiple independent tasks in a single pipeline run
success = luigi.build(
[
DailyReport(date=datetime.date(2026, 1, 13)),
DailyReport(date=datetime.date(2026, 1, 14)),
DailyReport(date=datetime.date(2026, 1, 15)),
],
local_scheduler=True,
workers=3,
)