Implementation:Ucbepic Docetl JoinOptimizer
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Optimization |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for optimizing resolve and equijoin operations via embedding-based blocking and LLM-assisted entity resolution provided by DocETL.
Description
The JoinOptimizer class optimizes join operations (resolve and equijoin) in data processing pipelines. For resolve operations, it computes embeddings on blocking keys, calculates cosine similarities, samples pairs for LLM comparison, finds optimal similarity thresholds targeting a configurable recall level, and generates blocking rules to reduce the number of expensive LLM comparisons. For equijoin operations, it additionally supports cross-dataset similarity computation, containment-based blocking rules, and optional pre-join map transformations synthesized by an LLM agent. It can also synthesize comparison and resolution prompts when the operation is marked as empty.
Usage
Use JoinOptimizer when you have a resolve or equijoin operation that needs blocking keys, similarity thresholds, and blocking conditions configured automatically. It is particularly useful when dealing with entity resolution tasks where you need to deduplicate records or join datasets with potentially matching entities, and you want to minimize the number of expensive LLM pair comparisons while maintaining a target recall.
Code Reference
Source Location
- Repository: Ucbepic_Docetl
- File: docetl/optimizers/join_optimizer.py
- Lines: 1-1779
Signature
class JoinOptimizer:
def __init__(
self,
runner,
op_config: dict[str, Any],
target_recall: float = 0.95,
sample_size: int = 500,
sampling_weight: float = 20,
agent_max_retries: int = 5,
estimated_selectivity: float | None = None,
) -> None: ...
def should_optimize(self, input_data: list[dict[str, Any]]) -> tuple[bool, str]: ...
def optimize_resolve(self, input_data: list[dict[str, Any]]) -> tuple[dict[str, Any], float]: ...
def optimize_equijoin(
self,
left_data: list[dict[str, Any]],
right_data: list[dict[str, Any]],
skip_map_gen: bool = False,
skip_containment_gen: bool = False,
) -> tuple[dict[str, Any], float, dict[str, Any]]: ...
def synthesize_compare_prompt(self, map_prompt: str | None, reduce_key: list[str]) -> str: ...
def synthesize_resolution_prompt(self, map_prompt: str | None, reduce_key: list[str], output_schema: dict[str, str]) -> str: ...
Import
from docetl.optimizers.join_optimizer import JoinOptimizer
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| runner | Runner | Yes | The pipeline runner object providing config, API client, and threading settings |
| op_config | dict[str, Any] | Yes | The join/resolve operation configuration |
| target_recall | float | No | Target recall level for threshold optimization (default: 0.95) |
| sample_size | int | No | Number of pairs to sample for comparison (default: 500) |
| sampling_weight | float | No | Exponential weighting factor for similarity-based sampling (default: 20) |
| agent_max_retries | int | No | Maximum retries for blocking rule generation (default: 5) |
| estimated_selectivity | float or None | No | Pre-estimated selectivity for the join |
| input_data | list[dict[str, Any]] | Yes | Input data for resolve optimization |
| left_data | list[dict[str, Any]] | Yes | Left dataset for equijoin optimization |
| right_data | list[dict[str, Any]] | Yes | Right dataset for equijoin optimization |
Outputs
| Name | Type | Description |
|---|---|---|
| optimized_config | dict[str, Any] | Updated operation config with blocking keys, threshold, blocking conditions, and embedding model |
| cost | float | Total cost of embeddings and LLM comparisons |
| map_info | dict[str, Any] | (Equijoin only) Optional map transformation metadata if a pre-join map was synthesized |
Usage Examples
from docetl.optimizers.join_optimizer import JoinOptimizer
# Resolve optimization
optimizer = JoinOptimizer(
runner=pipeline_runner,
op_config=resolve_op_config,
target_recall=0.95,
sample_size=500,
)
should_opt, explanation = optimizer.should_optimize(input_data)
if should_opt:
optimized_config, cost = optimizer.optimize_resolve(input_data)
print(f"Blocking threshold: {optimized_config['blocking_threshold']}")
print(f"Cost: ${cost:.4f}")
# Equijoin optimization
equijoin_optimizer = JoinOptimizer(
runner=pipeline_runner,
op_config=equijoin_op_config,
target_recall=0.95,
)
optimized_config, cost, map_info = equijoin_optimizer.optimize_equijoin(
left_data=left_dataset,
right_data=right_dataset,
)