Section 6: Distributed Training ValidationΒΆ
NCCL, RCCL, Multi-GPU & Multi-NodeΒΆ
Duration: 5 hours
Difficulty: Advanced
6.1 Why Distributed Training Validation MattersΒΆ
Training large models requires hundreds to thousands of GPUs working together. The communication layer must be correct, fast, and reliable:
Scale |
GPUs |
Communication |
Example Model |
|---|---|---|---|
Single-node |
1β8 |
NVLink / PCIe |
Llama 7B fine-tuning |
Multi-node |
16β64 |
NVLink + InfiniBand |
Llama 70B training |
Cluster-scale |
256β4096+ |
InfiniBand / RoCE |
GPT-4 class training |
A single faulty collective (all-reduce, all-gather) can cause silent training divergence β the loss looks fine but the model is corrupted.
6.2 Communication LibrariesΒΆ
NCCL (NVIDIA)ΒΆ
NVIDIA Collective Communications Library
Optimized for NVLink + InfiniBand
De facto standard for NVIDIA GPU clusters
RCCL (AMD)ΒΆ
ROCm Communication Collectives Library
NCCL-compatible API for AMD GPUs
Optimized for Infinity Fabric + RoCE
Gloo (CPU/RDMA)ΒΆ
Cross-platform, used as fallback
Supports TCP and RDMA
Key Collective OperationsΒΆ
Collective |
Description |
Use Case |
|---|---|---|
AllReduce |
Sum across all ranks, result on all |
Gradient synchronization |
AllGather |
Gather data from all ranks to all |
ZeRO Stage 3 parameter gather |
ReduceScatter |
Reduce + scatter result |
ZeRO Stage 2 gradients |
Broadcast |
One rank sends to all |
Weight initialization |
AllToAll |
Each rank sends chunks to all others |
Expert parallelism (MoE) |
6.3 Single-Node Multi-GPU ValidationΒΆ
Basic All-Reduce CorrectnessΒΆ
import torch
import torch.distributed as dist
import os
def validate_allreduce(rank, world_size):
"""Validate all-reduce correctness across GPUs."""
# Setup
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
# Test 1: Sum of known values
tensor = torch.full((1024,), float(rank + 1), device=f'cuda:{rank}')
dist.all_reduce(tensor, op=dist.ReduceOp.SUM)
expected_sum = sum(range(1, world_size + 1))
assert torch.allclose(
tensor,
torch.full_like(tensor, expected_sum),
atol=1e-5
), f"Rank {rank}: AllReduce SUM failed! Expected {expected_sum}, got {tensor[0].item()}"
print(f"Rank {rank}: AllReduce SUM PASS")
# Test 2: Large tensor (realistic gradient size)
torch.manual_seed(42)
gradient = torch.randn(50_000_000, device=f'cuda:{rank}') # ~200MB FP32
# Each rank has the same initial gradient (for validation)
dist.all_reduce(gradient, op=dist.ReduceOp.SUM)
gradient /= world_size # Average
# Verify all ranks have the same result
reference = gradient.clone()
dist.broadcast(reference, src=0)
assert torch.allclose(gradient, reference, atol=1e-5), \
f"Rank {rank}: AllReduce result inconsistent across ranks!"
print(f"Rank {rank}: Large AllReduce PASS")
dist.destroy_process_group()
def run_multiprocess_test(fn, world_size):
"""Launch a test across multiple GPUs."""
import torch.multiprocessing as mp
mp.spawn(fn, args=(world_size,), nprocs=world_size, join=True)
# Run: validates all-reduce on all available GPUs
# run_multiprocess_test(validate_allreduce, torch.cuda.device_count())
All-Gather ValidationΒΆ
AllGather is the collective operation where each GPU contributes its local tensor and every GPU receives the concatenation of all tensors. It is the core operation in ZeRO Stage 3 (DeepSpeed) and FSDP (PyTorch), where model parameters are sharded across GPUs and gathered on-demand for each forward/backward step. Validation verifies that gathered[i] on every rank contains exactly the tensor originally held by rank i, with no data corruption or ordering errors. AllGather is bandwidth-intensive because the output is \(N\times\) larger than the input (where \(N\) is the world size), so it is often the bottleneck in FSDP training and a key target for NCCL/RCCL optimization.
def validate_allgather(rank, world_size):
"""Validate all-gather operation."""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
# Each rank has a unique chunk
local_tensor = torch.full((1024,), float(rank), device=f'cuda:{rank}')
gathered = [torch.zeros(1024, device=f'cuda:{rank}') for _ in range(world_size)]
dist.all_gather(gathered, local_tensor)
# Verify: gathered[i] should be all i's
for i, t in enumerate(gathered):
expected = torch.full((1024,), float(i), device=f'cuda:{rank}')
assert torch.equal(t, expected), \
f"Rank {rank}: AllGather failed for source rank {i}"
print(f"Rank {rank}: AllGather PASS")
dist.destroy_process_group()
Bandwidth BenchmarkingΒΆ
Communication bandwidth benchmarking measures how efficiently the GPU interconnect (NVLink, InfiniBand, RoCE) handles AllReduce at different message sizes. The key metric is bus bandwidth (busBW), computed as \(\text{busBW} = \text{algBW} \times \frac{2(N-1)}{N}\) for ring AllReduce, which represents the actual data rate on the physical links. Small messages (< 1 MB) are latency-dominated and show low bandwidth utilization; large messages (100 MB+) should approach the theoretical peak of the interconnect. NVLink on H100 provides ~900 GB/s bidirectional; InfiniBand NDR provides ~400 Gb/s per port. A measured bandwidth below 70% of theoretical indicates a configuration issue (wrong NCCL algorithm, suboptimal topology mapping, or PCIe fallback instead of NVLink).
def benchmark_allreduce_bandwidth(rank, world_size):
"""Measure all-reduce bandwidth across GPUs."""
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '29500'
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
sizes_mb = [1, 10, 100, 500, 1000] # MB
for size_mb in sizes_mb:
numel = size_mb * 1024 * 1024 // 4 # FP32
tensor = torch.randn(numel, device=f'cuda:{rank}')
# Warmup
for _ in range(5):
dist.all_reduce(tensor)
torch.cuda.synchronize()
# Benchmark
import time
iterations = 20
start = time.perf_counter()
for _ in range(iterations):
dist.all_reduce(tensor)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
# Bus bandwidth = data_size * 2 * (n-1) / n / time (for ring allreduce)
data_bytes = numel * 4
algbw = data_bytes * iterations / elapsed / 1e9 # Algorithm BW (GB/s)
busbw = algbw * 2 * (world_size - 1) / world_size # Bus BW
if rank == 0:
print(f"AllReduce {size_mb:4d}MB: "
f"algBW={algbw:.1f} GB/s, busBW={busbw:.1f} GB/s")
dist.destroy_process_group()
6.4 Multi-Node ValidationΒΆ
Multi-Node Setup ValidationΒΆ
Multi-node validation verifies that GPUs across different physical servers can communicate correctly through the network fabric (InfiniBand or RoCE). The test uses AllGather to collect each rankβs ID on every other rank, then verifies that all expected ranks are present β a missing rank indicates a network connectivity failure, firewall rule, or NCCL/RCCL initialization error. Multi-node communication is significantly more complex than single-node because it involves RDMA (Remote Direct Memory Access) through the network adapter, and errors like IB link flap, DCQCN congestion, or GPU Direct RDMA driver misconfig can cause intermittent failures that only appear under load. This connectivity test is the first gate before running any multi-node training validation.
def validate_multinode_connectivity(rank, world_size):
"""Validate basic multi-node communication works."""
# Environment must be set by the launcher (torchrun, SLURM, etc.)
dist.init_process_group('nccl')
local_rank = int(os.environ.get('LOCAL_RANK', 0))
torch.cuda.set_device(local_rank)
# Verify all ranks can communicate
# Each rank sends its rank number, everyone should receive all ranks
tensor = torch.tensor([rank], device=f'cuda:{local_rank}', dtype=torch.int64)
gathered = [torch.zeros(1, device=f'cuda:{local_rank}', dtype=torch.int64)
for _ in range(world_size)]
dist.all_gather(gathered, tensor)
received_ranks = sorted([t.item() for t in gathered])
expected_ranks = list(range(world_size))
assert received_ranks == expected_ranks, \
f"Rank {rank}: Missing ranks! Got {received_ranks}, expected {expected_ranks}"
if rank == 0:
print(f"Multi-node connectivity: PASS ({world_size} ranks)")
dist.destroy_process_group()
Launch Multi-Node JobΒΆ
# Using torchrun (single node, 8 GPUs)
torchrun --nproc_per_node=8 validate_distributed.py
# Using torchrun (2 nodes, 8 GPUs each = 16 total)
# On node 0:
torchrun --nproc_per_node=8 --nnodes=2 --node_rank=0 \
--master_addr=node0-ip --master_port=29500 validate_distributed.py
# On node 1:
torchrun --nproc_per_node=8 --nnodes=2 --node_rank=1 \
--master_addr=node0-ip --master_port=29500 validate_distributed.py
# Using SLURM
srun --nodes=2 --ntasks-per-node=8 --gpus-per-node=8 \
torchrun --nproc_per_node=8 validate_distributed.py
6.5 Training Convergence ValidationΒΆ
Distributed vs Single-GPU Training ParityΒΆ
def validate_training_parity(rank, world_size, model_factory, dataset,
num_steps=100):
"""Verify distributed training converges to same loss as single-GPU."""
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
# Create model with DDP
model = model_factory().to(rank)
ddp_model = torch.nn.parallel.DistributedDataParallel(
model, device_ids=[rank]
)
# Same random seed for weight initialization
torch.manual_seed(42)
optimizer = torch.optim.AdamW(ddp_model.parameters(), lr=1e-4)
sampler = torch.utils.data.distributed.DistributedSampler(
dataset, num_replicas=world_size, rank=rank, shuffle=False
)
loader = torch.utils.data.DataLoader(
dataset, batch_size=32, sampler=sampler
)
losses = []
for step, (x, y) in enumerate(loader):
if step >= num_steps:
break
x, y = x.to(rank), y.to(rank)
output = ddp_model(x)
loss = torch.nn.functional.cross_entropy(output, y)
loss.backward()
optimizer.step()
optimizer.zero_grad()
losses.append(loss.item())
# Verify loss is decreasing
if rank == 0:
early_loss = sum(losses[:10]) / 10
late_loss = sum(losses[-10:]) / 10
print(f"Loss: {early_loss:.4f} β {late_loss:.4f} "
f"({'CONVERGING' if late_loss < early_loss else 'NOT CONVERGING'})")
# Verify all ranks have the same model parameters
for name, param in ddp_model.module.named_parameters():
param_clone = param.data.clone()
dist.broadcast(param_clone, src=0)
if not torch.allclose(param.data, param_clone, atol=1e-6):
print(f"Rank {rank}: Parameter {name} DIVERGED!")
break
else:
print(f"Rank {rank}: All parameters consistent across ranks")
dist.destroy_process_group()
6.6 NCCL/RCCL Specific TestsΒΆ
NCCL Environment Tuning & ValidationΒΆ
# Key NCCL environment variables
export NCCL_DEBUG=INFO # Enable debug logging
export NCCL_IB_DISABLE=0 # Enable InfiniBand
export NCCL_SOCKET_IFNAME=eth0 # Network interface
export NCCL_P2P_LEVEL=NVL # NVLink peer-to-peer
export NCCL_CROSS_NIC=1 # Cross-NIC communication
# RCCL equivalent
export RCCL_DEBUG=INFO
export NCCL_SOCKET_IFNAME=eth0
NCCL Test SuiteΒΆ
# Build and run NCCL tests (official NVIDIA tests)
git clone https://github.com/NVIDIA/nccl-tests.git
cd nccl-tests
make
./build/all_reduce_perf -b 1M -e 1G -f 2 -g 8
# Expected output:
# size time algbw busbw
# 1048576 0.XX XX.XX XX.XX
# ...
Detecting Communication HangsΒΆ
import platform as _platform
def detect_communication_hang(rank, world_size, timeout_seconds=60):
"""Detect and diagnose communication hangs.
Note: Uses signal.SIGALRM for timeout detection, which is only available
on Unix/Linux/macOS. On Windows, a threading-based fallback is used instead.
"""
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
operations = [
("all_reduce", lambda t: dist.all_reduce(t)),
("all_gather", lambda t: dist.all_gather(
[torch.zeros_like(t) for _ in range(world_size)], t)),
("broadcast", lambda t: dist.broadcast(t, src=0)),
("barrier", lambda t: dist.barrier()),
]
if _platform.system() != "Windows":
# Unix/Linux/macOS: use signal.SIGALRM for reliable timeout
import signal
def timeout_handler(signum, frame):
raise TimeoutError(f"Rank {rank}: Communication timed out after "
f"{timeout_seconds}s")
signal.signal(signal.SIGALRM, timeout_handler)
for name, op in operations:
tensor = torch.randn(1024, device=f'cuda:{rank}')
signal.alarm(timeout_seconds)
try:
op(tensor)
torch.cuda.synchronize()
signal.alarm(0) # Cancel alarm
print(f"Rank {rank}: {name} PASS")
except TimeoutError:
print(f"Rank {rank}: {name} HUNG (>{timeout_seconds}s)")
except Exception as e:
print(f"Rank {rank}: {name} ERROR: {e}")
else:
# Windows: signal.SIGALRM is not available; use threading-based timeout
import threading
for name, op in operations:
tensor = torch.randn(1024, device=f'cuda:{rank}')
result = {"done": False, "error": None}
def _run():
try:
op(tensor)
torch.cuda.synchronize()
result["done"] = True
except Exception as e:
result["error"] = e
t = threading.Thread(target=_run)
t.start()
t.join(timeout=timeout_seconds)
if not t.is_alive() and result["done"]:
print(f"Rank {rank}: {name} PASS")
elif not t.is_alive() and result["error"]:
print(f"Rank {rank}: {name} ERROR: {result['error']}")
else:
print(f"Rank {rank}: {name} HUNG (>{timeout_seconds}s)")
dist.destroy_process_group()
6.7 Parallelism Strategy ValidationΒΆ
Data Parallel vs Model ParallelΒΆ
Data Parallelism (DDP) replicates the full model on every GPU and synchronizes gradients via AllReduce β it is the simplest and most common distributed strategy. FSDP (Fully Sharded Data Parallel) shards model parameters, gradients, and optimizer states across GPUs, reducing per-GPU memory by up to \(N\times\) at the cost of additional AllGather communication. Validation of DDP checks that all ranks have identical gradients after AllReduce and identical model parameters after the optimizer step. FSDP validation is more nuanced: it must verify that the sharded forward pass produces correct output, that gradients are correctly reduced via ReduceScatter, and that no NaN/Inf values appear in the sharded optimizer state. Both strategies are validated against a single-GPU baseline to ensure distributed training converges to the same loss trajectory.
# Data Parallel: validate gradient equivalence
def validate_data_parallel(rank, world_size):
"""Verify DP gives same gradients as single-GPU with scaled batch."""
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
torch.manual_seed(42)
model = torch.nn.Linear(768, 768).to(rank)
ddp_model = torch.nn.parallel.DistributedDataParallel(
model, device_ids=[rank]
)
# Same data across ranks (for validation)
x = torch.randn(32, 768, device=rank)
y = torch.randn(32, 768, device=rank)
output = ddp_model(x)
loss = torch.nn.functional.mse_loss(output, y)
loss.backward()
# All ranks should have identical gradients
grad = model.weight.grad.clone()
dist.broadcast(grad, src=0)
assert torch.allclose(model.weight.grad, grad, atol=1e-6), \
f"Rank {rank}: Gradient mismatch!"
print(f"Rank {rank}: Data Parallel gradients PASS")
dist.destroy_process_group()
# FSDP (Fully Sharded Data Parallel) validation
def validate_fsdp(rank, world_size):
"""Validate FSDP produces correct results."""
from torch.distributed.fsdp import FullyShardedDataParallel as FSDP
dist.init_process_group('nccl', rank=rank, world_size=world_size)
torch.cuda.set_device(rank)
torch.manual_seed(42)
model = torch.nn.Sequential(
torch.nn.Linear(768, 3072),
torch.nn.GELU(),
torch.nn.Linear(3072, 768),
).to(rank)
fsdp_model = FSDP(model)
x = torch.randn(8, 512, 768, device=rank)
output = fsdp_model(x)
loss = output.sum()
loss.backward()
assert not torch.isnan(loss), "FSDP forward produced NaN"
print(f"Rank {rank}: FSDP forward+backward PASS")
dist.destroy_process_group()
6.8 Topology-Aware ValidationΒΆ
NVLink / Infinity Fabric TopologyΒΆ
# NVIDIA: Check GPU topology
nvidia-smi topo -m
# Expected output shows NVLink (NV12), PCIe (PIX, PHB), etc.
# AMD: Check topology
rocm-smi --showtopo
# Verify peer-to-peer access
python -c "
import torch
n = torch.cuda.device_count()
for i in range(n):
for j in range(n):
if i != j:
can_access = torch.cuda.can_device_access_peer(i, j)
print(f'GPU {i} -> GPU {j}: {\"P2P\" if can_access else \"NO P2P\"}')
"
P2P Bandwidth TestΒΆ
def measure_p2p_bandwidth(src_gpu, dst_gpu, size_mb=100):
"""Measure GPU-to-GPU transfer bandwidth."""
import time
numel = size_mb * 1024 * 1024 // 4 # FP32
src_tensor = torch.randn(numel, device=f'cuda:{src_gpu}')
dst_tensor = torch.empty(numel, device=f'cuda:{dst_gpu}')
# Warmup
for _ in range(5):
dst_tensor.copy_(src_tensor)
torch.cuda.synchronize()
# Benchmark
iterations = 50
start = time.perf_counter()
for _ in range(iterations):
dst_tensor.copy_(src_tensor)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
bandwidth_gbs = (numel * 4 * iterations) / elapsed / 1e9
print(f"GPU {src_gpu} β GPU {dst_gpu}: {bandwidth_gbs:.1f} GB/s")
return bandwidth_gbs
6.9 ExercisesΒΆ
All-Reduce Correctness: Implement and run the all-reduce validation test on your multi-GPU system. Verify with 2, 4, and 8 GPUs.
Bandwidth Matrix: Measure P2P bandwidth between all GPU pairs. Create a bandwidth matrix. Do NVLink-connected GPUs show higher bandwidth than PCIe?
Training Parity: Train a small model (e.g., 2-layer MLP on MNIST) with DDP on 2 GPUs and compare loss curves to single-GPU training. Are they identical?
NCCL Profiling: Run
NCCL_DEBUG=INFOand analyze the log output. What algorithm does NCCL select (ring, tree, etc.)? What topology does it detect?Hang Detection: Intentionally create a hang (e.g., one rank does an all-reduce while another doesnβt). How long before the timeout detects it? What diagnostics are available?
Key TakeawaysΒΆ
Distributed validation ensures communication correctness β wrong all-reduce = wrong model
Bandwidth benchmarks verify NVLink/InfiniBand performance matches specs
Training convergence parity proves DDP/FSDP implementation is correct
Multi-node validation is harder β network timeouts, NIC config, firewall rules all matter
NCCL/RCCL environment tuning significantly impacts performance
Previous: 05_e2e_pipeline_validation.ipynb
Next: 07_datacenter_validation.ipynb
Back to Overview: README.md