Implementation:NVIDIA NeMo Curator XennaStageAdapter
| Knowledge Sources | |
|---|---|
| Domains | Backend Architecture, Cosmos Xenna, Pipeline Execution |
| Last Updated | 2026-02-14 00:00 GMT |
Overview
Adapts NeMo Curator ProcessingStage instances to the Cosmos-Xenna pipeline framework by implementing both BaseStageAdapter and Xenna's pipelines_v1.Stage interface.
Description
XennaStageAdapter is a dual-inheritance class that bridges NeMo Curator's stage abstraction to the Cosmos-Xenna distributed execution engine. It inherits from both BaseStageAdapter (for performance tracking and lifecycle management) and pipelines_v1.Stage (the Xenna stage interface).
The adapter provides the following translations:
- required_resources property -- Converts the stage's resources (cpus, gpus, nvdecs, nvencs, entire_gpu) to a Xenna XennaResources object.
- stage_batch_size property -- Returns the stage's batch size, defaulting to 1 if not set.
- env_info property -- Returns None by default (can be customized per stage for runtime environment needs).
- process_data() -- Delegates to the base adapter's process_batch(), which adds automatic performance tracking.
- setup_on_node() -- Accepts Xenna's XennaNodeInfo and XennaWorkerMetadata types, converts them to NeMo Curator's generic NodeInfo and WorkerMetadata types, and delegates to the base adapter.
- setup() -- Similarly converts Xenna's XennaWorkerMetadata to the generic WorkerMetadata and delegates to the base adapter.
The module also provides create_named_xenna_stage_adapter(), a factory function that dynamically creates a subclass of XennaStageAdapter named after the original stage class. This ensures that Xenna dashboards and logs display the meaningful stage name (e.g., "ImageReaderStage") rather than "XennaStageAdapter".
Usage
This adapter is used internally by XennaExecutor. When building the Xenna pipeline specification, the executor calls create_named_xenna_stage_adapter() for each stage. Users do not typically interact with this adapter directly.
Code Reference
Source Location
- Repository: NeMo-Curator
- File: nemo_curator/backends/xenna/adapter.py
- Lines: 1-131
Signature
class XennaStageAdapter(BaseStageAdapter, pipelines_v1.Stage):
def __init__(self, processing_stage: ProcessingStage): ...
@property
def required_resources(self) -> XennaResources: ...
@property
def stage_batch_size(self) -> int: ...
@property
def env_info(self) -> pipelines_v1.RuntimeEnv | None: ...
def process_data(self, tasks: list[Task]) -> list[Task] | None: ...
def setup_on_node(self, node_info: XennaNodeInfo, worker_metadata: XennaWorkerMetadata) -> None: ...
def setup(self, worker_metadata: XennaWorkerMetadata) -> None: ...
def create_named_xenna_stage_adapter(stage: ProcessingStage) -> XennaStageAdapter: ...
Import
from nemo_curator.backends.xenna.adapter import XennaStageAdapter, create_named_xenna_stage_adapter
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| processing_stage | ProcessingStage | Yes | The NeMo Curator processing stage to adapt for Xenna |
process_data() Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| tasks | list[Task] | Yes | List of tasks to process through the wrapped stage |
Outputs
| Name | Type | Description |
|---|---|---|
| required_resources | XennaResources | Xenna resource requirements (cpus, gpus, nvdecs, nvencs, entire_gpu) |
| stage_batch_size | int | Batch size for Xenna scheduling (minimum 1) |
| process_data() | list[Task] or None | Processed tasks with performance metrics attached |
Type Conversion
The adapter performs type conversions between Xenna and NeMo Curator types:
| Xenna Type | NeMo Curator Type | Fields Mapped |
|---|---|---|
| XennaNodeInfo | NodeInfo | node_id |
| XennaWorkerMetadata | WorkerMetadata | worker_id, allocation |
| XennaResources | ProcessingStage.resources | cpus, gpus, nvdecs, nvencs, entire_gpu |
Usage Examples
Creating a Named Adapter (Used by XennaExecutor)
from nemo_curator.backends.xenna.adapter import create_named_xenna_stage_adapter
# Create an adapter with the original stage's class name
adapter = create_named_xenna_stage_adapter(my_image_reader_stage)
# The adapter class name matches the original stage
print(type(adapter).__name__) # "ImageReaderStage"
# Access Xenna-compatible resource requirements
resources = adapter.required_resources
print(resources.gpus) # e.g., 1.0
Related Pages
- Environment:NVIDIA_NeMo_Curator_Python_Linux_Base
- NVIDIA_NeMo_Curator_Backend_Base_Classes -- Parent base classes
- NVIDIA_NeMo_Curator_XennaExecutor -- Executor that uses this adapter