Heuristic:Spotify Luigi Batch Parameter Aggregation
| Knowledge Sources | |
|---|---|
| Domains | Optimization, Pipeline_Framework |
| Last Updated | 2026-02-10 07:00 GMT |
Overview
Optimization pattern for combining multiple task instances with different parameter values into a single batched execution.
Description
Luigi's `batch_method` parameter feature allows the scheduler to combine multiple pending instances of the same task class (differing only in one parameter value) into a single batched execution. When a parameter has a `batch_method`, the scheduler can aggregate multiple values into one, calling the task's `run()` method once with the combined value. This is useful for tasks where the overhead of starting a job (e.g., establishing database connections, launching Spark jobs) dominates the per-item processing cost.
Usage
Use batch parameters when you have many instances of the same task class differing by a single parameter, and processing them individually would be inefficient. Common examples include date-partitioned tasks that can process a range of dates in one pass.
The Insight (Rule of Thumb)
- Action: Add `batch_method=<combining_function>` to the parameter that varies across instances. The combining function receives an iterable of parsed values and returns a single combined value.
- Value: Reduces job startup overhead by processing multiple parameter values in a single task execution.
- Trade-off: The task's `run()` method must handle the combined value (e.g., a tuple of dates instead of a single date). Output completeness checking becomes more complex.
Reasoning
Consider a daily ETL task that loads data into a warehouse. If you need to backfill 30 days, Luigi would normally schedule 30 separate task instances. With `batch_method`, the scheduler can combine these into a single task that processes all 30 days at once, avoiding 29 redundant connection setups.
The `_parse_list` method on `Parameter` handles the aggregation:
def _parse_list(self, xs):
if not self._is_batchable():
raise NotImplementedError('No batch method found')
elif not xs:
raise ValueError('Empty parameter list passed to parse_list')
else:
return self._batch_method(map(self.parse, xs))
Code Evidence
batch_method parameter from `luigi/parameter.py:150-166`:
def __init__(self, default=_no_value, is_global=False, significant=True,
description=None, config_path=None, positional=True,
always_in_help=False, batch_method=None,
visibility=ParameterVisibility.PUBLIC):
"""
:param function(iterable[A])->A batch_method: Method to combine
an iterable of parsed parameter values into a single value.
Used when receiving batched parameter lists from the scheduler.
"""
self._batch_method = batch_method
Batchable check from `luigi/parameter.py:258-260`:
def _is_batchable(self):
return self._batch_method is not None
Parse list aggregation from `luigi/parameter.py:262-275`:
def _parse_list(self, xs):
"""
Parse a list of values from the scheduler.
Only possible if _is_batchable() is True. This will combine the list
into a single parameter value using batch method.
"""
if not self._is_batchable():
raise NotImplementedError('No batch method found')
elif not xs:
raise ValueError('Empty parameter list passed to parse_list')
else:
return self._batch_method(map(self.parse, xs))