Implementation:Spotify Luigi RangeTask
| Knowledge Sources | |
|---|---|
| Domains | Scheduling, Time_Series |
| Last Updated | 2026-02-10 08:00 GMT |
Overview
RangeDaily, RangeHourly, RangeByMinutes, and RangeMonthly are Luigi WrapperTask subclasses that produce contiguous completed ranges of recurring time-parameterized tasks, automatically detecting and filling gaps using filesystem-based bulk completeness inference.
Description
The Range tools module provides a family of classes for ensuring contiguous completion of recurring, time-parameterized Luigi tasks. This is essential for time-series data pipelines where gaps in processing (due to failures, downtime, or late-arriving data) must be detected and backfilled automatically.
The class hierarchy is organized as follows:
- RangeBase (extends luigi.WrapperTask) -- The abstract base class that implements the core range logic. Its requires() method computes the set of missing datetime instances within the specified range and returns task instances for those gaps (limited by task_limit). Parameters include:
- of (TaskParameter) -- The recurring task class to be completed.
- of_params (DictParameter) -- Additional parameters to pass to the of task.
- start / stop -- Bounds for the range (type depends on subclass).
- reverse (BoolParameter) -- If True, processes from newest to oldest; default: False.
- task_limit (IntParameter) -- Maximum number of missing instances to require per invocation; default: 50.
- now (IntParameter) -- Override current time (seconds since epoch) for testing.
- param_name (Parameter) -- Name of the datetime parameter on the target task; defaults to first positional parameter.
- RangeDailyBase / RangeHourlyBase / RangeByMinutesBase -- Intermediate abstract classes that implement datetime arithmetic for their respective granularities. They define moving_start(), moving_stop(), and finite_datetimes() methods, along with lookback/lookahead parameters (days_back/days_forward, hours_back/hours_forward, minutes_back/minutes_forward).
- RangeDaily / RangeHourly / RangeByMinutes -- Concrete classes that add efficient bulk completion detection. They first attempt to use the target task's bulk_complete() class method. If that raises NotImplementedError, they fall back to infer_bulk_complete_from_fs(), which reverse-engineers datetime representations from output paths, constructs filesystem globs, and uses directory listings to determine which outputs exist -- cutting filesystem requests by orders of magnitude compared to checking each instance individually.
- RangeMonthly -- A concrete class for monthly recurrence that does not use bulk filesystem optimization (the number of months is assumed to be small enough to not require it).
- RangeEvent (extends luigi.Event) -- Defines events for monitoring range progress: COMPLETE_COUNT, COMPLETE_FRACTION, and DELAY.
Key helper functions include:
- infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re) -- The core filesystem-based gap detection algorithm.
- _get_per_location_glob(tasks, outputs, regexes) -- Reverse-engineers glob patterns from sample output paths.
- _constrain_glob(glob, paths, limit) -- Specializes glob patterns to reduce listing scope.
- _list_existing(filesystem, glob, paths) -- Lists existing paths on a filesystem using constrained globs.
Usage
Use RangeDaily, RangeHourly, RangeByMinutes, or RangeMonthly when you have a recurring task parameterized by a date or datetime and need to ensure contiguous completion over a time range. This is the standard approach for backfilling gaps in time-series data pipelines with Luigi. These tasks are convenient to invoke from the command line (e.g., luigi --module your.module RangeDaily --of YourTask --start 2024-01-01) and integrate naturally with Luigi's scheduler for automated gap detection and filling. Set reverse=True to prioritize recent data, and adjust task_limit to control the batch size of gap filling per scheduler invocation.
Code Reference
Source Location
- Repository: Spotify_Luigi
- File: luigi/tools/range.py
- Lines: 1-797
Signature
class RangeEvent(luigi.Event):
COMPLETE_COUNT = "event.tools.range.complete.count"
COMPLETE_FRACTION = "event.tools.range.complete.fraction"
DELAY = "event.tools.range.delay"
class RangeBase(luigi.WrapperTask):
of = luigi.TaskParameter(description="task name to be completed")
of_params = luigi.DictParameter(default=dict())
start = luigi.Parameter()
stop = luigi.Parameter()
reverse = luigi.BoolParameter(default=False)
task_limit = luigi.IntParameter(default=50)
now = luigi.IntParameter(default=None)
param_name = luigi.Parameter(default=None, positional=False)
# Abstract methods:
def datetime_to_parameter(self, dt): ...
def parameter_to_datetime(self, p): ...
def datetime_to_parameters(self, dt): ...
def parameters_to_datetime(self, p): ...
def moving_start(self, now): ...
def moving_stop(self, now): ...
def finite_datetimes(self, finite_start, finite_stop): ...
def requires(self): ...
def missing_datetimes(self, finite_datetimes): ...
class RangeDailyBase(RangeBase):
start = luigi.DateParameter(default=None)
stop = luigi.DateParameter(default=None)
days_back = luigi.IntParameter(default=100)
days_forward = luigi.IntParameter(default=0)
class RangeHourlyBase(RangeBase):
start = luigi.DateHourParameter(default=None)
stop = luigi.DateHourParameter(default=None)
hours_back = luigi.IntParameter(default=2400)
hours_forward = luigi.IntParameter(default=0)
class RangeByMinutesBase(RangeBase):
start = luigi.DateMinuteParameter(default=None)
stop = luigi.DateMinuteParameter(default=None)
minutes_back = luigi.IntParameter(default=1440)
minutes_forward = luigi.IntParameter(default=0)
minutes_interval = luigi.IntParameter(default=1)
class RangeMonthly(RangeBase):
start = luigi.MonthParameter(default=None)
stop = luigi.MonthParameter(default=None)
months_back = luigi.IntParameter(default=13)
months_forward = luigi.IntParameter(default=0)
class RangeDaily(RangeDailyBase):
def missing_datetimes(self, finite_datetimes): ...
class RangeHourly(RangeHourlyBase):
def missing_datetimes(self, finite_datetimes): ...
class RangeByMinutes(RangeByMinutesBase):
def missing_datetimes(self, finite_datetimes): ...
def infer_bulk_complete_from_fs(datetimes, datetime_to_task,
datetime_to_re): ...
Import
from luigi.tools.range import RangeDaily, RangeHourly, RangeByMinutes, RangeMonthly
from luigi.tools.range import RangeBase, RangeDailyBase, RangeHourlyBase, RangeByMinutesBase
from luigi.tools.range import RangeEvent, infer_bulk_complete_from_fs
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| of | TaskParameter | Yes | The recurring task class to produce a contiguous range of; must accept a datetime-like parameter |
| of_params | DictParameter | No | Additional keyword arguments to pass when instantiating the of task; default: empty dict |
| start | DateParameter / DateHourParameter / DateMinuteParameter / MonthParameter | Conditional | Beginning of the range (inclusive); required unless reverse=True |
| stop | DateParameter / DateHourParameter / DateMinuteParameter / MonthParameter | No | End of the range (exclusive); defaults to current time plus forward offset |
| reverse | BoolParameter | No | Process from newest to oldest; default: False. Required if start is not set |
| task_limit | IntParameter | No | Maximum number of missing instances to schedule per invocation; default: 50 |
| now | IntParameter | No | Override current time in seconds since epoch; useful for testing |
| param_name | Parameter | No | Name of the datetime parameter on the target task; defaults to first positional parameter |
| days_back / hours_back / minutes_back / months_back | IntParameter | No | Lookback window to prevent infinite loops when start is None |
| days_forward / hours_forward / minutes_forward / months_forward | IntParameter | No | Lookahead window to prevent infinite loops when stop is None |
| minutes_interval | IntParameter | No | Interval between time points for RangeByMinutes; must evenly divide 60; default: 1 |
Outputs
| Name | Type | Description |
|---|---|---|
| requires() | list[Task] | List of task instances for missing datetime points within the range, limited by task_limit |
| COMPLETE_COUNT event | int | Number of completed instances within the computed finite range |
| COMPLETE_FRACTION event | float | Fraction of completed instances (0.0 to 1.0) |
| DELAY event | int | Number of instances between the first missing point and the end of the range |
Usage Examples
Basic Usage
import luigi
from luigi.tools.range import RangeDaily
class DailyReport(luigi.Task):
date = luigi.DateParameter()
def output(self):
return luigi.LocalTarget('/data/reports/%s.csv' % self.date)
def run(self):
with self.output().open('w') as f:
f.write('report data for %s' % self.date)
# From command line:
# luigi --module my_module RangeDaily --of DailyReport --start 2024-01-01
# Programmatically:
if __name__ == '__main__':
luigi.build([
RangeDaily(of=DailyReport, start='2024-01-01')
], local_scheduler=True)
Hourly Range with Reverse
import luigi
from luigi.tools.range import RangeHourly
class HourlyETL(luigi.Task):
date_hour = luigi.DateHourParameter()
def output(self):
return luigi.LocalTarget(
self.date_hour.strftime('/data/etl/%Y/%m/%d/%H/output.json')
)
def run(self):
# ETL processing logic
pass
# Process most recent gaps first, limit to 10 at a time:
if __name__ == '__main__':
luigi.build([
RangeHourly(
of=HourlyETL,
start='2024-01-01T00',
reverse=True,
task_limit=10,
hours_back=48
)
], local_scheduler=True)
Minutes Interval
import luigi
from luigi.tools.range import RangeByMinutes
class FiveMinuteAggregation(luigi.Task):
date_minute = luigi.DateMinuteParameter()
def output(self):
return luigi.LocalTarget(
self.date_minute.strftime('/data/agg/%Y/%m/%d/%H%M.parquet')
)
def run(self):
# Aggregate 5-minute window
pass
# Ensure contiguous 5-minute aggregations:
if __name__ == '__main__':
luigi.build([
RangeByMinutes(
of=FiveMinuteAggregation,
start='2024-01-15T0000',
minutes_interval=5,
minutes_back=1440 # one day
)
], local_scheduler=True)
Monthly Range
import luigi
from luigi.tools.range import RangeMonthly
class MonthlyAggregate(luigi.Task):
month = luigi.MonthParameter()
def output(self):
return luigi.LocalTarget(
'/data/monthly/%s.csv' % self.month.strftime('%Y-%m')
)
def run(self):
# Monthly aggregation logic
pass
if __name__ == '__main__':
luigi.build([
RangeMonthly(
of=MonthlyAggregate,
start='2023-01',
months_back=12
)
], local_scheduler=True)