Implementation:Ucbepic Docetl EquijoinOperation Execute
| Knowledge Sources | |
|---|---|
| Domains | Data_Processing, Record_Linkage |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete tool for joining two datasets using LLM-based pairwise comparison to determine matching record pairs, provided by DocETL.
Description
The EquijoinOperation class extends BaseOperation to perform semantic equijoins between two datasets. It combines blocking techniques (code-based conditions, embedding cosine similarity thresholds, and auto-computed thresholds via RuntimeBlockingOptimizer) with LLM-based comparisons to efficiently match records from a left and right dataset. Matched pairs are merged into output documents with configurable field name mappings to avoid key collisions.
Usage
Use this operation when you need to join two datasets where traditional key-based joins are insufficient and semantic matching is required. Typical scenarios include matching customer records across systems, linking research papers to their authors from separate databases, or joining product catalogs with user reviews based on textual similarity.
Code Reference
Source Location
- Repository: Ucbepic_Docetl
- File: docetl/operations/equijoin.py
- Lines: 1-672
Signature
class EquijoinOperation(BaseOperation):
class schema(BaseOperation.schema):
type: str = "equijoin"
comparison_prompt: str
output: dict[str, Any] | None = None
blocking_threshold: float | None = None
blocking_target_recall: float | None = None
blocking_conditions: list[str] | None = None
limits: dict[str, int] | None = None
comparison_model: str | None = None
optimize: bool | None = None
embedding_model: str | None = None
embedding_batch_size: int | None = None
compare_batch_size: int | None = None
limit_comparisons: int | None = None
blocking_keys: dict[str, list[str]] | None = None
timeout: int | None = None
litellm_completion_kwargs: dict[str, Any] = {}
def compare_pair(self, comparison_prompt, model, item1, item2,
timeout_seconds=120, max_retries_per_timeout=2) -> tuple[bool, float]: ...
def execute(self, left_data: list[dict], right_data: list[dict]) -> tuple[list[dict], float]: ...
Import
from docetl.operations.equijoin import EquijoinOperation
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| left_data | List[Dict] | Yes | Left dataset of documents to join |
| right_data | List[Dict] | Yes | Right dataset of documents to join |
| comparison_prompt | str | Yes | Jinja2 template prompt for LLM-based comparison of pairs |
| blocking_keys | Dict[str, List[str]] | No | Keys to use for embedding-based blocking (must include "left" and "right") |
| blocking_threshold | float | No | Cosine similarity threshold for embedding-based blocking |
| blocking_conditions | List[str] | No | Python expressions evaluated to determine candidate pairs |
| limits | Dict[str, int] | No | Maximum matches per item from each side (must include "left" and "right") |
| limit_comparisons | int | No | Maximum number of pairwise comparisons to perform |
| comparison_model | str | No | LLM model to use for comparison (defaults to pipeline default) |
| embedding_model | str | No | Model for generating embeddings used in blocking |
| blocking_target_recall | float | No | Target recall for auto-computed blocking threshold (default 0.95) |
Outputs
| Name | Type | Description |
|---|---|---|
| output | Tuple[List[Dict], float] | Joined documents (with left/right field prefixes for collisions) and total cost |
Usage Examples
# YAML pipeline configuration for equijoin
operations:
- name: match_customers
type: equijoin
comparison_prompt: |
Compare these two customer records and determine if they refer to the same person:
Left: {{ left }}
Right: {{ right }}
blocking_keys:
left: ["name", "email"]
right: ["full_name", "contact_email"]
blocking_threshold: 0.7
limits:
left: 1
right: 1
comparison_model: "gpt-4o-mini"
# Python API usage
from docetl.operations.equijoin import EquijoinOperation
config = {
"name": "match_records",
"type": "equijoin",
"comparison_prompt": "Compare {{left}} and {{right}} and determine if they match.",
"blocking_keys": {"left": ["id"], "right": ["user_id"]},
"blocking_threshold": 0.8,
"limits": {"left": 1, "right": 1},
}
equijoin_op = EquijoinOperation(runner, config, default_model, max_threads)
left_data = [{"id": 1, "name": "Alice"}, {"id": 2, "name": "Bob"}]
right_data = [{"user_id": 1, "age": 30}, {"user_id": 2, "age": 25}]
results, cost = equijoin_op.execute(left_data, right_data)