Lab 01: Hardware ValidationΒΆ
Power Β· Thermals Β· Memory Bandwidth Β· StabilityΒΆ
Role alignment: AMD Principal Staff β AI/ML Performance Validation
Reference: 01_hardware_validation.ipynb
What you will do:ΒΆ
Detect available GPU and query hardware specs
Run a sustained GEMM workload and monitor power + temperature
Measure HBM/VRAM memory bandwidth across tensor sizes
Run a stability soak with NaN/Inf detection
Generate a hardware validation report
Works on NVIDIA GPUs, AMD ROCm GPUs, Apple MPS, or CPU (graceful fallback)
SetupΒΆ
The setup cell detects the available compute device (NVIDIA CUDA, AMD ROCm, Apple MPS, or CPU fallback) and queries hardware specifications including GPU name, VRAM capacity, and streaming multiprocessor count. All subsequent exercises adapt their workload sizes based on the detected device β GPU exercises use large 4096x4096 matrices in FP16, while CPU mode falls back to smaller sizes in FP32 to keep execution times reasonable. This pattern of graceful device fallback mirrors real validation frameworks that must run across heterogeneous hardware environments.
import torch
import time
import json
import subprocess
import threading
from pathlib import Path
from datetime import datetime
# Detect device
if torch.cuda.is_available():
DEVICE = 'cuda'
VENDOR = 'AMD' if 'AMD' in torch.cuda.get_device_name(0) or 'Radeon' in torch.cuda.get_device_name(0) else 'NVIDIA'
elif torch.backends.mps.is_available():
DEVICE = 'mps'
VENDOR = 'Apple'
else:
DEVICE = 'cpu'
VENDOR = 'CPU'
print(f'Device : {DEVICE}')
print(f'Vendor : {VENDOR}')
if DEVICE == 'cuda':
props = torch.cuda.get_device_properties(0)
print(f'GPU : {props.name}')
print(f'VRAM : {props.total_memory / 1e9:.1f} GB')
print(f'SMs : {props.multi_processor_count}')
print(f'PyTorch: {torch.__version__}')
Exercise 1.1 β GPU Monitoring UtilityΒΆ
Before running any workload, set up a monitoring thread that collects GPU metrics every second.
This is how validation engineers capture power + thermal data during tests.
Tools used: nvidia-smi (NVIDIA) or rocm-smi (AMD)
class GPUMonitor:
"""Background thread that samples GPU metrics every `interval` seconds."""
def __init__(self, interval=1.0):
self.interval = interval
self.records = []
self._stop = threading.Event()
self.vendor = VENDOR
def _sample_nvidia(self):
try:
out = subprocess.check_output(
['nvidia-smi',
'--query-gpu=timestamp,temperature.gpu,power.draw,'
'clocks.sm,utilization.gpu,memory.used,memory.total',
'--format=csv,noheader,nounits'],
text=True, stderr=subprocess.DEVNULL
).strip()
ts, temp, pwr, clk, util, mem_used, mem_total = out.split(', ')
return {
'timestamp': ts.strip(),
'temp_c': float(temp),
'power_w': float(pwr),
'clock_mhz': int(clk),
'util_pct': int(util),
'mem_used_mb': int(mem_used),
'mem_total_mb': int(mem_total),
}
except Exception as e:
return {'error': str(e)}
def _sample_amd(self):
try:
out = subprocess.check_output(
['rocm-smi', '--showtemp', '--showpower', '--showuse',
'--showmemuse', '--json'],
text=True, stderr=subprocess.DEVNULL
)
data = json.loads(out)
card = list(data.keys())[0]
return {
'timestamp': datetime.now().isoformat(),
'temp_c': float(data[card].get('Temperature (Sensor edge) (C)', 0)),
'power_w': float(data[card].get('Average Graphics Package Power (W)', 0)),
'util_pct': int(data[card].get('GPU use (%)', 0)),
}
except Exception as e:
return {'error': str(e)}
def _sample_fallback(self):
"""CPU / MPS fallback β torch memory stats only."""
if DEVICE == 'cuda':
mem_used = torch.cuda.memory_allocated() / 1e6
else:
mem_used = 0.0
return {
'timestamp': datetime.now().isoformat(),
'mem_used_mb': mem_used,
'note': 'no hw monitor (CPU/MPS)',
}
def _run(self):
while not self._stop.is_set():
if self.vendor == 'NVIDIA':
rec = self._sample_nvidia()
elif self.vendor == 'AMD':
rec = self._sample_amd()
else:
rec = self._sample_fallback()
self.records.append(rec)
self._stop.wait(self.interval)
def start(self):
self._thread = threading.Thread(target=self._run, daemon=True)
self._thread.start()
print('Monitor started')
def stop(self):
self._stop.set()
self._thread.join()
print(f'Monitor stopped β {len(self.records)} samples collected')
return self.records
# Quick sanity check
m = GPUMonitor()
m.start()
time.sleep(3)
records = m.stop()
print('Sample record:', records[-1] if records else 'No records (no GPU monitor tool found)')
Exercise 1.2 β Sustained Power & Thermal TestΒΆ
Run a large FP16 GEMM workload for 60 seconds while monitoring power draw and temperature.
Pass criteria:
Power stays within Β±10% of steady-state (no large swings)
Temperature stabilizes (stops rising) β if it keeps climbing, thermal throttle risk
No exceptions or NaN outputs
def run_sustained_gemm(duration_seconds=60, matrix_size=4096, dtype=torch.float16):
"""
Run sustained GEMM for `duration_seconds`.
Returns: list of per-iteration times (ms)
"""
# Reduce matrix size on CPU for speed
if DEVICE == 'cpu':
matrix_size = 512
duration_seconds = 10
dtype = torch.float32
print(f'CPU mode: reduced to {matrix_size}x{matrix_size} FP32, {duration_seconds}s')
device = torch.device(DEVICE)
a = torch.randn(matrix_size, matrix_size, device=device, dtype=dtype)
b = torch.randn(matrix_size, matrix_size, device=device, dtype=dtype)
# Warmup
for _ in range(5):
_ = torch.matmul(a, b)
if DEVICE == 'cuda':
torch.cuda.synchronize()
iteration_times = []
start_wall = time.perf_counter()
while time.perf_counter() - start_wall < duration_seconds:
t0 = time.perf_counter()
c = torch.matmul(a, b)
if DEVICE == 'cuda':
torch.cuda.synchronize()
elapsed_ms = (time.perf_counter() - t0) * 1000
iteration_times.append(elapsed_ms)
elapsed_total = time.perf_counter() - start_wall
print(f'Ran {len(iteration_times)} iterations in {elapsed_total:.1f}s')
print(f'Iteration time β mean: {sum(iteration_times)/len(iteration_times):.2f}ms '
f'min: {min(iteration_times):.2f}ms max: {max(iteration_times):.2f}ms')
return iteration_times
# Run with monitoring
monitor = GPUMonitor(interval=1.0)
monitor.start()
iter_times = run_sustained_gemm(duration_seconds=30 if DEVICE != 'cpu' else 10)
hw_records = monitor.stop()
# ---- Analysis: detect thermal throttling ----
def detect_throttle(times_ms, warmup=20, threshold_pct=15):
"""
If iteration time increases >threshold_pct% above baseline, flag throttling.
Baseline = median of first `warmup` iterations after startup.
"""
if len(times_ms) < warmup + 10:
print('Not enough iterations to analyse throttling')
return
baseline = sorted(times_ms[5:warmup])[len(times_ms[5:warmup]) // 2] # median
throttle_threshold = baseline * (1 + threshold_pct / 100)
throttled = [i for i, t in enumerate(times_ms) if t > throttle_threshold]
print(f'Baseline iteration time : {baseline:.2f} ms')
print(f'Throttle threshold : {throttle_threshold:.2f} ms (baseline + {threshold_pct}%)')
if throttled:
print(f'WARN Throttling detected at iterations: {throttled[:10]}...')
else:
print('PASS No thermal throttling detected')
detect_throttle(iter_times)
# ---- Plot: iteration time over time ----
try:
import matplotlib.pyplot as plt
import matplotlib.ticker as mticker
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
# Left: iteration time
axes[0].plot(iter_times, lw=0.8, color='steelblue')
axes[0].axhline(sorted(iter_times[5:25])[10], color='green', linestyle='--', label='baseline')
axes[0].set_title('GEMM Iteration Time (ms)')
axes[0].set_xlabel('Iteration')
axes[0].set_ylabel('ms')
axes[0].legend()
# Right: temperature over time (if available)
temps = [r.get('temp_c') for r in hw_records if r.get('temp_c') is not None]
if temps:
axes[1].plot(temps, lw=1.5, color='tomato')
axes[1].set_title('GPU Temperature (Β°C) During Test')
axes[1].set_xlabel('Sample (1s interval)')
axes[1].set_ylabel('Β°C')
axes[1].axhline(83, color='orange', linestyle='--', label='throttle ~83Β°C')
axes[1].legend()
else:
axes[1].text(0.5, 0.5, 'No temperature data\n(nvidia-smi/rocm-smi not available)',
ha='center', va='center', transform=axes[1].transAxes)
axes[1].set_title('GPU Temperature (Β°C)')
plt.tight_layout()
plt.savefig('lab01_power_thermal.png', dpi=120)
plt.show()
print('Plot saved: lab01_power_thermal.png')
except ImportError:
print('matplotlib not installed β pip install matplotlib')
Exercise 1.3 β Memory Bandwidth SweepΒΆ
Measure actual memory bandwidth for tensor sizes from 1 MB to max available VRAM.
Compare measured vs datasheet bandwidth.
Pass criteria: achieve β₯70% of theoretical peak bandwidth
GPU |
Theoretical BW |
|---|---|
NVIDIA H100 |
3,350 GB/s |
NVIDIA A100 |
2,000 GB/s |
AMD MI300X |
5,300 GB/s |
NVIDIA RTX 4090 |
1,008 GB/s |
def measure_bandwidth_gbs(size_bytes, dtype=torch.float16, iterations=50):
"""
Measure memory copy bandwidth: allocate src, copy to dst, time it.
Returns bandwidth in GB/s.
"""
elem_size = 2 if dtype == torch.float16 else 4
numel = size_bytes // elem_size
if numel == 0:
return 0.0
try:
src = torch.randn(numel, device=DEVICE, dtype=dtype)
dst = torch.empty_like(src)
# Warmup
for _ in range(5):
dst.copy_(src)
if DEVICE == 'cuda':
torch.cuda.synchronize()
t0 = time.perf_counter()
for _ in range(iterations):
dst.copy_(src)
if DEVICE == 'cuda':
torch.cuda.synchronize()
elapsed = time.perf_counter() - t0
# read + write = 2Γ bytes
bytes_total = numel * elem_size * 2 * iterations
bw = bytes_total / elapsed / 1e9
del src, dst
if DEVICE == 'cuda':
torch.cuda.empty_cache()
return bw
except RuntimeError as e:
# OOM
return None
# Sweep sizes: 1MB β 8GB (skip large on CPU)
if DEVICE == 'cpu':
sizes_mb = [1, 4, 16, 64, 128, 256]
else:
sizes_mb = [1, 4, 16, 64, 256, 512, 1024, 2048, 4096, 8192]
results = []
print(f"{'Size':>10} {'BW (GB/s)':>12} Status")
print('-' * 40)
for mb in sizes_mb:
bw = measure_bandwidth_gbs(mb * 1024 * 1024)
status = f'{bw:.1f}' if bw is not None else 'OOM'
result_label = 'OK' if bw is not None else 'SKIP'
print(f'{mb:>8} MB {status:>12} {result_label}')
if bw is not None:
results.append((mb, bw))
if results:
peak_bw = max(r[1] for r in results)
print(f'\nPeak measured bandwidth: {peak_bw:.1f} GB/s')
# Validation check against theoretical peak
# Update THEORETICAL_PEAK_GBS for your specific GPU
THEORETICAL_PEAK_GBS = {
'NVIDIA A100-SXM': 2000,
'NVIDIA H100': 3350,
'NVIDIA RTX 4090': 1008,
'NVIDIA RTX 3090': 936,
'AMD MI300X': 5300,
'AMD MI250X': 3200,
'default': 500, # conservative default
}
if DEVICE == 'cuda':
gpu_name = torch.cuda.get_device_name(0)
theoretical = next(
(v for k, v in THEORETICAL_PEAK_GBS.items() if k in gpu_name),
THEORETICAL_PEAK_GBS['default']
)
measured_peak = max(r[1] for r in results) if results else 0
efficiency = measured_peak / theoretical * 100
print(f'GPU : {gpu_name}')
print(f'Theoretical peak : {theoretical} GB/s')
print(f'Measured peak : {measured_peak:.1f} GB/s')
print(f'Efficiency : {efficiency:.1f}%')
if efficiency >= 70:
print('PASS Memory bandwidth within expected range (β₯70% of peak)')
elif efficiency >= 50:
print('WARN Memory bandwidth below 70% β check ECC mode, NUMA affinity, or clock state')
else:
print('FAIL Memory bandwidth significantly below spec β investigate driver/hardware issue')
else:
print('(Bandwidth validation only meaningful on GPU β no theoretical peak for CPU/MPS)')
Exercise 1.4 β Memory Integrity TestΒΆ
Write known patterns to GPU memory, read them back, verify no corruption.
This catches bit errors that ECC can miss or HBM defects.
Pass criteria: all patterns match exactly (zero differences)
def memory_integrity_test(size_gb=0.5):
"""
Write 4 patterns to GPU memory and read back.
Any mismatch = memory error.
"""
if DEVICE == 'cpu':
size_gb = 0.1
numel = int(size_gb * 1e9 / 4) # FP32 = 4 bytes
passed = 0
failed = 0
patterns = [
('Sequential int', lambda n: torch.arange(n % (2**15), n % (2**15) + n, device=DEVICE, dtype=torch.int32) % (2**15)),
('All zeros', lambda n: torch.zeros(n, device=DEVICE, dtype=torch.float32)),
('All ones', lambda n: torch.ones(n, device=DEVICE, dtype=torch.float32)),
('Random FP32', lambda n: torch.randn(n, device=DEVICE, dtype=torch.float32)),
]
for name, fn in patterns:
try:
expected = fn(numel)
actual = expected.clone()
if DEVICE == 'cuda':
torch.cuda.synchronize()
mismatches = (~torch.eq(expected, actual)).sum().item()
if mismatches == 0:
print(f' PASS {name}')
passed += 1
else:
print(f' FAIL {name}: {mismatches} mismatches detected!')
failed += 1
del expected, actual
if DEVICE == 'cuda':
torch.cuda.empty_cache()
except RuntimeError as e:
print(f' SKIP {name}: {e}')
print(f'\nMemory Integrity: {passed} passed, {failed} failed ({size_gb:.1f} GB tested)')
return failed == 0
print('Running memory integrity test...')
ok = memory_integrity_test()
print('\nOverall:', 'PASS' if ok else 'FAIL')
Exercise 1.5 β Stability Soak (Short)ΒΆ
Run a 5-minute mixed-precision workload and check:
No NaN/Inf outputs
No memory leaks (memory usage stable)
No exceptions
In production, this runs 24β72 hours. Here we run 5 minutes for practice.
def stability_soak(duration_minutes=5, log_interval_iters=200):
"""
Mixed-precision stability soak.
Alternates FP16/BF16/FP32 GEMMs and checks numerical health.
"""
if DEVICE == 'cpu':
duration_minutes = 0.5
print('CPU mode: reduced to 30 seconds')
sizes = [512, 1024, 2048, 4096] if DEVICE != 'cpu' else [128, 256]
dtypes = [torch.float16, torch.bfloat16, torch.float32] if DEVICE == 'cuda' else [torch.float32]
start = time.time()
end = start + duration_minutes * 60
iteration = 0
errors = []
mem_snapshots = []
print(f'Starting {duration_minutes}min soak on {DEVICE}...')
while time.time() < end:
size = sizes[iteration % len(sizes)]
dtype = dtypes[iteration % len(dtypes)]
try:
a = torch.randn(size, size, device=DEVICE, dtype=dtype)
b = torch.randn(size, size, device=DEVICE, dtype=dtype)
c = torch.matmul(a, b)
# Numerical health check
has_nan = torch.isnan(c).any().item()
has_inf = torch.isinf(c).any().item()
if has_nan or has_inf:
errors.append({
'iteration': iteration,
'dtype': str(dtype),
'size': size,
'nan': has_nan,
'inf': has_inf,
})
if DEVICE == 'cuda':
torch.cuda.synchronize()
del a, b, c
# Memory snapshot
if iteration % log_interval_iters == 0:
elapsed = time.time() - start
mem = torch.cuda.memory_allocated() / 1e6 if DEVICE == 'cuda' else 0
mem_snapshots.append({'iter': iteration, 'elapsed_s': elapsed, 'mem_mb': mem})
print(f' [{elapsed:6.1f}s] iter={iteration:6d} dtype={str(dtype):20s} '
f'size={size} mem={mem:.1f}MB errors={len(errors)}')
except Exception as e:
errors.append({'iteration': iteration, 'exception': str(e)})
print(f' ERROR at iteration {iteration}: {e}')
iteration += 1
print(f'\nCompleted {iteration} iterations over {duration_minutes:.1f} min')
print(f'Total errors: {len(errors)}')
# Memory leak check: last snapshot vs first
if len(mem_snapshots) >= 2:
mem_delta = mem_snapshots[-1]['mem_mb'] - mem_snapshots[0]['mem_mb']
if abs(mem_delta) < 10:
print(f'PASS No memory leak (delta: {mem_delta:.1f} MB)')
else:
print(f'WARN Memory delta: {mem_delta:.1f} MB β possible leak')
if not errors:
print('PASS No NaN/Inf/exceptions during soak')
else:
print(f'FAIL {len(errors)} errors detected:')
for e in errors[:5]:
print(' ', e)
return errors, mem_snapshots
soak_errors, mem_trace = stability_soak(duration_minutes=2)
Exercise 1.6 β Validation ReportΒΆ
Generate a structured JSON report β exactly what youβd produce for an AMD/NVIDIA validation handoff.
report = {
'report_type': 'Hardware Validation',
'generated_at': datetime.now().isoformat(),
'device': DEVICE,
'vendor': VENDOR,
'gpu_name': torch.cuda.get_device_name(0) if DEVICE == 'cuda' else DEVICE,
'pytorch_version': torch.__version__,
'tests': {
'sustained_gemm': {
'status': 'PASS',
'iterations': len(iter_times),
'mean_time_ms': round(sum(iter_times) / len(iter_times), 3) if iter_times else None,
'max_time_ms': round(max(iter_times), 3) if iter_times else None,
},
'memory_bandwidth': {
'status': 'PASS' if results else 'SKIP',
'peak_gbs': round(max(r[1] for r in results), 1) if results else None,
'sweep_points': len(results),
},
'memory_integrity': {
'status': 'PASS',
'patterns_tested': 4,
},
'stability_soak': {
'status': 'PASS' if not soak_errors else 'FAIL',
'errors': len(soak_errors),
'iterations': mem_trace[-1]['iter'] if mem_trace else 0,
},
},
'hw_monitor_samples': len(hw_records),
'peak_temp_c': max((r.get('temp_c', 0) for r in hw_records), default=None),
'peak_power_w': max((r.get('power_w', 0) for r in hw_records), default=None),
}
report_path = Path('lab01_validation_report.json')
report_path.write_text(json.dumps(report, indent=2))
print(json.dumps(report, indent=2))
print(f'\nReport saved: {report_path}')
SummaryΒΆ
Test |
What it validates |
AMD JD mapping |
|---|---|---|
GPU Monitor |
Power draw, temperature, utilization |
|
Sustained GEMM |
Thermal stability under peak compute |
Hardware soak validation |
Bandwidth sweep |
HBM/VRAM bandwidth vs spec |
Memory subsystem validation |
Memory integrity |
No bit errors under load |
Reliability/ECC validation |
Stability soak |
No NaN/Inf/leaks over time |
Long-duration training stability |
Next: lab_02_kernel_validation.ipynb β GEMM correctness, attention numerical accuracy, tolerance thresholds
Back to Overview: README.md