Implementation:Deepspeedai DeepSpeed Py DS AIO Module
| Knowledge Sources | |
|---|---|
| Domains | Async_IO, NVMe_Offload |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
Python module binding that exports DeepSpeed's asynchronous I/O functionality to Python using PyBind11.
Description
This module uses PyBind11 to create Python bindings for DeepSpeed's C++ asynchronous I/O implementation. It exports three main components: the aio_read and aio_write functions for simple one-off operations, the deepspeed_memcpy function for optimized tensor copying, and the aio_handle class for advanced multi-operation scenarios. The bindings preserve the full C++ API including default arguments, method overloads, and proper Python/C++ type conversions for PyTorch tensors.
The module is compiled as a PyTorch extension named TORCH_EXTENSION_NAME and can be imported in Python as part of the DeepSpeed library. It includes py::call_guard<py::gil_scoped_release> on the wait() method to release Python's Global Interpreter Lock during blocking I/O operations, allowing other Python threads to run concurrently.
Usage
Import this module in Python scripts to access DeepSpeed's high-performance I/O operations for checkpoint loading/saving and optimizer state swapping. The bindings make all C++ functionality accessible through idiomatic Python interfaces.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/aio/py_lib/py_ds_aio.cpp
Signature
PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
m.def("aio_read", &deepspeed_py_aio_read, "DeepSpeed Asynchronous I/O Read");
m.def("aio_write", &deepspeed_py_aio_write, "DeepSpeed Asynchronous I/O Write");
m.def("deepspeed_memcpy", &deepspeed_py_memcpy, "DeepSpeed Memory Copy");
py::class_<deepspeed_aio_handle_t>(m, "aio_handle")
.def(py::init<const int, const int, const bool, const bool, const int>(), ...)
.def("read", &deepspeed_aio_handle_t::read, ...)
.def("write", &deepspeed_aio_handle_t::write, ...)
.def("sync_pread", &deepspeed_aio_handle_t::sync_pread, ...)
.def("sync_pwrite", &deepspeed_aio_handle_t::sync_pwrite, ...)
.def("async_pread", &deepspeed_aio_handle_t::async_pread, ...)
.def("async_pwrite", &deepspeed_aio_handle_t::async_pwrite, ...)
.def("wait", &deepspeed_aio_handle_t::wait, ...)
.def("new_cpu_locked_tensor", &deepspeed_aio_handle_t::new_cpu_locked_tensor, ...)
.def("free_cpu_locked_tensor", &deepspeed_aio_handle_t::free_cpu_locked_tensor, ...);
}
Import
from deepspeed.ops.aio import aio_read, aio_write, deepspeed_memcpy, aio_handle
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| buffer | torch.Tensor | Yes | PyTorch tensor for I/O operations |
| filename | str | Yes | File path for I/O |
| block_size | int | No | Block size in bytes (default: 1MB) |
| queue_depth | int | No | Queue depth (default: 128) |
| single_submit | bool | No | Single submission mode (default: False) |
| overlap_events | bool | No | Overlap events mode (default: False) |
| intra_op_parallelism | int | No | Thread count (default: 1) |
| validate | bool | Yes | Validation flag |
| async | bool | Yes | Async mode flag |
| file_offset | int | No | File offset (default: 0) |
| fd | int | Yes | File descriptor (for fd-based writes) |
| num_elem | int | Yes | Number of elements for tensor allocation |
| example_tensor | torch.Tensor | Yes | Template tensor for allocation |
Outputs
| Name | Type | Description |
|---|---|---|
| return_code | int | 0 on success, -1 on error |
| num_completed | int | Number of completed operations |
| locked_tensor | torch.Tensor | Newly allocated pinned tensor |
| free_status | bool | Success status of free operation |
Usage Examples
import torch
from deepspeed.ops.aio import aio_read, aio_write, aio_handle, deepspeed_memcpy
# Simple read/write operations
tensor = torch.randn(1024, 1024)
aio_write(tensor, "/nvme/checkpoint.pt", 1024*1024, 128, False, True, False)
buffer = torch.empty_like(tensor)
aio_read(buffer, "/nvme/checkpoint.pt", 1024*1024, 128, False, True, True)
# Advanced usage with handle
handle = aio_handle(
block_size=1024*1024,
queue_depth=128,
single_submit=False,
overlap_events=True,
intra_op_parallelism=8
)
# Asynchronous operations
handle.async_pwrite(tensor1, "/nvme/state1.pt")
handle.async_pwrite(tensor2, "/nvme/state2.pt")
handle.async_pwrite(tensor3, "/nvme/state3.pt")
# Continue computation while I/O happens
train_step()
# Wait for completion (releases GIL during wait)
num_completed = handle.wait()
print(f"Completed {num_completed} operations")
# Pinned memory allocation
pinned_tensor = handle.new_cpu_locked_tensor(1024*1024, tensor)
handle.async_pread(pinned_tensor, "/nvme/state.pt")
handle.wait()
# Optimized copy
dest = torch.empty_like(pinned_tensor)
deepspeed_memcpy(dest, pinned_tensor)
# Cleanup
handle.free_cpu_locked_tensor(pinned_tensor)