Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Huggingface Datatrove StatsMerger

From Leeroopedia

Template:Metadata

Overview

StatsMerger is a pipeline step that merges partial per-rank statistics files into consolidated metric files. It extends PipelineStep directly (not BaseStats) and operates on the output directory structure produced by stats computation steps such as WordStats, LineStats, and DocStats.

Signature

class StatsMerger(PipelineStep):
    def __init__(
        self,
        input_folder: DataFolderLike,
        output_folder: DataFolderLike,
        remove_input: bool = False,
        top_k_config: TopKConfig = DEFAULT_TOP_K_CONFIG,
    ) -> None:

Import

from datatrove.pipeline.stats import StatsMerger

Parameters

Parameter Type Default Description
input_folder DataFolderLike (required) Path or data folder containing the per-rank statistics files produced by a stats computation step. Expected structure: {group}/{stat_name}/{rank:05d}.json.
output_folder DataFolderLike (required) Path or data folder where merged metric files will be written. Output structure: {group}/{stat_name}/metric.json.
remove_input bool False Whether to delete the per-rank input files after merging. Useful for saving storage when intermediate files are no longer needed.
top_k_config TopKConfig DEFAULT_TOP_K_CONFIG Configuration for top-k truncation during merging. Groups listed in top_k_config.top_k_groups (default: ["fqdn", "suffix"]) will have their keys truncated to the top top_k_config.top_k (default: 100,000) by document count.

I/O

Input: Per-rank JSON statistics files at {group}/{stat_name}/{rank:05d}.json. These files are produced by stats computation steps (WordStats, LineStats, DocStats, etc.) and each contain a serialized MetricStatsDict.

Output: Merged {group}/{stat_name}/metric.json files. Each merged file contains a single MetricStatsDict representing the combined statistics across all ranks.

Pipeline passthrough: If data is provided to the run method, documents are yielded downstream unchanged after merging completes. This allows StatsMerger to be placed in a pipeline without disrupting document flow.

Key Implementation Details

  • Folder discovery: The merger uses get_leaf_non_empty_folders() to discover all leaf directories containing statistics files. A leaf directory is one that has files but no subdirectories. This avoids hardcoding stat names and automatically adapts to whatever stats were computed.
  • Distributed merging: The list of stat folders is sharded across ranks using folders_shard = folders[rank::world_size], enabling parallel merging when many stat/group combinations exist.
  • File pattern matching: Input files are discovered using the glob pattern {folder}/[0-9][0-9][0-9][0-9][0-9].json, matching the five-digit zero-padded rank format.
  • Incremental accumulation: Each partial file is loaded and accumulated into a single MetricStatsDict using in-place addition (stat[key] += MetricStats.from_dict(item)). This avoids creating intermediate dictionaries and keeps memory usage proportional to the number of unique keys, not the number of input files.
  • Top-k truncation: After accumulating all partial files for a stat folder, if the group is in top_k_config.top_k_groups, the keys are truncated to the top-k by document count using heapq.nlargest. The group name is extracted from the parent directory name via Path(folder).parent.name.
  • Input cleanup: When remove_input=True, each per-rank JSON file is deleted after its contents have been merged, reclaiming storage.
  • Output naming: The merged output file is always named metric.json (defined by the module-level constant STATS_MERGED_NAME).

Example Usage

from datatrove.pipeline.stats import StatsMerger

# Basic merging of stats output
merger = StatsMerger(
    input_folder="s3://my-bucket/stats/word_stats",
    output_folder="s3://my-bucket/stats/word_stats_merged",
)

# Merge with input cleanup and custom top-k
from datatrove.pipeline.stats import TopKConfig

merger = StatsMerger(
    input_folder="/data/stats/all_stats",
    output_folder="/data/stats/all_stats_merged",
    remove_input=True,
    top_k_config=TopKConfig(
        top_k_groups=["fqdn", "suffix"],
        top_k=50_000,
    ),
)

Related

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment