Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator ModelStage

From Leeroopedia
Knowledge Sources
Domains GPU Inference, NLP, HuggingFace, Data Pipeline
Last Updated 2026-02-14 00:00 GMT

Overview

Base class for GPU-based HuggingFace model inference stages that process tokenized DocumentBatch inputs and produce enriched DocumentBatch outputs.

Description

ModelStage extends ProcessingStage[DocumentBatch, DocumentBatch] and provides the core abstraction for all GPU model inference in the NeMo Curator text pipeline. It manages model downloading, GPU batching, forward pass execution, output collection, and memory cleanup.

The stage requires 1 GPU (configured via the Resources dataclass). On node setup, it downloads the model via huggingface_hub.snapshot_download and calls an optional _setup method on the subclass. The process method implements the full inference loop:

  1. Converts the DocumentBatch to a Pandas DataFrame
  2. Iterates over mini-batches via yield_next_batch, which moves tokenized data to GPU as torch.Tensor dictionaries
  3. Runs forward passes with torch.no_grad() and optional torch.autocast for mixed-precision inference
  4. Delegates output processing to the subclass via process_model_output
  5. Collects all batch outputs via collect_outputs and creates the output DataFrame via create_output_dataframe
  6. Restores original sequence ordering if the data was sorted by token length

The yield_next_batch generator efficiently handles memory by only moving one batch to GPU at a time, applying token clipping to remove unnecessary padding.

Usage

ModelStage is not used directly. Subclasses must implement outputs(), setup() (or _setup()), process_model_output(), and create_output_dataframe(). The primary subclass is EmbeddingModelStage for generating text embeddings.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/stages/text/models/model.py
  • Lines: 1-197

Signature

class ModelStage(ProcessingStage[DocumentBatch, DocumentBatch]):
    def __init__(
        self,
        model_identifier: str,
        cache_dir: str | None = None,
        hf_token: str | None = None,
        model_inference_batch_size: int = 256,
        has_seq_order: bool = True,
        padding_side: Literal["left", "right"] = "right",
        unpack_inference_batch: bool = False,
        autocast: bool = True,
    ): ...

Import

from nemo_curator.stages.text.models.model import ModelStage

I/O Contract

Inputs

Name Type Required Description
model_identifier str Yes HuggingFace model identifier or local path for the model
cache_dir str or None No Directory for caching downloaded model files (default: None)
hf_token str or None No HuggingFace authentication token for gated models (default: None)
model_inference_batch_size int No Number of samples per GPU inference batch (default: 256)
has_seq_order bool No Whether input data includes sequence ordering for restoring original order after length-sorted batching (default: True)
padding_side Literal["left", "right"] No Side on which input tokens are padded (default: "right")
unpack_inference_batch bool No Whether to unpack the batch dict with **kwargs when calling the model (default: False)
autocast bool No Whether to use torch.autocast for mixed-precision inference (default: True)

Stage I/O Specification

Method Returns
inputs() (["data"], [INPUT_ID_FIELD, ATTENTION_MASK_FIELD] + optionally [SEQ_ORDER_FIELD])
outputs() Must be implemented by subclasses

Outputs

Name Type Description
DocumentBatch DocumentBatch Enriched data with model inference results added as new columns (format depends on subclass)

Usage Examples

Implementing a Custom Model Stage

import numpy as np
import pandas as pd
import torch
from transformers import AutoModelForSequenceClassification

from nemo_curator.backends.base import WorkerMetadata
from nemo_curator.stages.text.models.model import ModelStage


class ClassifierStage(ModelStage):
    def __init__(self, model_identifier: str, label_field: str = "label", **kwargs):
        super().__init__(model_identifier=model_identifier, unpack_inference_batch=True, **kwargs)
        self.label_field = label_field

    def outputs(self):
        return ["data"], [self.label_field]

    def setup(self, _: WorkerMetadata | None = None) -> None:
        self.model = AutoModelForSequenceClassification.from_pretrained(
            self.model_identifier, cache_dir=self.cache_dir, local_files_only=True
        )
        self.model.eval().to("cuda")

    def process_model_output(self, outputs, model_input_batch=None):
        logits = outputs.logits.cpu().numpy()
        return {"predictions": np.argmax(logits, axis=1)}

    def collect_outputs(self, processed_outputs):
        return {"predictions": np.concatenate([o["predictions"] for o in processed_outputs])}

    def create_output_dataframe(self, df_cpu, collected_output):
        return df_cpu.assign(**{self.label_field: collected_output["predictions"]})

Implementation Details

GPU Batch Yielding

The yield_next_batch generator converts tokenized data from the Pandas DataFrame to PyTorch tensors in batches of model_inference_batch_size. Each batch is a dictionary with input_ids and attention_mask keys, moved to the model's GPU device. The clip_tokens utility is applied to remove unnecessary padding columns for improved memory efficiency.

Forward Pass Execution

The _model_forward method supports two modes:

  • When unpack_inference_batch is True: calls self.model(**model_input_batch) (standard HuggingFace interface)
  • When unpack_inference_batch is False: calls self.model(model_input_batch) (SentenceTransformer-style interface)

Sequence Order Restoration

When has_seq_order is True, the process method sorts the output DataFrame by the _curator_seq_order column and drops it, restoring the original row order from before length-based sorting was applied by the TokenizerStage.

Memory Management

The teardown method explicitly calls gc.collect() and torch.cuda.empty_cache() to free GPU memory after the stage completes. Additionally, model_input_batch references are deleted after each forward pass to allow garbage collection of GPU tensors.

Model Download

The setup_on_node method handles model downloading via snapshot_download and is called once per node in distributed execution. The setup method is called per worker and expects the model files to already be available locally (local_files_only=True).

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment