Section 7: Datacenter ValidationΒΆ
Kubernetes, Scheduling, & Monitoring for AI WorkloadsΒΆ
Duration: 5 hours
Difficulty: IntermediateβAdvanced
7.1 Why Datacenter Validation MattersΒΆ
AI accelerators donβt run in isolation β they run in datacenters with:
Kubernetes orchestrating jobs across hundreds of GPU nodes
Schedulers allocating GPUs to training/inference workloads
Monitoring tracking GPU health, utilization, and errors in real-time
Networking providing high-bandwidth, low-latency GPU-to-GPU communication
Datacenter validation ensures the infrastructure layer doesnβt degrade AI workload performance or correctness.
7.2 Kubernetes GPU Scheduling ValidationΒΆ
GPU Resource DiscoveryΒΆ
# Verify GPU resources are visible in Kubernetes
# kubectl describe node gpu-node-01
# Resources:
# nvidia.com/gpu: 8
# amd.com/gpu: 8 (AMD)
from kubernetes import client, config
def validate_gpu_resources():
"""Verify Kubernetes nodes report correct GPU resources."""
config.load_kube_config()
v1 = client.CoreV1Api()
nodes = v1.list_node()
gpu_nodes = []
for node in nodes.items:
allocatable = node.status.allocatable or {}
gpu_count = 0
# Check for NVIDIA GPUs
if 'nvidia.com/gpu' in allocatable:
gpu_count = int(allocatable['nvidia.com/gpu'])
# Check for AMD GPUs
elif 'amd.com/gpu' in allocatable:
gpu_count = int(allocatable['amd.com/gpu'])
if gpu_count > 0:
gpu_nodes.append({
'name': node.metadata.name,
'gpus': gpu_count,
'status': 'Ready' if any(
c.type == 'Ready' and c.status == 'True'
for c in node.status.conditions
) else 'NotReady',
})
print(f" Node {node.metadata.name}: {gpu_count} GPUs "
f"({gpu_nodes[-1]['status']})")
total_gpus = sum(n['gpus'] for n in gpu_nodes)
print(f"\nTotal: {len(gpu_nodes)} GPU nodes, {total_gpus} GPUs")
return gpu_nodes
GPU Pod Scheduling TestΒΆ
# test-gpu-pod.yaml β Validate a GPU pod can be scheduled and run
apiVersion: v1
kind: Pod
metadata:
name: gpu-validation-test
spec:
restartPolicy: Never
containers:
- name: gpu-test
image: nvidia/cuda:12.4.0-runtime-ubuntu22.04
command: ["nvidia-smi"]
resources:
limits:
nvidia.com/gpu: 1
import subprocess
import json
import time
def validate_gpu_scheduling(gpu_count=1, timeout_seconds=120):
"""Validate that GPU pods are scheduled correctly."""
pod_manifest = {
"apiVersion": "v1",
"kind": "Pod",
"metadata": {"name": "gpu-sched-test", "namespace": "default"},
"spec": {
"restartPolicy": "Never",
"containers": [{
"name": "test",
"image": "nvidia/cuda:12.4.0-runtime-ubuntu22.04",
"command": ["nvidia-smi", "-L"],
"resources": {
"limits": {"nvidia.com/gpu": str(gpu_count)}
},
}],
},
}
# Create pod
config.load_kube_config()
v1 = client.CoreV1Api()
# Clean up if exists
try:
v1.delete_namespaced_pod("gpu-sched-test", "default")
time.sleep(5)
except client.exceptions.ApiException:
pass
v1.create_namespaced_pod("default", pod_manifest)
# Wait for completion
start = time.time()
while time.time() - start < timeout_seconds:
pod = v1.read_namespaced_pod("gpu-sched-test", "default")
if pod.status.phase in ("Succeeded", "Failed"):
break
time.sleep(2)
# Check result
pod = v1.read_namespaced_pod("gpu-sched-test", "default")
logs = v1.read_namespaced_pod_log("gpu-sched-test", "default")
passed = pod.status.phase == "Succeeded"
print(f"GPU Scheduling Test: {'PASS' if passed else 'FAIL'}")
print(f" Phase: {pod.status.phase}")
print(f" Logs: {logs[:500]}")
# Cleanup
v1.delete_namespaced_pod("gpu-sched-test", "default")
return passed
Multi-GPU Scheduling (Topology-Aware)ΒΆ
# Validate that multi-GPU jobs get GPUs on the same node
# and ideally on the same NVLink domain
apiVersion: batch/v1
kind: Job
metadata:
name: multi-gpu-topo-test
spec:
template:
spec:
restartPolicy: Never
containers:
- name: topo-test
image: nvcr.io/nvidia/pytorch:24.03-py3
command:
- python
- -c
- |
import torch
n = torch.cuda.device_count()
print(f"GPUs visible: {n}")
for i in range(n):
props = torch.cuda.get_device_properties(i)
print(f" GPU {i}: {props.name} ({props.total_mem // 1e9:.0f}GB)")
# Check P2P
for i in range(n):
for j in range(n):
if i != j:
can = torch.cuda.can_device_access_peer(i, j)
print(f" P2P {i}->{j}: {can}")
resources:
limits:
nvidia.com/gpu: "8"
7.3 GPU Monitoring & ObservabilityΒΆ
Prometheus + DCGM Exporter (NVIDIA)ΒΆ
# DCGM exporter for Prometheus metrics
# Helm installation:
# helm repo add gpu-helm-charts https://nvidia.github.io/dcgm-exporter/helm-charts
# helm install dcgm-exporter gpu-helm-charts/dcgm-exporter
# Key metrics exported:
# DCGM_FI_DEV_GPU_TEMP - GPU temperature
# DCGM_FI_DEV_POWER_USAGE - Power draw (watts)
# DCGM_FI_DEV_GPU_UTIL - GPU utilization %
# DCGM_FI_DEV_MEM_COPY_UTIL - Memory utilization %
# DCGM_FI_DEV_ENC_UTIL - Encoder utilization
# DCGM_FI_DEV_XID_ERRORS - XID error count
# DCGM_FI_DEV_PCIE_REPLAY_COUNTER - PCIe replay errors
Custom GPU Health MonitorΒΆ
from prometheus_client import Gauge, start_http_server
import subprocess
import time
# Define Prometheus metrics
gpu_temp = Gauge('gpu_temperature_celsius', 'GPU Temperature', ['gpu_id'])
gpu_power = Gauge('gpu_power_watts', 'GPU Power Draw', ['gpu_id'])
gpu_util = Gauge('gpu_utilization_percent', 'GPU Utilization', ['gpu_id'])
gpu_mem_used = Gauge('gpu_memory_used_bytes', 'GPU Memory Used', ['gpu_id'])
gpu_ecc_errors = Gauge('gpu_ecc_errors_total', 'GPU ECC Errors', ['gpu_id', 'type'])
def collect_gpu_metrics():
"""Collect GPU metrics and expose via Prometheus."""
output = subprocess.check_output(
["nvidia-smi",
"--query-gpu=index,temperature.gpu,power.draw,"
"utilization.gpu,memory.used,"
"ecc.errors.corrected.aggregate.total,"
"ecc.errors.uncorrected.aggregate.total",
"--format=csv,noheader,nounits"],
text=True
)
for line in output.strip().split('\n'):
values = [v.strip() for v in line.split(',')]
gpu_id = values[0]
gpu_temp.labels(gpu_id=gpu_id).set(float(values[1]))
gpu_power.labels(gpu_id=gpu_id).set(float(values[2]))
gpu_util.labels(gpu_id=gpu_id).set(float(values[3]))
gpu_mem_used.labels(gpu_id=gpu_id).set(float(values[4]) * 1e6) # MB to bytes
if values[5] != 'N/A':
gpu_ecc_errors.labels(gpu_id=gpu_id, type='corrected').set(float(values[5]))
if values[6] != 'N/A':
gpu_ecc_errors.labels(gpu_id=gpu_id, type='uncorrected').set(float(values[6]))
def run_gpu_exporter(port=9400, interval=5):
"""Run GPU metrics exporter."""
start_http_server(port)
print(f"GPU exporter running on :{port}")
while True:
collect_gpu_metrics()
time.sleep(interval)
Grafana Dashboard AlertsΒΆ
# Example Prometheus alert rules for GPU health
groups:
- name: gpu-alerts
rules:
- alert: GPUTemperatureHigh
expr: gpu_temperature_celsius > 85
for: 5m
labels:
severity: warning
annotations:
summary: "GPU {{ $labels.gpu_id }} temperature above 85Β°C"
- alert: GPUMemoryExhausted
expr: gpu_memory_used_bytes / gpu_memory_total_bytes > 0.95
for: 1m
labels:
severity: critical
annotations:
summary: "GPU {{ $labels.gpu_id }} memory >95% used"
- alert: GPUECCUncorrectable
expr: increase(gpu_ecc_errors_total{type="uncorrected"}[1h]) > 0
labels:
severity: critical
annotations:
summary: "GPU {{ $labels.gpu_id }} has uncorrectable ECC errors"
- alert: GPUUtilizationLow
expr: gpu_utilization_percent < 10
for: 30m
labels:
severity: info
annotations:
summary: "GPU {{ $labels.gpu_id }} underutilized (<10%)"
7.4 Network Validation for AI WorkloadsΒΆ
InfiniBand ValidationΒΆ
# Check InfiniBand status
ibstat
ibstatus
# Verify link speed (should be HDR 200Gbps or NDR 400Gbps)
ibstat | grep -i rate
# Run InfiniBand bandwidth test
ib_write_bw --size=65536 --duration=10 <remote_host>
ib_read_bw --size=65536 --duration=10 <remote_host>
# Run InfiniBand latency test
ib_write_lat --size=4 --duration=10 <remote_host>
RoCE (RDMA over Converged Ethernet) ValidationΒΆ
# Check RoCE interfaces
ibv_devinfo
# Verify RDMA connectivity
rdma link show
rping -s -v # Server
rping -c -a <server_ip> -v -C 10 # Client
# Test bandwidth
ib_write_bw -d mlx5_0 --size=65536 <remote_host>
GPUDirect RDMA ValidationΒΆ
def validate_gpudirect_rdma():
"""Validate GPUDirect RDMA is active (GPU memory β network β remote GPU)."""
import subprocess
# Check if nvidia_peermem module is loaded
result = subprocess.run(
["lsmod"], capture_output=True, text=True
)
peermem_loaded = "nvidia_peermem" in result.stdout
# Check NCCL GPUDirect usage
nccl_env = {
"NCCL_NET_GDR_LEVEL": "5", # Enable GPUDirect RDMA
"NCCL_NET_GDR_READ": "1", # Enable GDR for reads
}
print(f"nvidia_peermem module loaded: {peermem_loaded}")
print(f"Recommended NCCL env: {nccl_env}")
return peermem_loaded
7.5 Job Scheduler ValidationΒΆ
SLURM GPU Job ValidationΒΆ
# Verify SLURM sees GPU resources
sinfo -o "%N %G" | head
# Submit a GPU validation job
sbatch <<'EOF'
#!/bin/bash
#SBATCH --job-name=gpu-validate
#SBATCH --nodes=1
#SBATCH --gpus-per-node=8
#SBATCH --time=00:10:00
#SBATCH --output=gpu-validate-%j.out
echo "Node: $(hostname)"
echo "GPUs allocated: $SLURM_GPUS_ON_NODE"
nvidia-smi -L
python -c "
import torch
print(f'PyTorch sees {torch.cuda.device_count()} GPUs')
for i in range(torch.cuda.device_count()):
print(f' GPU {i}: {torch.cuda.get_device_name(i)}')
"
EOF
# Multi-node GPU job
sbatch <<'EOF'
#!/bin/bash
#SBATCH --job-name=multi-node-validate
#SBATCH --nodes=2
#SBATCH --gpus-per-node=8
#SBATCH --ntasks-per-node=8
#SBATCH --time=00:10:00
srun torchrun --nproc_per_node=8 --nnodes=2 \
--rdzv_id=$SLURM_JOB_ID --rdzv_backend=c10d \
--rdzv_endpoint=$(scontrol show hostname $SLURM_NODELIST | head -1):29500 \
validate_distributed.py
EOF
Kubernetes Job Operator (Kubeflow Training Operator)ΒΆ
# PyTorchJob for distributed training validation
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
name: distributed-validation
spec:
pytorchReplicaSpecs:
Master:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: nvcr.io/nvidia/pytorch:24.03-py3
command: ["torchrun"]
args:
- "--nproc_per_node=8"
- "validate_distributed.py"
resources:
limits:
nvidia.com/gpu: "8"
Worker:
replicas: 1
template:
spec:
containers:
- name: pytorch
image: nvcr.io/nvidia/pytorch:24.03-py3
command: ["torchrun"]
args:
- "--nproc_per_node=8"
- "validate_distributed.py"
resources:
limits:
nvidia.com/gpu: "8"
7.6 Storage Validation for AI WorkloadsΒΆ
Checkpoint I/O PerformanceΒΆ
import torch
import time
import os
# Checkpoint directory β configure this for your environment.
# The default below is a placeholder for shared/networked storage typical in
# datacenter setups (e.g., NFS, Lustre, GPFS). Override via the
# CHECKPOINT_DIR environment variable or pass checkpoint_dir explicitly.
CHECKPOINT_DIR = os.environ.get("CHECKPOINT_DIR", "/mnt/shared/checkpoints")
def validate_checkpoint_io(model_size_gb=1, checkpoint_dir=None):
"""Validate checkpoint save/load performance on shared storage.
Args:
model_size_gb: Size of the simulated checkpoint in GB.
checkpoint_dir: Directory for checkpoint files. Defaults to the
module-level CHECKPOINT_DIR (configurable via the CHECKPOINT_DIR
environment variable). The default path '/mnt/shared/checkpoints'
is a placeholder β update it to match your storage mount.
"""
if checkpoint_dir is None:
checkpoint_dir = CHECKPOINT_DIR
os.makedirs(checkpoint_dir, exist_ok=True)
# Create a model-sized tensor (simulating a checkpoint)
numel = int(model_size_gb * 1e9 / 4) # FP32
state = {"model": torch.randn(numel)}
# Save
path = os.path.join(checkpoint_dir, "test_checkpoint.pt")
start = time.perf_counter()
torch.save(state, path)
save_time = time.perf_counter() - start
save_bandwidth = model_size_gb / save_time
# Load
start = time.perf_counter()
loaded = torch.load(path, weights_only=True)
load_time = time.perf_counter() - start
load_bandwidth = model_size_gb / load_time
# Verify integrity
assert torch.equal(state["model"], loaded["model"]), "Checkpoint corruption!"
print(f"Checkpoint I/O ({model_size_gb}GB):")
print(f" Directory: {checkpoint_dir}")
print(f" Save: {save_time:.1f}s ({save_bandwidth:.2f} GB/s)")
print(f" Load: {load_time:.1f}s ({load_bandwidth:.2f} GB/s)")
# Cleanup
os.remove(path)
return {"save_gbs": save_bandwidth, "load_gbs": load_bandwidth}
Dataset Loading PerformanceΒΆ
For large-scale training, dataset loading from shared storage (NFS, Lustre, GPFS) must deliver data fast enough to keep all GPUs busy β a single 8xH100 node processing images can consume over 10 GB/s of read bandwidth, and a 256-GPU cluster proportionally more. This benchmark reads sample files sequentially and measures sustained read throughput in GB/s. If measured bandwidth falls below the expected minimum (typically 1+ GB/s per GPU for image training, 0.5+ GB/s for text pretraining), the storage subsystem becomes the training bottleneck regardless of GPU performance. Common remedies include increasing the number of storage servers, switching to a faster parallel filesystem, or caching datasets in local NVMe SSDs.
def validate_dataset_io(data_dir, expected_bandwidth_gbs=1.0):
"""Validate that dataset read speed meets training requirements."""
import glob
files = glob.glob(os.path.join(data_dir, "*.pt"))[:100]
if not files:
print("No .pt files found for testing")
return
total_bytes = 0
start = time.perf_counter()
for f in files:
data = torch.load(f, weights_only=True)
total_bytes += os.path.getsize(f)
elapsed = time.perf_counter() - start
bandwidth = total_bytes / elapsed / 1e9
passed = bandwidth >= expected_bandwidth_gbs
print(f"Dataset I/O: {bandwidth:.2f} GB/s "
f"({'PASS' if passed else 'FAIL'}: need >{expected_bandwidth_gbs} GB/s)")
return bandwidth
7.7 ExercisesΒΆ
GPU Discovery: Write a script that queries your Kubernetes cluster (or local system) and reports all GPU resources, their types, and health status.
Monitoring Setup: Set up GPU metric collection (using nvidia-smi or DCGM) and write 3 alert rules: high temperature, ECC errors, and low utilization.
Network Bandwidth: If you have access to multi-node infrastructure, measure InfiniBand or Ethernet bandwidth between GPU nodes. Is it sufficient for distributed training?
Checkpoint Benchmark: Measure checkpoint save/load speed for 1 GB, 5 GB, and 20 GB files on your storage system. Is it fast enough for hourly checkpointing during LLM training?
Scheduling Test: Submit a multi-GPU job (via SLURM or Kubernetes) and verify that all GPUs are on the same node and have NVLink connectivity.
Key TakeawaysΒΆ
Datacenter validation ensures the infrastructure supports AI workloads correctly
Kubernetes GPU scheduling must be topology-aware (NVLink, same-node)
Monitoring must catch GPU health issues (ECC errors, thermal throttling) proactively
Network bandwidth (InfiniBand/RoCE) is critical for distributed training performance
Storage I/O must keep up with checkpoint frequency and dataset loading demands
Previous: 06_distributed_training_validation.ipynb
Next: 08_regression_release_validation.ipynb
Back to Overview: README.md