Principle:Microsoft Onnxruntime Distributed Data Loading
| Field | Value |
|---|---|
| Principle Name | Distributed_Data_Loading |
| Overview | Parallel data loading and partitioning across distributed training processes. |
| Category | API Doc |
| Domains | Distributed_Training, Training_Infrastructure |
| Source Repository | microsoft/onnxruntime |
| Last Updated | 2026-02-10 |
Overview
Parallel data loading and partitioning across distributed training processes. The DataLoader reads protobuf-serialized training data shards and partitions them across distributed processes based on world rank, ensuring each process trains on a unique data subset.
Description
The DataLoader reads protobuf-serialized training data shards and partitions them across distributed processes based on world rank. It supports asynchronous pre-loading and batch construction, ensuring each process trains on a unique data subset.
The data loading system consists of several components:
- IDataLoader interface: Abstract base class defining the data loading contract: InitializeDataSetIndex(), CurrentDataSet(), MoveToNextDataSet(), and DataSetTensorNames().
- DataLoader: Full implementation that reads .pb (protobuf) files from a directory, partitions them across workers, and supports asynchronous pre-loading.
- DataSetBuffer: Thread-safe buffer for storing pre-loaded datasets with condition-variable-based synchronization.
- SingleDataLoader: Simplified loader for a single dataset (used for evaluation).
Data File Format
Training files are organized in protobuf format:
[Sample ByteSize] [Feature0 ByteSize] [Feature0 TensorProto] ... [FeatureN ByteSize] [FeatureN TensorProto]
next sample ...
All byte size fields are stored as 4-byte uint32_t values.
Data Partitioning
When world_size > 1, the DataLoader partitions files across processes using round-robin assignment based on world rank:
- Files are sorted alphabetically to ensure consistent ordering across all workers.
- File at index i is assigned to rank i % world_size.
- Each rank loads only its assigned subset of files.
Asynchronous Pre-loading
The DataLoader maintains a thread pool (currently single-threaded for correctness) that pre-loads the next max_num_files_preload datasets asynchronously. When MoveToNextDataSet() is called, the loader advances the active index, starts loading the next dataset in the background, and removes the previously active dataset from the buffer.
Theoretical Basis
Data parallelism requires each process to train on different data partitions. The data loader assigns shard indices based on MPI rank, preventing data overlap and ensuring full dataset coverage across epochs.
Key theoretical considerations:
- No data overlap: Each rank processes a unique subset of the data, so the effective batch size scales linearly with the number of ranks.
- Deterministic partitioning: Files are sorted before partitioning to ensure all ranks agree on the assignment, which is essential for reproducibility.
- Async pre-loading: Overlapping data loading with computation hides I/O latency, which is critical when training data exceeds available memory and must be streamed from disk.
- Cyclic access pattern: After exhausting all assigned files, the loader wraps around to the beginning, enabling multi-epoch training.
Usage
Data loading is set up after runtime initialization and before the training loop:
- Create a DataLoader with the input name map, data directory, world rank, and world size.
- Call InitializeDataSetIndex() to set the starting position (e.g., from a checkpoint).
- The training loop calls CurrentDataSet() and MoveToNextDataSet() to iterate through batches.
- Each rank automatically receives its partitioned subset of the data.