Implementation:NVIDIA DALI Pipeline Def Dynamic
| Knowledge Sources | |
|---|---|
| Domains | Image_Processing, GPU_Computing, Pipeline_Architecture |
| Last Updated | 2026-02-08 00:00 GMT |
Overview
Concrete decorator for defining DALI pipeline graph functions with configurable execution modes, provided by the nvidia.dali library.
Description
The @pipeline_def decorator converts a plain Python function that constructs a DALI operator graph into a pipeline factory. When invoked, the factory instantiates an nvidia.dali.pipeline.Pipeline object with the specified execution parameters and wires the graph outputs automatically.
In dynamic mode (exec_dynamic=True), the resulting pipeline uses DALI's internal task scheduler to overlap CPU preprocessing and GPU compute without explicit buffer management. In standard pipelined mode, the decorator produces a pipeline that double-buffers between stages according to the prefetch_queue_depth parameter.
Key characteristics:
- Decorator arguments set default Pipeline constructor values.
- Any Pipeline parameter passed at call time overrides the decorator default.
- The decorated function's return values become the pipeline's outputs.
- enable_conditionals=True permits if/else branching inside the graph definition.
Usage
Apply @pipeline_def to any function that returns one or more DALI DataNode objects. Call the resulting factory with pipeline parameters (batch_size, num_threads, device_id) to obtain a Pipeline instance. Then call .build() to compile and .run() to execute.
Code Reference
Source Location
- Repository: NVIDIA DALI
- File: dali/python/nvidia/dali/pipeline.py (lines 2217-2350)
- Example (dynamic): docs/examples/zoo/images/decode.py (line 29)
- Example (standard): docs/examples/zoo/images/decode_and_transform_pytorch.py (lines 73-74)
Signature
# Dynamic execution mode (decorator with arguments)
@pipeline_def(
batch_size=4,
num_threads=4,
device_id=0,
exec_dynamic=True,
)
def decode_pipeline(source_name):
...
# Standard pipelined mode (bare decorator, params at call time)
@pipeline_def
def image_pipe(img_hw=(320, 200)):
...
Import
from nvidia.dali.pipeline import pipeline_def
# or
from nvidia.dali import pipeline_def
I/O Contract
Inputs
| Name | Type | Required | Description |
|---|---|---|---|
| batch_size | int | Yes | Number of samples per batch |
| num_threads | int | Yes | Number of CPU worker threads for the pipeline |
| device_id | int | Yes | GPU device index (0-based) |
| exec_dynamic | bool | No | Enable dynamic execution scheduler (default: None, auto-resolves to True) |
| exec_pipelined | bool | No | Enable pipelined (double-buffered) execution (default: True) |
| prefetch_queue_depth | int or tuple(int, int) | No | Number of prefetch buffers between stages (default: 2) |
| enable_conditionals | bool | No | Allow if/else branching in graph definition (default: False) |
| seed | int | No | Random seed for reproducibility (default: -1) |
Outputs
| Name | Type | Description |
|---|---|---|
| pipeline | nvidia.dali.pipeline.Pipeline | Fully configured Pipeline object with graph outputs wired from the decorated function's return values |
Usage Examples
Example: Dynamic Execution Decode Pipeline
import numpy as np
from nvidia.dali.pipeline import pipeline_def
import nvidia.dali.fn as fn
import nvidia.dali.types as types
from nvidia.dali.plugin.pytorch.torch_utils import to_torch_tensor
@pipeline_def(batch_size=4, num_threads=4, device_id=0, exec_dynamic=True)
def decode_pipeline(source_name):
inputs = fn.external_source(
device="cpu",
name=source_name,
no_copy=False,
blocking=True,
dtype=types.UINT8,
)
decoded = fn.decoders.image(
inputs,
device="mixed",
output_type=types.RGB,
jpeg_fancy_upsampling=True,
)
return decoded
pipe = decode_pipeline("encoded_img", prefetch_queue_depth=1)
pipe.build()
encoded_data = np.fromfile("image.jpg", dtype=np.uint8)
decoded = pipe.run(encoded_img=np.expand_dims(encoded_data, axis=0))
img_gpu = to_torch_tensor(decoded[0][0], copy=False)
Example: Standard Pipelined Mode with PyTorch Proxy
from nvidia.dali import pipeline_def, fn, types
@pipeline_def
def image_pipe(img_hw=(320, 200)):
encoded_images = fn.external_source(name="images", no_copy=True)
decoded = fn.decoders.image(
encoded_images,
device="mixed",
output_type=types.RGB,
)
images = fn.resize(decoded, size=img_hw, interp_type=types.INTERP_LINEAR)
return images
pipe = image_pipe(batch_size=8, num_threads=4, device_id=0)
pipe.build()