Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove BaseMediaReader

From Leeroopedia
Knowledge Sources
Domains Media Processing, Concurrent I/O
Last Updated 2026-02-14 17:00 GMT

Overview

BinaryReaderThreaded is an abstract base class for reading binary media content using a multi-threaded, heap-based approach that optionally preserves document ordering.

Description

The BinaryReaderThreaded class extends PipelineStep to provide a concurrent media reading framework. It accepts documents from an upstream pipeline stage, dispatches media reading tasks to a ThreadPoolExecutor, and yields processed documents as they complete. The abstract read_media_record method must be implemented by subclasses to define how individual media records are read from storage.

The threading model is carefully designed to balance throughput with memory usage. The executor maintains a bounded number of in-flight futures (capped at 2 * workers) to prevent unbounded memory growth. A min-heap (heapq) is used to buffer completed tasks and optionally re-order them to match the original input sequence when preserve_order is enabled. When ordering is not required, documents are yielded as soon as their futures complete, maximizing throughput.

Each document's media items are processed within a single task via _read_media_record_wrapper, which iterates over all media attached to a document, calls the abstract read_media_record for each, and updates media statistics. Thread-local storage (threading.local) is made available to subclasses for maintaining per-thread state such as file handles.

Usage

Use BinaryReaderThreaded as the base class when building media readers that need to fetch binary content from storage backends (WARC archives, compressed files, object stores). Configure workers based on available I/O parallelism and set preserve_order to True when downstream processing depends on document ordering.

Code Reference

Source Location

Signature

class BinaryReaderThreaded(PipelineStep):
    name = "📒⚡ - Binary Media Reader (Fast/Threaded)"
    type = "Media Reader"

    def __init__(
        self,
        data_folder: DataFolderLike,
        workers: int = 1,
        preserve_order: bool = False,
    ):
        ...

    def _read_media_record_wrapper(self, document: Document, task_index: int):
        ...

    @abstractmethod
    def read_media_record(self, media: Media) -> bytes | None:
        ...

    def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1):
        ...

Import

from datatrove.pipeline.media.media_readers.base import BinaryReaderThreaded

I/O Contract

Inputs

Name Type Required Description
data_folder DataFolderLike Yes Path, tuple, or DataFolder object pointing to media storage
workers int No Number of worker threads for concurrent reading (default: 1)
preserve_order bool No Whether to yield documents in original input order (default: False)
data DocumentsPipeline Yes (via run) Generator of Document objects whose media items need reading

Outputs

Name Type Description
documents DocumentsPipeline Generator of Document objects with media_bytes populated from storage

Usage Examples

Basic Usage

from datatrove.pipeline.media.media_readers.base import BinaryReaderThreaded
from datatrove.data import Media

class MyMediaReader(BinaryReaderThreaded):
    """Custom media reader that reads raw files."""

    def read_media_record(self, media: Media) -> bytes | None:
        if media.path is None:
            return None
        with self.data_folder.open(media.path, "rb") as f:
            return f.read()

# Use with 4 worker threads, preserving document order
reader = MyMediaReader(
    data_folder="s3://my-bucket/media/",
    workers=4,
    preserve_order=True,
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment