Implementation:Hiyouga LLaMA Factory V1 Accelerator Helper
| Knowledge Sources | |
|---|---|
| Domains | Distributed Training, Hardware Abstraction, Accelerator |
| Last Updated | 2026-02-06 19:00 GMT |
Overview
This module provides utility functions for querying distributed environment state, detecting accelerator hardware, and performing collective communication operations.
Description
The helper module implements the low-level infrastructure for distributed training in LLaMA-Factory V1. It provides functions for reading rank and world size from environment variables (get_rank, get_world_size, get_local_rank, get_local_world_size), detecting device types (CPU, CUDA, NPU, XPU, MPS) via torch.accelerator, and performing collective communication operations (all_gather, all_reduce, broadcast) using torch.distributed. The requires_accelerator decorator ensures functions depending on torch.accelerator fail gracefully on older PyTorch versions. The main_process_first context manager provides ordered execution for distributed environments.
Usage
Use these helper functions when building distributed training infrastructure or when needing to query the current device, rank, or world size. The functions are consumed primarily by DistributedInterface but can be used directly for low-level distributed operations. Requires PyTorch 2.7.0 or higher for torch.accelerator support.
Code Reference
Source Location
- Repository: Hiyouga_LLaMA_Factory
- File: src/llamafactory/v1/accelerator/helper.py
- Lines: 1-235
Signature
class DeviceType(StrEnum):
CPU = "cpu"
CUDA = "cuda"
META = "meta"
MPS = "mps"
NPU = "npu"
XPU = "xpu"
class ReduceOp(StrEnum):
SUM = "sum"
MEAN = "mean"
MAX = "max"
MIN = "min"
def requires_accelerator(fn) -> Callable
def is_distributed() -> bool
def get_rank() -> int
def get_world_size() -> int
def get_local_rank() -> int
def get_local_world_size() -> int
def get_current_accelerator(check_available: bool = True) -> torch.device
def get_device_count() -> int
def synchronize() -> None
def set_device_index() -> None
def get_current_device() -> torch.device
def is_torch_cuda_available() -> bool
def is_torch_mps_available() -> bool
def is_torch_npu_available() -> bool
def is_torch_xpu_available() -> bool
def operate_tensorlike(fn: Callable, data: TensorLike, **kwargs) -> TensorLike
def get_process_group_backend() -> str
def all_gather(tensor: Tensor, group: Optional[ProcessGroup] = None) -> Tensor
def all_reduce(tensor: Tensor, op: ReduceOp = ReduceOp.MEAN, group: Optional[ProcessGroup] = None) -> Tensor
def broadcast(tensor: Tensor, src: int = 0, group: Optional[ProcessGroup] = None) -> Tensor
def main_process_first(local_only: bool = True) -> ContextManager
Import
from llamafactory.v1.accelerator.helper import (
DeviceType, ReduceOp, is_distributed, get_rank, get_world_size,
get_current_device, all_gather, all_reduce, broadcast, main_process_first,
)
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| RANK (env var) | str | No | Global rank of the current process (default "0") |
| WORLD_SIZE (env var) | str | No | Total number of processes (default "1") |
| LOCAL_RANK (env var) | str | No | Local rank within the node (default "0") |
| LOCAL_WORLD_SIZE (env var) | str | No | Number of processes on the local node (default "1") |
| tensor (all_gather/all_reduce/broadcast) | Tensor | Yes | Input tensor for collective operations |
| op (all_reduce) | ReduceOp | No | Reduction operation: SUM, MEAN, MAX, or MIN (default MEAN) |
| data (operate_tensorlike) | TensorLike | Yes | Input data as torch.Tensor, numpy.ndarray, or scalar |
Outputs
| Name | Type | Description |
|---|---|---|
| get_rank result | int | Current process global rank |
| get_world_size result | int | Total number of processes |
| get_current_device result | torch.device | Current accelerator device with index |
| all_gather result | Tensor | Gathered tensors stacked along first dimension, shape (world_size, *tensor.shape) |
| all_reduce result | Tensor | Reduced tensor (in-place modification) |
| operate_tensorlike result | TensorLike | Result in same type as input (Tensor, ndarray, scalar, or list) |
Usage Examples
from llamafactory.v1.accelerator.helper import (
get_rank, get_world_size, get_current_device,
all_reduce, ReduceOp, main_process_first,
)
# Query distributed environment
rank = get_rank() # e.g., 0
world_size = get_world_size() # e.g., 8
device = get_current_device() # e.g., torch.device("cuda:0")
# Perform all-reduce
import torch
loss = torch.tensor(2.5, device=device)
avg_loss = all_reduce(loss, op=ReduceOp.MEAN)
# Execute on main process first
with main_process_first():
# Download data or prepare cache
dataset = load_dataset(...)
Related Pages
- Hiyouga_LLaMA_Factory_V1_Accelerator_Interface - DistributedInterface that consumes these helper functions
- Hiyouga_LLaMA_Factory_V1_Utils_Types - TensorLike, ProcessGroup, and Tensor type definitions
- Hiyouga_LLaMA_Factory_V1_Utils_Env - Environment utilities in the V1 utils module