Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator TaskPerfUtils

From Leeroopedia
Knowledge Sources
Domains Data Curation, Pipeline Framework, Observability
Last Updated 2026-02-14 00:00 GMT

Overview

The TaskPerfUtils class provides static utility methods for aggregating and analyzing per-stage performance metrics collected from pipeline tasks, enabling post-run performance analysis and optimization.

Description

TaskPerfUtils is a utility class containing four static methods that work together to normalize, collect, and aggregate performance data from pipeline tasks.

_normalize_pipeline_tasks: Accepts various input formats -- a plain list of Task objects, a WorkflowRunResult instance, or a Mapping of pipeline names to task lists -- and normalizes them into a consistent dict[str, list[Task]] structure. This handles nested formats like {"pipeline_tasks": {...}} and gracefully handles None inputs.

collect_stage_metrics: Iterates through all tasks across all pipelines, extracting per-stage performance data from each task's _stage_perf list. For each StagePerfStats entry, it collects both built-in metrics and custom stats (flattened via perf.items()), grouping values by stage name and metric name. The final output converts all collected lists to numpy arrays for efficient numerical computation. The return type is dict[str, dict[str, np.ndarray[float]]] (stage_name -> metric_name -> values array).

aggregate_task_metrics: Builds on collect_stage_metrics to compute three summary statistics (sum, mean, std) for each metric at each stage. It supports an optional prefix parameter and handles multi-pipeline scenarios by incorporating pipeline names into metric keys. Returns a flat dictionary of "{stage}_{metric}_{agg}" keys mapped to float values.

get_aggregated_stage_stat: Retrieves a specific summed statistic for all stages whose names start with a given prefix. This is useful for querying aggregate metrics like total "process_time" or "num_items_processed" across related stages.

Usage

Use TaskPerfUtils after pipeline execution to analyze performance characteristics such as per-stage processing times, throughput, and custom metrics. It is the primary observability tool for understanding bottlenecks and optimizing data processing workflows.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/tasks/utils.py
  • Lines: 1-165

Signature

class TaskPerfUtils:
    @staticmethod
    def _normalize_pipeline_tasks(
        tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
    ) -> dict[str, list[Task]]: ...

    @staticmethod
    def collect_stage_metrics(
        tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
    ) -> dict[str, dict[str, np.ndarray[float]]]: ...

    @staticmethod
    def aggregate_task_metrics(
        tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
        prefix: str | None = None,
    ) -> dict[str, Any]: ...

    @staticmethod
    def get_aggregated_stage_stat(
        tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
        stage_prefix: str,
        stat: str,
    ) -> float: ...

Import

from nemo_curator.tasks.utils import TaskPerfUtils

I/O Contract

collect_stage_metrics

Name Type Required Description
tasks list[Task], WorkflowRunResult, Mapping, or None Yes Tasks from which to collect metrics

Returns dict[str, dict[str, np.ndarray[float]]] mapping stage names to metric names to arrays of float values.

aggregate_task_metrics

Name Type Required Description
tasks list[Task], WorkflowRunResult, Mapping, or None Yes Tasks from which to aggregate metrics
prefix str or None No Optional prefix for metric keys

Returns dict[str, Any] with keys in the format "{stage}_{metric}_{sum|mean|std}".

get_aggregated_stage_stat

Name Type Required Description
tasks list[Task], WorkflowRunResult, Mapping, or None Yes Tasks from which to retrieve the stat
stage_prefix str Yes Prefix to match stage names against
stat str Yes The metric name to aggregate (e.g., "process_time", "num_items_processed")

Returns a float value representing the summed stat across matching stages.

Usage Examples

Collecting Raw Stage Metrics

from nemo_curator.tasks.utils import TaskPerfUtils

# After pipeline execution, collect per-stage metrics
stage_metrics = TaskPerfUtils.collect_stage_metrics(completed_tasks)

for stage_name, metrics in stage_metrics.items():
    for metric_name, values in metrics.items():
        print(f"{stage_name}/{metric_name}: mean={values.mean():.3f}, count={len(values)}")

Aggregating Task Metrics

from nemo_curator.tasks.utils import TaskPerfUtils

# Compute sum/mean/std for all metrics
aggregated = TaskPerfUtils.aggregate_task_metrics(completed_tasks)

# Access specific aggregated metrics
for key, value in sorted(aggregated.items()):
    print(f"{key}: {value:.4f}")

Querying a Specific Stage Stat

from nemo_curator.tasks.utils import TaskPerfUtils

# Get total processing time for all stages starting with "filter"
total_filter_time = TaskPerfUtils.get_aggregated_stage_stat(
    completed_tasks,
    stage_prefix="filter",
    stat="process_time",
)
print(f"Total filter processing time: {total_filter_time:.2f}s")

Working with WorkflowRunResult

from nemo_curator.tasks.utils import TaskPerfUtils

# TaskPerfUtils accepts WorkflowRunResult directly
result = workflow.run(input_data)
metrics = TaskPerfUtils.aggregate_task_metrics(result, prefix="my_workflow")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment