Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Ucbepic Docetl Blocking Utils

From Leeroopedia


Knowledge Sources
Domains Data_Processing, Blocking_Optimization
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete tool for automatically computing optimal embedding-based blocking thresholds for resolve and equijoin operations, provided by DocETL.

Description

The RuntimeBlockingOptimizer class computes the optimal cosine similarity cutoff that achieves a target recall rate for candidate pair generation. It samples document pairs using a hybrid of stratified and exponential-weighted sampling, computes their embeddings, performs LLM comparisons on the sample to establish ground-truth match labels, then searches for the highest similarity threshold that achieves the configured target recall (e.g., 0.95). The class provides separate optimization methods for self-join (resolve) and cross-join (equijoin) scenarios.

Usage

This utility is used internally by EquijoinOperation and resolve operations when no blocking configuration is provided. It eliminates the need for users to manually tune blocking thresholds. It can also be used directly when building custom operations that require blocking threshold optimization for pairwise comparison tasks.

Code Reference

Source Location

Signature

class RuntimeBlockingOptimizer:
    def __init__(
        self,
        runner,
        config: dict[str, Any],
        default_model: str,
        max_threads: int,
        console: Console,
        target_recall: float = 0.95,
        sample_size: int = 100,
        sampling_weight: float = 20.0,
    ): ...

    def compute_embeddings(self, input_data, keys, embedding_model=None,
                           batch_size=1000) -> tuple[list[list[float]], float]: ...

    def calculate_cosine_similarities_self(self, embeddings) -> list[tuple[int, int, float]]: ...

    def calculate_cosine_similarities_cross(self, left_embeddings,
                                            right_embeddings) -> list[tuple[int, int, float]]: ...

    def sample_pairs(self, similarities, num_bins=10,
                     stratified_fraction=0.5) -> list[tuple[int, int]]: ...

    def find_optimal_threshold(self, comparisons, similarities) -> tuple[float, float]: ...

    def optimize_resolve(self, input_data, compare_fn,
                         blocking_keys=None) -> tuple[float, list[list[float]], float]: ...

    def optimize_equijoin(self, left_data, right_data, compare_fn,
                          left_keys=None, right_keys=None) -> tuple[float, list[list[float]], list[list[float]], float]: ...

Import

from docetl.operations.utils.blocking import RuntimeBlockingOptimizer

I/O Contract

Inputs (optimize_resolve)

Name Type Required Description
input_data List[Dict] Yes Documents to compute blocking threshold for (self-join scenario)
compare_fn Callable Yes Function taking two items, returning (is_match, cost, prompt)
blocking_keys List[str] No Keys to use for embedding text (auto-extracted from prompt if None)

Inputs (optimize_equijoin)

Name Type Required Description
left_data List[Dict] Yes Left dataset for cross-join blocking
right_data List[Dict] Yes Right dataset for cross-join blocking
compare_fn Callable Yes Function taking two items, returning (is_match, cost)
left_keys List[str] No Keys for left dataset embeddings
right_keys List[str] No Keys for right dataset embeddings

Outputs (optimize_resolve)

Name Type Description
output Tuple[float, List[List[float]], float] Optimal threshold, precomputed embeddings, and total cost

Outputs (optimize_equijoin)

Name Type Description
output Tuple[float, List[List[float]], List[List[float]], float] Optimal threshold, left embeddings, right embeddings, and total cost

Usage Examples

from docetl.operations.utils.blocking import RuntimeBlockingOptimizer
from rich.console import Console

optimizer = RuntimeBlockingOptimizer(
    runner=pipeline_runner,
    config=operation_config,
    default_model="gpt-4o-mini",
    max_threads=64,
    console=Console(),
    target_recall=0.95,
    sample_size=100,
)

# For self-join (resolve) operations
def compare_fn(item1, item2):
    # Returns (is_match: bool, cost: float, prompt: str)
    return is_match, cost, prompt

threshold, embeddings, cost = optimizer.optimize_resolve(
    input_data=documents,
    compare_fn=compare_fn,
    blocking_keys=["name", "description"],
)

# For cross-join (equijoin) operations
threshold, left_emb, right_emb, cost = optimizer.optimize_equijoin(
    left_data=left_documents,
    right_data=right_documents,
    compare_fn=compare_pair_fn,
    left_keys=["title"],
    right_keys=["name"],
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment