Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Deepspeedai DeepSpeed XPU Packbits

From Leeroopedia


Knowledge Sources
Domains Intel_XPU, SYCL, Bit_Packing, Communication_Compression
Last Updated 2026-02-09 00:00 GMT

Overview

SYCL-based bit-packing operations for Intel XPU devices providing 8× compression of FP32 tensors by encoding signs as single bits for distributed training communication.

Description

This module implements bit-level compression for Intel XPUs (GPUs) using the SYCL programming model and Intel Extension for PyTorch (IPEX). The packbits function compresses FP32 tensors by extracting the sign bit of each element and packing eight signs into a single uint8 byte, achieving 8× compression ratio. The unpackbits function reverses this operation, reconstructing FP32 values as +1.0 or -1.0 based on the packed bits. This compression is particularly valuable for distributed training where gradient signs (rather than precise values) are sufficient for certain communication schemes like signSGD. The implementation uses SYCL's parallel_for construct with optimized device-side kernels (packbitskernel and unpackbitskernel) that operate efficiently on Intel GPU hardware. Queue management is integrated with PyTorch's stream system for proper synchronization with other operations.

Usage

Use these functions when deploying DeepSpeed on Intel XPU hardware for distributed training with gradient compression. The sign-based compression is particularly effective for signSGD and related algorithms where only gradient directions matter, enabling significant communication speedup in multi-GPU or multi-node training.

Code Reference

Source Location

Signature

// SYCL kernel for packing float signs into bits
void packbitskernel(const float* input, uint8_t* output,
                   const int input_size, sycl::id<1> item_ct1);

// SYCL kernel for unpacking bits back to float signs
void unpackbitskernel(const uint8_t* input, float* output,
                     sycl::id<1> item_ct1);

// Pack float32 tensor to uint8 (8× compression)
at::Tensor packbits(at::Tensor tensor, int input_size, int rank);

// Unpack uint8 back to float32 signs
at::Tensor unpackbits(at::Tensor tensor, int input_size, int rank);

// Helper: get SYCL queue for device
sycl::queue get_current_queue(at::Device device);

Import

import deepspeed.ops.xpu_packbits as xpu_pack

I/O Contract

Input Type Description
tensor torch.Tensor FP32 tensor on Intel XPU device
input_size int Number of elements in tensor
rank int XPU device rank/ID
Output Type Description
packed torch.Tensor (uint8) Packed bits (8× smaller)
unpacked torch.Tensor (float32) Reconstructed signs (±1.0)

Usage Examples

Basic Gradient Compression:

import torch
import deepspeed.ops.xpu_packbits as xpu_pack

# Simulate gradients on Intel XPU
device = "xpu:0"
gradients = torch.randn(1024, 4096, device=device)

# Pack gradient signs (8× compression)
input_size = gradients.numel()
packed = xpu_pack.packbits(gradients, input_size, rank=0)

print(f"Original size: {gradients.element_size() * input_size} bytes")
print(f"Packed size: {packed.element_size() * packed.numel()} bytes")
print(f"Compression: {gradients.element_size() * input_size / (packed.element_size() * packed.numel())}×")

# Unpack back to signs
unpacked = xpu_pack.unpackbits(packed, input_size, rank=0)

# Verify signs match
original_signs = torch.sign(gradients)
print(f"Sign accuracy: {(original_signs == unpacked).float().mean():.2%}")

SignSGD Communication:

class SignSGDCompressor:
    """
    Compress gradients to signs for communication
    """
    def __init__(self, rank):
        self.rank = rank

    def compress(self, gradient):
        """
        Compress gradient to sign bits
        Returns: packed tensor, original shape
        """
        original_shape = gradient.shape
        flat_grad = gradient.view(-1)

        # Pack signs
        packed = xpu_pack.packbits(flat_grad, flat_grad.numel(), self.rank)

        return packed, original_shape

    def decompress(self, packed, original_shape):
        """
        Decompress packed signs back to ±1 gradients
        """
        total_elems = torch.prod(torch.tensor(original_shape)).item()
        flat_signs = xpu_pack.unpackbits(packed, total_elems, self.rank)

        return flat_signs.view(original_shape)

# Usage in training loop
compressor = SignSGDCompressor(rank=0)

for param in model.parameters():
    if param.grad is not None:
        # Compress
        packed, shape = compressor.compress(param.grad)

        # All-reduce compressed gradients (8× less data)
        # dist.all_reduce(packed)

        # Decompress
        sign_grad = compressor.decompress(packed, shape)

        # Apply sign-based update
        param.data -= learning_rate * sign_grad

Distributed Training Integration:

import torch.distributed as dist

class XPUDistributedTrainer:
    def __init__(self, model, rank, world_size):
        self.model = model
        self.rank = rank
        self.world_size = world_size
        self.device = f"xpu:{rank}"

    def reduce_gradients(self):
        """
        All-reduce gradients with sign compression
        """
        for param in self.model.parameters():
            if param.grad is None:
                continue

            # Move to contiguous memory
            grad = param.grad.contiguous()
            size = grad.numel()

            # Pack gradient signs
            packed = xpu_pack.packbits(grad, size, self.rank)

            # All-reduce packed data (8× less communication)
            dist.all_reduce(packed, op=dist.ReduceOp.SUM)

            # Unpack and apply majority vote
            # Each bit was summed: >world_size/2 means majority positive
            unpacked = xpu_pack.unpackbits(packed, size, self.rank)

            # Majority vote: sign based on sum of packed values
            # (This is simplified; actual impl would threshold the sums)
            param.grad = torch.sign(unpacked).view_as(param.grad)

    def train_step(self, batch):
        # Forward pass
        loss = self.model(batch)

        # Backward pass
        loss.backward()

        # Compress and communicate gradients
        self.reduce_gradients()

        # Update parameters
        # optimizer.step()

Memory-Efficient Gradient Accumulation:

class CompressedGradientAccumulator:
    """
    Accumulate gradients in compressed form to save memory
    """
    def __init__(self, model, rank):
        self.rank = rank
        self.accumulated = {}

        # Pre-allocate compressed buffers
        for name, param in model.named_parameters():
            if param.requires_grad:
                size = param.numel()
                packed_size = (size + 7) // 8
                self.accumulated[name] = torch.zeros(
                    packed_size, dtype=torch.uint8, device=f"xpu:{rank}")

    def accumulate(self, model):
        """
        Add current gradients to accumulation (in compressed form)
        """
        for name, param in model.named_parameters():
            if param.grad is None:
                continue

            # Compress current gradient
            size = param.grad.numel()
            packed = xpu_pack.packbits(param.grad.contiguous(), size, self.rank)

            # XOR to accumulate signs (bit-level addition mod 2)
            self.accumulated[name] ^= packed

    def get_accumulated(self, model):
        """
        Decompress accumulated gradients
        """
        for name, param in model.named_parameters():
            if not param.requires_grad:
                continue

            size = param.numel()
            unpacked = xpu_pack.unpackbits(
                self.accumulated[name], size, self.rank)

            param.grad = unpacked.view_as(param)

    def zero(self):
        """
        Reset accumulation
        """
        for packed in self.accumulated.values():
            packed.zero_()

# Usage
accumulator = CompressedGradientAccumulator(model, rank=0)

for micro_batch in range(gradient_accumulation_steps):
    loss = model(micro_batch)
    loss.backward()
    accumulator.accumulate(model)
    model.zero_grad()

# Get accumulated gradients for optimizer step
accumulator.get_accumulated(model)
optimizer.step()
accumulator.zero()

Benchmarking Compression Speed:

import time

def benchmark_packbits(size, num_iterations=100):
    """
    Measure packing/unpacking throughput
    """
    device = "xpu:0"
    data = torch.randn(size, device=device)

    # Warmup
    for _ in range(10):
        packed = xpu_pack.packbits(data, size, 0)
        unpacked = xpu_pack.unpackbits(packed, size, 0)

    # Benchmark packing
    torch.xpu.synchronize()
    start = time.time()
    for _ in range(num_iterations):
        packed = xpu_pack.packbits(data, size, 0)
    torch.xpu.synchronize()
    pack_time = (time.time() - start) / num_iterations

    # Benchmark unpacking
    torch.xpu.synchronize()
    start = time.time()
    for _ in range(num_iterations):
        unpacked = xpu_pack.unpackbits(packed, size, 0)
    torch.xpu.synchronize()
    unpack_time = (time.time() - start) / num_iterations

    pack_bw = (size * 4) / pack_time / 1e9  # GB/s
    unpack_bw = (size * 4) / unpack_time / 1e9

    print(f"Size: {size:,} elements")
    print(f"Pack time: {pack_time*1000:.3f} ms ({pack_bw:.2f} GB/s)")
    print(f"Unpack time: {unpack_time*1000:.3f} ms ({unpack_bw:.2f} GB/s)")
    print(f"Compression ratio: 8.0×")

# Test various sizes
for size in [1024*1024, 10*1024*1024, 100*1024*1024]:
    benchmark_packbits(size)
    print()

Error Feedback for Improved Accuracy:

class ErrorFeedbackSignSGD:
    """
    SignSGD with error feedback to maintain accuracy
    """
    def __init__(self, model, rank):
        self.rank = rank
        self.error_buffer = {}

        for name, param in model.named_parameters():
            if param.requires_grad:
                self.error_buffer[name] = torch.zeros_like(param.data)

    def compress_and_communicate(self, model):
        for name, param in model.named_parameters():
            if param.grad is None:
                continue

            # Add error feedback
            compensated_grad = param.grad + self.error_buffer[name]

            # Compress
            size = compensated_grad.numel()
            packed = xpu_pack.packbits(compensated_grad.contiguous(),
                                       size, self.rank)

            # Communicate
            # dist.all_reduce(packed)

            # Decompress
            sign_grad = xpu_pack.unpackbits(packed, size, self.rank)
            sign_grad = sign_grad.view_as(param.grad)

            # Update error buffer: error = true_grad - compressed_grad
            self.error_buffer[name] = param.grad - sign_grad

            # Use compressed gradient
            param.grad = sign_grad

# Usage maintains better convergence than plain SignSGD

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment