Lab 05: Distributed Training ValidationΒΆ

NCCL/RCCL Β· Multi-GPU Communication Β· Scaling Β· Hang DetectionΒΆ

Role alignment: AMD Principal Staff – AI/ML Performance Validation
Reference: 06_distributed_training_validation.ipynb

What you will do:ΒΆ

  1. Understand AllReduce and collective communication primitives

  2. Simulate multi-GPU communication patterns on single GPU

  3. Validate AllReduce correctness (all GPUs get same result)

  4. Measure communication bandwidth and scaling efficiency

  5. Detect and simulate distributed training failure modes

Many exercises run in simulation mode if only 1 GPU (or CPU) is available.
The concepts and validation patterns are identical to real multi-node setups.

SetupΒΆ

The setup detects all available GPUs and determines whether the lab runs in real multi-GPU mode (using actual NCCL/RCCL collectives) or simulation mode (emulating distributed operations on a single device). The simulation mode faithfully reproduces the mathematical operations of each collective (AllReduce = sum + normalize, AllGather = concatenate, Broadcast = copy) so the validation concepts transfer directly to real multi-GPU and multi-node environments. When multiple GPUs are detected, the lab uses torch.multiprocessing.spawn to launch actual distributed workers with NCCL as the communication backend.

import torch
import torch.distributed as dist
import torch.nn as nn
import time
import os
import math
from datetime import datetime

if torch.cuda.is_available():
    DEVICE = 'cuda'
    GPU_COUNT = torch.cuda.device_count()
elif torch.backends.mps.is_available():
    DEVICE = 'mps'
    GPU_COUNT = 1
else:
    DEVICE = 'cpu'
    GPU_COUNT = 0

print(f'Device    : {DEVICE}')
print(f'GPU count : {GPU_COUNT}')
if DEVICE == 'cuda':
    for i in range(GPU_COUNT):
        print(f'  GPU {i}: {torch.cuda.get_device_name(i)}')

MULTI_GPU = GPU_COUNT > 1
print(f'Multi-GPU : {MULTI_GPU}')
print(f'Mode      : {"Real multi-GPU" if MULTI_GPU else "Simulation (single GPU/CPU)"}')

Concept: Collective Communication OperationsΒΆ

These are the building blocks of distributed training β€” implemented in NCCL (NVIDIA) and RCCL (AMD):

AllReduce   : Each GPU gets the SUM of tensors from all GPUs
              Used for: gradient synchronization (the most common op)

Broadcast   : One GPU sends its tensor to all others
              Used for: model weight distribution at startup

AllGather   : Each GPU gets ALL tensors from all GPUs (concatenated)
              Used for: Tensor Parallelism in LLMs

ReduceScatter: Opposite of AllGather β€” reduce then scatter
              Used for: ZeRO optimizer in DeepSpeed

Validation goal: Verify these operations produce correct results and meet bandwidth expectations.

Exercise 5.1 – AllReduce Correctness (Simulation)ΒΆ

Simulate what AllReduce does: each β€œGPU” has a tensor, all should end up with the sum.

def simulate_allreduce(num_workers=4, tensor_size=1024):
    """
    Simulate AllReduce correctness.
    Each worker has a different gradient tensor.
    After AllReduce, all should have the mean gradient.
    """
    # Each worker has its own gradient (simulating different mini-batches)
    worker_grads = [torch.randn(tensor_size) for _ in range(num_workers)]

    # Reference: mean of all gradients
    expected = torch.stack(worker_grads).mean(dim=0)

    # Simulated AllReduce (SUM then normalize)
    allreduced = torch.zeros(tensor_size)
    for g in worker_grads:
        allreduced += g
    allreduced /= num_workers

    # Verify all workers get the same result
    max_err = (allreduced - expected).abs().max().item()
    passed  = max_err < 1e-5

    print(f'AllReduce simulation:')
    print(f'  Workers       : {num_workers}')
    print(f'  Tensor size   : {tensor_size} elements ({tensor_size * 4 / 1e6:.2f} MB FP32)')
    print(f'  Max error     : {max_err:.2e}')
    print(f'  Status        : {"PASS" if passed else "FAIL"}')
    return passed


for n_workers in [2, 4, 8, 64]:
    simulate_allreduce(num_workers=n_workers, tensor_size=1024 * 1024)  # 4MB tensor
    print()

Exercise 5.2 – Real AllReduce (if Multi-GPU available)ΒΆ

Use torch.distributed to do actual NCCL/RCCL AllReduce if multiple GPUs are available.

# This cell runs real distributed AllReduce if multi-GPU is available
# Otherwise, shows the code pattern

ALLREDUCE_TEST_CODE = '''
# Pattern for real multi-GPU AllReduce validation
# (Requires torchrun or torch.multiprocessing)

import torch
import torch.distributed as dist
import os

def allreduce_worker(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    
    # RCCL for AMD, NCCL for NVIDIA
    backend = 'nccl'  # AMD ROCm uses 'nccl' via RCCL compatibility layer
    dist.init_process_group(backend, rank=rank, world_size=world_size)
    
    torch.cuda.set_device(rank)
    
    # Each rank has different tensor
    tensor = torch.ones(1024, device=f'cuda:{rank}') * (rank + 1)
    
    # Expected after AllReduce: sum = 1+2+...+world_size
    expected_sum = world_size * (world_size + 1) / 2
    
    dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
    
    # Validate
    actual = tensor[0].item()
    passed = abs(actual - expected_sum) < 1e-3
    print(f"Rank {rank}: expected={expected_sum}, got={actual}, PASS={passed}")
    
    dist.destroy_process_group()

# Run with: torchrun --nproc_per_node=4 script.py
# Or:       torch.multiprocessing.spawn(allreduce_worker, args=(4,), nprocs=4)
'''

if MULTI_GPU:
    # Real AllReduce with multiple GPUs
    import torch.multiprocessing as mp

    def allreduce_worker(rank, world_size, results_queue):
        os.environ['MASTER_ADDR'] = 'localhost'
        os.environ['MASTER_PORT'] = '29500'
        dist.init_process_group('nccl', rank=rank, world_size=world_size)
        torch.cuda.set_device(rank)

        tensor = torch.ones(1024, device=f'cuda:{rank}') * (rank + 1)
        expected = world_size * (world_size + 1) / 2

        dist.all_reduce(tensor, op=dist.ReduceOp.SUM)

        actual = tensor[0].item()
        passed = abs(actual - expected) < 1e-3
        results_queue.put({'rank': rank, 'expected': expected, 'actual': actual, 'passed': passed})
        dist.destroy_process_group()

    ctx = mp.get_context('spawn')
    q   = ctx.Queue()
    procs = [ctx.Process(target=allreduce_worker, args=(r, GPU_COUNT, q))
             for r in range(GPU_COUNT)]
    for p in procs: p.start()
    for p in procs: p.join()

    results = [q.get() for _ in range(GPU_COUNT)]
    for r in sorted(results, key=lambda x: x['rank']):
        print(f"Rank {r['rank']}: expected={r['expected']}, got={r['actual']}, "
              f"{'PASS' if r['passed'] else 'FAIL'}")
else:
    print('Single GPU/CPU mode β€” showing code pattern for multi-GPU AllReduce:')
    print(ALLREDUCE_TEST_CODE)

Exercise 5.3 – Communication Bandwidth MeasurementΒΆ

Measure AllReduce bandwidth for different message sizes.
In a real cluster: bandwidth should approach NVLink (600 GB/s) or InfiniBand (400 Gb/s).

def simulate_allreduce_bandwidth(message_size_bytes, num_gpus=8, iterations=100):
    """
    Simulate AllReduce bandwidth.
    Ring AllReduce formula: 2 * (N-1)/N * message_size / time
    """
    numel = message_size_bytes // 4  # FP32

    # Simulate transfer time based on NVLink bandwidth (600 GB/s)
    # On a real cluster this comes from actual timing
    nvlink_bw_gbs = 600  # NVLink theoretical
    transfer_time = (message_size_bytes / 1e9) / nvlink_bw_gbs
    # Add overhead (latency + scheduling)
    overhead = 50e-6  # 50 microseconds

    simulated_time = transfer_time * 2 * (num_gpus - 1) / num_gpus + overhead

    # Ring AllReduce effective bandwidth
    algbw = (message_size_bytes / 1e9) / simulated_time  # GB/s
    busbw = algbw * 2 * (num_gpus - 1) / num_gpus        # bus bandwidth

    return algbw, busbw, simulated_time * 1000


message_sizes_bytes = [
    1 * 1024,            # 1 KB  β€” tiny (attention head)
    1 * 1024 * 1024,     # 1 MB
    10 * 1024 * 1024,    # 10 MB β€” typical gradient chunk
    100 * 1024 * 1024,   # 100 MB
    1024 * 1024 * 1024,  # 1 GB β€” large model all-reduce
]

print('AllReduce Bandwidth vs Message Size (8-GPU simulation):')
print(f"{'Size':>12}  {'AlgBW (GB/s)':>14}  {'BusBW (GB/s)':>14}  {'Time (ms)':>12}  Status")
print('-' * 70)

bw_results = []
for sz in message_sizes_bytes:
    algbw, busbw, time_ms = simulate_allreduce_bandwidth(sz)
    # Pass if algbw > 50 GB/s for large messages (datacenter standard)
    ok = algbw > 50 or sz < 1e6  # small messages inherently low BW due to latency
    size_str = f'{sz // 1024}KB' if sz < 1e6 else f'{sz // 1024 // 1024}MB' if sz < 1e9 else '1GB'
    status = 'PASS' if ok else 'WARN'
    print(f'{size_str:>12}  {algbw:>14.1f}  {busbw:>14.1f}  {time_ms:>12.3f}  {status}')
    bw_results.append({'size_bytes': sz, 'algbw_gbs': round(algbw, 1), 'busbw_gbs': round(busbw, 1)})

print('\nNote: Real results require nccl-tests or rccl-tests on actual cluster')
print('AMD RCCL benchmark: github.com/ROCm/rccl-tests')

Exercise 5.4 – Linear Scaling ValidationΒΆ

When you add more GPUs, throughput should scale linearly (ideal = 100% efficiency).
Communication overhead causes sub-linear scaling. Validate and measure efficiency.

def simulate_scaling_efficiency(model_flops_per_step, comm_overhead_fraction):
    """
    Compute/communicate overlap model.
    
    Real distributed training uses compute/comm overlap to hide latency.
    comm_overhead_fraction: fraction of step time spent in communication.
    """
    gpu_counts = [1, 2, 4, 8, 16, 32, 64, 128]
    results    = []

    for n_gpu in gpu_counts:
        # Compute time: scales perfectly (linear)
        compute_time = model_flops_per_step / n_gpu

        # Comm time: AllReduce scales as O(log N) with ring topology
        # But bandwidth also increases with more GPUs (NVLink switches)
        # Simplified model: comm time grows slowly with GPU count
        if n_gpu == 1:
            comm_time = 0
        else:
            comm_time = comm_overhead_fraction * model_flops_per_step * math.log2(n_gpu) / n_gpu

        # With compute/comm overlap, effective time β‰ˆ max(compute, comm)
        effective_time = max(compute_time, comm_time)
        throughput     = model_flops_per_step / effective_time  # normalized

        ideal_throughput = n_gpu  # perfect linear scaling
        efficiency_pct   = (throughput / ideal_throughput) * 100

        results.append({
            'gpus': n_gpu,
            'throughput': round(throughput, 2),
            'ideal': ideal_throughput,
            'efficiency_pct': round(efficiency_pct, 1),
            'comm_pct': round(comm_time / effective_time * 100 if effective_time > 0 else 0, 1),
        })

    return results


# Simulate two scenarios: low vs high communication overhead
for scenario, comm_frac in [('Low comm overhead (good overlap)', 0.05),
                              ('High comm overhead (poor overlap)', 0.20)]:
    print(f'\nScenario: {scenario}')
    print(f"{'GPUs':>8}  {'Throughput':>12}  {'Ideal':>8}  {'Efficiency':>12}  {'Comm %':>8}  Status")
    print('-' * 65)

    scaling = simulate_scaling_efficiency(1000.0, comm_frac)
    for r in scaling:
        ok = r['efficiency_pct'] >= 80
        status = 'PASS' if ok else 'WARN'
        print(f'{r["gpus"]:>8}  {r["throughput"]:>12.1f}  {r["ideal"]:>8}  '
              f'{r["efficiency_pct"]:>11.1f}%  {r["comm_pct"]:>7.1f}%  {status}')

print('\nTarget: β‰₯85% scaling efficiency to 64 GPUs (AMD MI300X with RCCL)')

Exercise 5.5 – Distributed Training Failure ModesΒΆ

The AMD JD mentions debugging hangs and ensuring recovery. Here are the key failure modes and how to detect them.

class DistributedHealthChecker:
    """
    Validates distributed training health.
    In production: runs alongside training to catch issues early.
    """

    def __init__(self):
        self.checks = []

    def check_gradient_consistency(self, gradients: list, tol=1e-4):
        """
        After AllReduce, all ranks should have identical gradients.
        If they don't, AllReduce is broken or there's a data race.
        """
        if len(gradients) < 2:
            self.checks.append(('gradient_consistency', 'SKIP', 'Need β‰₯2 ranks'))
            return

        ref = gradients[0]
        max_diff = 0.0
        for i, g in enumerate(gradients[1:], 1):
            diff = (g - ref).abs().max().item()
            max_diff = max(max_diff, diff)

        passed = max_diff < tol
        self.checks.append((
            'gradient_consistency',
            'PASS' if passed else 'FAIL',
            f'max_diff={max_diff:.2e} across {len(gradients)} ranks'
        ))

    def check_loss_convergence(self, losses: list, window=10, threshold=0.01):
        """
        Loss should decrease over time.
        If it's flat or NaN β†’ divergence or dead learning rate.
        """
        if len(losses) < window * 2:
            self.checks.append(('loss_convergence', 'SKIP', 'Not enough steps'))
            return

        has_nan = any(math.isnan(l) or math.isinf(l) for l in losses)
        if has_nan:
            self.checks.append(('loss_convergence', 'FAIL', 'NaN/Inf loss detected'))
            return

        early_avg = sum(losses[:window]) / window
        late_avg  = sum(losses[-window:]) / window
        delta_pct = (early_avg - late_avg) / (abs(early_avg) + 1e-8) * 100

        passed = delta_pct > threshold
        self.checks.append((
            'loss_convergence',
            'PASS' if passed else 'WARN',
            f'loss dropped {delta_pct:.1f}% (early={early_avg:.4f}, late={late_avg:.4f})'
        ))

    def check_no_deadlock(self, timeout_seconds=30):
        """
        Check that distributed ops complete within timeout.
        In production: use torch.distributed timeout parameter.
        """
        # Simulation: verify timing of a mock collective
        t0 = time.time()
        time.sleep(0.001)  # simulate collective
        elapsed = time.time() - t0

        passed = elapsed < timeout_seconds
        self.checks.append((
            'deadlock_check',
            'PASS' if passed else 'FAIL',
            f'collective completed in {elapsed:.3f}s (timeout={timeout_seconds}s)'
        ))

    def check_memory_per_rank(self, expected_mb_per_rank=None):
        """Verify GPU memory is balanced across ranks (no stragglers)."""
        if DEVICE != 'cuda':
            self.checks.append(('memory_balance', 'SKIP', 'No GPU'))
            return

        mem = torch.cuda.memory_allocated() / 1e6
        if expected_mb_per_rank and mem > expected_mb_per_rank * 1.2:
            self.checks.append(('memory_balance', 'WARN',
                                 f'GPU using {mem:.0f}MB > expected {expected_mb_per_rank}MB'))
        else:
            self.checks.append(('memory_balance', 'PASS', f'GPU using {mem:.0f}MB'))

    def report(self):
        print('Distributed Health Check Report:')
        print('-' * 60)
        for name, status, detail in self.checks:
            print(f'  [{status:4s}] {name:30s}  {detail}')
        passed = sum(1 for _, s, _ in self.checks if s == 'PASS')
        print(f'\n{passed}/{len(self.checks)} checks passed')


# Demo
checker = DistributedHealthChecker()

# Simulate 4 ranks that all got the same gradient after AllReduce
shared_grad = torch.randn(1024)
checker.check_gradient_consistency([shared_grad.clone() for _ in range(4)])

# Simulate converging loss
import random
random.seed(42)
converging_loss = [3.0 * math.exp(-0.01 * i) + random.gauss(0, 0.01) for i in range(100)]
checker.check_loss_convergence(converging_loss)

# Deadlock check
checker.check_no_deadlock()

# Memory check
checker.check_memory_per_rank()

checker.report()

Exercise 5.6 – Distributed Training Debugging PlaybookΒΆ

When a multi-GPU training job hangs or produces wrong results, use this diagnostic flow.

DEBUGGING_PLAYBOOK = """
=== AMD/NVIDIA Distributed Training Debugging Playbook ===

SYMPTOM: Training hangs (no progress)
─────────────────────────────────────
1. Check if ALL ranks are stuck or just some:
   $ nvidia-smi  (or rocm-smi)
   All GPUs should show ~100% GPU utilization if compute-bound
   If some show 0% β†’ straggler or rank failure

2. Check for NCCL/RCCL timeout:
   Set NCCL_DEBUG=INFO or RCCL_DEBUG=INFO to see what op is blocking
   $ export NCCL_DEBUG=INFO
   Look for: "Timeout during AllReduce" or "Group call" stuck

3. Enable torch.distributed timeout:
   dist.init_process_group(..., timeout=timedelta(minutes=30))
   This will raise an exception instead of hanging forever

4. Check for asymmetric code paths:
   If rank 0 does `if rank == 0: dist.all_reduce(x)` but others don't
   β†’ Collective mismatch β†’ deadlock

5. Profile with nsys / omnitrace:
   $ nsys profile --trace=cuda,nccl torchrun ...
   Look for: gaps between collectives, long kernel queuing

SYMPTOM: Loss diverges (NaN/Inf)
─────────────────────────────────
1. Enable anomaly detection:
   torch.autograd.set_detect_anomaly(True)
   This shows which operation produced NaN

2. Check gradient norms before AllReduce:
   for p in model.parameters():
       if p.grad is not None:
           print(p.grad.norm())  # should be finite and reasonable

3. Check for exploding gradients:
   Use gradient clipping: torch.nn.utils.clip_grad_norm_(params, max_norm=1.0)

4. Reduce learning rate or increase warmup steps

SYMPTOM: Scaling efficiency < 70%
──────────────────────────────────
1. Run nccl-tests / rccl-tests to isolate communication bandwidth:
   $ ./all_reduce_perf -b 8 -e 8G -f 2 -g 8
   Compare algbw vs busbw β€” large gap means compute/comm NOT overlapping

2. Enable bucket optimization in DDP:
   torch.nn.parallel.DistributedDataParallel(..., bucket_cap_mb=25)

3. Check NVLink / InfiniBand health:
   $ nvidia-smi nvlink --status -i 0
   $ ibstat  (for InfiniBand)

4. Check NUMA affinity β€” CPU ↔ GPU assignments matter
"""

print(DEBUGGING_PLAYBOOK)

Exercise 5.7 – DataParallel vs DistributedDataParallelΒΆ

Understanding the distinction between nn.DataParallel (DP) and nn.parallel.DistributedDataParallel (DDP) is critical for AMD validation engineers. DP runs in a single process with multiple threads, limited by Python’s GIL, and gathers gradients on GPU 0 – creating a memory bottleneck that caps scaling at ~4 GPUs. DDP spawns one process per GPU, uses NCCL/RCCL AllReduce for gradient synchronization, and scales to thousands of GPUs across multiple nodes. AMD validation must test both paths because legacy customer code may still use DP, but all new training workloads should use DDP. The validation criteria differ: DP tests verify output matches single-GPU, while DDP tests verify gradient consistency across ranks and loss convergence parity.

# Demonstrate the key difference
import torch.nn as nn

simple_model = nn.Linear(128, 64)

COMPARISON = """
DataParallel (DP)                    DistributedDataParallel (DDP)
────────────────────────────────────────────────────────────────────
Single process, multiple threads     Multiple processes (one per GPU)
GIL limits parallelism               No GIL β€” true parallelism
Gradient averaged on GPU 0           Gradients averaged via AllReduce
Simple: model = nn.DataParallel(m)   Requires init_process_group()
Poor scaling beyond 4 GPUs           Scales to 1000+ GPUs
Legacy β€” use for prototyping only    Production standard
Works single-node only               Multi-node capable

AMD validation tests BOTH:
  - DP: verify output matches single-GPU baseline
  - DDP: verify loss convergence and gradient consistency
"""

print(COMPARISON)

# Show DDP wrapping pattern (simulation β€” no actual multi-process here)
if GPU_COUNT >= 2:
    ddp_model = nn.parallel.DistributedDataParallel(
        simple_model.cuda(),
        device_ids=[0]
    )
    print(f'DDP model created: {type(ddp_model)}')
elif GPU_COUNT == 1:
    print('1 GPU available β€” DDP wrapping shown below (requires β‰₯2 GPUs to actually run):')
    print('  ddp = nn.parallel.DistributedDataParallel(model.cuda(), device_ids=[rank])')
else:
    print('CPU only β€” DDP requires GPU')

Summary & AMD Interview PrepΒΆ

Question

Answer

How do you validate RCCL AllReduce?

Record baseline sum, compare after reduce, max_err < 1e-5

How do you detect a comm bottleneck?

algbw << busbw in rccl-tests; low GPU util during backward

How do you debug a multi-node hang?

RCCL_DEBUG=INFO, dist timeout, nsys trace, check asymmetric collectives

What metrics define β€œgood” scaling?

β‰₯85% weak scaling efficiency at 64 GPUs

DDP vs DP?

DDP: multi-process, AllReduce, production; DP: legacy, single-node only

Previous: lab_04_regression_suite.ipynb
Next: lab_06_framework_validation.ipynb β€” PyTorch operator coverage, ONNX Runtime, torch.compile
Back to Overview: README.md