Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Ucbepic Docetl JoinOptimizer

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for optimizing resolve and equijoin operations via embedding-based blocking and LLM-assisted entity resolution provided by DocETL.

Description

The JoinOptimizer class optimizes join operations (resolve and equijoin) in data processing pipelines. For resolve operations, it computes embeddings on blocking keys, calculates cosine similarities, samples pairs for LLM comparison, finds optimal similarity thresholds targeting a configurable recall level, and generates blocking rules to reduce the number of expensive LLM comparisons. For equijoin operations, it additionally supports cross-dataset similarity computation, containment-based blocking rules, and optional pre-join map transformations synthesized by an LLM agent. It can also synthesize comparison and resolution prompts when the operation is marked as empty.

Usage

Use JoinOptimizer when you have a resolve or equijoin operation that needs blocking keys, similarity thresholds, and blocking conditions configured automatically. It is particularly useful when dealing with entity resolution tasks where you need to deduplicate records or join datasets with potentially matching entities, and you want to minimize the number of expensive LLM pair comparisons while maintaining a target recall.

Code Reference

Source Location

Signature

class JoinOptimizer:
    def __init__(
        self,
        runner,
        op_config: dict[str, Any],
        target_recall: float = 0.95,
        sample_size: int = 500,
        sampling_weight: float = 20,
        agent_max_retries: int = 5,
        estimated_selectivity: float | None = None,
    ) -> None: ...

    def should_optimize(self, input_data: list[dict[str, Any]]) -> tuple[bool, str]: ...
    def optimize_resolve(self, input_data: list[dict[str, Any]]) -> tuple[dict[str, Any], float]: ...
    def optimize_equijoin(
        self,
        left_data: list[dict[str, Any]],
        right_data: list[dict[str, Any]],
        skip_map_gen: bool = False,
        skip_containment_gen: bool = False,
    ) -> tuple[dict[str, Any], float, dict[str, Any]]: ...
    def synthesize_compare_prompt(self, map_prompt: str | None, reduce_key: list[str]) -> str: ...
    def synthesize_resolution_prompt(self, map_prompt: str | None, reduce_key: list[str], output_schema: dict[str, str]) -> str: ...

Import

from docetl.optimizers.join_optimizer import JoinOptimizer

I/O Contract

Inputs

Name Type Required Description
runner Runner Yes The pipeline runner object providing config, API client, and threading settings
op_config dict[str, Any] Yes The join/resolve operation configuration
target_recall float No Target recall level for threshold optimization (default: 0.95)
sample_size int No Number of pairs to sample for comparison (default: 500)
sampling_weight float No Exponential weighting factor for similarity-based sampling (default: 20)
agent_max_retries int No Maximum retries for blocking rule generation (default: 5)
estimated_selectivity float or None No Pre-estimated selectivity for the join
input_data list[dict[str, Any]] Yes Input data for resolve optimization
left_data list[dict[str, Any]] Yes Left dataset for equijoin optimization
right_data list[dict[str, Any]] Yes Right dataset for equijoin optimization

Outputs

Name Type Description
optimized_config dict[str, Any] Updated operation config with blocking keys, threshold, blocking conditions, and embedding model
cost float Total cost of embeddings and LLM comparisons
map_info dict[str, Any] (Equijoin only) Optional map transformation metadata if a pre-join map was synthesized

Usage Examples

from docetl.optimizers.join_optimizer import JoinOptimizer

# Resolve optimization
optimizer = JoinOptimizer(
    runner=pipeline_runner,
    op_config=resolve_op_config,
    target_recall=0.95,
    sample_size=500,
)

should_opt, explanation = optimizer.should_optimize(input_data)
if should_opt:
    optimized_config, cost = optimizer.optimize_resolve(input_data)
    print(f"Blocking threshold: {optimized_config['blocking_threshold']}")
    print(f"Cost: ${cost:.4f}")

# Equijoin optimization
equijoin_optimizer = JoinOptimizer(
    runner=pipeline_runner,
    op_config=equijoin_op_config,
    target_recall=0.95,
)

optimized_config, cost, map_info = equijoin_optimizer.optimize_equijoin(
    left_data=left_dataset,
    right_data=right_dataset,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment