Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA NeMo Curator XennaStageAdapter

From Leeroopedia
Knowledge Sources
Domains Backend Architecture, Cosmos Xenna, Pipeline Execution
Last Updated 2026-02-14 00:00 GMT

Overview

Adapts NeMo Curator ProcessingStage instances to the Cosmos-Xenna pipeline framework by implementing both BaseStageAdapter and Xenna's pipelines_v1.Stage interface.

Description

XennaStageAdapter is a dual-inheritance class that bridges NeMo Curator's stage abstraction to the Cosmos-Xenna distributed execution engine. It inherits from both BaseStageAdapter (for performance tracking and lifecycle management) and pipelines_v1.Stage (the Xenna stage interface).

The adapter provides the following translations:

  • required_resources property -- Converts the stage's resources (cpus, gpus, nvdecs, nvencs, entire_gpu) to a Xenna XennaResources object.
  • stage_batch_size property -- Returns the stage's batch size, defaulting to 1 if not set.
  • env_info property -- Returns None by default (can be customized per stage for runtime environment needs).
  • process_data() -- Delegates to the base adapter's process_batch(), which adds automatic performance tracking.
  • setup_on_node() -- Accepts Xenna's XennaNodeInfo and XennaWorkerMetadata types, converts them to NeMo Curator's generic NodeInfo and WorkerMetadata types, and delegates to the base adapter.
  • setup() -- Similarly converts Xenna's XennaWorkerMetadata to the generic WorkerMetadata and delegates to the base adapter.

The module also provides create_named_xenna_stage_adapter(), a factory function that dynamically creates a subclass of XennaStageAdapter named after the original stage class. This ensures that Xenna dashboards and logs display the meaningful stage name (e.g., "ImageReaderStage") rather than "XennaStageAdapter".

Usage

This adapter is used internally by XennaExecutor. When building the Xenna pipeline specification, the executor calls create_named_xenna_stage_adapter() for each stage. Users do not typically interact with this adapter directly.

Code Reference

Source Location

  • Repository: NeMo-Curator
  • File: nemo_curator/backends/xenna/adapter.py
  • Lines: 1-131

Signature

class XennaStageAdapter(BaseStageAdapter, pipelines_v1.Stage):
    def __init__(self, processing_stage: ProcessingStage): ...

    @property
    def required_resources(self) -> XennaResources: ...
    @property
    def stage_batch_size(self) -> int: ...
    @property
    def env_info(self) -> pipelines_v1.RuntimeEnv | None: ...

    def process_data(self, tasks: list[Task]) -> list[Task] | None: ...
    def setup_on_node(self, node_info: XennaNodeInfo, worker_metadata: XennaWorkerMetadata) -> None: ...
    def setup(self, worker_metadata: XennaWorkerMetadata) -> None: ...

def create_named_xenna_stage_adapter(stage: ProcessingStage) -> XennaStageAdapter: ...

Import

from nemo_curator.backends.xenna.adapter import XennaStageAdapter, create_named_xenna_stage_adapter

I/O Contract

Inputs

Name Type Required Description
processing_stage ProcessingStage Yes The NeMo Curator processing stage to adapt for Xenna

process_data() Inputs

Name Type Required Description
tasks list[Task] Yes List of tasks to process through the wrapped stage

Outputs

Name Type Description
required_resources XennaResources Xenna resource requirements (cpus, gpus, nvdecs, nvencs, entire_gpu)
stage_batch_size int Batch size for Xenna scheduling (minimum 1)
process_data() list[Task] or None Processed tasks with performance metrics attached

Type Conversion

The adapter performs type conversions between Xenna and NeMo Curator types:

Xenna Type NeMo Curator Type Fields Mapped
XennaNodeInfo NodeInfo node_id
XennaWorkerMetadata WorkerMetadata worker_id, allocation
XennaResources ProcessingStage.resources cpus, gpus, nvdecs, nvencs, entire_gpu

Usage Examples

Creating a Named Adapter (Used by XennaExecutor)

from nemo_curator.backends.xenna.adapter import create_named_xenna_stage_adapter

# Create an adapter with the original stage's class name
adapter = create_named_xenna_stage_adapter(my_image_reader_stage)

# The adapter class name matches the original stage
print(type(adapter).__name__)  # "ImageReaderStage"

# Access Xenna-compatible resource requirements
resources = adapter.required_resources
print(resources.gpus)  # e.g., 1.0

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment