Implementation:Neuml Txtai Document Batch Storage
| Knowledge Sources | |
|---|---|
| Domains | Embeddings, Index Management, Data Streaming |
| Last Updated | 2026-02-10 01:00 GMT |
Overview
Concrete tool for streaming documents to temporary storage for deferred batch indexing provided by txtai.
Description
The Documents class provides a streaming buffer that queues large volumes of documents to temporary disk storage before they are indexed. This allows the embeddings index to accumulate documents across multiple add calls and then process them all at once during the index phase.
Key features:
- Temporary file storage: Documents are serialized to a temporary file (with ".docs" suffix) using Python's tempfile.NamedTemporaryFile. This prevents excessive memory usage when indexing large document collections.
- Pickle serialization: Uses txtai's SerializeFactory with the "pickle" format (with
allowpickle=True) for efficient streaming serialization and deserialization. - Batch tracking: Maintains a batch counter (number of add calls) and size counter (total documents added) for iteration management.
- Iterator protocol: Implements
__iter__to stream all queued documents by reading each serialized batch from the temporary file and yielding individual documents. - Reusable: After close is called, the temporary file is deleted and counters are reset, allowing new document batches to be added.
Usage
Use Documents as the internal buffering mechanism during embeddings indexing. It is created and managed by the Indexes class and the main embeddings indexing pipeline. It is particularly useful when indexing large datasets that cannot fit in memory, as it streams documents to disk and reads them back during the index build phase.
Code Reference
Source Location
- Repository: Neuml_Txtai
- File:
src/python/txtai/embeddings/index/documents.py
Signature
class Documents:
def __init__(self)
def __len__(self) -> int
def __iter__(self) -> generator
def add(self, documents) -> list
def close(self)
Import
from txtai.embeddings.index.documents import Documents
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| documents | list[tuple] | Yes (add) | List of (id, data, tag) tuples to queue for indexing. Each tuple contains a document identifier, document data (text or dict), and optional tag metadata. |
Outputs
| Name | Type | Description |
|---|---|---|
| len | int | Total number of queued documents across all batches. |
| iter | generator | Yields individual (id, data, tag) tuples from all queued batches in order. |
| documents (add return) | list[tuple] | Returns the same documents list that was passed in. |
Usage Examples
from txtai.embeddings.index.documents import Documents
# Create a documents stream
docs = Documents()
# Add batches of documents
batch1 = [
(0, "first document text", None),
(1, "second document text", None),
]
docs.add(batch1)
batch2 = [
(2, "third document text", None),
(3, "fourth document text", None),
]
docs.add(batch2)
# Check total document count
print(len(docs)) # 4
# Iterate over all queued documents
for uid, text, tags in docs:
print(f"Document {uid}: {text}")
# Clean up
docs.close()
# Documents can be reused after close
docs.add([(5, "new document", None)])
print(len(docs)) # 1