Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Ucbepic Docetl ReduceOptimizer

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for optimizing reduce operations through batch sizing, fold prompts, gleaning, decomposition, and merge strategies provided by DocETL.

Description

The ReduceOptimizer class optimizes reduce operations in data processing pipelines. It analyzes operation output quality using synthesized validator prompts, evaluates whether decomposition into sub-reduce operations would be beneficial, determines associativity for parallelization, and generates multiple candidate plans with varying batch sizes and fold prompts. The optimizer supports hierarchical decomposition (splitting a reduce into a sub-group reduce followed by a final reduce, with optional resolve operations for deduplication), gleaning plans for iterative refinement, merge prompts for combining parallel fold outputs, and value sampling configuration. Plan evaluation is done via multi-threaded execution with validation-based scoring.

Usage

Use ReduceOptimizer when a reduce operation produces suboptimal output quality or when the input data exceeds the model context window. It is the primary optimizer for reduce operations in the DocETL pipeline, handling scenarios such as summarizing grouped data, aggregating records by key, or any operation that combines multiple input items into a single output per reduce key. It is particularly useful for large datasets where folding (processing in batches) and hierarchical decomposition significantly improve throughput and accuracy.

Code Reference

Source Location

Signature

class ReduceOptimizer:
    def __init__(
        self,
        runner,
        run_operation: Callable,
        num_fold_prompts: int = 1,
        num_samples_in_validation: int = 10,
    ) -> None: ...

    def should_optimize(
        self,
        op_config: dict[str, Any],
        input_data: list[dict[str, Any]],
    ) -> tuple[str, list[dict[str, Any]], list[dict[str, Any]]]: ...

    def optimize(
        self,
        op_config: dict[str, Any],
        input_data: list[dict[str, Any]],
        level: int = 1,
    ) -> tuple[list[dict[str, Any]], list[dict[str, Any]], float]: ...

Import

from docetl.optimizers.reduce_optimizer import ReduceOptimizer

I/O Contract

Inputs

Name Type Required Description
runner Runner Yes The pipeline runner object providing config, console, optimizer, and threading settings
run_operation Callable Yes Function to execute operations on input data
num_fold_prompts int No Number of fold prompts to generate for plan variation (default: 1)
num_samples_in_validation int No Number of reduce-key groups to use in validation (default: 10)
op_config dict[str, Any] Yes The reduce operation configuration including prompt, reduce_key, output schema
input_data list[dict[str, Any]] Yes Input data for the reduce operation
level int No Current decomposition recursion level (default: 1)

Outputs

Name Type Description
optimized_configs list[dict[str, Any]] List of optimized operation configurations (may include resolve + multiple reduce operations if decomposed)
output_data list[dict[str, Any]] Output data from running the optimized operation(s)
cost float Cost incurred from synthesizing resolve operations during decomposition

Usage Examples

from docetl.optimizers.reduce_optimizer import ReduceOptimizer

reduce_optimizer = ReduceOptimizer(
    runner=pipeline_runner,
    run_operation=runner.run_operation,
    num_fold_prompts=1,
    num_samples_in_validation=10,
)

# Check if optimization is needed
assessment, input_data, original_output = reduce_optimizer.should_optimize(
    op_config=reduce_op_config,
    input_data=sample_data,
)

if assessment:
    print(f"Optimization recommended: {assessment}")

    # Run optimization
    optimized_configs, output_data, cost = reduce_optimizer.optimize(
        op_config=reduce_op_config,
        input_data=sample_data,
    )
    print(f"Optimized into {len(optimized_configs)} operation(s)")
    print(f"Resolve synthesis cost: ${cost:.4f}")
    for config in optimized_configs:
        print(f"  - {config['name']} (type: {config['type']})")
else:
    print("No optimization needed.")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment