Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:NVIDIA DALI UserStream

From Leeroopedia


Knowledge Sources
Domains Utilities, GPU_Management, CUDA_Streams
Last Updated 2026-02-08 16:00 GMT

Overview

Implements a singleton manager for per-device CUDA streams that enables inspection and interaction with DALI GPU buffers without forcing global pipeline synchronization.

Description

The UserStream class in dali/util/user_stream.h provides a thread-safe singleton for managing CUDA streams on a per-device basis. It is designed to support inspection and interaction with DALI GPU buffers (tensors and tensor lists) from user code without requiring full synchronization of all pipelines. The class maintains an internal map from device IDs to cudaStream_t handles, creating new non-blocking streams on demand with default priority.

The singleton is accessed via the static Get() method, which returns a pointer to the single UserStream instance. Stream acquisition is performed through overloaded GetStream methods that accept either a Tensor<GPUBackend> or a TensorList<GPUBackend>, extracting the device ID from the buffer and returning or creating the associated stream. A device ID-based private overload handles the actual map lookup with mutex protection.

Synchronization is provided at three levels: Wait() methods synchronize on the stream associated with a specific buffer or the current device, WaitForDevice() methods perform full device synchronization for the device hosting a given buffer, and WaitAll() synchronizes all tracked streams across all devices. The class explicitly notes that this functionality is not intended for performance-critical code paths.

Usage

Use UserStream when you need to inspect or modify DALI GPU buffers from user code outside the pipeline execution context. Obtain the singleton via UserStream::Get(), get a stream for your buffer, perform GPU operations on that stream, and then synchronize before accessing results on the host.

Code Reference

Source Location

Signature

namespace dali {

class DLL_PUBLIC UserStream {
 public:
  DLL_PUBLIC static UserStream *Get();

  DLL_PUBLIC cudaStream_t GetStream(const dali::Tensor<GPUBackend> &t);
  DLL_PUBLIC cudaStream_t GetStream(const dali::TensorList<GPUBackend> &tl);

  DLL_PUBLIC void WaitForDevice(const dali::Tensor<GPUBackend> &t);
  DLL_PUBLIC void WaitForDevice(const dali::TensorList<GPUBackend> &tl);

  DLL_PUBLIC void Wait(const dali::Tensor<GPUBackend> &t);
  DLL_PUBLIC void Wait(const dali::TensorList<GPUBackend> &tl);

  DLL_PUBLIC void Wait();
  DLL_PUBLIC void WaitAll();

 private:
  UserStream() = default;
  DLL_PUBLIC cudaStream_t GetStream(size_t dev);
  DLL_PUBLIC void WaitForDevice(size_t dev);
  void Wait(size_t dev);

  size_t GetDeviceForBuffer(const dali::Tensor<GPUBackend> &t);
  size_t GetDeviceForBuffer(const dali::TensorList<GPUBackend> &tl);

  static std::mutex m_;
  std::unordered_map<int, cudaStream_t> streams_;
};

}  // namespace dali

Import

#include "dali/util/user_stream.h"

I/O Contract

Inputs

Name Type Required Description
t const Tensor<GPUBackend>& Yes (GetStream/Wait/WaitForDevice overloads) GPU tensor to obtain a stream for or synchronize on
tl const TensorList<GPUBackend>& Yes (GetStream/Wait/WaitForDevice overloads) GPU tensor list to obtain a stream for or synchronize on

Outputs

Name Type Description
return value (Get) UserStream* Pointer to the singleton UserStream instance
return value (GetStream) cudaStream_t CUDA stream handle for the device hosting the given buffer

Usage Examples

Obtaining a Stream for a GPU Tensor

#include "dali/util/user_stream.h"

// Get the singleton
auto *us = dali::UserStream::Get();

// Obtain a CUDA stream for a GPU tensor
dali::Tensor<dali::GPUBackend> gpu_tensor;
cudaStream_t stream = us->GetStream(gpu_tensor);

// Perform GPU operations on the stream
// ...

// Synchronize only the relevant stream
us->Wait(gpu_tensor);

Synchronizing All Tracked Streams

#include "dali/util/user_stream.h"

auto *us = dali::UserStream::Get();

// After submitting work to multiple GPU buffers across devices
us->WaitAll();
// All tracked streams are now synchronized

Device-Level Synchronization

#include "dali/util/user_stream.h"

auto *us = dali::UserStream::Get();

dali::TensorList<dali::GPUBackend> gpu_batch;

// Full device synchronization for the device hosting this tensor list
us->WaitForDevice(gpu_batch);

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment