Implementation:Huggingface Datatrove ZstdMediaReader
| Knowledge Sources | |
|---|---|
| Domains | Media Processing, Data Compression |
| Last Updated | 2026-02-14 17:00 GMT |
Overview
ZstdReader is a threaded media reader that extracts binary media content from Zstandard-compressed files using offset-based random access with support for both length-prefixed and non-length-prefixed storage formats.
Description
The ZstdReader class extends BinaryReaderThreaded to implement zstd-specific decompression and reading. It supports two storage format versions: v1 (without length prefix) reads a byte-length prefix at the offset position to determine the compressed content size, then decompresses the following data; v2 (with length prefix) uses the media's length attribute directly to read a precise number of compressed bytes. The version is selected automatically based on whether the media object has a length value.
The module also includes a LimitedReader helper class that wraps a file-like object and enforces a maximum byte read limit. This is essential for the v2 format because the zstd stream reader needs to be constrained to read only the bytes belonging to the current media record, preventing it from reading into adjacent records in the same file.
Thread-local storage manages per-thread state including the current file pointer, current filename, and a ZstdDecompressor instance configured for magicless format. The decompressor is reused across reads within the same thread because each compressed record is terminated with a frame flush, which resets the decompressor state naturally. File pointers are also reused when consecutive reads target the same file.
The reader is configured with a block_size parameter (default 20MB) that controls the file read buffer size, and an offset_byte_size parameter (default 4 bytes) that determines the width of the length prefix in v1 format.
Usage
Use ZstdReader when your media data has been stored using the complementary ZstdWriter format. It is designed for high-throughput reading of individually compressed media records stored sequentially in large binary files, typical of media processing pipelines that need to handle millions of images or other binary assets.
Code Reference
Source Location
- Repository: Huggingface_Datatrove
- File: src/datatrove/pipeline/media/media_readers/zstd.py
- Lines: 1-168
Signature
class LimitedReader(IO):
def __init__(self, fp: IO, max_bytes: int):
...
def read(self, size: int = -1) -> bytes:
...
def close(self):
...
def seek(self, offset: int, whence: int = 0):
...
def tell(self):
...
class ZstdReader(BinaryReaderThreaded):
name = "📒⚡ Zstd Media Reader (Streaming/Threaded)"
type = "Media Reader"
def __init__(
self,
data_folder: DataFolderLike,
block_size: int = 20 * 1024 * 1024,
workers: int = 1,
offset_byte_size: int = 4,
preserve_order: bool = False,
):
...
def read_media_record(self, media: Media) -> bytes | None:
...
Import
from datatrove.pipeline.media.media_readers.zstd import ZstdReader, LimitedReader
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| data_folder | DataFolderLike | Yes | Path to the folder containing zstd-compressed media files |
| block_size | int | No | File read buffer size in bytes (default: 20MB) |
| workers | int | No | Number of worker threads (default: 1) |
| offset_byte_size | int | No | Width in bytes of the length prefix in v1 format (default: 4) |
| preserve_order | bool | No | Whether to preserve document order (default: False) |
| media.path | str | Yes | Path to the zstd file containing this media record |
| media.offset | int | Yes | Byte offset of the compressed record within the file |
| media.length | int | No | Compressed size in bytes (if present, uses v2 read mode; if None, uses v1 with length prefix) |
Outputs
| Name | Type | Description |
|---|---|---|
| content | bytes or None | The decompressed media bytes, or None if offset/path is missing |
Usage Examples
Basic Usage
from datatrove.pipeline.media.media_readers.zstd import ZstdReader
# Read zstd-compressed media with 4 worker threads
zstd_reader = ZstdReader(
data_folder="/data/compressed-media/",
block_size=20 * 1024 * 1024, # 20MB buffer
workers=4,
preserve_order=True,
)
# Use in a pipeline
pipeline = [
# ... document reader that populates media.path, media.offset, and optionally media.length ...
zstd_reader,
# ... downstream processing (filtering, writing, etc.) ...
]