Section 7: Datacenter ValidationΒΆ

Kubernetes, Scheduling, & Monitoring for AI WorkloadsΒΆ

Duration: 5 hours
Difficulty: Intermediate–Advanced

7.1 Why Datacenter Validation MattersΒΆ

AI accelerators don’t run in isolation β€” they run in datacenters with:

  • Kubernetes orchestrating jobs across hundreds of GPU nodes

  • Schedulers allocating GPUs to training/inference workloads

  • Monitoring tracking GPU health, utilization, and errors in real-time

  • Networking providing high-bandwidth, low-latency GPU-to-GPU communication

Datacenter validation ensures the infrastructure layer doesn’t degrade AI workload performance or correctness.

7.2 Kubernetes GPU Scheduling ValidationΒΆ

GPU Resource DiscoveryΒΆ

# Verify GPU resources are visible in Kubernetes
# kubectl describe node gpu-node-01
# Resources:
#   nvidia.com/gpu: 8
#   amd.com/gpu: 8 (AMD)
from kubernetes import client, config

def validate_gpu_resources():
    """Verify Kubernetes nodes report correct GPU resources."""
    config.load_kube_config()
    v1 = client.CoreV1Api()

    nodes = v1.list_node()
    gpu_nodes = []

    for node in nodes.items:
        allocatable = node.status.allocatable or {}
        gpu_count = 0

        # Check for NVIDIA GPUs
        if 'nvidia.com/gpu' in allocatable:
            gpu_count = int(allocatable['nvidia.com/gpu'])
        # Check for AMD GPUs
        elif 'amd.com/gpu' in allocatable:
            gpu_count = int(allocatable['amd.com/gpu'])

        if gpu_count > 0:
            gpu_nodes.append({
                'name': node.metadata.name,
                'gpus': gpu_count,
                'status': 'Ready' if any(
                    c.type == 'Ready' and c.status == 'True'
                    for c in node.status.conditions
                ) else 'NotReady',
            })
            print(f"  Node {node.metadata.name}: {gpu_count} GPUs "
                  f"({gpu_nodes[-1]['status']})")

    total_gpus = sum(n['gpus'] for n in gpu_nodes)
    print(f"\nTotal: {len(gpu_nodes)} GPU nodes, {total_gpus} GPUs")
    return gpu_nodes

GPU Pod Scheduling TestΒΆ

# test-gpu-pod.yaml β€” Validate a GPU pod can be scheduled and run
apiVersion: v1
kind: Pod
metadata:
  name: gpu-validation-test
spec:
  restartPolicy: Never
  containers:
  - name: gpu-test
    image: nvidia/cuda:12.4.0-runtime-ubuntu22.04
    command: ["nvidia-smi"]
    resources:
      limits:
        nvidia.com/gpu: 1
import subprocess
import json
import time

def validate_gpu_scheduling(gpu_count=1, timeout_seconds=120):
    """Validate that GPU pods are scheduled correctly."""
    pod_manifest = {
        "apiVersion": "v1",
        "kind": "Pod",
        "metadata": {"name": "gpu-sched-test", "namespace": "default"},
        "spec": {
            "restartPolicy": "Never",
            "containers": [{
                "name": "test",
                "image": "nvidia/cuda:12.4.0-runtime-ubuntu22.04",
                "command": ["nvidia-smi", "-L"],
                "resources": {
                    "limits": {"nvidia.com/gpu": str(gpu_count)}
                },
            }],
        },
    }

    # Create pod
    config.load_kube_config()
    v1 = client.CoreV1Api()

    # Clean up if exists
    try:
        v1.delete_namespaced_pod("gpu-sched-test", "default")
        time.sleep(5)
    except client.exceptions.ApiException:
        pass

    v1.create_namespaced_pod("default", pod_manifest)

    # Wait for completion
    start = time.time()
    while time.time() - start < timeout_seconds:
        pod = v1.read_namespaced_pod("gpu-sched-test", "default")
        if pod.status.phase in ("Succeeded", "Failed"):
            break
        time.sleep(2)

    # Check result
    pod = v1.read_namespaced_pod("gpu-sched-test", "default")
    logs = v1.read_namespaced_pod_log("gpu-sched-test", "default")

    passed = pod.status.phase == "Succeeded"
    print(f"GPU Scheduling Test: {'PASS' if passed else 'FAIL'}")
    print(f"  Phase: {pod.status.phase}")
    print(f"  Logs: {logs[:500]}")

    # Cleanup
    v1.delete_namespaced_pod("gpu-sched-test", "default")
    return passed

Multi-GPU Scheduling (Topology-Aware)ΒΆ

# Validate that multi-GPU jobs get GPUs on the same node
# and ideally on the same NVLink domain
apiVersion: batch/v1
kind: Job
metadata:
  name: multi-gpu-topo-test
spec:
  template:
    spec:
      restartPolicy: Never
      containers:
      - name: topo-test
        image: nvcr.io/nvidia/pytorch:24.03-py3
        command:
          - python
          - -c
          - |
            import torch
            n = torch.cuda.device_count()
            print(f"GPUs visible: {n}")
            for i in range(n):
                props = torch.cuda.get_device_properties(i)
                print(f"  GPU {i}: {props.name} ({props.total_mem // 1e9:.0f}GB)")
            # Check P2P
            for i in range(n):
                for j in range(n):
                    if i != j:
                        can = torch.cuda.can_device_access_peer(i, j)
                        print(f"  P2P {i}->{j}: {can}")
        resources:
          limits:
            nvidia.com/gpu: "8"

7.3 GPU Monitoring & ObservabilityΒΆ

Prometheus + DCGM Exporter (NVIDIA)ΒΆ

# DCGM exporter for Prometheus metrics
# Helm installation:
# helm repo add gpu-helm-charts https://nvidia.github.io/dcgm-exporter/helm-charts
# helm install dcgm-exporter gpu-helm-charts/dcgm-exporter

# Key metrics exported:
# DCGM_FI_DEV_GPU_TEMP          - GPU temperature
# DCGM_FI_DEV_POWER_USAGE       - Power draw (watts)
# DCGM_FI_DEV_GPU_UTIL          - GPU utilization %
# DCGM_FI_DEV_MEM_COPY_UTIL     - Memory utilization %
# DCGM_FI_DEV_ENC_UTIL          - Encoder utilization
# DCGM_FI_DEV_XID_ERRORS        - XID error count
# DCGM_FI_DEV_PCIE_REPLAY_COUNTER - PCIe replay errors

Custom GPU Health MonitorΒΆ

from prometheus_client import Gauge, start_http_server
import subprocess
import time

# Define Prometheus metrics
gpu_temp = Gauge('gpu_temperature_celsius', 'GPU Temperature', ['gpu_id'])
gpu_power = Gauge('gpu_power_watts', 'GPU Power Draw', ['gpu_id'])
gpu_util = Gauge('gpu_utilization_percent', 'GPU Utilization', ['gpu_id'])
gpu_mem_used = Gauge('gpu_memory_used_bytes', 'GPU Memory Used', ['gpu_id'])
gpu_ecc_errors = Gauge('gpu_ecc_errors_total', 'GPU ECC Errors', ['gpu_id', 'type'])

def collect_gpu_metrics():
    """Collect GPU metrics and expose via Prometheus."""
    output = subprocess.check_output(
        ["nvidia-smi",
         "--query-gpu=index,temperature.gpu,power.draw,"
         "utilization.gpu,memory.used,"
         "ecc.errors.corrected.aggregate.total,"
         "ecc.errors.uncorrected.aggregate.total",
         "--format=csv,noheader,nounits"],
        text=True
    )

    for line in output.strip().split('\n'):
        values = [v.strip() for v in line.split(',')]
        gpu_id = values[0]
        gpu_temp.labels(gpu_id=gpu_id).set(float(values[1]))
        gpu_power.labels(gpu_id=gpu_id).set(float(values[2]))
        gpu_util.labels(gpu_id=gpu_id).set(float(values[3]))
        gpu_mem_used.labels(gpu_id=gpu_id).set(float(values[4]) * 1e6)  # MB to bytes

        if values[5] != 'N/A':
            gpu_ecc_errors.labels(gpu_id=gpu_id, type='corrected').set(float(values[5]))
        if values[6] != 'N/A':
            gpu_ecc_errors.labels(gpu_id=gpu_id, type='uncorrected').set(float(values[6]))


def run_gpu_exporter(port=9400, interval=5):
    """Run GPU metrics exporter."""
    start_http_server(port)
    print(f"GPU exporter running on :{port}")
    while True:
        collect_gpu_metrics()
        time.sleep(interval)

Grafana Dashboard AlertsΒΆ

# Example Prometheus alert rules for GPU health
groups:
  - name: gpu-alerts
    rules:
      - alert: GPUTemperatureHigh
        expr: gpu_temperature_celsius > 85
        for: 5m
        labels:
          severity: warning
        annotations:
          summary: "GPU {{ $labels.gpu_id }} temperature above 85Β°C"

      - alert: GPUMemoryExhausted
        expr: gpu_memory_used_bytes / gpu_memory_total_bytes > 0.95
        for: 1m
        labels:
          severity: critical
        annotations:
          summary: "GPU {{ $labels.gpu_id }} memory >95% used"

      - alert: GPUECCUncorrectable
        expr: increase(gpu_ecc_errors_total{type="uncorrected"}[1h]) > 0
        labels:
          severity: critical
        annotations:
          summary: "GPU {{ $labels.gpu_id }} has uncorrectable ECC errors"

      - alert: GPUUtilizationLow
        expr: gpu_utilization_percent < 10
        for: 30m
        labels:
          severity: info
        annotations:
          summary: "GPU {{ $labels.gpu_id }} underutilized (<10%)"

7.4 Network Validation for AI WorkloadsΒΆ

InfiniBand ValidationΒΆ

# Check InfiniBand status
ibstat
ibstatus

# Verify link speed (should be HDR 200Gbps or NDR 400Gbps)
ibstat | grep -i rate

# Run InfiniBand bandwidth test
ib_write_bw --size=65536 --duration=10 <remote_host>
ib_read_bw --size=65536 --duration=10 <remote_host>

# Run InfiniBand latency test
ib_write_lat --size=4 --duration=10 <remote_host>

RoCE (RDMA over Converged Ethernet) ValidationΒΆ

# Check RoCE interfaces
ibv_devinfo

# Verify RDMA connectivity
rdma link show
rping -s -v  # Server
rping -c -a <server_ip> -v -C 10  # Client

# Test bandwidth
ib_write_bw -d mlx5_0 --size=65536 <remote_host>

GPUDirect RDMA ValidationΒΆ

def validate_gpudirect_rdma():
    """Validate GPUDirect RDMA is active (GPU memory β†’ network β†’ remote GPU)."""
    import subprocess

    # Check if nvidia_peermem module is loaded
    result = subprocess.run(
        ["lsmod"], capture_output=True, text=True
    )
    peermem_loaded = "nvidia_peermem" in result.stdout

    # Check NCCL GPUDirect usage
    nccl_env = {
        "NCCL_NET_GDR_LEVEL": "5",     # Enable GPUDirect RDMA
        "NCCL_NET_GDR_READ": "1",      # Enable GDR for reads
    }

    print(f"nvidia_peermem module loaded: {peermem_loaded}")
    print(f"Recommended NCCL env: {nccl_env}")
    return peermem_loaded

7.5 Job Scheduler ValidationΒΆ

SLURM GPU Job ValidationΒΆ

# Verify SLURM sees GPU resources
sinfo -o "%N %G" | head

# Submit a GPU validation job
sbatch <<'EOF'
#!/bin/bash
#SBATCH --job-name=gpu-validate
#SBATCH --nodes=1
#SBATCH --gpus-per-node=8
#SBATCH --time=00:10:00
#SBATCH --output=gpu-validate-%j.out

echo "Node: $(hostname)"
echo "GPUs allocated: $SLURM_GPUS_ON_NODE"
nvidia-smi -L

python -c "
import torch
print(f'PyTorch sees {torch.cuda.device_count()} GPUs')
for i in range(torch.cuda.device_count()):
    print(f'  GPU {i}: {torch.cuda.get_device_name(i)}')
"
EOF

# Multi-node GPU job
sbatch <<'EOF'
#!/bin/bash
#SBATCH --job-name=multi-node-validate
#SBATCH --nodes=2
#SBATCH --gpus-per-node=8
#SBATCH --ntasks-per-node=8
#SBATCH --time=00:10:00

srun torchrun --nproc_per_node=8 --nnodes=2 \
    --rdzv_id=$SLURM_JOB_ID --rdzv_backend=c10d \
    --rdzv_endpoint=$(scontrol show hostname $SLURM_NODELIST | head -1):29500 \
    validate_distributed.py
EOF

Kubernetes Job Operator (Kubeflow Training Operator)ΒΆ

# PyTorchJob for distributed training validation
apiVersion: kubeflow.org/v1
kind: PyTorchJob
metadata:
  name: distributed-validation
spec:
  pytorchReplicaSpecs:
    Master:
      replicas: 1
      template:
        spec:
          containers:
          - name: pytorch
            image: nvcr.io/nvidia/pytorch:24.03-py3
            command: ["torchrun"]
            args:
              - "--nproc_per_node=8"
              - "validate_distributed.py"
            resources:
              limits:
                nvidia.com/gpu: "8"
    Worker:
      replicas: 1
      template:
        spec:
          containers:
          - name: pytorch
            image: nvcr.io/nvidia/pytorch:24.03-py3
            command: ["torchrun"]
            args:
              - "--nproc_per_node=8"
              - "validate_distributed.py"
            resources:
              limits:
                nvidia.com/gpu: "8"

7.6 Storage Validation for AI WorkloadsΒΆ

Checkpoint I/O PerformanceΒΆ

import torch
import time
import os

# Checkpoint directory β€” configure this for your environment.
# The default below is a placeholder for shared/networked storage typical in
# datacenter setups (e.g., NFS, Lustre, GPFS).  Override via the
# CHECKPOINT_DIR environment variable or pass checkpoint_dir explicitly.
CHECKPOINT_DIR = os.environ.get("CHECKPOINT_DIR", "/mnt/shared/checkpoints")

def validate_checkpoint_io(model_size_gb=1, checkpoint_dir=None):
    """Validate checkpoint save/load performance on shared storage.

    Args:
        model_size_gb: Size of the simulated checkpoint in GB.
        checkpoint_dir: Directory for checkpoint files.  Defaults to the
            module-level CHECKPOINT_DIR (configurable via the CHECKPOINT_DIR
            environment variable).  The default path '/mnt/shared/checkpoints'
            is a placeholder β€” update it to match your storage mount.
    """
    if checkpoint_dir is None:
        checkpoint_dir = CHECKPOINT_DIR

    os.makedirs(checkpoint_dir, exist_ok=True)

    # Create a model-sized tensor (simulating a checkpoint)
    numel = int(model_size_gb * 1e9 / 4)  # FP32
    state = {"model": torch.randn(numel)}

    # Save
    path = os.path.join(checkpoint_dir, "test_checkpoint.pt")
    start = time.perf_counter()
    torch.save(state, path)
    save_time = time.perf_counter() - start
    save_bandwidth = model_size_gb / save_time

    # Load
    start = time.perf_counter()
    loaded = torch.load(path, weights_only=True)
    load_time = time.perf_counter() - start
    load_bandwidth = model_size_gb / load_time

    # Verify integrity
    assert torch.equal(state["model"], loaded["model"]), "Checkpoint corruption!"

    print(f"Checkpoint I/O ({model_size_gb}GB):")
    print(f"  Directory: {checkpoint_dir}")
    print(f"  Save: {save_time:.1f}s ({save_bandwidth:.2f} GB/s)")
    print(f"  Load: {load_time:.1f}s ({load_bandwidth:.2f} GB/s)")

    # Cleanup
    os.remove(path)
    return {"save_gbs": save_bandwidth, "load_gbs": load_bandwidth}

Dataset Loading PerformanceΒΆ

For large-scale training, dataset loading from shared storage (NFS, Lustre, GPFS) must deliver data fast enough to keep all GPUs busy – a single 8xH100 node processing images can consume over 10 GB/s of read bandwidth, and a 256-GPU cluster proportionally more. This benchmark reads sample files sequentially and measures sustained read throughput in GB/s. If measured bandwidth falls below the expected minimum (typically 1+ GB/s per GPU for image training, 0.5+ GB/s for text pretraining), the storage subsystem becomes the training bottleneck regardless of GPU performance. Common remedies include increasing the number of storage servers, switching to a faster parallel filesystem, or caching datasets in local NVMe SSDs.

def validate_dataset_io(data_dir, expected_bandwidth_gbs=1.0):
    """Validate that dataset read speed meets training requirements."""
    import glob

    files = glob.glob(os.path.join(data_dir, "*.pt"))[:100]
    if not files:
        print("No .pt files found for testing")
        return

    total_bytes = 0
    start = time.perf_counter()
    for f in files:
        data = torch.load(f, weights_only=True)
        total_bytes += os.path.getsize(f)
    elapsed = time.perf_counter() - start

    bandwidth = total_bytes / elapsed / 1e9
    passed = bandwidth >= expected_bandwidth_gbs
    print(f"Dataset I/O: {bandwidth:.2f} GB/s "
          f"({'PASS' if passed else 'FAIL'}: need >{expected_bandwidth_gbs} GB/s)")
    return bandwidth

7.7 ExercisesΒΆ

  1. GPU Discovery: Write a script that queries your Kubernetes cluster (or local system) and reports all GPU resources, their types, and health status.

  2. Monitoring Setup: Set up GPU metric collection (using nvidia-smi or DCGM) and write 3 alert rules: high temperature, ECC errors, and low utilization.

  3. Network Bandwidth: If you have access to multi-node infrastructure, measure InfiniBand or Ethernet bandwidth between GPU nodes. Is it sufficient for distributed training?

  4. Checkpoint Benchmark: Measure checkpoint save/load speed for 1 GB, 5 GB, and 20 GB files on your storage system. Is it fast enough for hourly checkpointing during LLM training?

  5. Scheduling Test: Submit a multi-GPU job (via SLURM or Kubernetes) and verify that all GPUs are on the same node and have NVLink connectivity.

Key TakeawaysΒΆ

  • Datacenter validation ensures the infrastructure supports AI workloads correctly

  • Kubernetes GPU scheduling must be topology-aware (NVLink, same-node)

  • Monitoring must catch GPU health issues (ECC errors, thermal throttling) proactively

  • Network bandwidth (InfiniBand/RoCE) is critical for distributed training performance

  • Storage I/O must keep up with checkpoint frequency and dataset loading demands

Previous: 06_distributed_training_validation.ipynb
Next: 08_regression_release_validation.ipynb Back to Overview: README.md