Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Spotify Luigi RangeTask

From Leeroopedia


Knowledge Sources
Domains Scheduling, Time_Series
Last Updated 2026-02-10 08:00 GMT

Overview

RangeDaily, RangeHourly, RangeByMinutes, and RangeMonthly are Luigi WrapperTask subclasses that produce contiguous completed ranges of recurring time-parameterized tasks, automatically detecting and filling gaps using filesystem-based bulk completeness inference.

Description

The Range tools module provides a family of classes for ensuring contiguous completion of recurring, time-parameterized Luigi tasks. This is essential for time-series data pipelines where gaps in processing (due to failures, downtime, or late-arriving data) must be detected and backfilled automatically.

The class hierarchy is organized as follows:

  • RangeBase (extends luigi.WrapperTask) -- The abstract base class that implements the core range logic. Its requires() method computes the set of missing datetime instances within the specified range and returns task instances for those gaps (limited by task_limit). Parameters include:
    • of (TaskParameter) -- The recurring task class to be completed.
    • of_params (DictParameter) -- Additional parameters to pass to the of task.
    • start / stop -- Bounds for the range (type depends on subclass).
    • reverse (BoolParameter) -- If True, processes from newest to oldest; default: False.
    • task_limit (IntParameter) -- Maximum number of missing instances to require per invocation; default: 50.
    • now (IntParameter) -- Override current time (seconds since epoch) for testing.
    • param_name (Parameter) -- Name of the datetime parameter on the target task; defaults to first positional parameter.
  • RangeDailyBase / RangeHourlyBase / RangeByMinutesBase -- Intermediate abstract classes that implement datetime arithmetic for their respective granularities. They define moving_start(), moving_stop(), and finite_datetimes() methods, along with lookback/lookahead parameters (days_back/days_forward, hours_back/hours_forward, minutes_back/minutes_forward).
  • RangeDaily / RangeHourly / RangeByMinutes -- Concrete classes that add efficient bulk completion detection. They first attempt to use the target task's bulk_complete() class method. If that raises NotImplementedError, they fall back to infer_bulk_complete_from_fs(), which reverse-engineers datetime representations from output paths, constructs filesystem globs, and uses directory listings to determine which outputs exist -- cutting filesystem requests by orders of magnitude compared to checking each instance individually.
  • RangeMonthly -- A concrete class for monthly recurrence that does not use bulk filesystem optimization (the number of months is assumed to be small enough to not require it).
  • RangeEvent (extends luigi.Event) -- Defines events for monitoring range progress: COMPLETE_COUNT, COMPLETE_FRACTION, and DELAY.

Key helper functions include:

  • infer_bulk_complete_from_fs(datetimes, datetime_to_task, datetime_to_re) -- The core filesystem-based gap detection algorithm.
  • _get_per_location_glob(tasks, outputs, regexes) -- Reverse-engineers glob patterns from sample output paths.
  • _constrain_glob(glob, paths, limit) -- Specializes glob patterns to reduce listing scope.
  • _list_existing(filesystem, glob, paths) -- Lists existing paths on a filesystem using constrained globs.

Usage

Use RangeDaily, RangeHourly, RangeByMinutes, or RangeMonthly when you have a recurring task parameterized by a date or datetime and need to ensure contiguous completion over a time range. This is the standard approach for backfilling gaps in time-series data pipelines with Luigi. These tasks are convenient to invoke from the command line (e.g., luigi --module your.module RangeDaily --of YourTask --start 2024-01-01) and integrate naturally with Luigi's scheduler for automated gap detection and filling. Set reverse=True to prioritize recent data, and adjust task_limit to control the batch size of gap filling per scheduler invocation.

Code Reference

Source Location

  • Repository: Spotify_Luigi
  • File: luigi/tools/range.py
  • Lines: 1-797

Signature

class RangeEvent(luigi.Event):
    COMPLETE_COUNT = "event.tools.range.complete.count"
    COMPLETE_FRACTION = "event.tools.range.complete.fraction"
    DELAY = "event.tools.range.delay"

class RangeBase(luigi.WrapperTask):
    of = luigi.TaskParameter(description="task name to be completed")
    of_params = luigi.DictParameter(default=dict())
    start = luigi.Parameter()
    stop = luigi.Parameter()
    reverse = luigi.BoolParameter(default=False)
    task_limit = luigi.IntParameter(default=50)
    now = luigi.IntParameter(default=None)
    param_name = luigi.Parameter(default=None, positional=False)

    # Abstract methods:
    def datetime_to_parameter(self, dt): ...
    def parameter_to_datetime(self, p): ...
    def datetime_to_parameters(self, dt): ...
    def parameters_to_datetime(self, p): ...
    def moving_start(self, now): ...
    def moving_stop(self, now): ...
    def finite_datetimes(self, finite_start, finite_stop): ...

    def requires(self): ...
    def missing_datetimes(self, finite_datetimes): ...

class RangeDailyBase(RangeBase):
    start = luigi.DateParameter(default=None)
    stop = luigi.DateParameter(default=None)
    days_back = luigi.IntParameter(default=100)
    days_forward = luigi.IntParameter(default=0)

class RangeHourlyBase(RangeBase):
    start = luigi.DateHourParameter(default=None)
    stop = luigi.DateHourParameter(default=None)
    hours_back = luigi.IntParameter(default=2400)
    hours_forward = luigi.IntParameter(default=0)

class RangeByMinutesBase(RangeBase):
    start = luigi.DateMinuteParameter(default=None)
    stop = luigi.DateMinuteParameter(default=None)
    minutes_back = luigi.IntParameter(default=1440)
    minutes_forward = luigi.IntParameter(default=0)
    minutes_interval = luigi.IntParameter(default=1)

class RangeMonthly(RangeBase):
    start = luigi.MonthParameter(default=None)
    stop = luigi.MonthParameter(default=None)
    months_back = luigi.IntParameter(default=13)
    months_forward = luigi.IntParameter(default=0)

class RangeDaily(RangeDailyBase):
    def missing_datetimes(self, finite_datetimes): ...

class RangeHourly(RangeHourlyBase):
    def missing_datetimes(self, finite_datetimes): ...

class RangeByMinutes(RangeByMinutesBase):
    def missing_datetimes(self, finite_datetimes): ...

def infer_bulk_complete_from_fs(datetimes, datetime_to_task,
                                 datetime_to_re): ...

Import

from luigi.tools.range import RangeDaily, RangeHourly, RangeByMinutes, RangeMonthly
from luigi.tools.range import RangeBase, RangeDailyBase, RangeHourlyBase, RangeByMinutesBase
from luigi.tools.range import RangeEvent, infer_bulk_complete_from_fs

I/O Contract

Inputs

Name Type Required Description
of TaskParameter Yes The recurring task class to produce a contiguous range of; must accept a datetime-like parameter
of_params DictParameter No Additional keyword arguments to pass when instantiating the of task; default: empty dict
start DateParameter / DateHourParameter / DateMinuteParameter / MonthParameter Conditional Beginning of the range (inclusive); required unless reverse=True
stop DateParameter / DateHourParameter / DateMinuteParameter / MonthParameter No End of the range (exclusive); defaults to current time plus forward offset
reverse BoolParameter No Process from newest to oldest; default: False. Required if start is not set
task_limit IntParameter No Maximum number of missing instances to schedule per invocation; default: 50
now IntParameter No Override current time in seconds since epoch; useful for testing
param_name Parameter No Name of the datetime parameter on the target task; defaults to first positional parameter
days_back / hours_back / minutes_back / months_back IntParameter No Lookback window to prevent infinite loops when start is None
days_forward / hours_forward / minutes_forward / months_forward IntParameter No Lookahead window to prevent infinite loops when stop is None
minutes_interval IntParameter No Interval between time points for RangeByMinutes; must evenly divide 60; default: 1

Outputs

Name Type Description
requires() list[Task] List of task instances for missing datetime points within the range, limited by task_limit
COMPLETE_COUNT event int Number of completed instances within the computed finite range
COMPLETE_FRACTION event float Fraction of completed instances (0.0 to 1.0)
DELAY event int Number of instances between the first missing point and the end of the range

Usage Examples

Basic Usage

import luigi
from luigi.tools.range import RangeDaily


class DailyReport(luigi.Task):
    date = luigi.DateParameter()

    def output(self):
        return luigi.LocalTarget('/data/reports/%s.csv' % self.date)

    def run(self):
        with self.output().open('w') as f:
            f.write('report data for %s' % self.date)


# From command line:
# luigi --module my_module RangeDaily --of DailyReport --start 2024-01-01

# Programmatically:
if __name__ == '__main__':
    luigi.build([
        RangeDaily(of=DailyReport, start='2024-01-01')
    ], local_scheduler=True)

Hourly Range with Reverse

import luigi
from luigi.tools.range import RangeHourly


class HourlyETL(luigi.Task):
    date_hour = luigi.DateHourParameter()

    def output(self):
        return luigi.LocalTarget(
            self.date_hour.strftime('/data/etl/%Y/%m/%d/%H/output.json')
        )

    def run(self):
        # ETL processing logic
        pass


# Process most recent gaps first, limit to 10 at a time:
if __name__ == '__main__':
    luigi.build([
        RangeHourly(
            of=HourlyETL,
            start='2024-01-01T00',
            reverse=True,
            task_limit=10,
            hours_back=48
        )
    ], local_scheduler=True)

Minutes Interval

import luigi
from luigi.tools.range import RangeByMinutes


class FiveMinuteAggregation(luigi.Task):
    date_minute = luigi.DateMinuteParameter()

    def output(self):
        return luigi.LocalTarget(
            self.date_minute.strftime('/data/agg/%Y/%m/%d/%H%M.parquet')
        )

    def run(self):
        # Aggregate 5-minute window
        pass


# Ensure contiguous 5-minute aggregations:
if __name__ == '__main__':
    luigi.build([
        RangeByMinutes(
            of=FiveMinuteAggregation,
            start='2024-01-15T0000',
            minutes_interval=5,
            minutes_back=1440  # one day
        )
    ], local_scheduler=True)

Monthly Range

import luigi
from luigi.tools.range import RangeMonthly


class MonthlyAggregate(luigi.Task):
    month = luigi.MonthParameter()

    def output(self):
        return luigi.LocalTarget(
            '/data/monthly/%s.csv' % self.month.strftime('%Y-%m')
        )

    def run(self):
        # Monthly aggregation logic
        pass


if __name__ == '__main__':
    luigi.build([
        RangeMonthly(
            of=MonthlyAggregate,
            start='2023-01',
            months_back=12
        )
    ], local_scheduler=True)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment