Implementation:Huggingface Datatrove BaseMediaReader
| Knowledge Sources | |
|---|---|
| Domains | Media Processing, Concurrent I/O |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
BinaryReaderThreaded is an abstract base class for reading binary media content using a multi-threaded, heap-based approach that optionally preserves document ordering.
Description
The BinaryReaderThreaded class extends PipelineStep to provide a concurrent media reading framework. It accepts documents from an upstream pipeline stage, dispatches media reading tasks to a ThreadPoolExecutor, and yields processed documents as they complete. The abstract read_media_record method must be implemented by subclasses to define how individual media records are read from storage.
The threading model is carefully designed to balance throughput with memory usage. The executor maintains a bounded number of in-flight futures (capped at 2 * workers) to prevent unbounded memory growth. A min-heap (heapq) is used to buffer completed tasks and optionally re-order them to match the original input sequence when preserve_order is enabled. When ordering is not required, documents are yielded as soon as their futures complete, maximizing throughput.
Each document's media items are processed within a single task via _read_media_record_wrapper, which iterates over all media attached to a document, calls the abstract read_media_record for each, and updates media statistics. Thread-local storage (threading.local) is made available to subclasses for maintaining per-thread state such as file handles.
Usage
Use BinaryReaderThreaded as the base class when building media readers that need to fetch binary content from storage backends (WARC archives, compressed files, object stores). Configure workers based on available I/O parallelism and set preserve_order to True when downstream processing depends on document ordering.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/media/media_readers/base.py
- Lines: 1-82
Signature
class BinaryReaderThreaded(PipelineStep):
name = "📒⚡ - Binary Media Reader (Fast/Threaded)"
type = "Media Reader"
def __init__(
self,
data_folder: DataFolderLike,
workers: int = 1,
preserve_order: bool = False,
):
...
def _read_media_record_wrapper(self, document: Document, task_index: int):
...
@abstractmethod
def read_media_record(self, media: Media) -> bytes | None:
...
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1):
...
Import
from datatrove.pipeline.media.media_readers.base import BinaryReaderThreaded
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data_folder | DataFolderLike | Yes | Path, tuple, or DataFolder object pointing to media storage |
| workers | int | No | Number of worker threads for concurrent reading (default: 1) |
| preserve_order | bool | No | Whether to yield documents in original input order (default: False) |
| data | DocumentsPipeline | Yes (via run) | Generator of Document objects whose media items need reading |
Outputs
| Name | Type | Description |
|---|---|---|
| documents | DocumentsPipeline | Generator of Document objects with media_bytes populated from storage |
Usage Examples
Basic Usage
from datatrove.pipeline.media.media_readers.base import BinaryReaderThreaded
from datatrove.data import Media
class MyMediaReader(BinaryReaderThreaded):
"""Custom media reader that reads raw files."""
def read_media_record(self, media: Media) -> bytes | None:
if media.path is None:
return None
with self.data_folder.open(media.path, "rb") as f:
return f.read()
# Use with 4 worker threads, preserving document order
reader = MyMediaReader(
data_folder="s3://my-bucket/media/",
workers=4,
preserve_order=True,
)