Implementation:Deepspeedai DeepSpeed IO Handle Interface
| Knowledge Sources | |
|---|---|
| Domains | Async_IO, NVMe_Offload |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
C++ header defining the abstract interface for DeepSpeed's I/O handle class that manages asynchronous tensor I/O operations.
Description
This header file defines the deepspeed_io_handle_t class structure, which serves as the main interface for performing asynchronous I/O operations on PyTorch tensors. The class encapsulates AIO contexts, manages worker threads for parallel I/O, coordinates operation scheduling and completion, and provides methods for both synchronous and asynchronous read/write operations. The pure virtual destructor makes this an abstract base class that must be extended by concrete implementations (like the CPU-based implementation in deepspeed_py_io_handle.cpp).
The interface supports multiple operation modes:
- read/write: Simple synchronous single-threaded operations
- sync_pread/sync_pwrite: Synchronous multi-threaded parallel operations
- async_pread/async_pwrite: Asynchronous multi-threaded operations that return immediately
- wait: Blocks until all pending async operations complete
Usage
Include this header when implementing or using DeepSpeed's I/O handle. The interface defines the contract that all I/O handle implementations must follow, allowing for different backend implementations (CPU, GDS) while maintaining a consistent API.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/aio/py_lib/deepspeed_py_io_handle.h
Signature
struct deepspeed_io_handle_t {
// Configuration members
std::unique_ptr<struct aio_context> _aio_ctxt;
const bool _single_submit;
const bool _overlap_events;
const int _intra_op_parallelism;
deepspeed_aio_config_t _aio_config;
// Thread management
std::vector<std::shared_ptr<struct deepspeed_aio_thread_t>> _thread_contexts;
std::vector<std::thread> _threads;
int _num_pending_ops;
std::unique_ptr<struct deepspeed_pin_tensor_t> _pinned_tensor_mgr;
// Constructor and destructor
deepspeed_io_handle_t(const int block_size,
const int queue_depth,
const bool single_submit,
const bool overlap_events,
const int intra_op_parallelism);
virtual ~deepspeed_io_handle_t() = 0;
// Configuration getters
const int get_block_size() const;
const int get_queue_depth() const;
const bool get_single_submit() const;
const bool get_overlap_events() const;
const int get_intra_op_parallelism() const;
const int get_alignment() const;
// Synchronous operations
int read(torch::Tensor& buffer, const char* filename, const bool validate, const int64_t file_offset);
int write(const torch::Tensor& buffer, const char* filename, const bool validate, const int64_t file_offset);
// Parallel operations (sync/async variants)
int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);
int async_pwrite(const torch::Tensor& buffer, const int fd, const int64_t file_offset);
// Async coordination
int wait();
// Memory management
torch::Tensor new_cpu_locked_tensor(const int64_t num_elem, const torch::Tensor& example_tensor);
bool free_cpu_locked_tensor(torch::Tensor&);
// Internal methods
void _stop_threads();
void _schedule_aio_work(std::shared_ptr<struct io_op_desc_t> scheduled_op);
std::shared_ptr<struct io_op_desc_t> _wait_for_aio_work();
bool _is_valid_parallel_aio_op(const bool read_op, const int64_t num_bytes);
virtual std::shared_ptr<struct io_op_desc_t> _create_io_op_desc(
const bool read_op,
const torch::Tensor& buffer,
const int fd,
const char* filename,
const bool validate,
const int64_t file_offset);
};
Import
#include "deepspeed_py_io_handle.h"
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| block_size | int | Yes | I/O block size in bytes |
| queue_depth | int | Yes | AIO queue depth |
| single_submit | bool | Yes | Submit mode flag |
| overlap_events | bool | Yes | Event overlap flag |
| intra_op_parallelism | int | Yes | Thread count for parallel ops |
| buffer | torch::Tensor | Yes | Tensor for I/O operations |
| filename | const char* | Yes | File path |
| validate | bool | Yes | Validation flag |
| file_offset | int64_t | No | File offset (default 0) |
| fd | int | Yes | File descriptor (for fd-based operations) |
Outputs
| Name | Type | Description |
|---|---|---|
| return_code | int | Operation status (0 = success, -1 = error) |
| num_completed | int | Number of completed operations from wait() |
| config_values | int/bool | Configuration values from getter methods |
| locked_tensor | torch::Tensor | Allocated pinned tensor |
| free_status | bool | Success status of tensor deallocation |
Usage Examples
// Typical implementation pattern
class deepspeed_aio_handle_t : public deepspeed_io_handle_t {
public:
deepspeed_aio_handle_t(int block_size, int queue_depth,
bool single_submit, bool overlap_events,
int parallelism)
: deepspeed_io_handle_t(block_size, queue_depth, single_submit,
overlap_events, parallelism) {}
~deepspeed_aio_handle_t() override {}
std::shared_ptr<io_op_desc_t> _create_io_op_desc(
const bool read_op, const torch::Tensor& buffer, const int fd,
const char* filename, const bool validate,
const int64_t file_offset) override {
return std::make_shared<cpu_op_desc_t>(
_pinned_tensor_mgr, read_op, buffer, fd, filename,
_intra_op_parallelism, validate, file_offset);
}
};
// Usage example
auto handle = new deepspeed_aio_handle_t(1024*1024, 128, false, true, 8);
// Get configuration
std::cout << "Block size: " << handle->get_block_size() << std::endl;
std::cout << "Parallelism: " << handle->get_intra_op_parallelism() << std::endl;
// Perform operations
auto tensor = torch::randn({1024, 1024});
handle->async_pwrite(tensor, "/nvme/state.pt", 0);
handle->wait();
delete handle;