Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi Task Dependencies

From Leeroopedia


Template:Knowledge Source

Overview

Concrete tool for building directed acyclic graphs (DAGs) by declaring task dependencies, provided by Luigi.

Description

Luigi provides several mechanisms for declaring and managing task dependencies:

  • Task.requires() -- The primary method for declaring upstream dependencies. Returns a single task, a list, or a dict of tasks.
  • Task.clone(cls, **kwargs) -- Creates a new task instance of a given class, automatically passing all parameters common to the current task and the target class.
  • WrapperTask -- A special task subclass that is complete when all of its requirements are complete, useful for grouping multiple tasks under a single entry point.
  • @inherits decorator -- Copies parameters from one or more task classes onto the decorated class without Python inheritance.
  • @requires decorator -- Combines @inherits with an auto-generated requires() method that clones the upstream task.
  • @copies decorator -- Combines @requires with an auto-generated run() method that copies input to output line by line.

Usage

Use these dependency mechanisms when:

  • You need to chain tasks into a multi-step pipeline and want Luigi to resolve execution order automatically.
  • You want to avoid repeating upstream parameter declarations in every downstream task (parameter explosion problem).
  • You need a single "umbrella" task that triggers an entire sub-pipeline.

Code Reference

Task.requires() and Task.clone()

Attribute Value
Source Location luigi/task.py, lines 636-648 (requires), lines 542-565 (clone)
Import from luigi import Task
def requires(self):
    """
    The Tasks that this Task depends on.

    A Task will only run if all of the Tasks that it requires are completed.
    If your Task does not require any other Tasks, then you don't need to
    override this method. Otherwise, a subclass can override this method
    to return a single Task, a list of Task instances, or a dict whose
    values are Task instances.
    """
    return []  # default impl

def clone(self, cls=None, **kwargs):
    """
    Creates a new instance from an existing instance where some of the
    args have changed.
    """
    if cls is None:
        cls = self.__class__

    new_k = {}
    for param_name, param_class in cls.get_params():
        if param_name in kwargs:
            new_k[param_name] = kwargs[param_name]
        elif hasattr(self, param_name):
            new_k[param_name] = getattr(self, param_name)

    return cls(**new_k)

WrapperTask

Attribute Value
Source Location luigi/task.py, lines 932-938
Signature class WrapperTask(Task)
Import from luigi import WrapperTask
class WrapperTask(Task):
    """
    Use for tasks that only wrap other tasks and that by definition are done
    if all their requirements exist.
    """
    def complete(self):
        return all(r.complete() for r in flatten(self.requires()))

@inherits decorator

Attribute Value
Source Location luigi/util.py, lines 247-324
Signature class inherits(*tasks_to_inherit, **kw_tasks_to_inherit)
Import from luigi.util import inherits

The decorator copies all Parameter objects from the specified task classes onto the decorated class (unless a parameter of the same name already exists). It also adds clone_parent() and clone_parents() helper methods.

@requires decorator

Attribute Value
Source Location luigi/util.py, lines 327-351
Signature class requires(*tasks_to_require, **kw_tasks_to_require)
Import from luigi.util import requires

Applies @inherits and also attaches a requires() method that returns self.clone_parent() (for a single dependency) or self.clone_parents() (for multiple).

@copies decorator

Attribute Value
Source Location luigi/util.py, lines 354-386
Signature class copies(task_to_copy)
Import from luigi.util import copies

Applies @requires and adds a run() method that copies every line from self.input() to self.output().

I/O Contract

Task.requires()

Direction Name Type Description
Output return value Task, list[Task], or dict[str, Task] The upstream tasks this task depends on.

Task.clone()

Direction Name Type Description
Input cls type (Task subclass) or None The target class to instantiate. Defaults to the current class.
Input **kwargs parameter overrides Explicit parameter values that override the cloned values.
Output return value Task A new instance of cls with shared parameters copied.

WrapperTask.complete()

Direction Name Type Description
Output return value bool True if every required task is complete.

Usage Examples

Basic dependency chain with requires()

import luigi


class ExtractData(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('/data/raw/%s.csv' % self.date)

    def run(self):
        with self.output().open('w') as f:
            f.write("id,value\n1,100\n2,200\n")


class TransformData(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return ExtractData(date=self.date)

    def output(self):
        return luigi.LocalTarget('/data/clean/%s.csv' % self.date)

    def run(self):
        with self.input().open('r') as infile:
            lines = infile.readlines()
        with self.output().open('w') as outfile:
            for line in lines:
                outfile.write(line.upper())


class LoadData(luigi.Task):
    date = luigi.DateParameter()

    def requires(self):
        return TransformData(date=self.date)

    def output(self):
        return luigi.LocalTarget('/data/final/%s.done' % self.date)

    def run(self):
        with self.output().open('w') as f:
            f.write("loaded\n")

Eliminating parameter explosion with @inherits and clone

import luigi
from luigi.util import inherits


class TaskA(luigi.ExternalTask):
    param_a = luigi.Parameter()

    def output(self):
        return luigi.LocalTarget('/tmp/log-%s' % self.param_a)


@inherits(TaskA)
class TaskB(luigi.Task):
    param_b = luigi.Parameter()

    def requires(self):
        # clone automatically passes param_a (the shared parameter)
        return self.clone(TaskA)

    def output(self):
        return luigi.LocalTarget('/tmp/result-%s-%s' % (self.param_a, self.param_b))

    def run(self):
        with self.input().open('r') as infile:
            data = infile.read()
        with self.output().open('w') as outfile:
            outfile.write(data)

Using @requires for zero-boilerplate chaining

import luigi
from luigi.util import requires


class Upstream(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('/data/upstream/%s.txt' % self.date)

    def run(self):
        with self.output().open('w') as f:
            f.write("upstream data\n")


@requires(Upstream)
class Downstream(luigi.Task):
    extra_param = luigi.Parameter(default='default')

    # requires() is auto-generated: returns self.clone(Upstream)
    # The 'date' parameter is inherited automatically from Upstream.

    def output(self):
        return luigi.LocalTarget('/data/downstream/%s.txt' % self.date)

    def run(self):
        with self.input().open('r') as infile:
            data = infile.read()
        with self.output().open('w') as outfile:
            outfile.write(data + self.extra_param)

WrapperTask as a pipeline entry point

import luigi


class AllReports(luigi.WrapperTask):
    date = luigi.DateParameter()

    def requires(self):
        return [
            SalesReport(date=self.date),
            InventoryReport(date=self.date),
            UserReport(date=self.date),
        ]

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment