Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:NVIDIA DALI Pipeline Def Decorator

From Leeroopedia


Knowledge Sources
Domains Data_Pipeline, GPU_Computing, Deep_Learning
Last Updated 2026-02-08 00:00 GMT

Overview

The @pipeline_def decorator provided by NVIDIA DALI that transforms a plain Python function defining data processing operators into a callable pipeline factory, enabling declarative pipeline construction with configurable execution parameters.

Description

The @pipeline_def decorator is the primary API for defining DALI data preprocessing pipelines. It wraps a user-defined Python function that describes a graph of DALI operators (readers, decoders, augmentations, etc.) and returns a factory callable. When invoked with pipeline-level parameters (batch_size, num_threads, device_id, seed), the factory instantiates a fully configured DALI Pipeline object.

The decorated function's parameters represent the logical configuration of the preprocessing graph (e.g., data directory, crop size, training vs. validation mode), while the pipeline-level parameters passed at call time control the execution environment. This separation allows the same logical pipeline to be instantiated with different hardware configurations for multi-GPU training.

The exec_dynamic=True flag enables DALI's dynamic executor, which supports variable batch sizes and more flexible operator scheduling compared to the default static executor. The enable_conditionals=True flag (used in the EfficientNet example) enables Python-style if/else control flow within the pipeline definition that is resolved at build time.

Usage

Import the decorator from nvidia.dali.pipeline and apply it to a function that constructs a DALI operator graph using nvidia.dali.fn operators. Call the resulting factory with pipeline-level parameters to create a Pipeline object, then call .build() to finalize the graph before iteration.

Code Reference

Source Location

  • Repository: NVIDIA DALI
  • File: docs/examples/use_cases/pytorch/resnet50/main.py (lines 165-181)
  • File: docs/examples/use_cases/pytorch/efficientnet/image_classification/dali.py (lines 84-111)

Signature

@pipeline_def(exec_dynamic=True)
def create_dali_pipeline(
    data_dir, crop, size, shard_id, num_shards, dali_cpu=False, is_training=True
):
    images, labels = fn.readers.file(
        file_root=data_dir,
        shard_id=shard_id,
        num_shards=num_shards,
        random_shuffle=is_training,
        pad_last_batch=True,
        name="Reader",
    )
    decoder_device = "cpu" if dali_cpu else "mixed"
    images = image_processing_func(
        images, crop, size, is_training, decoder_device
    )
    return images, labels.gpu()

EfficientNet Variant

@pipeline_def(enable_conditionals=True)
def training_pipe(
    data_dir, interpolation, image_size, output_layout,
    automatic_augmentation, dali_device="gpu", rank=0, world_size=1,
):
    jpegs, labels = fn.readers.file(
        name="Reader",
        file_root=data_dir,
        shard_id=rank,
        num_shards=world_size,
        random_shuffle=True,
        pad_last_batch=True,
    )
    outputs = efficientnet_processing_training(
        jpegs, interpolation, image_size, output_layout,
        automatic_augmentation, dali_device,
    )
    return outputs, labels

Import

from nvidia.dali.pipeline import pipeline_def
# or, for the experimental variant:
from nvidia.dali.pipeline.experimental import pipeline_def

I/O Contract

Inputs

Name Type Required Description
batch_size int Yes Number of samples per batch (passed at call time)
num_threads int Yes Number of CPU threads for parallel operator execution (passed at call time)
device_id int Yes CUDA device index for GPU operators (passed at call time)
seed int No Random seed for reproducible augmentation (passed at call time)
exec_dynamic bool No Enable dynamic executor for variable batch sizes (decorator arg, default False)
enable_conditionals bool No Enable Python if/else in pipeline definition (decorator arg, default False)
data_dir str Yes Root directory of the dataset (function arg)
crop int Yes Target crop size in pixels (function arg)
size int Yes Resize target for validation (function arg)
shard_id int Yes This worker's shard index for distributed data partitioning (function arg)
num_shards int Yes Total number of data shards / workers (function arg)
dali_cpu bool No Use CPU decoding instead of mixed CPU/GPU (function arg, default False)
is_training bool No Training mode enables augmentation; validation mode uses deterministic preprocessing (function arg, default True)

Outputs

Name Type Description
pipeline nvidia.dali.Pipeline A configured but unbuilt Pipeline object; call .build() to finalize the operator graph

Usage Examples

Creating and Building a Training Pipeline

from nvidia.dali.pipeline import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types

@pipeline_def(exec_dynamic=True)
def create_dali_pipeline(data_dir, crop, size, shard_id, num_shards,
                         dali_cpu=False, is_training=True):
    images, labels = fn.readers.file(
        file_root=data_dir, shard_id=shard_id, num_shards=num_shards,
        random_shuffle=is_training, pad_last_batch=True, name="Reader",
    )
    images = fn.decoders.image_random_crop(
        images, device="mixed", output_type=types.RGB,
        random_aspect_ratio=[0.8, 1.25], random_area=[0.1, 1.0],
    )
    images = fn.resize(images, resize_x=crop, resize_y=crop,
                       interp_type=types.INTERP_TRIANGULAR)
    mirror = fn.random.coin_flip(probability=0.5)
    images = fn.crop_mirror_normalize(
        images.gpu(), dtype=types.FLOAT, output_layout="CHW",
        crop=(crop, crop),
        mean=[0.485 * 255, 0.456 * 255, 0.406 * 255],
        std=[0.229 * 255, 0.224 * 255, 0.225 * 255],
        mirror=mirror,
    )
    return images, labels.gpu()

# Instantiate and build
train_pipe = create_dali_pipeline(
    batch_size=256, num_threads=4, device_id=0, seed=12,
    data_dir="/data/imagenet/train", crop=224, size=256,
    shard_id=0, num_shards=1, is_training=True,
)
train_pipe.build()

Multi-GPU Distributed Setup

local_rank = torch.distributed.get_rank()
world_size = torch.distributed.get_world_size()

train_pipe = create_dali_pipeline(
    batch_size=args.batch_size,
    num_threads=args.workers,
    device_id=local_rank,
    seed=12 + local_rank,
    data_dir=traindir,
    crop=224,
    size=256,
    shard_id=local_rank,
    num_shards=world_size,
    is_training=True,
)
train_pipe.build()

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment