Heuristic:Spotify Luigi Dynamic Requirements Generator
| Knowledge Sources | |
|---|---|
| Domains | Pipeline_Framework, Optimization |
| Last Updated | 2026-02-10 07:00 GMT |
Overview
Pattern for discovering task dependencies at runtime using Python generators (`yield`) in the `run()` method.
Description
Luigi's standard `requires()` method declares static dependencies known before task execution. However, some pipelines need dynamic dependencies where the set of required tasks depends on the output of an upstream task. Luigi supports this by allowing `run()` to be a generator: yielding a task or list of tasks pauses execution until those tasks complete, then resumes the generator. For large numbers of dynamic dependencies, the `DynamicRequirements` wrapper with a `custom_complete` function avoids filesystem thrashing.
Usage
Use dynamic requirements when the dependency graph cannot be known at declaration time. Common scenarios include:
- A configuration task produces a list of items that each need separate processing
- A discovery task finds files/records that each need individual handling
- Fan-out patterns where the number of parallel tasks depends on data
The Insight (Rule of Thumb)
- Action: Make `run()` a generator and `yield` task instances (or lists of tasks) to create dynamic dependencies. For large sets (100+ tasks), wrap in `DynamicRequirements` with `custom_complete`.
- Value: Enables data-driven pipeline structures that cannot be expressed with static `requires()`.
- Trade-off: Dynamic requirements are resolved at runtime, so the full dependency graph is not visible upfront in the scheduler. Each `yield` creates a synchronization point.
Reasoning
Static `requires()` is evaluated before `run()` executes, so it cannot depend on the output of other tasks. The generator-based pattern solves this by interleaving execution with dependency resolution: the worker runs the task until it yields, schedules the yielded dependencies, waits for their completion, then resumes the generator.
For large dynamic dependency sets, the default completeness check calls `task.complete()` for every yielded task individually. The `DynamicRequirements` class with `custom_complete` allows batching these checks (e.g., listing a directory once instead of checking each file individually):
def custom_complete(complete_fn):
if not complete_fn(data_dependent_deps[0]):
return False
paths = [task.output().path for task in data_dependent_deps]
basenames = os.listdir(os.path.dirname(paths[0])) # single fs call
return all(os.path.basename(path) in basenames for path in paths)
Code Evidence
Dynamic requirements example from `examples/dynamic_requirements.py:67-107`:
class Dynamic(luigi.Task):
seed = luigi.IntParameter(default=1)
def run(self):
# Static-style dependency via yield
config = self.clone(Configuration)
yield config
with config.output().open() as f:
data = [int(x) for x in f.read().split(',')]
# Data-dependent dependencies
data_dependent_deps = [Data(magic_number=x) for x in data]
yield data_dependent_deps
with self.output().open('w') as f:
f.write('Tada!')
# Optimized completeness check for large sets
yield luigi.DynamicRequirements(data_dependent_deps, custom_complete)
DynamicRequirements class from `luigi/task.py:780-858`:
class DynamicRequirements(object):
"""
Wraps dynamic requirements yielded in tasks's run methods to control
how completeness checks of large chunks of tasks are performed.
"""
def __init__(self, requirements, custom_complete=None):
self.requirements = requirements
self.custom_complete = custom_complete
def complete(self, complete_fn=None):
if complete_fn is None:
def complete_fn(task):
return task.complete()
if self.custom_complete:
return self.custom_complete(complete_fn)
return all(complete_fn(t) for t in self.flat_requirements)