Lab 05: Distributed Training ValidationΒΆ
NCCL/RCCL Β· Multi-GPU Communication Β· Scaling Β· Hang DetectionΒΆ
Role alignment: AMD Principal Staff β AI/ML Performance Validation
Reference: 06_distributed_training_validation.ipynb
What you will do:ΒΆ
Understand AllReduce and collective communication primitives
Simulate multi-GPU communication patterns on single GPU
Validate AllReduce correctness (all GPUs get same result)
Measure communication bandwidth and scaling efficiency
Detect and simulate distributed training failure modes
Many exercises run in simulation mode if only 1 GPU (or CPU) is available.
The concepts and validation patterns are identical to real multi-node setups.
SetupΒΆ
The setup detects all available GPUs and determines whether the lab runs in real multi-GPU mode (using actual NCCL/RCCL collectives) or simulation mode (emulating distributed operations on a single device). The simulation mode faithfully reproduces the mathematical operations of each collective (AllReduce = sum + normalize, AllGather = concatenate, Broadcast = copy) so the validation concepts transfer directly to real multi-GPU and multi-node environments. When multiple GPUs are detected, the lab uses torch.multiprocessing.spawn to launch actual distributed workers with NCCL as the communication backend.
import torch
import torch.distributed as dist
import torch.nn as nn
import time
import os
import math
from datetime import datetime
if torch.cuda.is_available():
DEVICE = 'cuda'
GPU_COUNT = torch.cuda.device_count()
elif torch.backends.mps.is_available():
DEVICE = 'mps'
GPU_COUNT = 1
else:
DEVICE = 'cpu'
GPU_COUNT = 0
print(f'Device : {DEVICE}')
print(f'GPU count : {GPU_COUNT}')
if DEVICE == 'cuda':
for i in range(GPU_COUNT):
print(f' GPU {i}: {torch.cuda.get_device_name(i)}')
MULTI_GPU = GPU_COUNT > 1
print(f'Multi-GPU : {MULTI_GPU}')
print(f'Mode : {"Real multi-GPU" if MULTI_GPU else "Simulation (single GPU/CPU)"}')
Concept: Collective Communication OperationsΒΆ
These are the building blocks of distributed training β implemented in NCCL (NVIDIA) and RCCL (AMD):
AllReduce : Each GPU gets the SUM of tensors from all GPUs
Used for: gradient synchronization (the most common op)
Broadcast : One GPU sends its tensor to all others
Used for: model weight distribution at startup
AllGather : Each GPU gets ALL tensors from all GPUs (concatenated)
Used for: Tensor Parallelism in LLMs
ReduceScatter: Opposite of AllGather β reduce then scatter
Used for: ZeRO optimizer in DeepSpeed
Validation goal: Verify these operations produce correct results and meet bandwidth expectations.
Exercise 5.1 β AllReduce Correctness (Simulation)ΒΆ
Simulate what AllReduce does: each βGPUβ has a tensor, all should end up with the sum.
def simulate_allreduce(num_workers=4, tensor_size=1024):
"""
Simulate AllReduce correctness.
Each worker has a different gradient tensor.
After AllReduce, all should have the mean gradient.
"""
# Each worker has its own gradient (simulating different mini-batches)
worker_grads = [torch.randn(tensor_size) for _ in range(num_workers)]
# Reference: mean of all gradients
expected = torch.stack(worker_grads).mean(dim=0)
# Simulated AllReduce (SUM then normalize)
allreduced = torch.zeros(tensor_size)
for g in worker_grads:
allreduced += g
allreduced /= num_workers
# Verify all workers get the same result
max_err = (allreduced - expected).abs().max().item()
passed = max_err < 1e-5
print(f'AllReduce simulation:')
print(f' Workers : {num_workers}')
print(f' Tensor size : {tensor_size} elements ({tensor_size * 4 / 1e6:.2f} MB FP32)')
print(f' Max error : {max_err:.2e}')
print(f' Status : {"PASS" if passed else "FAIL"}')
return passed
for n_workers in [2, 4, 8, 64]:
simulate_allreduce(num_workers=n_workers, tensor_size=1024 * 1024) # 4MB tensor
print()
Exercise 5.2 β Real AllReduce (if Multi-GPU available)ΒΆ
Use torch.distributed to do actual NCCL/RCCL AllReduce if multiple GPUs are available.
# This cell runs real distributed AllReduce if multi-GPU is available
# Otherwise, shows the code pattern
ALLREDUCE_TEST_CODE = '''
# Pattern for real multi-GPU AllReduce validation
# (Requires torchrun or torch.multiprocessing)
import torch
import torch.distributed as dist
import os
def allreduce_worker(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
# RCCL for AMD, NCCL for NVIDIA
backend = 'nccl' # AMD ROCm uses 'nccl' via RCCL compatibility layer
dist.init_process_group(backend, rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
# Each rank has different tensor
tensor = torch.ones(1024, device=f'cuda:{rank}') * (rank + 1)
# Expected after AllReduce: sum = 1+2+...+world_size
expected_sum = world_size * (world_size + 1) / 2
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
# Validate
actual = tensor[0].item()
passed = abs(actual - expected_sum) < 1e-3
print(f"Rank {rank}: expected={expected_sum}, got={actual}, PASS={passed}")
dist.destroy_process_group()
# Run with: torchrun --nproc_per_node=4 script.py
# Or: torch.multiprocessing.spawn(allreduce_worker, args=(4,), nprocs=4)
'''
if MULTI_GPU:
# Real AllReduce with multiple GPUs
import torch.multiprocessing as mp
def allreduce_worker(rank, world_size, results_queue):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
tensor = torch.ones(1024, device=f'cuda:{rank}') * (rank + 1)
expected = world_size * (world_size + 1) / 2
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
actual = tensor[0].item()
passed = abs(actual - expected) < 1e-3
results_queue.put({'rank': rank, 'expected': expected, 'actual': actual, 'passed': passed})
dist.destroy_process_group()
ctx = mp.get_context('spawn')
q = ctx.Queue()
procs = [ctx.Process(target=allreduce_worker, args=(r, GPU_COUNT, q))
for r in range(GPU_COUNT)]
for p in procs: p.start()
for p in procs: p.join()
results = [q.get() for _ in range(GPU_COUNT)]
for r in sorted(results, key=lambda x: x['rank']):
print(f"Rank {r['rank']}: expected={r['expected']}, got={r['actual']}, "
f"{'PASS' if r['passed'] else 'FAIL'}")
else:
print('Single GPU/CPU mode β showing code pattern for multi-GPU AllReduce:')
print(ALLREDUCE_TEST_CODE)
Exercise 5.3 β Communication Bandwidth MeasurementΒΆ
Measure AllReduce bandwidth for different message sizes.
In a real cluster: bandwidth should approach NVLink (600 GB/s) or InfiniBand (400 Gb/s).
def simulate_allreduce_bandwidth(message_size_bytes, num_gpus=8, iterations=100):
"""
Simulate AllReduce bandwidth.
Ring AllReduce formula: 2 * (N-1)/N * message_size / time
"""
numel = message_size_bytes // 4 # FP32
# Simulate transfer time based on NVLink bandwidth (600 GB/s)
# On a real cluster this comes from actual timing
nvlink_bw_gbs = 600 # NVLink theoretical
transfer_time = (message_size_bytes / 1e9) / nvlink_bw_gbs
# Add overhead (latency + scheduling)
overhead = 50e-6 # 50 microseconds
simulated_time = transfer_time * 2 * (num_gpus - 1) / num_gpus + overhead
# Ring AllReduce effective bandwidth
algbw = (message_size_bytes / 1e9) / simulated_time # GB/s
busbw = algbw * 2 * (num_gpus - 1) / num_gpus # bus bandwidth
return algbw, busbw, simulated_time * 1000
message_sizes_bytes = [
1 * 1024, # 1 KB β tiny (attention head)
1 * 1024 * 1024, # 1 MB
10 * 1024 * 1024, # 10 MB β typical gradient chunk
100 * 1024 * 1024, # 100 MB
1024 * 1024 * 1024, # 1 GB β large model all-reduce
]
print('AllReduce Bandwidth vs Message Size (8-GPU simulation):')
print(f"{'Size':>12} {'AlgBW (GB/s)':>14} {'BusBW (GB/s)':>14} {'Time (ms)':>12} Status")
print('-' * 70)
bw_results = []
for sz in message_sizes_bytes:
algbw, busbw, time_ms = simulate_allreduce_bandwidth(sz)
# Pass if algbw > 50 GB/s for large messages (datacenter standard)
ok = algbw > 50 or sz < 1e6 # small messages inherently low BW due to latency
size_str = f'{sz // 1024}KB' if sz < 1e6 else f'{sz // 1024 // 1024}MB' if sz < 1e9 else '1GB'
status = 'PASS' if ok else 'WARN'
print(f'{size_str:>12} {algbw:>14.1f} {busbw:>14.1f} {time_ms:>12.3f} {status}')
bw_results.append({'size_bytes': sz, 'algbw_gbs': round(algbw, 1), 'busbw_gbs': round(busbw, 1)})
print('\nNote: Real results require nccl-tests or rccl-tests on actual cluster')
print('AMD RCCL benchmark: github.com/ROCm/rccl-tests')
Exercise 5.4 β Linear Scaling ValidationΒΆ
When you add more GPUs, throughput should scale linearly (ideal = 100% efficiency).
Communication overhead causes sub-linear scaling. Validate and measure efficiency.
def simulate_scaling_efficiency(model_flops_per_step, comm_overhead_fraction):
"""
Compute/communicate overlap model.
Real distributed training uses compute/comm overlap to hide latency.
comm_overhead_fraction: fraction of step time spent in communication.
"""
gpu_counts = [1, 2, 4, 8, 16, 32, 64, 128]
results = []
for n_gpu in gpu_counts:
# Compute time: scales perfectly (linear)
compute_time = model_flops_per_step / n_gpu
# Comm time: AllReduce scales as O(log N) with ring topology
# But bandwidth also increases with more GPUs (NVLink switches)
# Simplified model: comm time grows slowly with GPU count
if n_gpu == 1:
comm_time = 0
else:
comm_time = comm_overhead_fraction * model_flops_per_step * math.log2(n_gpu) / n_gpu
# With compute/comm overlap, effective time β max(compute, comm)
effective_time = max(compute_time, comm_time)
throughput = model_flops_per_step / effective_time # normalized
ideal_throughput = n_gpu # perfect linear scaling
efficiency_pct = (throughput / ideal_throughput) * 100
results.append({
'gpus': n_gpu,
'throughput': round(throughput, 2),
'ideal': ideal_throughput,
'efficiency_pct': round(efficiency_pct, 1),
'comm_pct': round(comm_time / effective_time * 100 if effective_time > 0 else 0, 1),
})
return results
# Simulate two scenarios: low vs high communication overhead
for scenario, comm_frac in [('Low comm overhead (good overlap)', 0.05),
('High comm overhead (poor overlap)', 0.20)]:
print(f'\nScenario: {scenario}')
print(f"{'GPUs':>8} {'Throughput':>12} {'Ideal':>8} {'Efficiency':>12} {'Comm %':>8} Status")
print('-' * 65)
scaling = simulate_scaling_efficiency(1000.0, comm_frac)
for r in scaling:
ok = r['efficiency_pct'] >= 80
status = 'PASS' if ok else 'WARN'
print(f'{r["gpus"]:>8} {r["throughput"]:>12.1f} {r["ideal"]:>8} '
f'{r["efficiency_pct"]:>11.1f}% {r["comm_pct"]:>7.1f}% {status}')
print('\nTarget: β₯85% scaling efficiency to 64 GPUs (AMD MI300X with RCCL)')
Exercise 5.5 β Distributed Training Failure ModesΒΆ
The AMD JD mentions debugging hangs and ensuring recovery. Here are the key failure modes and how to detect them.
class DistributedHealthChecker:
"""
Validates distributed training health.
In production: runs alongside training to catch issues early.
"""
def __init__(self):
self.checks = []
def check_gradient_consistency(self, gradients: list, tol=1e-4):
"""
After AllReduce, all ranks should have identical gradients.
If they don't, AllReduce is broken or there's a data race.
"""
if len(gradients) < 2:
self.checks.append(('gradient_consistency', 'SKIP', 'Need β₯2 ranks'))
return
ref = gradients[0]
max_diff = 0.0
for i, g in enumerate(gradients[1:], 1):
diff = (g - ref).abs().max().item()
max_diff = max(max_diff, diff)
passed = max_diff < tol
self.checks.append((
'gradient_consistency',
'PASS' if passed else 'FAIL',
f'max_diff={max_diff:.2e} across {len(gradients)} ranks'
))
def check_loss_convergence(self, losses: list, window=10, threshold=0.01):
"""
Loss should decrease over time.
If it's flat or NaN β divergence or dead learning rate.
"""
if len(losses) < window * 2:
self.checks.append(('loss_convergence', 'SKIP', 'Not enough steps'))
return
has_nan = any(math.isnan(l) or math.isinf(l) for l in losses)
if has_nan:
self.checks.append(('loss_convergence', 'FAIL', 'NaN/Inf loss detected'))
return
early_avg = sum(losses[:window]) / window
late_avg = sum(losses[-window:]) / window
delta_pct = (early_avg - late_avg) / (abs(early_avg) + 1e-8) * 100
passed = delta_pct > threshold
self.checks.append((
'loss_convergence',
'PASS' if passed else 'WARN',
f'loss dropped {delta_pct:.1f}% (early={early_avg:.4f}, late={late_avg:.4f})'
))
def check_no_deadlock(self, timeout_seconds=30):
"""
Check that distributed ops complete within timeout.
In production: use torch.distributed timeout parameter.
"""
# Simulation: verify timing of a mock collective
t0 = time.time()
time.sleep(0.001) # simulate collective
elapsed = time.time() - t0
passed = elapsed < timeout_seconds
self.checks.append((
'deadlock_check',
'PASS' if passed else 'FAIL',
f'collective completed in {elapsed:.3f}s (timeout={timeout_seconds}s)'
))
def check_memory_per_rank(self, expected_mb_per_rank=None):
"""Verify GPU memory is balanced across ranks (no stragglers)."""
if DEVICE != 'cuda':
self.checks.append(('memory_balance', 'SKIP', 'No GPU'))
return
mem = torch.cuda.memory_allocated() / 1e6
if expected_mb_per_rank and mem > expected_mb_per_rank * 1.2:
self.checks.append(('memory_balance', 'WARN',
f'GPU using {mem:.0f}MB > expected {expected_mb_per_rank}MB'))
else:
self.checks.append(('memory_balance', 'PASS', f'GPU using {mem:.0f}MB'))
def report(self):
print('Distributed Health Check Report:')
print('-' * 60)
for name, status, detail in self.checks:
print(f' [{status:4s}] {name:30s} {detail}')
passed = sum(1 for _, s, _ in self.checks if s == 'PASS')
print(f'\n{passed}/{len(self.checks)} checks passed')
# Demo
checker = DistributedHealthChecker()
# Simulate 4 ranks that all got the same gradient after AllReduce
shared_grad = torch.randn(1024)
checker.check_gradient_consistency([shared_grad.clone() for _ in range(4)])
# Simulate converging loss
import random
random.seed(42)
converging_loss = [3.0 * math.exp(-0.01 * i) + random.gauss(0, 0.01) for i in range(100)]
checker.check_loss_convergence(converging_loss)
# Deadlock check
checker.check_no_deadlock()
# Memory check
checker.check_memory_per_rank()
checker.report()
Exercise 5.6 β Distributed Training Debugging PlaybookΒΆ
When a multi-GPU training job hangs or produces wrong results, use this diagnostic flow.
DEBUGGING_PLAYBOOK = """
=== AMD/NVIDIA Distributed Training Debugging Playbook ===
SYMPTOM: Training hangs (no progress)
βββββββββββββββββββββββββββββββββββββ
1. Check if ALL ranks are stuck or just some:
$ nvidia-smi (or rocm-smi)
All GPUs should show ~100% GPU utilization if compute-bound
If some show 0% β straggler or rank failure
2. Check for NCCL/RCCL timeout:
Set NCCL_DEBUG=INFO or RCCL_DEBUG=INFO to see what op is blocking
$ export NCCL_DEBUG=INFO
Look for: "Timeout during AllReduce" or "Group call" stuck
3. Enable torch.distributed timeout:
dist.init_process_group(..., timeout=timedelta(minutes=30))
This will raise an exception instead of hanging forever
4. Check for asymmetric code paths:
If rank 0 does `if rank == 0: dist.all_reduce(x)` but others don't
β Collective mismatch β deadlock
5. Profile with nsys / omnitrace:
$ nsys profile --trace=cuda,nccl torchrun ...
Look for: gaps between collectives, long kernel queuing
SYMPTOM: Loss diverges (NaN/Inf)
βββββββββββββββββββββββββββββββββ
1. Enable anomaly detection:
torch.autograd.set_detect_anomaly(True)
This shows which operation produced NaN
2. Check gradient norms before AllReduce:
for p in model.parameters():
if p.grad is not None:
print(p.grad.norm()) # should be finite and reasonable
3. Check for exploding gradients:
Use gradient clipping: torch.nn.utils.clip_grad_norm_(params, max_norm=1.0)
4. Reduce learning rate or increase warmup steps
SYMPTOM: Scaling efficiency < 70%
ββββββββββββββββββββββββββββββββββ
1. Run nccl-tests / rccl-tests to isolate communication bandwidth:
$ ./all_reduce_perf -b 8 -e 8G -f 2 -g 8
Compare algbw vs busbw β large gap means compute/comm NOT overlapping
2. Enable bucket optimization in DDP:
torch.nn.parallel.DistributedDataParallel(..., bucket_cap_mb=25)
3. Check NVLink / InfiniBand health:
$ nvidia-smi nvlink --status -i 0
$ ibstat (for InfiniBand)
4. Check NUMA affinity β CPU β GPU assignments matter
"""
print(DEBUGGING_PLAYBOOK)
Exercise 5.7 β DataParallel vs DistributedDataParallelΒΆ
Understanding the distinction between nn.DataParallel (DP) and nn.parallel.DistributedDataParallel (DDP) is critical for AMD validation engineers. DP runs in a single process with multiple threads, limited by Pythonβs GIL, and gathers gradients on GPU 0 β creating a memory bottleneck that caps scaling at ~4 GPUs. DDP spawns one process per GPU, uses NCCL/RCCL AllReduce for gradient synchronization, and scales to thousands of GPUs across multiple nodes. AMD validation must test both paths because legacy customer code may still use DP, but all new training workloads should use DDP. The validation criteria differ: DP tests verify output matches single-GPU, while DDP tests verify gradient consistency across ranks and loss convergence parity.
# Demonstrate the key difference
import torch.nn as nn
simple_model = nn.Linear(128, 64)
COMPARISON = """
DataParallel (DP) DistributedDataParallel (DDP)
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Single process, multiple threads Multiple processes (one per GPU)
GIL limits parallelism No GIL β true parallelism
Gradient averaged on GPU 0 Gradients averaged via AllReduce
Simple: model = nn.DataParallel(m) Requires init_process_group()
Poor scaling beyond 4 GPUs Scales to 1000+ GPUs
Legacy β use for prototyping only Production standard
Works single-node only Multi-node capable
AMD validation tests BOTH:
- DP: verify output matches single-GPU baseline
- DDP: verify loss convergence and gradient consistency
"""
print(COMPARISON)
# Show DDP wrapping pattern (simulation β no actual multi-process here)
if GPU_COUNT >= 2:
ddp_model = nn.parallel.DistributedDataParallel(
simple_model.cuda(),
device_ids=[0]
)
print(f'DDP model created: {type(ddp_model)}')
elif GPU_COUNT == 1:
print('1 GPU available β DDP wrapping shown below (requires β₯2 GPUs to actually run):')
print(' ddp = nn.parallel.DistributedDataParallel(model.cuda(), device_ids=[rank])')
else:
print('CPU only β DDP requires GPU')
Summary & AMD Interview PrepΒΆ
Question |
Answer |
|---|---|
How do you validate RCCL AllReduce? |
Record baseline sum, compare after reduce, max_err < 1e-5 |
How do you detect a comm bottleneck? |
algbw << busbw in rccl-tests; low GPU util during backward |
How do you debug a multi-node hang? |
RCCL_DEBUG=INFO, dist timeout, nsys trace, check asymmetric collectives |
What metrics define βgoodβ scaling? |
β₯85% weak scaling efficiency at 64 GPUs |
DDP vs DP? |
DDP: multi-process, AllReduce, production; DP: legacy, single-node only |
Previous: lab_04_regression_suite.ipynb
Next: lab_06_framework_validation.ipynb β PyTorch operator coverage, ONNX Runtime, torch.compile
Back to Overview: README.md