Implementation:Deepspeedai DeepSpeed NPU Accelerator
| Knowledge Sources | |
|---|---|
| Domains | Accelerator, Huawei Ascend Backend |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Huawei Ascend NPU (Neural Processing Unit) accelerator backend enabling DeepSpeed training on Ascend hardware.
Description
The NPU_Accelerator class implements the DeepSpeedAccelerator interface for Huawei Ascend NPU AI accelerators. It wraps torch.npu APIs provided by the torch_npu extension and uses hccl (Huawei Collective Communication Library) as the communication backend with inductor as the compile backend. All standard device, memory, RNG, and stream/event operations delegate to torch.npu equivalents. FP16 is always supported while BF16 support is checked dynamically via torch.npu.is_bf16_supported(). Graph operations are not supported (returns None/noop contexts). Uses ASCEND_RT_VISIBLE_DEVICES for device visibility control and exports ASCEND, HCCL, LD_LIBRARY, and PATH environment variables. Op builders are lazily loaded from op_builder.npu using inspect.getmembers to scan for builder classes. Tensor type properties directly use torch.npu tensor constructors.
Usage
Use when training on Huawei Ascend AI processors (Ascend 910, 910B). Requires torch_npu to be installed. Set DS_ACCELERATOR=npu to explicitly select this backend.
Code Reference
Source Location
- Repository: DeepSpeed
- File: accelerator/npu_accelerator.py
Signature
class NPU_Accelerator(DeepSpeedAccelerator):
def __init__(self):
super().__init__()
self._name = 'npu'
self._communication_backend_name = 'hccl'
self._compile_backend = "inductor"
self.class_dict = None
def is_synchronized_device(self):
return False
def device_name(self, device_index=None):
if device_index is None:
return 'npu'
return f'npu:{device_index}'
def device(self, device_index=None):
return torch.device('npu', device_index)
def synchronize(self, device_index=None):
return torch.npu.synchronize(device_index)
def is_bf16_supported(self):
return torch.npu.is_bf16_supported()
def is_fp16_supported(self):
return True
def supported_dtypes(self):
return [torch.float, torch.half, torch.bfloat16]
def is_triton_supported(self):
return False
def create_graph(self):
return None
def capture_to_graph(self, graph, pool=None, stream=None):
from deepspeed.runtime.utils import noop_context
return noop_context()
@property
def BFloat16Tensor(self):
return torch.npu.BFloat16Tensor
@property
def FloatTensor(self):
return torch.npu.FloatTensor
def visible_devices_envs(self):
return ['ASCEND_RT_VISIBLE_DEVICES']
def export_envs(self):
return ['ASCEND', 'HCCL', 'LD_LIBRARY', 'PATH']
Import
from deepspeed.accelerator.npu_accelerator import NPU_Accelerator
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| device_index | int | Optional | NPU device index |
| seed | int | Required | Random seed for NPU RNG |
Outputs
| Name | Type | Description |
|---|---|---|
| device | torch.device | NPU device object |
| device_count | int | Number of NPU devices |
| memory_bytes | int | NPU memory in bytes |
| communication_backend | str | Always 'hccl' |
Usage Examples
# Set NPU accelerator
import os
os.environ['DS_ACCELERATOR'] = 'npu'
from deepspeed.accelerator import get_accelerator
accelerator = get_accelerator()
print(f"Device: {accelerator.device_name()}") # 'npu'
print(f"Backend: {accelerator.communication_backend_name()}") # 'hccl'
# Device management
print(f"Device count: {accelerator.device_count()}")
accelerator.set_device(0)
print(f"Current device: {accelerator.current_device_name()}")
# Precision support
print(f"FP16: {accelerator.is_fp16_supported()}") # True
print(f"BF16: {accelerator.is_bf16_supported()}") # Depends on NPU model
print(f"Supported dtypes: {accelerator.supported_dtypes()}")
# Memory operations
total = accelerator.total_memory(0)
allocated = accelerator.memory_allocated(0)
available = accelerator.available_memory(0)
print(f"Memory: {allocated}/{total} bytes")
# Tensor creation
float_tensor = accelerator.FloatTensor([1.0, 2.0, 3.0])
half_tensor = accelerator.HalfTensor([1.0, 2.0, 3.0])
bfloat16_tensor = accelerator.BFloat16Tensor([1.0, 2.0, 3.0])
# Streams and events
stream = accelerator.Stream()
with accelerator.stream(stream):
output = model(input_tensor)
event = accelerator.Event()
event.record()
event.synchronize()
# Note: Graph operations not supported
graph = accelerator.create_graph() # Returns None
# Synchronization
accelerator.synchronize()
accelerator.empty_cache()
Related Pages
- Abstract Accelerator - Base interface
- Real Accelerator - Accelerator selection
- HPU Accelerator - Habana alternative