Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Heuristic:Spotify Luigi Dynamic Requirements Generator

From Leeroopedia



Knowledge Sources
Domains Pipeline_Framework, Optimization
Last Updated 2026-02-10 07:00 GMT

Overview

Pattern for discovering task dependencies at runtime using Python generators (`yield`) in the `run()` method.

Description

Luigi's standard `requires()` method declares static dependencies known before task execution. However, some pipelines need dynamic dependencies where the set of required tasks depends on the output of an upstream task. Luigi supports this by allowing `run()` to be a generator: yielding a task or list of tasks pauses execution until those tasks complete, then resumes the generator. For large numbers of dynamic dependencies, the `DynamicRequirements` wrapper with a `custom_complete` function avoids filesystem thrashing.

Usage

Use dynamic requirements when the dependency graph cannot be known at declaration time. Common scenarios include:

  • A configuration task produces a list of items that each need separate processing
  • A discovery task finds files/records that each need individual handling
  • Fan-out patterns where the number of parallel tasks depends on data

The Insight (Rule of Thumb)

  • Action: Make `run()` a generator and `yield` task instances (or lists of tasks) to create dynamic dependencies. For large sets (100+ tasks), wrap in `DynamicRequirements` with `custom_complete`.
  • Value: Enables data-driven pipeline structures that cannot be expressed with static `requires()`.
  • Trade-off: Dynamic requirements are resolved at runtime, so the full dependency graph is not visible upfront in the scheduler. Each `yield` creates a synchronization point.

Reasoning

Static `requires()` is evaluated before `run()` executes, so it cannot depend on the output of other tasks. The generator-based pattern solves this by interleaving execution with dependency resolution: the worker runs the task until it yields, schedules the yielded dependencies, waits for their completion, then resumes the generator.

For large dynamic dependency sets, the default completeness check calls `task.complete()` for every yielded task individually. The `DynamicRequirements` class with `custom_complete` allows batching these checks (e.g., listing a directory once instead of checking each file individually):

def custom_complete(complete_fn):
    if not complete_fn(data_dependent_deps[0]):
        return False
    paths = [task.output().path for task in data_dependent_deps]
    basenames = os.listdir(os.path.dirname(paths[0]))  # single fs call
    return all(os.path.basename(path) in basenames for path in paths)

Code Evidence

Dynamic requirements example from `examples/dynamic_requirements.py:67-107`:

class Dynamic(luigi.Task):
    seed = luigi.IntParameter(default=1)

    def run(self):
        # Static-style dependency via yield
        config = self.clone(Configuration)
        yield config

        with config.output().open() as f:
            data = [int(x) for x in f.read().split(',')]

        # Data-dependent dependencies
        data_dependent_deps = [Data(magic_number=x) for x in data]
        yield data_dependent_deps

        with self.output().open('w') as f:
            f.write('Tada!')

        # Optimized completeness check for large sets
        yield luigi.DynamicRequirements(data_dependent_deps, custom_complete)

DynamicRequirements class from `luigi/task.py:780-858`:

class DynamicRequirements(object):
    """
    Wraps dynamic requirements yielded in tasks's run methods to control
    how completeness checks of large chunks of tasks are performed.
    """

    def __init__(self, requirements, custom_complete=None):
        self.requirements = requirements
        self.custom_complete = custom_complete

    def complete(self, complete_fn=None):
        if complete_fn is None:
            def complete_fn(task):
                return task.complete()

        if self.custom_complete:
            return self.custom_complete(complete_fn)

        return all(complete_fn(t) for t in self.flat_requirements)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment