Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Huggingface Datatrove PipelineStats

From Leeroopedia
Knowledge Sources
Domains Monitoring, Data Processing
Last Updated 2026-02-14 17:00 GMT

Overview

The pipeline statistics module provides a comprehensive statistics tracking system (MetricStats, TimingStats, Stats, PipelineStats, MetricStatsDict) for monitoring pipeline execution, including metric aggregation, timing measurement, and human-readable reporting.

Description

MetricStats is a dataclass that tracks running statistics for a single metric using Welford's online algorithm. It maintains total, count, mean, min, max, and running variance, and supports parallel merging of stats from multiple workers using the parallel algorithm for combining means and variances. Each metric has a configurable unit (default: "doc").

TimingStats extends MetricStats as a context manager for measuring execution time. It adds global tracking across tasks (global mean, min, max, standard deviation, and task count), enabling reporting of both per-item timing and cross-task timing statistics. The context manager interface (with stats.time_stats:) automatically records elapsed wall-clock time using time.perf_counter().

MetricStatsDict is a defaultdict-based container that automatically creates MetricStats instances for new keys. It supports addition (merging stats from multiple workers), top-k filtering by total value, and JSON serialization/deserialization.

Stats groups a TimingStats instance and a MetricStatsDict for a single pipeline block (step). It provides per-block runtime reporting and stat display, along with JSON serialization.

PipelineStats aggregates Stats objects across all pipeline blocks. It computes total pipeline runtime, propagated standard deviation across tasks, and generates a comprehensive human-readable report using the humanize library for formatting time durations. It supports JSON serialization for saving to disk and loading from saved stats files.

Usage

These classes are used internally by every pipeline step to track performance metrics, document counts, and execution timing. They are also used by executors to aggregate and report statistics across all ranks and pipeline stages.

Code Reference

Source Location

Signature

@dataclass
class MetricStats:
    total: float = 0
    n: int = 0
    mean: float = 0.0
    min: float = float("inf")
    max: float = float("-inf")
    _running_variance: float = 0.0
    unit: str = "doc"

    def update(self, x: float, unit: str = None):
    def __add__(self, other):

@dataclass
class TimingStats(MetricStats):
    global_mean: float = 0
    n_tasks: int = 1
    global_min: float = float("inf")
    global_max: float = float("-inf")
    global_std_dev: float = 0.0

    def __enter__(self):
    def __exit__(self, exc_type, exc_val, exc_tb):

class Stats:
    def __init__(self, name: str):

class PipelineStats:
    def __init__(self, stats: list[Stats | Callable] = None):
    def get_repr(self, text=None):
    def save_to_disk(self, file: IO):
    def from_json(cls, data):

Import

from datatrove.utils.stats import MetricStats, TimingStats, Stats, PipelineStats, MetricStatsDict

I/O Contract

Inputs

Name Type Required Description
x float Yes (update) Value to record for the metric
unit str No Unit label for the metric (default: "doc")
name str Yes (Stats) Name of the pipeline block
stats list[Stats] No (PipelineStats) List of per-block Stats objects
file IO Yes (save_to_disk) File-like object to write JSON stats to

Outputs

Name Type Description
variance float Computed variance using Welford's algorithm
standard_deviation float Square root of variance
total_time float Sum of global_mean across all pipeline blocks
get_repr str Human-readable report string with formatted timing and metrics
to_json str JSON string representation of all stats

Usage Examples

Basic Usage

from datatrove.utils.stats import MetricStats, TimingStats, PipelineStats

# Track a metric
metric = MetricStats()
metric.update(42.0)
metric.update(58.0)
print(f"Mean: {metric.mean}, Total: {metric.total}")

# Time a block of code
timing = TimingStats()
with timing:
    # ... expensive operation ...
    pass
print(f"Elapsed: {timing.total}s")

# Merge stats from multiple workers
stats_worker1 = MetricStats(total=100, n=10, mean=10.0, min=5.0, max=15.0)
stats_worker2 = MetricStats(total=200, n=20, mean=10.0, min=3.0, max=18.0)
merged = stats_worker1 + stats_worker2
print(f"Merged total: {merged.total}, n: {merged.n}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment