Principle:Huggingface Datatrove Statistics Merging
Overview
Merging distributed per-shard statistics into consolidated metric files using online aggregation.
Description
Statistics merging is the final stage of the distributed statistics pipeline. When computing dataset statistics across multiple parallel workers (ranks), each worker produces partial statistics files for its shard of the data. The merging step combines these partial results into a single consolidated result per metric per group, without needing access to the original raw data.
The merging process handles the following:
- Combining partial statistics: Each partial file contains a
MetricStatsDict-- a dictionary mapping keys (e.g., domain names for the fqdn group, or "summary" for the summary group) toMetricStatsobjects. EachMetricStatstracks the count (n), mean, running variance, min, and max for a metric. The merger combines these by iterating over all partial files and accumulating them using theMetricStats.__add__operator.
- Parallel variance computation: The key mathematical operation is combining two (n, mean, variance) tuples into one. This uses the parallel algorithm for calculating variance, which computes the combined mean as a weighted average and the combined variance using the formula that accounts for the difference between the group means.
- Top-k truncation: For high-cardinality groups such as
fqdn(fully qualified domain name) andsuffix(top-level domain suffix), the number of unique keys can be very large. The merger supports truncating to the top-k keys by document count, reducing memory usage and output file size. This truncation is configured viaTopKConfig.
- One-file-per-metric output: The merger produces one consolidated
metric.jsonfile per stat per group, making downstream analysis straightforward.
Usage
Statistics merging is run as the final stage of the statistics pipeline, after all distributed computation workers have completed. It is typically configured as a separate pipeline step that reads the output directory of the stats computation step.
The typical two-stage pipeline is:
- Stage 1 (distributed): Run a stats computation step (e.g.,
WordStats,LineStats,DocStats) across N parallel workers. Each worker writes{group}/{stat_name}/{rank:05d}.json. - Stage 2 (merging): Run
StatsMergerto combine the N partial files per stat into a single{group}/{stat_name}/metric.json.
The merging step can itself be distributed: each rank processes a subset of stat folders, enabling parallel merging across many metrics.
Theoretical Basis
The mathematical foundation for merging partial statistics is the parallel algorithm for calculating variance (also known as the "combine" step in Welford's online algorithm for parallel computation).
Given two groups of observations with statistics (n_1, mean_1, M2_1) and (n_2, mean_2, M2_2), where M2 is the sum of squared deviations from the mean (running variance), the combined statistics are:
n = n_1 + n_2
mean = (n_1 * mean_1 + n_2 * mean_2) / n
delta = mean_1 - mean_2
M2 = M2_1 + M2_2 + delta^2 * n_1 * n_2 / n
This formula is exact (not an approximation) and numerically stable. It enables combining arbitrarily many partial statistics without loss of precision, regardless of the order in which they are combined. The MetricStats.__add__ method implements this formula directly.
The top-k truncation operates by selecting the k keys with the largest document count (MetricStats.n) using a heap-based selection algorithm (heapq.nlargest), which runs in O(m log k) time where m is the total number of keys. In distributed settings, each worker may produce slightly different top-k sets due to data distribution, so it is recommended to use a slightly larger k during the distributed computation phase (e.g., 0.8x margin) to account for this.