Implementation:Deepspeedai DeepSpeed IO Handle
| Knowledge Sources | |
|---|---|
| Domains | Async_IO, NVMe_Offload |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
High-level handle interface for asynchronous I/O operations on optimizer tensors with support for parallel and asynchronous execution.
Description
The deepspeed_io_handle_t class provides a high-level C++ interface for managing asynchronous I/O operations on PyTorch tensors. It orchestrates multiple worker threads to perform parallel I/O operations, manages pinned memory allocation for CPU tensors, and provides both synchronous and asynchronous API variants. The implementation supports intra-operation parallelism by splitting large I/O operations across multiple threads, each with its own AIO context. It manages a queue of pending operations and coordinates thread execution through condition variables and mutexes.
Key features include:
- Multi-threaded parallel I/O with configurable parallelism level
- Asynchronous operation scheduling with wait semantics
- Automatic bounce buffer management for non-pinned tensors
- Support for both file-based and file descriptor-based operations
- Built-in validation capabilities
Usage
Use this handle when performing large-scale tensor swapping operations to NVMe storage, particularly during optimizer state offloading in ZeRO-Offload training. The handle allows overlapping computation with I/O by using async operations and calling wait() when the data is needed.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/aio/py_lib/deepspeed_py_io_handle.cpp
Signature
class deepspeed_io_handle_t {
deepspeed_io_handle_t(const int block_size,
const int queue_depth,
const bool single_submit,
const bool overlap_events,
const int intra_op_parallelism);
~deepspeed_io_handle_t();
int read(torch::Tensor& buffer, const char* filename, const bool validate, const int64_t file_offset);
int write(const torch::Tensor& buffer, const char* filename, const bool validate, const int64_t file_offset);
int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int async_pwrite(const torch::Tensor& buffer, const int fd, const int64_t file_offset);
int wait();
at::Tensor new_cpu_locked_tensor(const int64_t num_elem, const torch::Tensor& example_tensor);
bool free_cpu_locked_tensor(torch::Tensor& locked_tensor);
};
Import
#include "deepspeed_py_io_handle.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| block_size | int | Yes | Size of each I/O block in bytes (e.g., 128K, 1M) |
| queue_depth | int | Yes | Maximum number of concurrent I/O operations |
| single_submit | bool | Yes | Whether to submit iocbs individually or as a batch |
| overlap_events | bool | Yes | Whether to overlap submission and completion |
| intra_op_parallelism | int | Yes | Number of parallel threads for a single operation |
| buffer | torch::Tensor | Yes | PyTorch tensor for data transfer |
| filename | const char* | Yes | Path to file for I/O |
| validate | bool | Yes | Whether to validate I/O correctness |
| async | bool | Yes | Whether operation should be asynchronous |
| file_offset | int64_t | No | Starting offset in file (default: 0) |
| fd | int | No | File descriptor for fd-based operations |
Outputs
| Name | Type | Description |
|---|---|---|
| return_code | int | 0 on success, -1 on error for sync operations |
| num_completed | int | Number of completed operations from wait() |
| locked_tensor | torch::Tensor | Pinned CPU tensor from new_cpu_locked_tensor() |
| free_success | bool | True if tensor was successfully freed |
Usage Examples
// Create I/O handle with 1MB blocks, depth 128, 8-way parallelism
auto handle = new deepspeed_io_handle_t(1024*1024, 128, false, true, 8);
// Synchronous parallel read
torch::Tensor buffer = torch::empty({1024*1024}, torch::kFloat32);
handle->sync_pread(buffer, "/nvme/optimizer_state.pt", 0);
// Asynchronous parallel write with multiple operations
handle->async_pwrite(state1, "/nvme/state1.pt", 0);
handle->async_pwrite(state2, "/nvme/state2.pt", 0);
handle->async_pwrite(state3, "/nvme/state3.pt", 0);
// Continue computation while I/O happens in background
do_forward_pass();
// Wait for all I/O to complete
int completed = handle->wait();
// Allocate pinned memory for efficient transfers
auto pinned_buffer = handle->new_cpu_locked_tensor(1024*1024, example_tensor);
handle->async_pread(pinned_buffer, "/nvme/state.pt", 0);
handle->wait();
handle->free_cpu_locked_tensor(pinned_buffer);
delete handle;