Implementation:Deepspeedai DeepSpeed XPU Packbits
| Knowledge Sources | |
|---|---|
| Domains | Intel_XPU, SYCL, Bit_Packing, Communication_Compression |
| Last Updated | 2026-02-09 00:00 GMT |
Overview
SYCL-based bit-packing operations for Intel XPU devices providing 8× compression of FP32 tensors by encoding signs as single bits for distributed training communication.
Description
This module implements bit-level compression for Intel XPUs (GPUs) using the SYCL programming model and Intel Extension for PyTorch (IPEX). The packbits function compresses FP32 tensors by extracting the sign bit of each element and packing eight signs into a single uint8 byte, achieving 8× compression ratio. The unpackbits function reverses this operation, reconstructing FP32 values as +1.0 or -1.0 based on the packed bits. This compression is particularly valuable for distributed training where gradient signs (rather than precise values) are sufficient for certain communication schemes like signSGD. The implementation uses SYCL's parallel_for construct with optimized device-side kernels (packbitskernel and unpackbitskernel) that operate efficiently on Intel GPU hardware. Queue management is integrated with PyTorch's stream system for proper synchronization with other operations.
Usage
Use these functions when deploying DeepSpeed on Intel XPU hardware for distributed training with gradient compression. The sign-based compression is particularly effective for signSGD and related algorithms where only gradient directions matter, enabling significant communication speedup in multi-GPU or multi-node training.
Code Reference
Source Location
- Repository: DeepSpeed
- File: csrc/xpu/packbits/packing.cpp
Signature
// SYCL kernel for packing float signs into bits
void packbitskernel(const float* input, uint8_t* output,
const int input_size, sycl::id<1> item_ct1);
// SYCL kernel for unpacking bits back to float signs
void unpackbitskernel(const uint8_t* input, float* output,
sycl::id<1> item_ct1);
// Pack float32 tensor to uint8 (8× compression)
at::Tensor packbits(at::Tensor tensor, int input_size, int rank);
// Unpack uint8 back to float32 signs
at::Tensor unpackbits(at::Tensor tensor, int input_size, int rank);
// Helper: get SYCL queue for device
sycl::queue get_current_queue(at::Device device);
Import
import deepspeed.ops.xpu_packbits as xpu_pack
I/O Contract
| Input | Type | Description |
|---|---|---|
| tensor | torch.Tensor | FP32 tensor on Intel XPU device |
| input_size | int | Number of elements in tensor |
| rank | int | XPU device rank/ID |
| Output | Type | Description |
|---|---|---|
| packed | torch.Tensor (uint8) | Packed bits (8× smaller) |
| unpacked | torch.Tensor (float32) | Reconstructed signs (±1.0) |
Usage Examples
Basic Gradient Compression:
import torch
import deepspeed.ops.xpu_packbits as xpu_pack
# Simulate gradients on Intel XPU
device = "xpu:0"
gradients = torch.randn(1024, 4096, device=device)
# Pack gradient signs (8× compression)
input_size = gradients.numel()
packed = xpu_pack.packbits(gradients, input_size, rank=0)
print(f"Original size: {gradients.element_size() * input_size} bytes")
print(f"Packed size: {packed.element_size() * packed.numel()} bytes")
print(f"Compression: {gradients.element_size() * input_size / (packed.element_size() * packed.numel())}×")
# Unpack back to signs
unpacked = xpu_pack.unpackbits(packed, input_size, rank=0)
# Verify signs match
original_signs = torch.sign(gradients)
print(f"Sign accuracy: {(original_signs == unpacked).float().mean():.2%}")
SignSGD Communication:
class SignSGDCompressor:
"""
Compress gradients to signs for communication
"""
def __init__(self, rank):
self.rank = rank
def compress(self, gradient):
"""
Compress gradient to sign bits
Returns: packed tensor, original shape
"""
original_shape = gradient.shape
flat_grad = gradient.view(-1)
# Pack signs
packed = xpu_pack.packbits(flat_grad, flat_grad.numel(), self.rank)
return packed, original_shape
def decompress(self, packed, original_shape):
"""
Decompress packed signs back to ±1 gradients
"""
total_elems = torch.prod(torch.tensor(original_shape)).item()
flat_signs = xpu_pack.unpackbits(packed, total_elems, self.rank)
return flat_signs.view(original_shape)
# Usage in training loop
compressor = SignSGDCompressor(rank=0)
for param in model.parameters():
if param.grad is not None:
# Compress
packed, shape = compressor.compress(param.grad)
# All-reduce compressed gradients (8× less data)
# dist.all_reduce(packed)
# Decompress
sign_grad = compressor.decompress(packed, shape)
# Apply sign-based update
param.data -= learning_rate * sign_grad
Distributed Training Integration:
import torch.distributed as dist
class XPUDistributedTrainer:
def __init__(self, model, rank, world_size):
self.model = model
self.rank = rank
self.world_size = world_size
self.device = f"xpu:{rank}"
def reduce_gradients(self):
"""
All-reduce gradients with sign compression
"""
for param in self.model.parameters():
if param.grad is None:
continue
# Move to contiguous memory
grad = param.grad.contiguous()
size = grad.numel()
# Pack gradient signs
packed = xpu_pack.packbits(grad, size, self.rank)
# All-reduce packed data (8× less communication)
dist.all_reduce(packed, op=dist.ReduceOp.SUM)
# Unpack and apply majority vote
# Each bit was summed: >world_size/2 means majority positive
unpacked = xpu_pack.unpackbits(packed, size, self.rank)
# Majority vote: sign based on sum of packed values
# (This is simplified; actual impl would threshold the sums)
param.grad = torch.sign(unpacked).view_as(param.grad)
def train_step(self, batch):
# Forward pass
loss = self.model(batch)
# Backward pass
loss.backward()
# Compress and communicate gradients
self.reduce_gradients()
# Update parameters
# optimizer.step()
Memory-Efficient Gradient Accumulation:
class CompressedGradientAccumulator:
"""
Accumulate gradients in compressed form to save memory
"""
def __init__(self, model, rank):
self.rank = rank
self.accumulated = {}
# Pre-allocate compressed buffers
for name, param in model.named_parameters():
if param.requires_grad:
size = param.numel()
packed_size = (size + 7) // 8
self.accumulated[name] = torch.zeros(
packed_size, dtype=torch.uint8, device=f"xpu:{rank}")
def accumulate(self, model):
"""
Add current gradients to accumulation (in compressed form)
"""
for name, param in model.named_parameters():
if param.grad is None:
continue
# Compress current gradient
size = param.grad.numel()
packed = xpu_pack.packbits(param.grad.contiguous(), size, self.rank)
# XOR to accumulate signs (bit-level addition mod 2)
self.accumulated[name] ^= packed
def get_accumulated(self, model):
"""
Decompress accumulated gradients
"""
for name, param in model.named_parameters():
if not param.requires_grad:
continue
size = param.numel()
unpacked = xpu_pack.unpackbits(
self.accumulated[name], size, self.rank)
param.grad = unpacked.view_as(param)
def zero(self):
"""
Reset accumulation
"""
for packed in self.accumulated.values():
packed.zero_()
# Usage
accumulator = CompressedGradientAccumulator(model, rank=0)
for micro_batch in range(gradient_accumulation_steps):
loss = model(micro_batch)
loss.backward()
accumulator.accumulate(model)
model.zero_grad()
# Get accumulated gradients for optimizer step
accumulator.get_accumulated(model)
optimizer.step()
accumulator.zero()
Benchmarking Compression Speed:
import time
def benchmark_packbits(size, num_iterations=100):
"""
Measure packing/unpacking throughput
"""
device = "xpu:0"
data = torch.randn(size, device=device)
# Warmup
for _ in range(10):
packed = xpu_pack.packbits(data, size, 0)
unpacked = xpu_pack.unpackbits(packed, size, 0)
# Benchmark packing
torch.xpu.synchronize()
start = time.time()
for _ in range(num_iterations):
packed = xpu_pack.packbits(data, size, 0)
torch.xpu.synchronize()
pack_time = (time.time() - start) / num_iterations
# Benchmark unpacking
torch.xpu.synchronize()
start = time.time()
for _ in range(num_iterations):
unpacked = xpu_pack.unpackbits(packed, size, 0)
torch.xpu.synchronize()
unpack_time = (time.time() - start) / num_iterations
pack_bw = (size * 4) / pack_time / 1e9 # GB/s
unpack_bw = (size * 4) / unpack_time / 1e9
print(f"Size: {size:,} elements")
print(f"Pack time: {pack_time*1000:.3f} ms ({pack_bw:.2f} GB/s)")
print(f"Unpack time: {unpack_time*1000:.3f} ms ({unpack_bw:.2f} GB/s)")
print(f"Compression ratio: 8.0×")
# Test various sizes
for size in [1024*1024, 10*1024*1024, 100*1024*1024]:
benchmark_packbits(size)
print()
Error Feedback for Improved Accuracy:
class ErrorFeedbackSignSGD:
"""
SignSGD with error feedback to maintain accuracy
"""
def __init__(self, model, rank):
self.rank = rank
self.error_buffer = {}
for name, param in model.named_parameters():
if param.requires_grad:
self.error_buffer[name] = torch.zeros_like(param.data)
def compress_and_communicate(self, model):
for name, param in model.named_parameters():
if param.grad is None:
continue
# Add error feedback
compensated_grad = param.grad + self.error_buffer[name]
# Compress
size = compensated_grad.numel()
packed = xpu_pack.packbits(compensated_grad.contiguous(),
size, self.rank)
# Communicate
# dist.all_reduce(packed)
# Decompress
sign_grad = xpu_pack.unpackbits(packed, size, self.rank)
sign_grad = sign_grad.view_as(param.grad)
# Update error buffer: error = true_grad - compressed_grad
self.error_buffer[name] = param.grad - sign_grad
# Use compressed gradient
param.grad = sign_grad
# Usage maintains better convergence than plain SignSGD
Related Pages
- Quantization Binding - Alternative gradient compression
- FP Quantize - Floating-point quantization methods
- Random LTD Binding - Another compression technique