Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Heuristic:Spotify Luigi Batch Parameter Aggregation

From Leeroopedia



Knowledge Sources
Domains Optimization, Pipeline_Framework
Last Updated 2026-02-10 07:00 GMT

Overview

Optimization pattern for combining multiple task instances with different parameter values into a single batched execution.

Description

Luigi's `batch_method` parameter feature allows the scheduler to combine multiple pending instances of the same task class (differing only in one parameter value) into a single batched execution. When a parameter has a `batch_method`, the scheduler can aggregate multiple values into one, calling the task's `run()` method once with the combined value. This is useful for tasks where the overhead of starting a job (e.g., establishing database connections, launching Spark jobs) dominates the per-item processing cost.

Usage

Use batch parameters when you have many instances of the same task class differing by a single parameter, and processing them individually would be inefficient. Common examples include date-partitioned tasks that can process a range of dates in one pass.

The Insight (Rule of Thumb)

  • Action: Add `batch_method=<combining_function>` to the parameter that varies across instances. The combining function receives an iterable of parsed values and returns a single combined value.
  • Value: Reduces job startup overhead by processing multiple parameter values in a single task execution.
  • Trade-off: The task's `run()` method must handle the combined value (e.g., a tuple of dates instead of a single date). Output completeness checking becomes more complex.

Reasoning

Consider a daily ETL task that loads data into a warehouse. If you need to backfill 30 days, Luigi would normally schedule 30 separate task instances. With `batch_method`, the scheduler can combine these into a single task that processes all 30 days at once, avoiding 29 redundant connection setups.

The `_parse_list` method on `Parameter` handles the aggregation:

def _parse_list(self, xs):
    if not self._is_batchable():
        raise NotImplementedError('No batch method found')
    elif not xs:
        raise ValueError('Empty parameter list passed to parse_list')
    else:
        return self._batch_method(map(self.parse, xs))

Code Evidence

batch_method parameter from `luigi/parameter.py:150-166`:

def __init__(self, default=_no_value, is_global=False, significant=True,
             description=None, config_path=None, positional=True,
             always_in_help=False, batch_method=None,
             visibility=ParameterVisibility.PUBLIC):
    """
    :param function(iterable[A])->A batch_method: Method to combine
        an iterable of parsed parameter values into a single value.
        Used when receiving batched parameter lists from the scheduler.
    """
    self._batch_method = batch_method

Batchable check from `luigi/parameter.py:258-260`:

def _is_batchable(self):
    return self._batch_method is not None

Parse list aggregation from `luigi/parameter.py:262-275`:

def _parse_list(self, xs):
    """
    Parse a list of values from the scheduler.
    Only possible if _is_batchable() is True. This will combine the list
    into a single parameter value using batch method.
    """
    if not self._is_batchable():
        raise NotImplementedError('No batch method found')
    elif not xs:
        raise ValueError('Empty parameter list passed to parse_list')
    else:
        return self._batch_method(map(self.parse, xs))

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment