Implementation:Ucbepic Docetl FastDecomposer
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for fast directive-based decomposition of map operations provided by DocETL.
Description
The FastDecomposer class provides a lightweight alternative to the full optimizer flow for decomposing map operations. It generates candidate decompositions using directives (such as chaining, gleaning, instruction clarification, document chunking, and deterministic compression), runs each candidate on sample documents in parallel, and uses LLM-based pairwise comparison to select the winning decomposition. Directives are conditionally enabled based on document characteristics such as average character length and token count relative to the model context window.
Usage
Use FastDecomposer when you need to quickly decompose a complex map operation into simpler sub-operations without running the full MapOptimizer pipeline. It is suited for scenarios where a pipeline YAML config is available, and you want to evaluate multiple directive-based decomposition strategies against the original operation on a small sample of data, selecting the best approach via automated LLM judging.
Code Reference
Source Location
- Repository: Ucbepic_Docetl
- File: docetl/optimizers/fast_decomposer.py
- Lines: 1-926
Signature
class FastDecomposer:
def __init__(
self,
yaml_config_path: str,
optimizer_model: str = "gpt-5.1",
sample_size: int = 5,
litellm_kwargs: dict[str, Any] | None = None,
console: Console | None = None,
) -> None: ...
def get_model_context_limit(self, model: str) -> int: ...
def get_avg_doc_size(self, sample_data: list[dict[str, Any]], op_config: dict[str, Any]) -> tuple[float, float]: ...
def get_applicable_directives(self, sample_data: list[dict[str, Any]], op_config: dict[str, Any]) -> list: ...
def load_sample_data(self, step_name: str, op_name: str) -> list[dict[str, Any]]: ...
def generate_candidates(self, op_name: str, sample_data: list[dict[str, Any]], target_op: dict[str, Any]) -> list[dict[str, Any]]: ...
def run_candidate_on_samples(self, candidate: dict[str, Any], sample_data: list[dict[str, Any]], original_op_name: str) -> list[dict[str, Any]]: ...
def pairwise_compare(self, candidate_a: dict[str, Any], candidate_b: dict[str, Any], original_prompt: str, output_schema: dict[str, Any]) -> dict[str, Any]: ...
def decompose(self, step_name: str, op_name: str) -> tuple[list[dict[str, Any]], str, int, float]: ...
Import
from docetl.optimizers.fast_decomposer import FastDecomposer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| yaml_config_path | str | Yes | Path to the pipeline YAML configuration file |
| optimizer_model | str | No | LLM model to use for directive instantiation and judging (default: "gpt-5.1") |
| sample_size | int | No | Number of sample documents to run candidates on (default: 5) |
| litellm_kwargs | dict[str, Any] or None | No | Additional kwargs to pass to litellm.completion |
| console | Console or None | No | Rich console for output (uses default if not provided) |
| step_name | str | Yes | Name of the pipeline step (for decompose method) |
| op_name | str | Yes | Name of the operation to decompose (for decompose method) |
Outputs
| Name | Type | Description |
|---|---|---|
| decomposed_ops | list[dict[str, Any]] | List of operations that replace the original operation |
| winning_directive | str | Name of the directive that won the comparison |
| candidates_evaluated | int | Number of candidates that were compared |
| cost | float | Total LLM API cost in USD |
| original_outputs | list[dict[str, Any]] | Sample outputs from the original operation |
| decomposed_outputs | list[dict[str, Any]] | Sample outputs from the winning decomposition |
| comparison_rationale | str | LLM explanation of why the winner was chosen |
Usage Examples
from docetl.optimizers.fast_decomposer import FastDecomposer
decomposer = FastDecomposer(
yaml_config_path="pipeline.yaml",
optimizer_model="gpt-5.1",
sample_size=5,
)
result = decomposer.decompose(
step_name="extraction_step",
op_name="extract_entities",
)
print(f"Winner: {result['winning_directive']}")
print(f"Cost: ${result['cost']:.4f}")
print(f"New operations: {len(result['decomposed_ops'])}")