Implementation:NVIDIA NeMo Curator ModelStage
| Knowledge Sources | |
|---|---|
| Domains | GPU Inference, NLP, HuggingFace, Data Pipeline |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Base class for GPU-based HuggingFace model inference stages that process tokenized DocumentBatch inputs and produce enriched DocumentBatch outputs.
Description
ModelStage extends ProcessingStage[DocumentBatch, DocumentBatch] and provides the core abstraction for all GPU model inference in the NeMo Curator text pipeline. It manages model downloading, GPU batching, forward pass execution, output collection, and memory cleanup.
The stage requires 1 GPU (configured via the Resources dataclass). On node setup, it downloads the model via huggingface_hub.snapshot_download and calls an optional _setup method on the subclass. The process method implements the full inference loop:
- Converts the DocumentBatch to a Pandas DataFrame
- Iterates over mini-batches via yield_next_batch, which moves tokenized data to GPU as torch.Tensor dictionaries
- Runs forward passes with torch.no_grad() and optional torch.autocast for mixed-precision inference
- Delegates output processing to the subclass via process_model_output
- Collects all batch outputs via collect_outputs and creates the output DataFrame via create_output_dataframe
- Restores original sequence ordering if the data was sorted by token length
The yield_next_batch generator efficiently handles memory by only moving one batch to GPU at a time, applying token clipping to remove unnecessary padding.
Usage
ModelStage is not used directly. Subclasses must implement outputs(), setup() (or _setup()), process_model_output(), and create_output_dataframe(). The primary subclass is EmbeddingModelStage for generating text embeddings.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/stages/text/models/model.py
- Lines: 1-197
Signature
class ModelStage(ProcessingStage[DocumentBatch, DocumentBatch]):
def __init__(
self,
model_identifier: str,
cache_dir: str | None = None,
hf_token: str | None = None,
model_inference_batch_size: int = 256,
has_seq_order: bool = True,
padding_side: Literal["left", "right"] = "right",
unpack_inference_batch: bool = False,
autocast: bool = True,
): ...
Import
from nemo_curator.stages.text.models.model import ModelStage
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| model_identifier | str | Yes | HuggingFace model identifier or local path for the model |
| cache_dir | str or None | No | Directory for caching downloaded model files (default: None) |
| hf_token | str or None | No | HuggingFace authentication token for gated models (default: None) |
| model_inference_batch_size | int | No | Number of samples per GPU inference batch (default: 256) |
| has_seq_order | bool | No | Whether input data includes sequence ordering for restoring original order after length-sorted batching (default: True) |
| padding_side | Literal["left", "right"] | No | Side on which input tokens are padded (default: "right") |
| unpack_inference_batch | bool | No | Whether to unpack the batch dict with **kwargs when calling the model (default: False) |
| autocast | bool | No | Whether to use torch.autocast for mixed-precision inference (default: True) |
Stage I/O Specification
| Method | Returns |
|---|---|
| inputs() | (["data"], [INPUT_ID_FIELD, ATTENTION_MASK_FIELD] + optionally [SEQ_ORDER_FIELD]) |
| outputs() | Must be implemented by subclasses |
Outputs
| Name | Type | Description |
|---|---|---|
| DocumentBatch | DocumentBatch | Enriched data with model inference results added as new columns (format depends on subclass) |
Usage Examples
Implementing a Custom Model Stage
import numpy as np
import pandas as pd
import torch
from transformers import AutoModelForSequenceClassification
from nemo_curator.backends.base import WorkerMetadata
from nemo_curator.stages.text.models.model import ModelStage
class ClassifierStage(ModelStage):
def __init__(self, model_identifier: str, label_field: str = "label", **kwargs):
super().__init__(model_identifier=model_identifier, unpack_inference_batch=True, **kwargs)
self.label_field = label_field
def outputs(self):
return ["data"], [self.label_field]
def setup(self, _: WorkerMetadata | None = None) -> None:
self.model = AutoModelForSequenceClassification.from_pretrained(
self.model_identifier, cache_dir=self.cache_dir, local_files_only=True
)
self.model.eval().to("cuda")
def process_model_output(self, outputs, model_input_batch=None):
logits = outputs.logits.cpu().numpy()
return {"predictions": np.argmax(logits, axis=1)}
def collect_outputs(self, processed_outputs):
return {"predictions": np.concatenate([o["predictions"] for o in processed_outputs])}
def create_output_dataframe(self, df_cpu, collected_output):
return df_cpu.assign(**{self.label_field: collected_output["predictions"]})
Implementation Details
GPU Batch Yielding
The yield_next_batch generator converts tokenized data from the Pandas DataFrame to PyTorch tensors in batches of model_inference_batch_size. Each batch is a dictionary with input_ids and attention_mask keys, moved to the model's GPU device. The clip_tokens utility is applied to remove unnecessary padding columns for improved memory efficiency.
Forward Pass Execution
The _model_forward method supports two modes:
- When unpack_inference_batch is True: calls self.model(**model_input_batch) (standard HuggingFace interface)
- When unpack_inference_batch is False: calls self.model(model_input_batch) (SentenceTransformer-style interface)
Sequence Order Restoration
When has_seq_order is True, the process method sorts the output DataFrame by the _curator_seq_order column and drops it, restoring the original row order from before length-based sorting was applied by the TokenizerStage.
Memory Management
The teardown method explicitly calls gc.collect() and torch.cuda.empty_cache() to free GPU memory after the stage completes. Additionally, model_input_batch references are deleted after each forward pass to allow garbage collection of GPU tensors.
Model Download
The setup_on_node method handles model downloading via snapshot_download and is called once per node in distributed execution. The setup method is called per worker and expects the model files to already be available locally (local_files_only=True).
Related Pages
- NVIDIA_NeMo_Curator_EmbedderBase - Embedding model stages that extend ModelStage
- NVIDIA_NeMo_Curator_TokenizerStage - Tokenizer stage that prepares inputs for ModelStage
- NVIDIA_NeMo_Curator_VLLMEmbedder - Alternative embedding approach using vLLM instead of ModelStage
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base