Jump to content

Connect SuperML | Leeroopedia MCP: Equip your AI agents with best practices, code verification, and debugging knowledge. Powered by Leeroo — building Organizational Superintelligence. Contact us at founders@leeroo.com.

Implementation:Microsoft DeepSpeedExamples Main Training Loop SuperOffload

From Leeroopedia
Revision as of 15:41, 16 February 2026 by Admin (talk | contribs) (Auto-imported from implementations/Microsoft_DeepSpeedExamples_Main_Training_Loop_SuperOffload.md)
(diff) ← Older revision | Latest revision (diff) | Newer revision → (diff)


Metadata

Field Value
Page Type Implementation
Title Main_Training_Loop_SuperOffload
Repository Microsoft/DeepSpeedExamples
Type Direct Function
Code Reference File: training/DeepSpeed-SuperOffload/finetune_zero3.py, Lines 289-360
Import Direct code in main() function of finetune_zero3.py
Related Principle Principle:Microsoft_DeepSpeedExamples_DeepSpeed_Training_Loop

Overview

Concrete tool for executing the SuperOffload training loop with TFLOPS estimation and WandB logging. Implements the DeepSpeed managed training pattern with per-step performance metrics, MoE-aware TFLOPS calculation, and configurable benchmarking.

Function: estimate_transformer_tflops

Signature

def estimate_transformer_tflops(
    seq_len: int,
    model_size: int,
    num_layers: int,
    hidden_size: int,
    use_activation_checkpointing: bool = False
) -> float:

Code Reference: File: training/DeepSpeed-SuperOffload/finetune_zero3.py, Lines 63-78

Description

Estimates the TFLOPS for a single training sample on a dense (non-MoE) decoder-only transformer model. The estimate accounts for the additional compute introduced by activation checkpointing.

Implementation

def estimate_transformer_tflops(
    seq_len: int,
    model_size: int,
    num_layers: int,
    hidden_size: int,
    use_activation_checkpointing: bool = False
) -> float:
    """
    Estimate TFLOPS for decoder-only dense models.
    """
    coefficient = 4 if use_activation_checkpointing else 3
    tflops = (
        2 * coefficient * model_size * seq_len
        + 2 * 2 * coefficient * num_layers * hidden_size * seq_len**2
    ) / TFLOPS_DENOMINATOR
    return tflops

I/O Contract

Parameter Type Description Default
seq_len int Sequence length (e.g., 2048, 4096) (required)
model_size int Total number of model parameters (required)
num_layers int Number of transformer layers (model.config.num_hidden_layers) (required)
hidden_size int Hidden dimension size (model.config.hidden_size) (required)
use_activation_checkpointing bool Whether activation checkpointing is enabled False

Returns: float -- Estimated TFLOPS per sample (divide by step time and multiply by batch size for actual TFLOPS).

Helper Function: get_parameter_count

Code Reference: File: training/DeepSpeed-SuperOffload/finetune_zero3.py, Lines 59-60

def get_parameter_count(parameter: torch.nn.Parameter) -> int:
    return parameter.ds_numel if hasattr(parameter, "ds_tensor") else parameter.numel()

This helper handles ZeRO-3 partitioned parameters, where parameter.numel() returns the local partition size. The ds_numel attribute holds the original (full) parameter count.

Training Loop Implementation

Code Reference: File: training/DeepSpeed-SuperOffload/finetune_zero3.py, Lines 289-360

stop = False
for epoch in range(args.num_train_epochs):
    logger.debug(f"Starting epoch {epoch + 1}/{args.num_train_epochs}")

    for step, batch in enumerate(train_dataloader):
        step_start_time = time.time()
        batch = {k: v.to(model_engine.device) for k, v in batch.items()}

        actual_batch_size = batch['input_ids'].shape[0]
        tokens_in_batch = actual_batch_size * sequence_length

        outputs = model_engine(**batch)
        loss = outputs.loss

        model_engine.backward(loss)

        model_engine.step()

        step_time = time.time() - step_start_time
        global_step += 1

        if global_step > args.warmup_steps:
            iter_times.append(step_time)

        losses.append(loss.item())

        total_tokens_processed += tokens_in_batch
        total_train_time += step_time

        tokens_per_second = tokens_in_batch / step_time
        step_tflops = None

        if not is_moe_model and total_tflops is not None:
            step_tflops = args.batch_size * total_tflops / step_time

        if global_step % args.log_interval == 0:
            avg_loss = sum(losses[-args.log_interval:]) / len(losses[-args.log_interval:])

            if is_moe_model:
                log_msg = (f"Step {global_step:4d} | "
                          f"Loss: {avg_loss:.4f} | "
                          f"Time: {step_time * MS_PER_SECOND:5.0f}ms")
            else:
                log_msg = (f"Step {global_step:4d} | "
                          f"Loss: {avg_loss:.4f} | "
                          f"Time: {step_time * MS_PER_SECOND:5.0f}ms | "
                          f"TFLOPS: {step_tflops:5.2f} | "
                          f"Tokens/s: {tokens_per_second:6.0f}")

            logger.info(log_msg)

            if args.use_wandb and dist.get_rank() == 0:
                log_dict = {
                    "train/loss": avg_loss,
                    "train/epoch": epoch + 1,
                    "train/global_step": global_step,
                    "train/learning_rate": args.lr,
                    "perf/step_time_ms": step_time * MS_PER_SECOND,
                    "perf/tokens_per_second": tokens_per_second,
                }

                if not is_moe_model and step_tflops is not None:
                    log_dict["perf/tflops"] = step_tflops

                wandb.log(log_dict, step=global_step)

        stop = global_step >= args.bench_steps
        if stop:
            break

    if stop:
        break

Pre-Loop Setup

Before the training loop begins (Lines 265-287):

model_engine.train()

sequence_length = args.max_length
model_size = sum(get_parameter_count(p) for p in model.parameters())
is_moe_model = detect_moe_model(model, args.model_name)

logger.debug(f"Model type: {'MoE' if is_moe_model else 'Dense'}")
logger.debug(f"Model size: {model_size:,} parameters")

# Calculate TFLOPS only for non-MoE models
total_tflops = None
if not is_moe_model:
    total_tflops = estimate_transformer_tflops(
        sequence_length, model_size, model.config.num_hidden_layers,
        model.config.hidden_size, args.activation_checkpointing
    )

global_step = 0
total_tokens_processed = 0
total_train_time = 0
iter_times = []
losses = []

Training Step Breakdown

Each training step performs the following operations:

Step Code Description
1. Move to device batch = {k: v.to(model_engine.device) ...} Transfer batch tensors to the GPU managed by the DeepSpeed engine
2. Forward pass outputs = model_engine(**batch) Run forward pass; DeepSpeed gathers partitioned parameters as needed
3. Extract loss loss = outputs.loss Get the causal LM loss from model output
4. Backward pass model_engine.backward(loss) Compute gradients with automatic mixed precision scaling and distributed reduce-scatter
5. Optimizer step model_engine.step() Execute CPU Adam update, transfer parameters, zero gradients

Performance Tracking

Warmup Exclusion

Steps within the warmup period (global_step <= args.warmup_steps) are excluded from the iter_times list. This ensures that JIT compilation, memory allocation, and CUDA cache warming do not skew performance measurements.

MoE-Aware Logging

The logging differentiates between dense and MoE models:

  • Dense models: Log loss, step time, TFLOPS, and tokens/second.
  • MoE models: Log loss and step time only (TFLOPS estimation is not applicable because the formula assumes dense computation).

Benchmarking Mode

When bench_steps is set, the training loop terminates after the specified number of global steps, regardless of epoch boundaries. This is controlled by the stop flag.

WandB Integration

WandB logging occurs only on rank 0 to avoid duplicate entries. The initialization happens before the training loop:

# In initialize_wandb() (Lines 208-222)
if args.use_wandb and dist.get_rank() == 0:
    wandb.init(
        project=args.wandb_project,
        name=wandb_run_name,
        tags=args.wandb_tags,
        config=vars(args)
    )

Usage Example

# After deepspeed.initialize()
model_engine.train()

model_size = sum(get_parameter_count(p) for p in model.parameters())
total_tflops = estimate_transformer_tflops(
    seq_len=4096, model_size=model_size,
    num_layers=model.config.num_hidden_layers,
    hidden_size=model.config.hidden_size,
    use_activation_checkpointing=True
)

for epoch in range(num_epochs):
    for step, batch in enumerate(train_dataloader):
        batch = {k: v.to(model_engine.device) for k, v in batch.items()}

        outputs = model_engine(**batch)
        loss = outputs.loss

        model_engine.backward(loss)
        model_engine.step()

        step_tflops = batch_size * total_tflops / step_time
        print(f"Step {step}: loss={loss.item():.4f}, TFLOPS={step_tflops:.2f}")

Related Pages

Page Connections

Double-click a node to navigate. Hold to expand connections.
Principle
Implementation
Heuristic
Environment