Implementation:Ucbepic Docetl MapOptimizer Optimizer
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for optimizing map operations through staged evaluation of chunking, gleaning, and projection synthesis plans provided by DocETL.
Description
The MapOptimizer class is the top-level orchestrator for optimizing map operations in data processing pipelines. It coordinates plan generation (via PlanGenerator), output evaluation (via Evaluator), and prompt generation (via PromptGenerator) to find the best optimization strategy. It supports three plan types: chunk-based splitting for long documents, gleaning for iterative refinement, and projection synthesis for parallel/chain decomposition. The optimizer uses a staged evaluation approach: for data within context limits, it first tries gleaning and projection synthesis, then selectively evaluates chunking plans; for data exceeding limits, it evaluates all plan types simultaneously. Plan selection uses top-k scoring followed by pairwise LLM comparison.
Usage
Use MapOptimizer when a map operation needs to be optimized for better quality or to handle inputs that exceed the model context window. It is the primary entry point for map operation optimization in the DocETL pipeline optimizer, invoked automatically when the should_optimize check detects quality issues or context length exceedance. It generates and evaluates multiple candidate plans in parallel, returning the best-performing decomposed operation configuration.
Code Reference
Source Location
- Repository: Ucbepic_Docetl
- File: docetl/optimizers/map_optimizer/optimizer.py
- Lines: 1-694
Signature
class MapOptimizer:
def __init__(
self,
runner,
run_operation: Callable,
timeout: int = 10,
is_filter: bool = False,
depth: int = 1,
) -> None: ...
def should_optimize(
self,
op_config: dict[str, Any],
input_data: list[dict[str, Any]],
) -> tuple[str, list[dict[str, Any]], list[dict[str, Any]]]: ...
def optimize(
self,
op_config: dict[str, Any],
input_data: list[dict[str, Any]],
plan_types: list[str] | None = ["chunk", "proj_synthesis", "glean"],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]], float]: ...
Import
from docetl.optimizers.map_optimizer.optimizer import MapOptimizer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| runner | Runner | Yes | The pipeline runner object providing config, console, optimizer, and threading settings |
| run_operation | Callable | Yes | Function to execute operations on input data |
| timeout | int | No | Timeout in seconds for operation execution (default: 10) |
| is_filter | bool | No | Whether the operation is a filter operation (default: False) |
| depth | int | No | Current recursion depth for nested optimization (default: 1) |
| op_config | dict[str, Any] | Yes | The map operation configuration to optimize |
| input_data | list[dict[str, Any]] | Yes | Input data for the operation |
| plan_types | list[str] or None | No | List of plan types to evaluate: "chunk", "proj_synthesis", "glean" |
Outputs
| Name | Type | Description |
|---|---|---|
| optimized_ops | list[dict[str, Any]] | List of optimized operation configurations (may be multiple if decomposed) |
| output_data | list[dict[str, Any]] | Output data from running the best plan |
| cost | float | Cost incurred by any sub-plan optimization (recursive optimization cost) |
Usage Examples
from docetl.optimizers.map_optimizer.optimizer import MapOptimizer
map_optimizer = MapOptimizer(
runner=pipeline_runner,
run_operation=runner.run_operation,
timeout=120,
is_filter=False,
)
# Check if optimization is needed
assessment, input_data, output_data = map_optimizer.should_optimize(
op_config=map_op_config,
input_data=sample_data,
)
if assessment:
# Run optimization
optimized_ops, best_output, cost = map_optimizer.optimize(
op_config=map_op_config,
input_data=sample_data,
plan_types=["chunk", "proj_synthesis", "glean"],
)
print(f"Optimized into {len(optimized_ops)} operation(s), cost: ${cost:.4f}")