Jump to content

Connect Leeroopedia MCP: Equip your AI agents to search best practices, build plans, verify code, diagnose failures, and look up hyperparameter defaults.

Implementation:NVIDIA DALI Pipeline Def Dynamic

From Leeroopedia


Knowledge Sources
Domains Image_Processing, GPU_Computing, Pipeline_Architecture
Last Updated 2026-02-08 00:00 GMT

Overview

Concrete decorator for defining DALI pipeline graph functions with configurable execution modes, provided by the nvidia.dali library.

Description

The @pipeline_def decorator converts a plain Python function that constructs a DALI operator graph into a pipeline factory. When invoked, the factory instantiates an nvidia.dali.pipeline.Pipeline object with the specified execution parameters and wires the graph outputs automatically.

In dynamic mode (exec_dynamic=True), the resulting pipeline uses DALI's internal task scheduler to overlap CPU preprocessing and GPU compute without explicit buffer management. In standard pipelined mode, the decorator produces a pipeline that double-buffers between stages according to the prefetch_queue_depth parameter.

Key characteristics:

  • Decorator arguments set default Pipeline constructor values.
  • Any Pipeline parameter passed at call time overrides the decorator default.
  • The decorated function's return values become the pipeline's outputs.
  • enable_conditionals=True permits if/else branching inside the graph definition.

Usage

Apply @pipeline_def to any function that returns one or more DALI DataNode objects. Call the resulting factory with pipeline parameters (batch_size, num_threads, device_id) to obtain a Pipeline instance. Then call .build() to compile and .run() to execute.

Code Reference

Source Location

  • Repository: NVIDIA DALI
  • File: dali/python/nvidia/dali/pipeline.py (lines 2217-2350)
  • Example (dynamic): docs/examples/zoo/images/decode.py (line 29)
  • Example (standard): docs/examples/zoo/images/decode_and_transform_pytorch.py (lines 73-74)

Signature

# Dynamic execution mode (decorator with arguments)
@pipeline_def(
    batch_size=4,
    num_threads=4,
    device_id=0,
    exec_dynamic=True,
)
def decode_pipeline(source_name):
    ...

# Standard pipelined mode (bare decorator, params at call time)
@pipeline_def
def image_pipe(img_hw=(320, 200)):
    ...

Import

from nvidia.dali.pipeline import pipeline_def
# or
from nvidia.dali import pipeline_def

I/O Contract

Inputs

Name Type Required Description
batch_size int Yes Number of samples per batch
num_threads int Yes Number of CPU worker threads for the pipeline
device_id int Yes GPU device index (0-based)
exec_dynamic bool No Enable dynamic execution scheduler (default: None, auto-resolves to True)
exec_pipelined bool No Enable pipelined (double-buffered) execution (default: True)
prefetch_queue_depth int or tuple(int, int) No Number of prefetch buffers between stages (default: 2)
enable_conditionals bool No Allow if/else branching in graph definition (default: False)
seed int No Random seed for reproducibility (default: -1)

Outputs

Name Type Description
pipeline nvidia.dali.pipeline.Pipeline Fully configured Pipeline object with graph outputs wired from the decorated function's return values

Usage Examples

Example: Dynamic Execution Decode Pipeline

import numpy as np
from nvidia.dali.pipeline import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch.torch_utils import to_torch_tensor

@pipeline_def(batch_size=4, num_threads=4, device_id=0, exec_dynamic=True)
def decode_pipeline(source_name):
    inputs = fn.external_source(
        device="cpu",
        name=source_name,
        no_copy=False,
        blocking=True,
        dtype=types.UINT8,
    )
    decoded = fn.decoders.image(
        inputs,
        device="mixed",
        output_type=types.RGB,
        jpeg_fancy_upsampling=True,
    )
    return decoded

pipe = decode_pipeline("encoded_img", prefetch_queue_depth=1)
pipe.build()

encoded_data = np.fromfile("image.jpg", dtype=np.uint8)
decoded = pipe.run(encoded_img=np.expand_dims(encoded_data, axis=0))
img_gpu = to_torch_tensor(decoded[0][0], copy=False)

Example: Standard Pipelined Mode with PyTorch Proxy

from nvidia.dali import pipeline_def, fn, types

@pipeline_def
def image_pipe(img_hw=(320, 200)):
    encoded_images = fn.external_source(name="images", no_copy=True)
    decoded = fn.decoders.image(
        encoded_images,
        device="mixed",
        output_type=types.RGB,
    )
    images = fn.resize(decoded, size=img_hw, interp_type=types.INTERP_LINEAR)
    return images

pipe = image_pipe(batch_size=8, num_threads=4, device_id=0)
pipe.build()

Related Pages

Implements Principle

Requires Environment

Uses Heuristic

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment