Implementation:Huggingface Datatrove StatsMerger
Overview
StatsMerger is a pipeline step that merges partial per-rank statistics files into consolidated metric files. It extends PipelineStep directly (not BaseStats) and operates on the output directory structure produced by stats computation steps such as WordStats, LineStats, and DocStats.
Signature
class StatsMerger(PipelineStep):
def __init__(
self,
input_folder: DataFolderLike,
output_folder: DataFolderLike,
remove_input: bool = False,
top_k_config: TopKConfig = DEFAULT_TOP_K_CONFIG,
) -> None:
Import
from datatrove.pipeline.stats import StatsMerger
Parameters
| Parameter | Type | Default | Description |
|---|---|---|---|
input_folder |
DataFolderLike |
(required) | Path or data folder containing the per-rank statistics files produced by a stats computation step. Expected structure: {group}/{stat_name}/{rank:05d}.json.
|
output_folder |
DataFolderLike |
(required) | Path or data folder where merged metric files will be written. Output structure: {group}/{stat_name}/metric.json.
|
remove_input |
bool |
False |
Whether to delete the per-rank input files after merging. Useful for saving storage when intermediate files are no longer needed. |
top_k_config |
TopKConfig |
DEFAULT_TOP_K_CONFIG |
Configuration for top-k truncation during merging. Groups listed in top_k_config.top_k_groups (default: ["fqdn", "suffix"]) will have their keys truncated to the top top_k_config.top_k (default: 100,000) by document count.
|
I/O
Input: Per-rank JSON statistics files at {group}/{stat_name}/{rank:05d}.json. These files are produced by stats computation steps (WordStats, LineStats, DocStats, etc.) and each contain a serialized MetricStatsDict.
Output: Merged {group}/{stat_name}/metric.json files. Each merged file contains a single MetricStatsDict representing the combined statistics across all ranks.
Pipeline passthrough: If data is provided to the run method, documents are yielded downstream unchanged after merging completes. This allows StatsMerger to be placed in a pipeline without disrupting document flow.
Key Implementation Details
- Folder discovery: The merger uses
get_leaf_non_empty_folders()to discover all leaf directories containing statistics files. A leaf directory is one that has files but no subdirectories. This avoids hardcoding stat names and automatically adapts to whatever stats were computed.
- Distributed merging: The list of stat folders is sharded across ranks using
folders_shard = folders[rank::world_size], enabling parallel merging when many stat/group combinations exist.
- File pattern matching: Input files are discovered using the glob pattern
{folder}/[0-9][0-9][0-9][0-9][0-9].json, matching the five-digit zero-padded rank format.
- Incremental accumulation: Each partial file is loaded and accumulated into a single
MetricStatsDictusing in-place addition (stat[key] += MetricStats.from_dict(item)). This avoids creating intermediate dictionaries and keeps memory usage proportional to the number of unique keys, not the number of input files.
- Top-k truncation: After accumulating all partial files for a stat folder, if the group is in
top_k_config.top_k_groups, the keys are truncated to the top-k by document count usingheapq.nlargest. The group name is extracted from the parent directory name viaPath(folder).parent.name.
- Input cleanup: When
remove_input=True, each per-rank JSON file is deleted after its contents have been merged, reclaiming storage.
- Output naming: The merged output file is always named
metric.json(defined by the module-level constantSTATS_MERGED_NAME).
Example Usage
from datatrove.pipeline.stats import StatsMerger
# Basic merging of stats output
merger = StatsMerger(
input_folder="s3://my-bucket/stats/word_stats",
output_folder="s3://my-bucket/stats/word_stats_merged",
)
# Merge with input cleanup and custom top-k
from datatrove.pipeline.stats import TopKConfig
merger = StatsMerger(
input_folder="/data/stats/all_stats",
output_folder="/data/stats/all_stats_merged",
remove_input=True,
top_k_config=TopKConfig(
top_k_groups=["fqdn", "suffix"],
top_k=50_000,
),
)