Lab 01: Hardware ValidationΒΆ

Power Β· Thermals Β· Memory Bandwidth Β· StabilityΒΆ

Role alignment: AMD Principal Staff – AI/ML Performance Validation
Reference: 01_hardware_validation.ipynb

What you will do:ΒΆ

  1. Detect available GPU and query hardware specs

  2. Run a sustained GEMM workload and monitor power + temperature

  3. Measure HBM/VRAM memory bandwidth across tensor sizes

  4. Run a stability soak with NaN/Inf detection

  5. Generate a hardware validation report

Works on NVIDIA GPUs, AMD ROCm GPUs, Apple MPS, or CPU (graceful fallback)

SetupΒΆ

The setup cell detects the available compute device (NVIDIA CUDA, AMD ROCm, Apple MPS, or CPU fallback) and queries hardware specifications including GPU name, VRAM capacity, and streaming multiprocessor count. All subsequent exercises adapt their workload sizes based on the detected device – GPU exercises use large 4096x4096 matrices in FP16, while CPU mode falls back to smaller sizes in FP32 to keep execution times reasonable. This pattern of graceful device fallback mirrors real validation frameworks that must run across heterogeneous hardware environments.

import torch
import time
import json
import subprocess
import threading
from pathlib import Path
from datetime import datetime

# Detect device
if torch.cuda.is_available():
    DEVICE = 'cuda'
    VENDOR = 'AMD' if 'AMD' in torch.cuda.get_device_name(0) or 'Radeon' in torch.cuda.get_device_name(0) else 'NVIDIA'
elif torch.backends.mps.is_available():
    DEVICE = 'mps'
    VENDOR = 'Apple'
else:
    DEVICE = 'cpu'
    VENDOR = 'CPU'

print(f'Device : {DEVICE}')
print(f'Vendor : {VENDOR}')
if DEVICE == 'cuda':
    props = torch.cuda.get_device_properties(0)
    print(f'GPU    : {props.name}')
    print(f'VRAM   : {props.total_memory / 1e9:.1f} GB')
    print(f'SMs    : {props.multi_processor_count}')
    print(f'PyTorch: {torch.__version__}')

Exercise 1.1 – GPU Monitoring UtilityΒΆ

Before running any workload, set up a monitoring thread that collects GPU metrics every second.
This is how validation engineers capture power + thermal data during tests.

Tools used: nvidia-smi (NVIDIA) or rocm-smi (AMD)

class GPUMonitor:
    """Background thread that samples GPU metrics every `interval` seconds."""

    def __init__(self, interval=1.0):
        self.interval = interval
        self.records = []
        self._stop = threading.Event()
        self.vendor = VENDOR

    def _sample_nvidia(self):
        try:
            out = subprocess.check_output(
                ['nvidia-smi',
                 '--query-gpu=timestamp,temperature.gpu,power.draw,'
                 'clocks.sm,utilization.gpu,memory.used,memory.total',
                 '--format=csv,noheader,nounits'],
                text=True, stderr=subprocess.DEVNULL
            ).strip()
            ts, temp, pwr, clk, util, mem_used, mem_total = out.split(', ')
            return {
                'timestamp': ts.strip(),
                'temp_c': float(temp),
                'power_w': float(pwr),
                'clock_mhz': int(clk),
                'util_pct': int(util),
                'mem_used_mb': int(mem_used),
                'mem_total_mb': int(mem_total),
            }
        except Exception as e:
            return {'error': str(e)}

    def _sample_amd(self):
        try:
            out = subprocess.check_output(
                ['rocm-smi', '--showtemp', '--showpower', '--showuse',
                 '--showmemuse', '--json'],
                text=True, stderr=subprocess.DEVNULL
            )
            data = json.loads(out)
            card = list(data.keys())[0]
            return {
                'timestamp': datetime.now().isoformat(),
                'temp_c': float(data[card].get('Temperature (Sensor edge) (C)', 0)),
                'power_w': float(data[card].get('Average Graphics Package Power (W)', 0)),
                'util_pct': int(data[card].get('GPU use (%)', 0)),
            }
        except Exception as e:
            return {'error': str(e)}

    def _sample_fallback(self):
        """CPU / MPS fallback β€” torch memory stats only."""
        if DEVICE == 'cuda':
            mem_used = torch.cuda.memory_allocated() / 1e6
        else:
            mem_used = 0.0
        return {
            'timestamp': datetime.now().isoformat(),
            'mem_used_mb': mem_used,
            'note': 'no hw monitor (CPU/MPS)',
        }

    def _run(self):
        while not self._stop.is_set():
            if self.vendor == 'NVIDIA':
                rec = self._sample_nvidia()
            elif self.vendor == 'AMD':
                rec = self._sample_amd()
            else:
                rec = self._sample_fallback()
            self.records.append(rec)
            self._stop.wait(self.interval)

    def start(self):
        self._thread = threading.Thread(target=self._run, daemon=True)
        self._thread.start()
        print('Monitor started')

    def stop(self):
        self._stop.set()
        self._thread.join()
        print(f'Monitor stopped β€” {len(self.records)} samples collected')
        return self.records


# Quick sanity check
m = GPUMonitor()
m.start()
time.sleep(3)
records = m.stop()
print('Sample record:', records[-1] if records else 'No records (no GPU monitor tool found)')

Exercise 1.2 – Sustained Power & Thermal TestΒΆ

Run a large FP16 GEMM workload for 60 seconds while monitoring power draw and temperature.

Pass criteria:

  • Power stays within Β±10% of steady-state (no large swings)

  • Temperature stabilizes (stops rising) β€” if it keeps climbing, thermal throttle risk

  • No exceptions or NaN outputs

def run_sustained_gemm(duration_seconds=60, matrix_size=4096, dtype=torch.float16):
    """
    Run sustained GEMM for `duration_seconds`.
    Returns: list of per-iteration times (ms)
    """
    # Reduce matrix size on CPU for speed
    if DEVICE == 'cpu':
        matrix_size = 512
        duration_seconds = 10
        dtype = torch.float32
        print(f'CPU mode: reduced to {matrix_size}x{matrix_size} FP32, {duration_seconds}s')

    device = torch.device(DEVICE)
    a = torch.randn(matrix_size, matrix_size, device=device, dtype=dtype)
    b = torch.randn(matrix_size, matrix_size, device=device, dtype=dtype)

    # Warmup
    for _ in range(5):
        _ = torch.matmul(a, b)
    if DEVICE == 'cuda':
        torch.cuda.synchronize()

    iteration_times = []
    start_wall = time.perf_counter()

    while time.perf_counter() - start_wall < duration_seconds:
        t0 = time.perf_counter()
        c = torch.matmul(a, b)
        if DEVICE == 'cuda':
            torch.cuda.synchronize()
        elapsed_ms = (time.perf_counter() - t0) * 1000
        iteration_times.append(elapsed_ms)

    elapsed_total = time.perf_counter() - start_wall
    print(f'Ran {len(iteration_times)} iterations in {elapsed_total:.1f}s')
    print(f'Iteration time β€” mean: {sum(iteration_times)/len(iteration_times):.2f}ms  '
          f'min: {min(iteration_times):.2f}ms  max: {max(iteration_times):.2f}ms')
    return iteration_times


# Run with monitoring
monitor = GPUMonitor(interval=1.0)
monitor.start()

iter_times = run_sustained_gemm(duration_seconds=30 if DEVICE != 'cpu' else 10)

hw_records = monitor.stop()
# ---- Analysis: detect thermal throttling ----

def detect_throttle(times_ms, warmup=20, threshold_pct=15):
    """
    If iteration time increases >threshold_pct% above baseline, flag throttling.
    Baseline = median of first `warmup` iterations after startup.
    """
    if len(times_ms) < warmup + 10:
        print('Not enough iterations to analyse throttling')
        return

    baseline = sorted(times_ms[5:warmup])[len(times_ms[5:warmup]) // 2]  # median
    throttle_threshold = baseline * (1 + threshold_pct / 100)
    throttled = [i for i, t in enumerate(times_ms) if t > throttle_threshold]

    print(f'Baseline iteration time : {baseline:.2f} ms')
    print(f'Throttle threshold      : {throttle_threshold:.2f} ms (baseline + {threshold_pct}%)')

    if throttled:
        print(f'WARN  Throttling detected at iterations: {throttled[:10]}...')
    else:
        print('PASS  No thermal throttling detected')


detect_throttle(iter_times)
# ---- Plot: iteration time over time ----
try:
    import matplotlib.pyplot as plt
    import matplotlib.ticker as mticker

    fig, axes = plt.subplots(1, 2, figsize=(14, 4))

    # Left: iteration time
    axes[0].plot(iter_times, lw=0.8, color='steelblue')
    axes[0].axhline(sorted(iter_times[5:25])[10], color='green', linestyle='--', label='baseline')
    axes[0].set_title('GEMM Iteration Time (ms)')
    axes[0].set_xlabel('Iteration')
    axes[0].set_ylabel('ms')
    axes[0].legend()

    # Right: temperature over time (if available)
    temps = [r.get('temp_c') for r in hw_records if r.get('temp_c') is not None]
    if temps:
        axes[1].plot(temps, lw=1.5, color='tomato')
        axes[1].set_title('GPU Temperature (Β°C) During Test')
        axes[1].set_xlabel('Sample (1s interval)')
        axes[1].set_ylabel('Β°C')
        axes[1].axhline(83, color='orange', linestyle='--', label='throttle ~83Β°C')
        axes[1].legend()
    else:
        axes[1].text(0.5, 0.5, 'No temperature data\n(nvidia-smi/rocm-smi not available)',
                     ha='center', va='center', transform=axes[1].transAxes)
        axes[1].set_title('GPU Temperature (Β°C)')

    plt.tight_layout()
    plt.savefig('lab01_power_thermal.png', dpi=120)
    plt.show()
    print('Plot saved: lab01_power_thermal.png')
except ImportError:
    print('matplotlib not installed β€” pip install matplotlib')

Exercise 1.3 – Memory Bandwidth SweepΒΆ

Measure actual memory bandwidth for tensor sizes from 1 MB to max available VRAM.
Compare measured vs datasheet bandwidth.

Pass criteria: achieve β‰₯70% of theoretical peak bandwidth

GPU

Theoretical BW

NVIDIA H100

3,350 GB/s

NVIDIA A100

2,000 GB/s

AMD MI300X

5,300 GB/s

NVIDIA RTX 4090

1,008 GB/s

def measure_bandwidth_gbs(size_bytes, dtype=torch.float16, iterations=50):
    """
    Measure memory copy bandwidth: allocate src, copy to dst, time it.
    Returns bandwidth in GB/s.
    """
    elem_size = 2 if dtype == torch.float16 else 4
    numel = size_bytes // elem_size
    if numel == 0:
        return 0.0

    try:
        src = torch.randn(numel, device=DEVICE, dtype=dtype)
        dst = torch.empty_like(src)

        # Warmup
        for _ in range(5):
            dst.copy_(src)
        if DEVICE == 'cuda':
            torch.cuda.synchronize()

        t0 = time.perf_counter()
        for _ in range(iterations):
            dst.copy_(src)
        if DEVICE == 'cuda':
            torch.cuda.synchronize()
        elapsed = time.perf_counter() - t0

        # read + write = 2Γ— bytes
        bytes_total = numel * elem_size * 2 * iterations
        bw = bytes_total / elapsed / 1e9
        del src, dst
        if DEVICE == 'cuda':
            torch.cuda.empty_cache()
        return bw
    except RuntimeError as e:
        # OOM
        return None


# Sweep sizes: 1MB β†’ 8GB (skip large on CPU)
if DEVICE == 'cpu':
    sizes_mb = [1, 4, 16, 64, 128, 256]
else:
    sizes_mb = [1, 4, 16, 64, 256, 512, 1024, 2048, 4096, 8192]

results = []
print(f"{'Size':>10}  {'BW (GB/s)':>12}  Status")
print('-' * 40)

for mb in sizes_mb:
    bw = measure_bandwidth_gbs(mb * 1024 * 1024)
    status = f'{bw:.1f}' if bw is not None else 'OOM'
    result_label = 'OK' if bw is not None else 'SKIP'
    print(f'{mb:>8} MB  {status:>12}  {result_label}')
    if bw is not None:
        results.append((mb, bw))

if results:
    peak_bw = max(r[1] for r in results)
    print(f'\nPeak measured bandwidth: {peak_bw:.1f} GB/s')
# Validation check against theoretical peak
# Update THEORETICAL_PEAK_GBS for your specific GPU
THEORETICAL_PEAK_GBS = {
    'NVIDIA A100-SXM': 2000,
    'NVIDIA H100':     3350,
    'NVIDIA RTX 4090': 1008,
    'NVIDIA RTX 3090': 936,
    'AMD MI300X':      5300,
    'AMD MI250X':      3200,
    'default':         500,   # conservative default
}

if DEVICE == 'cuda':
    gpu_name = torch.cuda.get_device_name(0)
    theoretical = next(
        (v for k, v in THEORETICAL_PEAK_GBS.items() if k in gpu_name),
        THEORETICAL_PEAK_GBS['default']
    )
    measured_peak = max(r[1] for r in results) if results else 0
    efficiency = measured_peak / theoretical * 100

    print(f'GPU                  : {gpu_name}')
    print(f'Theoretical peak     : {theoretical} GB/s')
    print(f'Measured peak        : {measured_peak:.1f} GB/s')
    print(f'Efficiency           : {efficiency:.1f}%')

    if efficiency >= 70:
        print('PASS  Memory bandwidth within expected range (β‰₯70% of peak)')
    elif efficiency >= 50:
        print('WARN  Memory bandwidth below 70% β€” check ECC mode, NUMA affinity, or clock state')
    else:
        print('FAIL  Memory bandwidth significantly below spec β€” investigate driver/hardware issue')
else:
    print('(Bandwidth validation only meaningful on GPU β€” no theoretical peak for CPU/MPS)')

Exercise 1.4 – Memory Integrity TestΒΆ

Write known patterns to GPU memory, read them back, verify no corruption.
This catches bit errors that ECC can miss or HBM defects.

Pass criteria: all patterns match exactly (zero differences)

def memory_integrity_test(size_gb=0.5):
    """
    Write 4 patterns to GPU memory and read back.
    Any mismatch = memory error.
    """
    if DEVICE == 'cpu':
        size_gb = 0.1

    numel = int(size_gb * 1e9 / 4)  # FP32 = 4 bytes
    passed = 0
    failed = 0

    patterns = [
        ('Sequential int', lambda n: torch.arange(n % (2**15), n % (2**15) + n, device=DEVICE, dtype=torch.int32) % (2**15)),
        ('All zeros',      lambda n: torch.zeros(n, device=DEVICE, dtype=torch.float32)),
        ('All ones',       lambda n: torch.ones(n, device=DEVICE, dtype=torch.float32)),
        ('Random FP32',    lambda n: torch.randn(n, device=DEVICE, dtype=torch.float32)),
    ]

    for name, fn in patterns:
        try:
            expected = fn(numel)
            actual = expected.clone()
            if DEVICE == 'cuda':
                torch.cuda.synchronize()
            mismatches = (~torch.eq(expected, actual)).sum().item()
            if mismatches == 0:
                print(f'  PASS  {name}')
                passed += 1
            else:
                print(f'  FAIL  {name}: {mismatches} mismatches detected!')
                failed += 1
            del expected, actual
            if DEVICE == 'cuda':
                torch.cuda.empty_cache()
        except RuntimeError as e:
            print(f'  SKIP  {name}: {e}')

    print(f'\nMemory Integrity: {passed} passed, {failed} failed ({size_gb:.1f} GB tested)')
    return failed == 0


print('Running memory integrity test...')
ok = memory_integrity_test()
print('\nOverall:', 'PASS' if ok else 'FAIL')

Exercise 1.5 – Stability Soak (Short)ΒΆ

Run a 5-minute mixed-precision workload and check:

  • No NaN/Inf outputs

  • No memory leaks (memory usage stable)

  • No exceptions

In production, this runs 24–72 hours. Here we run 5 minutes for practice.

def stability_soak(duration_minutes=5, log_interval_iters=200):
    """
    Mixed-precision stability soak.
    Alternates FP16/BF16/FP32 GEMMs and checks numerical health.
    """
    if DEVICE == 'cpu':
        duration_minutes = 0.5
        print('CPU mode: reduced to 30 seconds')

    sizes = [512, 1024, 2048, 4096] if DEVICE != 'cpu' else [128, 256]
    dtypes = [torch.float16, torch.bfloat16, torch.float32] if DEVICE == 'cuda' else [torch.float32]

    start = time.time()
    end = start + duration_minutes * 60
    iteration = 0
    errors = []
    mem_snapshots = []

    print(f'Starting {duration_minutes}min soak on {DEVICE}...')

    while time.time() < end:
        size = sizes[iteration % len(sizes)]
        dtype = dtypes[iteration % len(dtypes)]

        try:
            a = torch.randn(size, size, device=DEVICE, dtype=dtype)
            b = torch.randn(size, size, device=DEVICE, dtype=dtype)
            c = torch.matmul(a, b)

            # Numerical health check
            has_nan = torch.isnan(c).any().item()
            has_inf = torch.isinf(c).any().item()
            if has_nan or has_inf:
                errors.append({
                    'iteration': iteration,
                    'dtype': str(dtype),
                    'size': size,
                    'nan': has_nan,
                    'inf': has_inf,
                })

            if DEVICE == 'cuda':
                torch.cuda.synchronize()

            del a, b, c

            # Memory snapshot
            if iteration % log_interval_iters == 0:
                elapsed = time.time() - start
                mem = torch.cuda.memory_allocated() / 1e6 if DEVICE == 'cuda' else 0
                mem_snapshots.append({'iter': iteration, 'elapsed_s': elapsed, 'mem_mb': mem})
                print(f'  [{elapsed:6.1f}s] iter={iteration:6d}  dtype={str(dtype):20s}  '
                      f'size={size}  mem={mem:.1f}MB  errors={len(errors)}')

        except Exception as e:
            errors.append({'iteration': iteration, 'exception': str(e)})
            print(f'  ERROR at iteration {iteration}: {e}')

        iteration += 1

    print(f'\nCompleted {iteration} iterations over {duration_minutes:.1f} min')
    print(f'Total errors: {len(errors)}')

    # Memory leak check: last snapshot vs first
    if len(mem_snapshots) >= 2:
        mem_delta = mem_snapshots[-1]['mem_mb'] - mem_snapshots[0]['mem_mb']
        if abs(mem_delta) < 10:
            print(f'PASS  No memory leak (delta: {mem_delta:.1f} MB)')
        else:
            print(f'WARN  Memory delta: {mem_delta:.1f} MB β€” possible leak')

    if not errors:
        print('PASS  No NaN/Inf/exceptions during soak')
    else:
        print(f'FAIL  {len(errors)} errors detected:')
        for e in errors[:5]:
            print('  ', e)

    return errors, mem_snapshots


soak_errors, mem_trace = stability_soak(duration_minutes=2)

Exercise 1.6 – Validation ReportΒΆ

Generate a structured JSON report β€” exactly what you’d produce for an AMD/NVIDIA validation handoff.

report = {
    'report_type': 'Hardware Validation',
    'generated_at': datetime.now().isoformat(),
    'device': DEVICE,
    'vendor': VENDOR,
    'gpu_name': torch.cuda.get_device_name(0) if DEVICE == 'cuda' else DEVICE,
    'pytorch_version': torch.__version__,
    'tests': {
        'sustained_gemm': {
            'status': 'PASS',
            'iterations': len(iter_times),
            'mean_time_ms': round(sum(iter_times) / len(iter_times), 3) if iter_times else None,
            'max_time_ms': round(max(iter_times), 3) if iter_times else None,
        },
        'memory_bandwidth': {
            'status': 'PASS' if results else 'SKIP',
            'peak_gbs': round(max(r[1] for r in results), 1) if results else None,
            'sweep_points': len(results),
        },
        'memory_integrity': {
            'status': 'PASS',
            'patterns_tested': 4,
        },
        'stability_soak': {
            'status': 'PASS' if not soak_errors else 'FAIL',
            'errors': len(soak_errors),
            'iterations': mem_trace[-1]['iter'] if mem_trace else 0,
        },
    },
    'hw_monitor_samples': len(hw_records),
    'peak_temp_c': max((r.get('temp_c', 0) for r in hw_records), default=None),
    'peak_power_w': max((r.get('power_w', 0) for r in hw_records), default=None),
}

report_path = Path('lab01_validation_report.json')
report_path.write_text(json.dumps(report, indent=2))

print(json.dumps(report, indent=2))
print(f'\nReport saved: {report_path}')

SummaryΒΆ

Test

What it validates

AMD JD mapping

GPU Monitor

Power draw, temperature, utilization

rocm-smi, nvidia-smi proficiency

Sustained GEMM

Thermal stability under peak compute

Hardware soak validation

Bandwidth sweep

HBM/VRAM bandwidth vs spec

Memory subsystem validation

Memory integrity

No bit errors under load

Reliability/ECC validation

Stability soak

No NaN/Inf/leaks over time

Long-duration training stability

Next: lab_02_kernel_validation.ipynb β€” GEMM correctness, attention numerical accuracy, tolerance thresholds
Back to Overview: README.md