Implementation:NVIDIA NeMo Curator TaskPerfUtils
| Knowledge Sources | |
|---|---|
| Domains | Data Curation, Pipeline Framework, Observability |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
The TaskPerfUtils class provides static utility methods for aggregating and analyzing per-stage performance metrics collected from pipeline tasks, enabling post-run performance analysis and optimization.
Description
TaskPerfUtils is a utility class containing four static methods that work together to normalize, collect, and aggregate performance data from pipeline tasks.
_normalize_pipeline_tasks: Accepts various input formats -- a plain list of Task objects, a WorkflowRunResult instance, or a Mapping of pipeline names to task lists -- and normalizes them into a consistent dict[str, list[Task]] structure. This handles nested formats like {"pipeline_tasks": {...}} and gracefully handles None inputs.
collect_stage_metrics: Iterates through all tasks across all pipelines, extracting per-stage performance data from each task's _stage_perf list. For each StagePerfStats entry, it collects both built-in metrics and custom stats (flattened via perf.items()), grouping values by stage name and metric name. The final output converts all collected lists to numpy arrays for efficient numerical computation. The return type is dict[str, dict[str, np.ndarray[float]]] (stage_name -> metric_name -> values array).
aggregate_task_metrics: Builds on collect_stage_metrics to compute three summary statistics (sum, mean, std) for each metric at each stage. It supports an optional prefix parameter and handles multi-pipeline scenarios by incorporating pipeline names into metric keys. Returns a flat dictionary of "{stage}_{metric}_{agg}" keys mapped to float values.
get_aggregated_stage_stat: Retrieves a specific summed statistic for all stages whose names start with a given prefix. This is useful for querying aggregate metrics like total "process_time" or "num_items_processed" across related stages.
Usage
Use TaskPerfUtils after pipeline execution to analyze performance characteristics such as per-stage processing times, throughput, and custom metrics. It is the primary observability tool for understanding bottlenecks and optimizing data processing workflows.
Code Reference
Source Location
- Repository: NeMo-Curator
- File:
nemo_curator/tasks/utils.py - Lines: 1-165
Signature
class TaskPerfUtils:
@staticmethod
def _normalize_pipeline_tasks(
tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
) -> dict[str, list[Task]]: ...
@staticmethod
def collect_stage_metrics(
tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
) -> dict[str, dict[str, np.ndarray[float]]]: ...
@staticmethod
def aggregate_task_metrics(
tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
prefix: str | None = None,
) -> dict[str, Any]: ...
@staticmethod
def get_aggregated_stage_stat(
tasks: list[Task] | WorkflowRunResult | Mapping[str, list[Task]] | None,
stage_prefix: str,
stat: str,
) -> float: ...
Import
from nemo_curator.tasks.utils import TaskPerfUtils
I/O Contract
collect_stage_metrics
| Name | Type | Required | Description |
|---|---|---|---|
| tasks | list[Task], WorkflowRunResult, Mapping, or None | Yes | Tasks from which to collect metrics |
Returns dict[str, dict[str, np.ndarray[float]]] mapping stage names to metric names to arrays of float values.
aggregate_task_metrics
| Name | Type | Required | Description |
|---|---|---|---|
| tasks | list[Task], WorkflowRunResult, Mapping, or None | Yes | Tasks from which to aggregate metrics |
| prefix | str or None | No | Optional prefix for metric keys |
Returns dict[str, Any] with keys in the format "{stage}_{metric}_{sum|mean|std}".
get_aggregated_stage_stat
| Name | Type | Required | Description |
|---|---|---|---|
| tasks | list[Task], WorkflowRunResult, Mapping, or None | Yes | Tasks from which to retrieve the stat |
| stage_prefix | str | Yes | Prefix to match stage names against |
| stat | str | Yes | The metric name to aggregate (e.g., "process_time", "num_items_processed") |
Returns a float value representing the summed stat across matching stages.
Usage Examples
Collecting Raw Stage Metrics
from nemo_curator.tasks.utils import TaskPerfUtils
# After pipeline execution, collect per-stage metrics
stage_metrics = TaskPerfUtils.collect_stage_metrics(completed_tasks)
for stage_name, metrics in stage_metrics.items():
for metric_name, values in metrics.items():
print(f"{stage_name}/{metric_name}: mean={values.mean():.3f}, count={len(values)}")
Aggregating Task Metrics
from nemo_curator.tasks.utils import TaskPerfUtils
# Compute sum/mean/std for all metrics
aggregated = TaskPerfUtils.aggregate_task_metrics(completed_tasks)
# Access specific aggregated metrics
for key, value in sorted(aggregated.items()):
print(f"{key}: {value:.4f}")
Querying a Specific Stage Stat
from nemo_curator.tasks.utils import TaskPerfUtils
# Get total processing time for all stages starting with "filter"
total_filter_time = TaskPerfUtils.get_aggregated_stage_stat(
completed_tasks,
stage_prefix="filter",
stat="process_time",
)
print(f"Total filter processing time: {total_filter_time:.2f}s")
Working with WorkflowRunResult
from nemo_curator.tasks.utils import TaskPerfUtils
# TaskPerfUtils accepts WorkflowRunResult directly
result = workflow.run(input_data)
metrics = TaskPerfUtils.aggregate_task_metrics(result, prefix="my_workflow")
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_Task_Base - The Task ABC whose _stage_perf field is analyzed by TaskPerfUtils
- NVIDIA_NeMo_Curator_Benchmark_Runner - Uses TaskPerfUtils for benchmark metric collection