Template:Knowledge Source
Overview
Concrete tool for building directed acyclic graphs (DAGs) by declaring task dependencies, provided by Luigi.
Description
Luigi provides several mechanisms for declaring and managing task dependencies:
Task.requires() -- The primary method for declaring upstream dependencies. Returns a single task, a list, or a dict of tasks.
Task.clone(cls, **kwargs) -- Creates a new task instance of a given class, automatically passing all parameters common to the current task and the target class.
WrapperTask -- A special task subclass that is complete when all of its requirements are complete, useful for grouping multiple tasks under a single entry point.
@inherits decorator -- Copies parameters from one or more task classes onto the decorated class without Python inheritance.
@requires decorator -- Combines @inherits with an auto-generated requires() method that clones the upstream task.
@copies decorator -- Combines @requires with an auto-generated run() method that copies input to output line by line.
Usage
Use these dependency mechanisms when:
- You need to chain tasks into a multi-step pipeline and want Luigi to resolve execution order automatically.
- You want to avoid repeating upstream parameter declarations in every downstream task (parameter explosion problem).
- You need a single "umbrella" task that triggers an entire sub-pipeline.
Code Reference
Task.requires() and Task.clone()
| Attribute |
Value
|
| Source Location |
luigi/task.py, lines 636-648 (requires), lines 542-565 (clone)
|
| Import |
from luigi import Task
|
def requires(self):
"""
The Tasks that this Task depends on.
A Task will only run if all of the Tasks that it requires are completed.
If your Task does not require any other Tasks, then you don't need to
override this method. Otherwise, a subclass can override this method
to return a single Task, a list of Task instances, or a dict whose
values are Task instances.
"""
return [] # default impl
def clone(self, cls=None, **kwargs):
"""
Creates a new instance from an existing instance where some of the
args have changed.
"""
if cls is None:
cls = self.__class__
new_k = {}
for param_name, param_class in cls.get_params():
if param_name in kwargs:
new_k[param_name] = kwargs[param_name]
elif hasattr(self, param_name):
new_k[param_name] = getattr(self, param_name)
return cls(**new_k)
WrapperTask
| Attribute |
Value
|
| Source Location |
luigi/task.py, lines 932-938
|
| Signature |
class WrapperTask(Task)
|
| Import |
from luigi import WrapperTask
|
class WrapperTask(Task):
"""
Use for tasks that only wrap other tasks and that by definition are done
if all their requirements exist.
"""
def complete(self):
return all(r.complete() for r in flatten(self.requires()))
@inherits decorator
| Attribute |
Value
|
| Source Location |
luigi/util.py, lines 247-324
|
| Signature |
class inherits(*tasks_to_inherit, **kw_tasks_to_inherit)
|
| Import |
from luigi.util import inherits
|
The decorator copies all Parameter objects from the specified task classes onto the decorated class (unless a parameter of the same name already exists). It also adds clone_parent() and clone_parents() helper methods.
@requires decorator
| Attribute |
Value
|
| Source Location |
luigi/util.py, lines 327-351
|
| Signature |
class requires(*tasks_to_require, **kw_tasks_to_require)
|
| Import |
from luigi.util import requires
|
Applies @inherits and also attaches a requires() method that returns self.clone_parent() (for a single dependency) or self.clone_parents() (for multiple).
@copies decorator
| Attribute |
Value
|
| Source Location |
luigi/util.py, lines 354-386
|
| Signature |
class copies(task_to_copy)
|
| Import |
from luigi.util import copies
|
Applies @requires and adds a run() method that copies every line from self.input() to self.output().
I/O Contract
Task.requires()
| Direction |
Name |
Type |
Description
|
| Output |
return value |
Task, list[Task], or dict[str, Task] |
The upstream tasks this task depends on.
|
Task.clone()
| Direction |
Name |
Type |
Description
|
| Input |
cls |
type (Task subclass) or None |
The target class to instantiate. Defaults to the current class.
|
| Input |
**kwargs |
parameter overrides |
Explicit parameter values that override the cloned values.
|
| Output |
return value |
Task |
A new instance of cls with shared parameters copied.
|
WrapperTask.complete()
| Direction |
Name |
Type |
Description
|
| Output |
return value |
bool |
True if every required task is complete.
|
Usage Examples
Basic dependency chain with requires()
import luigi
class ExtractData(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget('/data/raw/%s.csv' % self.date)
def run(self):
with self.output().open('w') as f:
f.write("id,value\n1,100\n2,200\n")
class TransformData(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return ExtractData(date=self.date)
def output(self):
return luigi.LocalTarget('/data/clean/%s.csv' % self.date)
def run(self):
with self.input().open('r') as infile:
lines = infile.readlines()
with self.output().open('w') as outfile:
for line in lines:
outfile.write(line.upper())
class LoadData(luigi.Task):
date = luigi.DateParameter()
def requires(self):
return TransformData(date=self.date)
def output(self):
return luigi.LocalTarget('/data/final/%s.done' % self.date)
def run(self):
with self.output().open('w') as f:
f.write("loaded\n")
Eliminating parameter explosion with @inherits and clone
import luigi
from luigi.util import inherits
class TaskA(luigi.ExternalTask):
param_a = luigi.Parameter()
def output(self):
return luigi.LocalTarget('/tmp/log-%s' % self.param_a)
@inherits(TaskA)
class TaskB(luigi.Task):
param_b = luigi.Parameter()
def requires(self):
# clone automatically passes param_a (the shared parameter)
return self.clone(TaskA)
def output(self):
return luigi.LocalTarget('/tmp/result-%s-%s' % (self.param_a, self.param_b))
def run(self):
with self.input().open('r') as infile:
data = infile.read()
with self.output().open('w') as outfile:
outfile.write(data)
Using @requires for zero-boilerplate chaining
import luigi
from luigi.util import requires
class Upstream(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget('/data/upstream/%s.txt' % self.date)
def run(self):
with self.output().open('w') as f:
f.write("upstream data\n")
@requires(Upstream)
class Downstream(luigi.Task):
extra_param = luigi.Parameter(default='default')
# requires() is auto-generated: returns self.clone(Upstream)
# The 'date' parameter is inherited automatically from Upstream.
def output(self):
return luigi.LocalTarget('/data/downstream/%s.txt' % self.date)
def run(self):
with self.input().open('r') as infile:
data = infile.read()
with self.output().open('w') as outfile:
outfile.write(data + self.extra_param)
WrapperTask as a pipeline entry point
import luigi
class AllReports(luigi.WrapperTask):
date = luigi.DateParameter()
def requires(self):
return [
SalesReport(date=self.date),
InventoryReport(date=self.date),
UserReport(date=self.date),
]
Related Pages