Implementation:Huggingface Datatrove DocumentTokenizerMerger
| Sources | Domains | Last Updated |
|---|---|---|
| Huggingface Datatrove | Tokenization, Training_Data | 2026-02-14 |
Overview
Pipeline step that merges and shuffles distributed tokenized .ds files into consolidated training-ready binary files with cross-file document interleaving.
Description
DocumentTokenizerMerger extends PipelineStep and operates as a single-worker consolidation stage. It discovers all .ds, .ds.index, and optionally .ds.loss files in the input folder, loads their document boundary arrays, generates a global shuffled ordering using numpy.random.default_rng, and then copies token bytes from input files into sequentially numbered output files (e.g., 000_filename.ds, 001_filename.ds).
The merger uses lazy file reading via get_data_reader generators that yield document bytes on demand, avoiding loading all token data into memory. Document boundaries from .ds.index files are loaded as numpy.uint64 arrays. When shuffle_chunk_size is set, the merger operates on fixed-size token chunks instead of individual documents, and writes the internal document boundaries within each chunk to the output index.
The step asserts world_size == 1 since merging is inherently a sequential global operation.
Usage
Run after distributed DocumentTokenizer to produce final training files. Use a local filesystem for best performance, as the random-access read pattern is slow on remote filesystems.
Code Reference
Source Location: Repository: huggingface/datatrove, File: src/datatrove/pipeline/tokens/merger.py (L15-205)
Signature:
class DocumentTokenizerMerger(PipelineStep):
def __init__(
self,
input_folder: DataFolderLike,
output_folder: DataFolderLike,
save_filename: str,
max_tokens_per_file: int = 100e9,
max_tokens: int = -1,
shuffle: bool = True,
shuffle_chunk_size: int = None,
upload_block_size: int = 20 * 2**20,
seed: int = None,
save_loss_metadata: bool = False,
save_final_metadata: bool = True,
progress: bool = True,
):
Import:
from datatrove.pipeline.tokens import DocumentTokenizerMerger
I/O Contract
Inputs:
| Parameter | Type | Required | Description |
|---|---|---|---|
| input_folder | DataFolderLike | Yes | Folder containing .ds and .ds.index files from DocumentTokenizer |
| output_folder | DataFolderLike | Yes | Folder where merged output files are written |
| save_filename | str | Yes | Base filename for output files (e.g., "merged" produces 000_merged.ds) |
| max_tokens_per_file | int | No | Max tokens per output file before splitting (default 100 billion) |
| max_tokens | int | No | Total token limit; -1 for unlimited (default -1) |
| shuffle | bool | No | Shuffle document order across files (default True) |
| shuffle_chunk_size | int or None | No | Shuffle at chunk level instead of document level (default None) |
| upload_block_size | int | No | fsspec upload block size for remote storage (default 20 MB) |
| seed | int or None | No | Random seed for shuffling (default None) |
| save_loss_metadata | bool | No | Merge and save .ds.loss files (default False) |
| save_final_metadata | bool | No | Save metadata file with tokenizer name and token count (default True) |
| progress | bool | No | Show progress bar during merging (default True) |
Outputs:
- Numbered .ds files -- NNN_filename.ds with consolidated token data
- .ds.index files -- document boundary positions for each output file
- .ds.loss files (optional) -- merged per-token loss masks
- .ds.metadata files (optional) -- tokenizer name and total token count
Usage Examples
Example 1 -- Basic merging with shuffling:
from datatrove.pipeline.tokens import DocumentTokenizerMerger
merger = DocumentTokenizerMerger(
input_folder="/data/tokens/",
output_folder="/data/merged_tokens/",
save_filename="train",
shuffle=True,
seed=42,
)
Example 2 -- Merging with token limit and chunk shuffling:
from datatrove.pipeline.tokens import DocumentTokenizerMerger
merger = DocumentTokenizerMerger(
input_folder="s3://my-bucket/tokens/",
output_folder="/data/merged/",
save_filename="train",
max_tokens_per_file=10_000_000_000,
max_tokens=100_000_000_000,
shuffle_chunk_size=2048 * 8192,
seed=42,
)