Implementation:NVIDIA DALI Pipeline Def Decorator
| Knowledge Sources | |
|---|---|
| Domains | Data_Pipeline, GPU_Computing, Deep_Learning |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
The @pipeline_def decorator provided by NVIDIA DALI that transforms a plain Python function defining data processing operators into a callable pipeline factory, enabling declarative pipeline construction with configurable execution parameters.
Description
The @pipeline_def decorator is the primary API for defining DALI data preprocessing pipelines. It wraps a user-defined Python function that describes a graph of DALI operators (readers, decoders, augmentations, etc.) and returns a factory callable. When invoked with pipeline-level parameters (batch_size, num_threads, device_id, seed), the factory instantiates a fully configured DALI Pipeline object.
The decorated function's parameters represent the logical configuration of the preprocessing graph (e.g., data directory, crop size, training vs. validation mode), while the pipeline-level parameters passed at call time control the execution environment. This separation allows the same logical pipeline to be instantiated with different hardware configurations for multi-GPU training.
The exec_dynamic=True flag enables DALI's dynamic executor, which supports variable batch sizes and more flexible operator scheduling compared to the default static executor. The enable_conditionals=True flag (used in the EfficientNet example) enables Python-style if/else control flow within the pipeline definition that is resolved at build time.
Usage
Import the decorator from nvidia.dali.pipeline and apply it to a function that constructs a DALI operator graph using nvidia.dali.fn operators. Call the resulting factory with pipeline-level parameters to create a Pipeline object, then call .build() to finalize the graph before iteration.
Code Reference
Source Location
- Repository: NVIDIA DALI
- File: docs/examples/use_cases/pytorch/resnet50/main.py (lines 165-181)
- File: docs/examples/use_cases/pytorch/efficientnet/image_classification/dali.py (lines 84-111)
Signature
@pipeline_def(exec_dynamic=True)
def create_dali_pipeline(
data_dir, crop, size, shard_id, num_shards, dali_cpu=False, is_training=True
):
images, labels = fn.readers.file(
file_root=data_dir,
shard_id=shard_id,
num_shards=num_shards,
random_shuffle=is_training,
pad_last_batch=True,
name="Reader",
)
decoder_device = "cpu" if dali_cpu else "mixed"
images = image_processing_func(
images, crop, size, is_training, decoder_device
)
return images, labels.gpu()
EfficientNet Variant
@pipeline_def(enable_conditionals=True)
def training_pipe(
data_dir, interpolation, image_size, output_layout,
automatic_augmentation, dali_device="gpu", rank=0, world_size=1,
):
jpegs, labels = fn.readers.file(
name="Reader",
file_root=data_dir,
shard_id=rank,
num_shards=world_size,
random_shuffle=True,
pad_last_batch=True,
)
outputs = efficientnet_processing_training(
jpegs, interpolation, image_size, output_layout,
automatic_augmentation, dali_device,
)
return outputs, labels
Import
from nvidia.dali.pipeline import pipeline_def
# or, for the experimental variant:
from nvidia.dali.pipeline.experimental import pipeline_def
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch_size | int | Yes | Number of samples per batch (passed at call time) |
| num_threads | int | Yes | Number of CPU threads for parallel operator execution (passed at call time) |
| device_id | int | Yes | CUDA device index for GPU operators (passed at call time) |
| seed | int | No | Random seed for reproducible augmentation (passed at call time) |
| exec_dynamic | bool | No | Enable dynamic executor for variable batch sizes (decorator arg, default False) |
| enable_conditionals | bool | No | Enable Python if/else in pipeline definition (decorator arg, default False) |
| data_dir | str | Yes | Root directory of the dataset (function arg) |
| crop | int | Yes | Target crop size in pixels (function arg) |
| size | int | Yes | Resize target for validation (function arg) |
| shard_id | int | Yes | This worker's shard index for distributed data partitioning (function arg) |
| num_shards | int | Yes | Total number of data shards / workers (function arg) |
| dali_cpu | bool | No | Use CPU decoding instead of mixed CPU/GPU (function arg, default False) |
| is_training | bool | No | Training mode enables augmentation; validation mode uses deterministic preprocessing (function arg, default True) |
Outputs
| Name | Type | Description |
|---|---|---|
| pipeline | nvidia.dali.Pipeline | A configured but unbuilt Pipeline object; call .build() to finalize the operator graph |
Usage Examples
Creating and Building a Training Pipeline
from nvidia.dali.pipeline import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
@pipeline_def(exec_dynamic=True)
def create_dali_pipeline(data_dir, crop, size, shard_id, num_shards,
dali_cpu=False, is_training=True):
images, labels = fn.readers.file(
file_root=data_dir, shard_id=shard_id, num_shards=num_shards,
random_shuffle=is_training, pad_last_batch=True, name="Reader",
)
images = fn.decoders.image_random_crop(
images, device="mixed", output_type=types.RGB,
random_aspect_ratio=[0.8, 1.25], random_area=[0.1, 1.0],
)
images = fn.resize(images, resize_x=crop, resize_y=crop,
interp_type=types.INTERP_TRIANGULAR)
mirror = fn.random.coin_flip(probability=0.5)
images = fn.crop_mirror_normalize(
images.gpu(), dtype=types.FLOAT, output_layout="CHW",
crop=(crop, crop),
mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
mirror=mirror,
)
return images, labels.gpu()
# Instantiate and build
train_pipe = create_dali_pipeline(
batch_size=256, num_threads=4, device_id=0, seed=12,
data_dir="/data/imagenet/train", crop=224, size=256,
shard_id=0, num_shards=1, is_training=True,
)
train_pipe.build()
Multi-GPU Distributed Setup
local_rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()
train_pipe = create_dali_pipeline(
batch_size=args.batch_size,
num_threads=args.workers,
device_id=local_rank,
seed=12 + local_rank,
data_dir=traindir,
crop=224,
size=256,
shard_id=local_rank,
num_shards=world_size,
is_training=True,
)
train_pipe.build()