Implementation:Huggingface Datatrove BaseReader
| Knowledge Sources | |
|---|---|
| Domains | Data Ingestion, Pipeline Architecture |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
BaseReader and BaseDiskReader are abstract base classes that define the framework for reading data from various sources and creating Document objects as the entry point of Datatrove processing pipelines.
Description
The BaseReader class extends PipelineStep to provide the foundational interface for all data readers in Datatrove. It supports configurable limit and skip parameters for controlling how many documents are read, a customizable adapter function for transforming raw data dictionaries into Document format, and default_metadata that is merged into every document's metadata. The default adapter extracts text, id, media, and metadata fields from raw data, with remaining fields automatically placed into metadata.
The get_document_from_dict method is the core document creation utility. It applies the adapter to raw data, validates that the resulting text is non-empty (logging a warning with available keys on the first empty document), instantiates Media objects from media data, creates a Document, and merges default metadata. This method encapsulates the complete raw-to-Document transformation.
BaseDiskReader extends BaseReader with filesystem-specific functionality through fsspec integration. It adds support for data_folder (local or remote paths), paths_file (explicit file listings), recursive directory scanning, glob_pattern filtering, shuffle_files for randomizing read order, and progress bars for both files and documents. Its run method handles shard computation -- dividing available files across workers -- and delegates per-file reading to the abstract read_file method. The read_files_shard method manages the sequential reading of files within a shard with document counting, skipping, and limiting.
Usage
Use BaseReader when implementing readers for non-filesystem data sources (APIs, databases, custom formats). Use BaseDiskReader when implementing readers for file-based data stored locally or on remote storage systems like S3, GCS, or HDFS. In both cases, subclasses only need to implement the abstract run or read_file method.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/readers/base.py
- Lines: 1-252
Signature
class BaseReader(PipelineStep):
type = "📖 - READER"
def __init__(
self,
limit: int = -1,
skip: int = 0,
adapter: Callable = None,
text_key: str = "text",
id_key: str = "id",
default_metadata: dict = None,
):
...
def _default_adapter(self, data: dict, path: str, id_in_file: int | str):
...
def get_document_from_dict(self, data: dict, source_file: str, id_in_file: int | str):
...
@abstractmethod
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
...
class BaseDiskReader(BaseReader):
type = "📖 - READER"
def __init__(
self,
data_folder: DataFolderLike,
paths_file: DataFileLike | None = None,
limit: int = -1,
skip: int = 0,
file_progress: bool = False,
doc_progress: bool = False,
adapter: Callable = None,
text_key: str = "text",
id_key: str = "id",
default_metadata: dict = None,
recursive: bool = True,
glob_pattern: str | None = None,
shuffle_files: bool = False,
add_file_path: bool = True,
):
...
@abstractmethod
def read_file(self, filepath: str) -> DocumentsPipeline:
...
def read_files_shard(self, shard: list[str]) -> DocumentsPipeline:
...
def run(self, data: DocumentsPipeline = None, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
...
Import
from datatrove.pipeline.readers.base import BaseReader, BaseDiskReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| limit | int | No | Maximum number of documents to read; -1 for unlimited (default: -1) |
| skip | int | No | Number of initial documents to skip (default: 0) |
| adapter | Callable | No | Custom function to transform raw data dicts to Document format |
| text_key | str | No | Key for the text field in raw data (default: "text") |
| id_key | str | No | Key for the id field in raw data (default: "id") |
| default_metadata | dict | No | Metadata to merge into every document |
| data_folder | DataFolderLike | Yes (BaseDiskReader) | Path, tuple, or DataFolder for the data source |
| paths_file | DataFileLike | No (BaseDiskReader) | File containing one path per line to read |
| recursive | bool | No (BaseDiskReader) | Search directories recursively (default: True) |
| glob_pattern | str | No (BaseDiskReader) | Pattern for filtering files (default: None) |
| shuffle_files | bool | No (BaseDiskReader) | Randomize file order within the shard (default: False) |
| file_progress | bool | No (BaseDiskReader) | Show file-level progress bar (default: False) |
| doc_progress | bool | No (BaseDiskReader) | Show document-level progress bar (default: False) |
| add_file_path | bool | No (BaseDiskReader) | Add source file path to metadata (default: True) |
Outputs
| Name | Type | Description |
|---|---|---|
| documents | DocumentsPipeline | Generator of Document objects created from the data source |
Usage Examples
Basic Usage
from datatrove.pipeline.readers.base import BaseDiskReader
from datatrove.data import DocumentsPipeline
class MyFormatReader(BaseDiskReader):
"""Reader for a custom file format."""
name = "Custom Format Reader"
def read_file(self, filepath: str) -> DocumentsPipeline:
with self.data_folder.open(filepath, "r") as f:
for i, line in enumerate(f):
data = parse_custom_format(line)
document = self.get_document_from_dict(data, filepath, i)
if document:
yield document
# Read from an S3 bucket with a 1000 document limit
reader = MyFormatReader(
data_folder="s3://my-bucket/data/",
limit=1000,
glob_pattern="*.custom",
file_progress=True,
)
Custom Adapter
from datatrove.pipeline.readers.base import BaseDiskReader
def my_adapter(self, data: dict, path: str, id_in_file: int):
return {
"text": data["content"],
"id": f"{path}:{data['uid']}",
"media": [],
"metadata": {"source": data.get("source", "unknown")},
}
reader = MyFormatReader(
data_folder="/data/raw/",
adapter=my_adapter,
default_metadata={"pipeline_version": "2.0"},
)