Implementation:Neuml Txtai LateEncoder
| Knowledge Sources | |
|---|---|
| Domains | Text_Similarity, Late_Interaction |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Late interaction scoring pipeline using token-level embeddings and einsum-based maximum similarity computation (ColBERT-style).
Description
The LateEncoder class implements a late interaction retrieval model inspired by ColBERT. Unlike single-vector approaches that compress each text into one embedding, late interaction preserves per-token embeddings for both queries and documents, then computes fine-grained token-level similarity scores using Einstein summation notation (torch.einsum).
The pipeline loads a model through PoolingFactory.create, defaulting to colbert-ir/colbertv2.0 when no path is specified. The model is configured with muvera=None in the model arguments to disable MuVERA compression, ensuring full late interaction vectors are produced.
The scoring process works as follows:
- Both query and document texts are encoded into 3D tensors of shape
(batch, tokens, hidden). - A bulk dot product is computed using
torch.einsum("ash,bth->abst", queries, data), producing a 4D similarity matrix. - The maximum similarity along the document token axis is taken (
.max(axis=-1)), then averaged across query tokens (.mean(axis=-1)) to produce a single relevance score per query-document pair. - Results are sorted using
np.argpartitionfor efficient top-k selection.
Usage
Use this pipeline when token-level matching granularity is needed beyond what single-vector similarity provides. It is particularly effective for tasks where specific terms or phrases in the query should match specific parts of documents. The LateEncoder is used internally by the Similarity pipeline when lateencode=True.
Code Reference
Source Location
- Repository: txtai
- File:
src/python/txtai/pipeline/text/lateencoder.py - Lines: L1-103
Class Definition
class LateEncoder(Pipeline):
"""
Computes similarity between query and list of text using a late interaction model.
"""
Constructor Signature
def __init__(self, path=None, **kwargs):
The constructor resolves the device via Models.device() and loads the model through PoolingFactory.create. Supported kwargs include gpu, method, tokenizer, maxlength, and vectors. The default model path is "colbert-ir/colbertv2.0".
Call Signature
def __call__(self, query, texts, limit=None):
Import
from txtai.pipeline.text import LateEncoder
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| query | str or list | Yes | Query text or list of query texts. A single string is internally wrapped in a list for batch processing. |
| texts | list of str or list of tensors | Yes | List of candidate texts to score against the query. If elements are strings, they are encoded via self.encode(). If elements are pre-encoded tensors, they are used directly.
|
| limit | int or None | No | Maximum number of results to return per query. Defaults to None, which returns all scored results.
|
Outputs
| Name | Type | Description |
|---|---|---|
| results | list of tuple | If query is a string, returns a 1D list of (id, score) tuples where id is the index in texts. If query is a list, returns a 2D list with one row of results per query. Results are selected by top-k partitioning, not guaranteed to be sorted.
|
Key Methods
encode(data, category)
def encode(self, data, category):
Encodes a batch of data using the underlying model's encode method. The category parameter distinguishes between "query" and "data" encoding modes, which may apply different tokenization or padding strategies. Returns a PyTorch tensor on the configured device, converted from the model's NumPy output.
score(queries, data, limit)
def score(self, queries, data, limit):
Computes the maximum similarity score between query vectors and data vectors using Einstein summation notation. The computation proceeds as:
# Bulk dot product: (a=queries, s=query_tokens, h=hidden) x (b=docs, t=doc_tokens, h=hidden)
scores = torch.einsum("ash,bth->abst", queries, data)
# MaxSim: max over document tokens, then mean over query tokens
scores = scores.max(axis=-1).values.mean(axis=-1)
Top-k results are selected using np.argpartition for efficient O(n) selection rather than O(n log n) full sort. Returns a list of (index, score) tuples.
Inheritance Chain
LateEncoder -> Pipeline
The Pipeline base class defines the __call__ interface contract and a batch() helper method.
Usage Examples
Basic Late Interaction Scoring
from txtai.pipeline.text import LateEncoder
# Load the default ColBERTv2 model
encoder = LateEncoder()
# Score a query against candidate texts
results = encoder("What is machine learning?", [
"Machine learning is a subset of artificial intelligence",
"The weather forecast predicts rain tomorrow",
"Deep learning models use neural networks"
])
# Results: [(id, score), ...] sorted by relevance
for idx, score in results:
print(f"Text {idx}: {score:.4f}")
Batch Queries with Limit
from txtai.pipeline.text import LateEncoder
encoder = LateEncoder()
texts = [
"Python is a programming language",
"Java is used for enterprise software",
"Rust provides memory safety guarantees",
"Go is designed for concurrent programming"
]
# Score multiple queries, return top 2 per query
results = encoder(
["best language for beginners", "systems programming"],
texts,
limit=2
)
for i, row in enumerate(results):
print(f"Query {i}: {row}")
Pre-encoded Document Vectors
from txtai.pipeline.text import LateEncoder
encoder = LateEncoder()
# Pre-encode documents once
texts = ["Document one content", "Document two content", "Document three content"]
encoded_docs = encoder.encode(texts, "data")
# Reuse encoded documents for multiple queries
result1 = encoder("first query", encoded_docs, limit=2)
result2 = encoder("second query", encoded_docs, limit=2)