Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove BaseReader

From Leeroopedia
Revision as of 13:01, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Huggingface_Datatrove_BaseReader.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)
Knowledge Sources
Domains Data Ingestion, Pipeline Architecture
Last Updated 2026-02-14 17:00 GMT

Overview

BaseReader and BaseDiskReader are abstract base classes that define the framework for reading data from various sources and creating Document objects as the entry point of Datatrove processing pipelines.

Description

The BaseReader class extends PipelineStep to provide the foundational interface for all data readers in Datatrove. It supports configurable limit and skip parameters for controlling how many documents are read, a customizable adapter function for transforming raw data dictionaries into Document format, and default_metadata that is merged into every document's metadata. The default adapter extracts text, id, media, and metadata fields from raw data, with remaining fields automatically placed into metadata.

The get_document_from_dict method is the core document creation utility. It applies the adapter to raw data, validates that the resulting text is non-empty (logging a warning with available keys on the first empty document), instantiates Media objects from media data, creates a Document, and merges default metadata. This method encapsulates the complete raw-to-Document transformation.

BaseDiskReader extends BaseReader with filesystem-specific functionality through fsspec integration. It adds support for data_folder (local or remote paths), paths_file (explicit file listings), recursive directory scanning, glob_pattern filtering, shuffle_files for randomizing read order, and progress bars for both files and documents. Its run method handles shard computation -- dividing available files across workers -- and delegates per-file reading to the abstract read_file method. The read_files_shard method manages the sequential reading of files within a shard with document counting, skipping, and limiting.

Usage

Use BaseReader when implementing readers for non-filesystem data sources (APIs, databases, custom formats). Use BaseDiskReader when implementing readers for file-based data stored locally or on remote storage systems like S3, GCS, or HDFS. In both cases, subclasses only need to implement the abstract run or read_file method.

Code Reference

Source Location

Signature

class BaseReader(PipelineStep):
    type = "📖 - READER"

    def __init__(
        self,
        limit: int = -1,
        skip: int = 0,
        adapter: Callable = None,
        text_key: str = "text",
        id_key: str = "id",
        default_metadata: dict = None,
    ):
        ...

    def _default_adapter(self, data: dict, path: str, id_in_file: int | str):
        ...

    def get_document_from_dict(self, data: dict, source_file: str, id_in_file: int | str):
        ...

    @abstractmethod
    def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        ...


class BaseDiskReader(BaseReader):
    type = "📖 - READER"

    def __init__(
        self,
        data_folder: DataFolderLike,
        paths_file: DataFileLike | None = None,
        limit: int = -1,
        skip: int = 0,
        file_progress: bool = False,
        doc_progress: bool = False,
        adapter: Callable = None,
        text_key: str = "text",
        id_key: str = "id",
        default_metadata: dict = None,
        recursive: bool = True,
        glob_pattern: str | None = None,
        shuffle_files: bool = False,
        add_file_path: bool = True,
    ):
        ...

    @abstractmethod
    def read_file(self, filepath: str) -> DocumentsPipeline:
        ...

    def read_files_shard(self, shard: list[str]) -> DocumentsPipeline:
        ...

    def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
        ...

Import

from datatrove.pipeline.readers.base import BaseReader, BaseDiskReader

I/O Contract

Inputs

Name Type Required Description
limit int No Maximum number of documents to read; -1 for unlimited (default: -1)
skip int No Number of initial documents to skip (default: 0)
adapter Callable No Custom function to transform raw data dicts to Document format
text_key str No Key for the text field in raw data (default: "text")
id_key str No Key for the id field in raw data (default: "id")
default_metadata dict No Metadata to merge into every document
data_folder DataFolderLike Yes (BaseDiskReader) Path, tuple, or DataFolder for the data source
paths_file DataFileLike No (BaseDiskReader) File containing one path per line to read
recursive bool No (BaseDiskReader) Search directories recursively (default: True)
glob_pattern str No (BaseDiskReader) Pattern for filtering files (default: None)
shuffle_files bool No (BaseDiskReader) Randomize file order within the shard (default: False)
file_progress bool No (BaseDiskReader) Show file-level progress bar (default: False)
doc_progress bool No (BaseDiskReader) Show document-level progress bar (default: False)
add_file_path bool No (BaseDiskReader) Add source file path to metadata (default: True)

Outputs

Name Type Description
documents DocumentsPipeline Generator of Document objects created from the data source

Usage Examples

Basic Usage

from datatrove.pipeline.readers.base import BaseDiskReader
from datatrove.data import DocumentsPipeline

class MyFormatReader(BaseDiskReader):
    """Reader for a custom file format."""
    name = "Custom Format Reader"

    def read_file(self, filepath: str) -> DocumentsPipeline:
        with self.data_folder.open(filepath, "r") as f:
            for i, line in enumerate(f):
                data = parse_custom_format(line)
                document = self.get_document_from_dict(data, filepath, i)
                if document:
                    yield document

# Read from an S3 bucket with a 1000 document limit
reader = MyFormatReader(
    data_folder="s3://my-bucket/data/",
    limit=1000,
    glob_pattern="*.custom",
    file_progress=True,
)

Custom Adapter

from datatrove.pipeline.readers.base import BaseDiskReader

def my_adapter(self, data: dict, path: str, id_in_file: int):
    return {
        "text": data["content"],
        "id": f"{path}:{data['uid']}",
        "media": [],
        "metadata": {"source": data.get("source", "unknown")},
    }

reader = MyFormatReader(
    data_folder="/data/raw/",
    adapter=my_adapter,
    default_metadata={"pipeline_version": "2.0"},
)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment