Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Workflow:Rapidsai Cuml Multi GPU Distributed ML

From Leeroopedia
Revision as of 11:00, 16 February 2026 by Admin (talk | contribs) (Auto-imported from workflows/Rapidsai_Cuml_Multi_GPU_Distributed_ML.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Knowledge Sources
Domains Machine_Learning, Distributed_Computing, GPU_Computing, MLOps
Last Updated 2026-02-08 12:00 GMT

Overview

End-to-end process for distributed multi-GPU and multi-node machine learning using cuML's Dask integration for scaling algorithms across multiple NVIDIA GPUs.

Description

This workflow covers the standard procedure for running cuML machine learning algorithms across multiple GPUs using the Dask distributed computing framework. cuML's Dask module (`cuml.dask`) provides multi-node multi-GPU (MNMG) implementations of KMeans, DBSCAN, PCA, TruncatedSVD, Nearest Neighbors, Random Forest, Linear Regression, Ridge, Lasso, Logistic Regression, and Naive Bayes. The architecture uses a LocalCUDACluster with one Dask worker per GPU, UCXX for fast GPU-to-GPU communication, and RAFT for distributed linear algebra primitives. Data is partitioned across workers and processed in parallel, with minimal communication (e.g., only centroids are shared during KMeans iterations).

Usage

Execute this workflow when datasets are too large for a single GPU's memory, when you want to leverage multiple GPUs for faster training, or when operating in a multi-node cluster environment. This is essential for production ML pipelines processing terabytes of data, large-scale nearest neighbor search, and distributed model training. The Dask estimators share the same API as their single-GPU counterparts, making it straightforward to scale existing cuML code.

Execution Steps

Step 1: Cluster Initialization

Create a Dask CUDA cluster that allocates one worker per GPU. Configure the communication protocol (UCXX recommended for high-speed CUDA array transfers), enable NVLink for intra-node GPU communication, and optionally enable InfiniBand for multi-node setups. Create a Dask client connected to the cluster.

Key considerations:

  • `LocalCUDACluster` creates one worker per GPU on a single node
  • UCXX protocol provides direct GPU memory transfers without host staging
  • NVLink enables high-bandwidth direct GPU-to-GPU communication
  • For multi-node, use Dask's distributed scheduler with UCXX transport

Step 2: Distributed Data Loading

Load data into distributed GPU memory using dask-cuDF for DataFrames or Dask arrays for array data. Data is automatically partitioned across workers. Each partition resides in a single GPU's memory. For CSV files, `dask_cudf.read_csv()` reads and distributes data in parallel.

Key considerations:

  • `dask_cudf.read_csv()` reads data in parallel across workers
  • `dask.array` partitions can wrap CuPy arrays for GPU-native data
  • Each partition should fit in a single GPU's memory
  • Data redistribution happens automatically during algorithm execution

Step 3: Distributed Algorithm Selection

Import the distributed version of the desired algorithm from `cuml.dask`. Available algorithms include clustering (KMeans, DBSCAN), dimensionality reduction (PCA, TruncatedSVD), neighbors (NearestNeighbors, KNeighborsClassifier, KNeighborsRegressor), linear models (LinearRegression, Ridge, Lasso, ElasticNet, LogisticRegression), ensemble methods (RandomForest), preprocessing (LabelEncoder, OneHotEncoder), and Naive Bayes.

Key considerations:

  • Import from `cuml.dask.cluster`, `cuml.dask.neighbors`, etc.
  • APIs mirror single-GPU cuML estimators
  • Some parameters have constraints in multi-GPU mode (e.g., KMeans requires `random_state` to be set)
  • KMeans requires `oversampling_factor > 0` in multi-GPU mode

Step 4: Model Training

Fit the distributed model on the partitioned data. The Dask scheduler coordinates work across all GPU workers. Each worker runs a local copy of the algorithm on its data partition, with inter-worker communication handled by RAFT's distributed communication layer. Results are aggregated across workers after convergence.

What happens:

  • Each worker receives its data partitions and a communication handle
  • Workers execute the algorithm locally with inter-worker synchronization
  • For KMeans: centroids are shared between workers each iteration; labels are computed locally
  • For Nearest Neighbors: queries are distributed embarrassingly parallel
  • For Linear Models: distributed gradient computation and parameter aggregation

Step 5: Distributed Prediction

Generate predictions on distributed data using the trained model. Predictions are computed in parallel across workers, with each worker handling its local data partitions. Results can be collected to a single GPU or kept distributed for further processing. The `delayed` parameter controls lazy vs eager execution.

Key considerations:

  • `predict()` returns distributed results by default
  • Use `delayed=True` for lazy evaluation (default) or `delayed=False` for immediate computation
  • Results can be gathered with `.compute()` for collection to a single worker
  • Distributed predictions are embarrassingly parallel for most algorithms

Step 6: Result Collection

Collect distributed results to a single location for final analysis, evaluation, or export. Use Dask's `.compute()` method to materialize lazy results. Aggregate metrics across workers if needed (e.g., summing inertia across KMeans workers). Shut down the cluster when processing is complete to free GPU resources.

Key considerations:

  • `.compute()` collects distributed data to the client process
  • Metrics like inertia are automatically summed across workers
  • Labels are concatenated in partition order
  • Call `client.close()` and `cluster.close()` to release GPU resources

Execution Diagram

GitHub URL

Workflow Repository