Heuristic:NVIDIA DALI Distributed Sharding Strategy
| Knowledge Sources | |
|---|---|
| Domains | Distributed_Training, Optimization |
| Last Updated | 2026-02-08 16:00 GMT |
Overview
Data sharding and distributed training configuration patterns for DALI readers that ensure correct partitioning, load balancing, and augmentation diversity across GPU ranks.
Description
DALI's file readers (`fn.readers.file`, `fn.readers.tfrecord`, `fn.readers.video`) support built-in data sharding via `shard_id` and `num_shards` parameters. Each reader instance processes a non-overlapping partition of the dataset, eliminating the need for external distributed samplers. Correct distributed configuration requires coordinating the reader's sharding parameters with the DALI pipeline's `device_id` and `seed`, and the iterator's `last_batch_policy` and `pad_last_batch` settings.
Usage
Use this heuristic when setting up multi-GPU or multi-node training with DALI data pipelines. Apply whenever using PyTorch DDP, TensorFlow MirroredStrategy, or PaddlePaddle distributed training with DALI data loading.
The Insight (Rule of Thumb)
- Action: Configure each rank with unique `shard_id`, matching `device_id`, and distinct `seed`. Enable `pad_last_batch=True` in the reader and use `LastBatchPolicy.PARTIAL` in the iterator.
- Value:
- `shard_id = local_rank` (0 to world_size-1)
- `num_shards = world_size`
- `device_id = local_rank`
- `seed = base_seed + local_rank` (e.g., `12 + local_rank`)
- `pad_last_batch = True`
- `reader_name = "Reader"` (enables auto shard size calculation)
- Trade-off: DALI handles sharding internally, so do not use PyTorch `DistributedSampler` or TensorFlow `shard` alongside DALI readers. Mixing external and internal sharding causes data overlap or gaps.
Reasoning
DALI readers compute shard boundaries using the formula: `floor((id + 1) * dataset_size / num_shards) - floor(id * dataset_size / num_shards)`. This ensures non-overlapping partitions. Different seeds per rank ensure each GPU sees different random augmentation of its data partition, maximizing the effective augmentation diversity across the training run.
Setting `pad_last_batch=True` replicates the last sample in each shard so all shards have the same length. Without this, shards with fewer samples finish earlier, causing NCCL synchronization deadlocks in the gradient all-reduce step. The `reader_name` parameter in the iterator enables automatic epoch length calculation based on the shard size.
Code Evidence
Distributed pipeline creation from `docs/examples/use_cases/pytorch/resnet50/main.py:312-331`:
pipe = create_dali_pipeline(
batch_size=batch_size,
num_threads=args.workers,
device_id=args.local_rank, # Pin pipeline to local GPU
seed=12 + args.local_rank, # Different augmentation per rank
data_dir=traindir,
crop=crop_size,
size=val_size,
dali_cpu=False,
shard_id=args.local_rank, # Unique data partition per rank
num_shards=args.world_size, # Total number of ranks
is_training=True,
pad_last_batch=True, # Ensure all shards same length
)
pipe.build()
train_loader = DALIClassificationIterator(
pipe,
reader_name="Reader", # Auto epoch length from shard size
last_batch_policy=LastBatchPolicy.PARTIAL,
auto_reset=True,
)
PaddlePaddle distributed pattern from `docs/examples/use_cases/paddle/resnet50/dali.py:142-178`:
pipe = create_dali_pipeline(
batch_size=batch_size,
num_threads=args.workers,
device_id=args.local_rank,
seed=12 + args.local_rank,
shard_id=args.local_rank,
num_shards=args.world_size,
)