Implementation:Huggingface Datatrove BaseMediaWriter
| Knowledge Sources | |
|---|---|
| Domains | Media Processing, Data Persistence |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
BaseMediaWriter is an abstract base class that provides the framework for writing binary media content to disk with support for templated filenames, automatic file splitting at size thresholds, and pipeline statistics tracking.
Description
The BaseMediaWriter class extends both PipelineStep and ABC to define the contract for media writing operations. It manages output files through a configurable output_folder and a Template-based output_filename that supports variable substitution (e.g., ${rank} for worker rank). The class uses a file_id_counter to track file indices when splitting output across multiple files.
A key feature is automatic file splitting controlled by the max_file_size parameter. When writing in binary mode, the writer monitors the current file size and switches to a new file (with an incremented numeric prefix like 001_, 002_) once the threshold is exceeded. The _on_file_switch callback properly closes the old file when switching occurs. This mechanism prevents individual output files from growing too large, which is important for storage systems with size limits or for downstream parallel processing.
The write method orchestrates the full write lifecycle: it computes the output filename, handles file splitting logic, delegates actual writing to the abstract _write method, and updates statistics. The run method iterates over all documents in the pipeline, writes each media item that has non-None bytes, and updates the media's path, offset, and length attributes with the storage location of the written data. The class also implements the context manager protocol for proper resource cleanup.
Usage
Use BaseMediaWriter as the base class when implementing writers for specific storage formats (e.g., zstd-compressed, raw binary). Subclasses only need to implement _write to define format-specific serialization logic. The base class handles file management, splitting, statistics, and pipeline integration.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/media/media_writers/base.py
- Lines: 1-160
Signature
class BaseMediaWriter(PipelineStep, ABC):
default_output_filename: str = None
type = "đ˝ - MEDIA WRITER"
def __init__(
self,
output_folder: DataFolderLike,
output_filename: str = None,
mode: str = "wb",
max_file_size: int = -1,
):
...
def _get_output_filename(self, media: Media, rank: int | str = 0, **kwargs) -> str:
...
@abstractmethod
def _write(self, media: Media, file_handler: IO, filename: str) -> tuple[str, int, int]:
...
def _on_file_switch(self, _original_name, old_filename, _new_filename):
...
def write(self, media: Media, rank: int = 0, **kwargs):
...
def run(self, data: DocumentsPipeline, rank: int = 0, world_size: int = 1) -> DocumentsPipeline:
...
Import
from datatrove.pipeline.media.media_writers.base import BaseMediaWriter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| output_folder | DataFolderLike | Yes | Destination path, tuple, or DataFolder for saving media |
| output_filename | str | No | Template string for output filenames with placeholders like ${rank} (default: class-specific default) |
| mode | str | No | File open mode (default: "wb") |
| max_file_size | int | No | Maximum file size in bytes before splitting; -1 for unlimited (default: -1) |
| data | DocumentsPipeline | Yes (via run) | Generator of Document objects whose media items will be written |
Outputs
| Name | Type | Description |
|---|---|---|
| documents | DocumentsPipeline | Generator of Document objects with updated media.path, media.offset, and media.length attributes |
| write result | tuple[str, int, int] | Tuple of (filename, offset, compressed_size) returned by the write method |
Usage Examples
Basic Usage
from datatrove.pipeline.media.media_writers.base import BaseMediaWriter
from datatrove.data import Media
from typing import IO
class RawMediaWriter(BaseMediaWriter):
"""Simple writer that stores raw bytes without compression."""
default_output_filename = "${rank}.bin"
def _write(self, media: Media, file_handler: IO, filename: str) -> tuple[str, int, int]:
offset = file_handler.tell()
file_handler.write(media.media_bytes)
size = len(media.media_bytes)
return filename, offset, size
# Write media to output folder, splitting files at 5GB
writer = RawMediaWriter(
output_folder="/output/media/",
max_file_size=5 * 2**30,
)