Implementation:Ucbepic Docetl ReduceOptimizer
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for optimizing reduce operations through batch sizing, fold prompts, gleaning, decomposition, and merge strategies provided by DocETL.
Description
The ReduceOptimizer class optimizes reduce operations in data processing pipelines. It analyzes operation output quality using synthesized validator prompts, evaluates whether decomposition into sub-reduce operations would be beneficial, determines associativity for parallelization, and generates multiple candidate plans with varying batch sizes and fold prompts. The optimizer supports hierarchical decomposition (splitting a reduce into a sub-group reduce followed by a final reduce, with optional resolve operations for deduplication), gleaning plans for iterative refinement, merge prompts for combining parallel fold outputs, and value sampling configuration. Plan evaluation is done via multi-threaded execution with validation-based scoring.
Usage
Use ReduceOptimizer when a reduce operation produces suboptimal output quality or when the input data exceeds the model context window. It is the primary optimizer for reduce operations in the DocETL pipeline, handling scenarios such as summarizing grouped data, aggregating records by key, or any operation that combines multiple input items into a single output per reduce key. It is particularly useful for large datasets where folding (processing in batches) and hierarchical decomposition significantly improve throughput and accuracy.
Code Reference
Source Location
- Repository: Ucbepic_Docetl
- File: docetl/optimizers/reduce_optimizer.py
- Lines: 1-1917
Signature
class ReduceOptimizer:
def __init__(
self,
runner,
run_operation: Callable,
num_fold_prompts: int = 1,
num_samples_in_validation: int = 10,
) -> None: ...
def should_optimize(
self,
op_config: dict[str, Any],
input_data: list[dict[str, Any]],
) -> tuple[str, list[dict[str, Any]], list[dict[str, Any]]]: ...
def optimize(
self,
op_config: dict[str, Any],
input_data: list[dict[str, Any]],
level: int = 1,
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], float]: ...
Import
from docetl.optimizers.reduce_optimizer import ReduceOptimizer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| runner | Runner | Yes | The pipeline runner object providing config, console, optimizer, and threading settings |
| run_operation | Callable | Yes | Function to execute operations on input data |
| num_fold_prompts | int | No | Number of fold prompts to generate for plan variation (default: 1) |
| num_samples_in_validation | int | No | Number of reduce-key groups to use in validation (default: 10) |
| op_config | dict[str, Any] | Yes | The reduce operation configuration including prompt, reduce_key, output schema |
| input_data | list[dict[str, Any]] | Yes | Input data for the reduce operation |
| level | int | No | Current decomposition recursion level (default: 1) |
Outputs
| Name | Type | Description |
|---|---|---|
| optimized_configs | list[dict[str, Any]] | List of optimized operation configurations (may include resolve + multiple reduce operations if decomposed) |
| output_data | list[dict[str, Any]] | Output data from running the optimized operation(s) |
| cost | float | Cost incurred from synthesizing resolve operations during decomposition |
Usage Examples
from docetl.optimizers.reduce_optimizer import ReduceOptimizer
reduce_optimizer = ReduceOptimizer(
runner=pipeline_runner,
run_operation=runner.run_operation,
num_fold_prompts=1,
num_samples_in_validation=10,
)
# Check if optimization is needed
assessment, input_data, original_output = reduce_optimizer.should_optimize(
op_config=reduce_op_config,
input_data=sample_data,
)
if assessment:
print(f"Optimization recommended: {assessment}")
# Run optimization
optimized_configs, output_data, cost = reduce_optimizer.optimize(
op_config=reduce_op_config,
input_data=sample_data,
)
print(f"Optimized into {len(optimized_configs)} operation(s)")
print(f"Resolve synthesis cost: ${cost:.4f}")
for config in optimized_configs:
print(f" - {config['name']} (type: {config['type']})")
else:
print("No optimization needed.")