Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Deepspeedai DeepSpeed Py DS AIO Module

From Leeroopedia


Knowledge Sources
Domains Async_IO, NVMe_Offload
Last Updated 2026-02-09 00:00 GMT

Overview

Python module binding that exports DeepSpeed's asynchronous I/O functionality to Python using PyBind11.

Description

This module uses PyBind11 to create Python bindings for DeepSpeed's C++ asynchronous I/O implementation. It exports three main components: the aio_read and aio_write functions for simple one-off operations, the deepspeed_memcpy function for optimized tensor copying, and the aio_handle class for advanced multi-operation scenarios. The bindings preserve the full C++ API including default arguments, method overloads, and proper Python/C++ type conversions for PyTorch tensors.

The module is compiled as a PyTorch extension named TORCH_EXTENSION_NAME and can be imported in Python as part of the DeepSpeed library. It includes py::call_guard<py::gil_scoped_release> on the wait() method to release Python's Global Interpreter Lock during blocking I/O operations, allowing other Python threads to run concurrently.

Usage

Import this module in Python scripts to access DeepSpeed's high-performance I/O operations for checkpoint loading/saving and optimizer state swapping. The bindings make all C++ functionality accessible through idiomatic Python interfaces.

Code Reference

Source Location

Signature

PYBIND11_MODULE(TORCH_EXTENSION_NAME, m) {
    m.def("aio_read", &deepspeed_py_aio_read, "DeepSpeed Asynchronous I/O Read");
    m.def("aio_write", &deepspeed_py_aio_write, "DeepSpeed Asynchronous I/O Write");
    m.def("deepspeed_memcpy", &deepspeed_py_memcpy, "DeepSpeed Memory Copy");

    py::class_<deepspeed_aio_handle_t>(m, "aio_handle")
        .def(py::init<const int, const int, const bool, const bool, const int>(), ...)
        .def("read", &deepspeed_aio_handle_t::read, ...)
        .def("write", &deepspeed_aio_handle_t::write, ...)
        .def("sync_pread", &deepspeed_aio_handle_t::sync_pread, ...)
        .def("sync_pwrite", &deepspeed_aio_handle_t::sync_pwrite, ...)
        .def("async_pread", &deepspeed_aio_handle_t::async_pread, ...)
        .def("async_pwrite", &deepspeed_aio_handle_t::async_pwrite, ...)
        .def("wait", &deepspeed_aio_handle_t::wait, ...)
        .def("new_cpu_locked_tensor", &deepspeed_aio_handle_t::new_cpu_locked_tensor, ...)
        .def("free_cpu_locked_tensor", &deepspeed_aio_handle_t::free_cpu_locked_tensor, ...);
}

Import

from deepspeed.ops.aio import aio_read, aio_write, deepspeed_memcpy, aio_handle

I/O Contract

Inputs

Name Type Required Description
buffer torch.Tensor Yes PyTorch tensor for I/O operations
filename str Yes File path for I/O
block_size int No Block size in bytes (default: 1MB)
queue_depth int No Queue depth (default: 128)
single_submit bool No Single submission mode (default: False)
overlap_events bool No Overlap events mode (default: False)
intra_op_parallelism int No Thread count (default: 1)
validate bool Yes Validation flag
async bool Yes Async mode flag
file_offset int No File offset (default: 0)
fd int Yes File descriptor (for fd-based writes)
num_elem int Yes Number of elements for tensor allocation
example_tensor torch.Tensor Yes Template tensor for allocation

Outputs

Name Type Description
return_code int 0 on success, -1 on error
num_completed int Number of completed operations
locked_tensor torch.Tensor Newly allocated pinned tensor
free_status bool Success status of free operation

Usage Examples

import torch
from deepspeed.ops.aio import aio_read, aio_write, aio_handle, deepspeed_memcpy

# Simple read/write operations
tensor = torch.randn(1024, 1024)
aio_write(tensor, "/nvme/checkpoint.pt", 1024*1024, 128, False, True, False)

buffer = torch.empty_like(tensor)
aio_read(buffer, "/nvme/checkpoint.pt", 1024*1024, 128, False, True, True)

# Advanced usage with handle
handle = aio_handle(
    block_size=1024*1024,
    queue_depth=128,
    single_submit=False,
    overlap_events=True,
    intra_op_parallelism=8
)

# Asynchronous operations
handle.async_pwrite(tensor1, "/nvme/state1.pt")
handle.async_pwrite(tensor2, "/nvme/state2.pt")
handle.async_pwrite(tensor3, "/nvme/state3.pt")

# Continue computation while I/O happens
train_step()

# Wait for completion (releases GIL during wait)
num_completed = handle.wait()
print(f"Completed {num_completed} operations")

# Pinned memory allocation
pinned_tensor = handle.new_cpu_locked_tensor(1024*1024, tensor)
handle.async_pread(pinned_tensor, "/nvme/state.pt")
handle.wait()

# Optimized copy
dest = torch.empty_like(pinned_tensor)
deepspeed_memcpy(dest, pinned_tensor)

# Cleanup
handle.free_cpu_locked_tensor(pinned_tensor)

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment