Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Deepspeedai DeepSpeed IO Handle Interface

From Leeroopedia


Knowledge Sources
Domains Async_IO, NVMe_Offload
Last Updated 2026-02-09 00:00 GMT

Overview

C++ header defining the abstract interface for DeepSpeed's I/O handle class that manages asynchronous tensor I/O operations.

Description

This header file defines the deepspeed_io_handle_t class structure, which serves as the main interface for performing asynchronous I/O operations on PyTorch tensors. The class encapsulates AIO contexts, manages worker threads for parallel I/O, coordinates operation scheduling and completion, and provides methods for both synchronous and asynchronous read/write operations. The pure virtual destructor makes this an abstract base class that must be extended by concrete implementations (like the CPU-based implementation in deepspeed_py_io_handle.cpp).

The interface supports multiple operation modes:

  • read/write: Simple synchronous single-threaded operations
  • sync_pread/sync_pwrite: Synchronous multi-threaded parallel operations
  • async_pread/async_pwrite: Asynchronous multi-threaded operations that return immediately
  • wait: Blocks until all pending async operations complete

Usage

Include this header when implementing or using DeepSpeed's I/O handle. The interface defines the contract that all I/O handle implementations must follow, allowing for different backend implementations (CPU, GDS) while maintaining a consistent API.

Code Reference

Source Location

Signature

struct deepspeed_io_handle_t {
    // Configuration members
    std::unique_ptr<struct aio_context> _aio_ctxt;
    const bool _single_submit;
    const bool _overlap_events;
    const int _intra_op_parallelism;
    deepspeed_aio_config_t _aio_config;

    // Thread management
    std::vector<std::shared_ptr<struct deepspeed_aio_thread_t>> _thread_contexts;
    std::vector<std::thread> _threads;
    int _num_pending_ops;
    std::unique_ptr<struct deepspeed_pin_tensor_t> _pinned_tensor_mgr;

    // Constructor and destructor
    deepspeed_io_handle_t(const int block_size,
                          const int queue_depth,
                          const bool single_submit,
                          const bool overlap_events,
                          const int intra_op_parallelism);
    virtual ~deepspeed_io_handle_t() = 0;

    // Configuration getters
    const int get_block_size() const;
    const int get_queue_depth() const;
    const bool get_single_submit() const;
    const bool get_overlap_events() const;
    const int get_intra_op_parallelism() const;
    const int get_alignment() const;

    // Synchronous operations
    int read(torch::Tensor& buffer, const char* filename, const bool validate, const int64_t file_offset);
    int write(const torch::Tensor& buffer, const char* filename, const bool validate, const int64_t file_offset);

    // Parallel operations (sync/async variants)
    int sync_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);
    int sync_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);
    int async_pread(torch::Tensor& buffer, const char* filename, const int64_t file_offset);
    int async_pwrite(const torch::Tensor& buffer, const char* filename, const int64_t file_offset);
    int async_pwrite(const torch::Tensor& buffer, const int fd, const int64_t file_offset);

    // Async coordination
    int wait();

    // Memory management
    torch::Tensor new_cpu_locked_tensor(const int64_t num_elem, const torch::Tensor& example_tensor);
    bool free_cpu_locked_tensor(torch::Tensor&);

    // Internal methods
    void _stop_threads();
    void _schedule_aio_work(std::shared_ptr<struct io_op_desc_t> scheduled_op);
    std::shared_ptr<struct io_op_desc_t> _wait_for_aio_work();
    bool _is_valid_parallel_aio_op(const bool read_op, const int64_t num_bytes);

    virtual std::shared_ptr<struct io_op_desc_t> _create_io_op_desc(
        const bool read_op,
        const torch::Tensor& buffer,
        const int fd,
        const char* filename,
        const bool validate,
        const int64_t file_offset);
};

Import

#include "deepspeed_py_io_handle.h"

I/O Contract

Inputs

Name Type Required Description
block_size int Yes I/O block size in bytes
queue_depth int Yes AIO queue depth
single_submit bool Yes Submit mode flag
overlap_events bool Yes Event overlap flag
intra_op_parallelism int Yes Thread count for parallel ops
buffer torch::Tensor Yes Tensor for I/O operations
filename const char* Yes File path
validate bool Yes Validation flag
file_offset int64_t No File offset (default 0)
fd int Yes File descriptor (for fd-based operations)

Outputs

Name Type Description
return_code int Operation status (0 = success, -1 = error)
num_completed int Number of completed operations from wait()
config_values int/bool Configuration values from getter methods
locked_tensor torch::Tensor Allocated pinned tensor
free_status bool Success status of tensor deallocation

Usage Examples

// Typical implementation pattern
class deepspeed_aio_handle_t : public deepspeed_io_handle_t {
public:
    deepspeed_aio_handle_t(int block_size, int queue_depth,
                           bool single_submit, bool overlap_events,
                           int parallelism)
        : deepspeed_io_handle_t(block_size, queue_depth, single_submit,
                                overlap_events, parallelism) {}

    ~deepspeed_aio_handle_t() override {}

    std::shared_ptr<io_op_desc_t> _create_io_op_desc(
        const bool read_op, const torch::Tensor& buffer, const int fd,
        const char* filename, const bool validate,
        const int64_t file_offset) override {
        return std::make_shared<cpu_op_desc_t>(
            _pinned_tensor_mgr, read_op, buffer, fd, filename,
            _intra_op_parallelism, validate, file_offset);
    }
};

// Usage example
auto handle = new deepspeed_aio_handle_t(1024*1024, 128, false, true, 8);

// Get configuration
std::cout << "Block size: " << handle->get_block_size() << std::endl;
std::cout << "Parallelism: " << handle->get_intra_op_parallelism() << std::endl;

// Perform operations
auto tensor = torch::randn({1024, 1024});
handle->async_pwrite(tensor, "/nvme/state.pt", 0);
handle->wait();

delete handle;

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment