Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:Microsoft Onnxruntime DataLoader Init

From Leeroopedia


Field Value
Implementation Name DataLoader_Init
Overview Initialization and operation of the distributed data loader for partitioning training data across processes with asynchronous pre-loading.
Type API Doc
Language C++
Domains Distributed_Training, Training_Infrastructure
Source Repository microsoft/onnxruntime
Last Updated 2026-02-10

Overview

Initialization and operation of the distributed data loader for partitioning training data across processes with asynchronous pre-loading. The DataLoader class reads protobuf-serialized training data, partitions files across MPI ranks, and supports background data pre-fetching via a thread pool.

API

DataLoader(const MapStringToString& input_name_map,
           const PathString& dir_path,
           size_t max_num_files_preload = 2,
           size_t world_rank = 0,
           size_t world_size = 1);

Status InitializeDataSetIndex(size_t initial_data_set_index);

std::shared_ptr<DataSet> CurrentDataSet();

std::shared_ptr<DataSet> MoveToNextDataSet();

const VectorString& DataSetTensorNames() const;

Source Code Reference

Key Parameters

Parameter Type Default Description
input_name_map MapStringToString (required) Maps tensor names in data files to graph input names
dir_path PathString (required) Directory containing .pb training data files
max_num_files_preload size_t 2 Maximum number of data files to keep pre-loaded in memory
world_rank size_t 0 MPI rank of the current process
world_size size_t 1 Total number of MPI processes
initial_data_set_index size_t 0 Starting dataset index (for resuming from checkpoint)

I/O Contract

Direction Name Type Description
Input data_dir PathString Directory containing .pb protobuf data shards
Input world_rank size_t MPI rank for data partitioning (0-based)
Input world_size size_t Total MPI processes for partitioning
Output DataSet shared_ptr<DataSet> Batched OrtValue tensors for training consumption
Output tensor_names VectorString Input tensor names matching the training graph

Usage Examples

Basic DataLoader Creation

#include "orttraining/models/runner/data_loader.h"

using namespace onnxruntime::training;

// Map data file tensor names to model graph input names
MapStringToString input_name_map;
input_name_map["input_ids"] = "input_ids";
input_name_map["attention_mask"] = "attention_mask";
input_name_map["labels"] = "labels";

// Create data loader with distributed partitioning
int world_rank = MPIContext::GetInstance().GetWorldRank();
int world_size = MPIContext::GetInstance().GetWorldSize();

auto training_data_loader = std::make_unique<DataLoader>(
    input_name_map,
    ORT_TSTR("/data/train/"),
    2,           // max_num_files_preload
    world_rank,
    world_size);

Resuming from a Checkpoint Index

// Set the starting data index when resuming from a checkpoint
size_t checkpoint_data_index = 5;
ORT_THROW_IF_ERROR(training_data_loader->InitializeDataSetIndex(checkpoint_data_index));

Iterating Through Data

// Get current dataset
auto data_set = training_data_loader->CurrentDataSet();

// Process batches from the current dataset
// ...

// Advance to the next dataset (wraps around cyclically)
auto next_data_set = training_data_loader->MoveToNextDataSet();

Data Partitioning Logic

The constructor partitions data files using round-robin assignment:

// Files are sorted alphabetically for deterministic assignment
std::sort(data_files.begin(), data_files.end());

// Round-robin assignment: file i goes to rank (i % world_size)
if (world_size > 1) {
    std::vector<PathString> partial_training_files;
    int count = 0;
    for (const auto& file : data_files_) {
        if ((count++ % world_size) == world_rank) {
            partial_training_files.push_back(file);
        }
    }
    data_files_ = std::move(partial_training_files);
}

Key Details

  • Data files must have the .pb (protobuf) extension to be discovered.
  • Files are sorted alphabetically before partitioning to ensure all ranks agree on assignment.
  • The DataSetBuffer provides thread-safe access to pre-loaded datasets using mutex and condition variable synchronization.
  • The thread pool currently uses a single thread (thread_pool_size_ = 1) for correctness, as concurrent requests could cause race conditions.
  • MoveToNextDataSet() wraps around cyclically: active_file_index_ = (active_file_index_ + 1) % NumShards().
  • When pre-loading, the loader keeps max_num_files_preload datasets in memory and evicts old ones as new ones are loaded.
  • The IDataLoader interface also has a SingleDataLoader implementation for evaluation data that holds a single dataset.

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment