Lab 08 β€” Industry Benchmarking: Hands-OnΒΆ

Duration: 2-3 hours | Difficulty: Intermediate-Advanced

What You’ll BuildΒΆ

  1. A TTFT & Output Speed measurement tool with OpenAI token normalization

  2. A Mini AA-SLT (System Load Test) with phased concurrency ramp

  3. An SLO-based capacity planner using binary search (AA-AgentPerf style)

  4. A hardware comparison dashboard with per-watt normalization

  5. A mini intelligence eval runner with confidence intervals

All exercises run on CPU/MPS/CUDA with automatic device detection.

# ── Setup ─────────────────────────────────────────────────────────────────────
import dataclasses
import json
import platform
import statistics
import sys
import time
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path

import numpy as np
import torch
import torch.nn as nn

# Device selection
if torch.cuda.is_available():
    DEVICE = torch.device("cuda")
    DEVICE_NAME = torch.cuda.get_device_name(0)
elif torch.backends.mps.is_available():
    DEVICE = torch.device("mps")
    DEVICE_NAME = "Apple MPS"
else:
    DEVICE = torch.device("cpu")
    DEVICE_NAME = f"CPU ({platform.processor() or 'unknown'})"

print(f"Python  : {sys.version.split()[0]}")
print(f"PyTorch : {torch.__version__}")
print(f"Device  : {DEVICE_NAME} ({DEVICE})")

Part 1 β€” TTFT & Output Speed MeasurementΒΆ

We’ll simulate a streaming LLM inference and measure the exact metrics that Artificial Analysis tracks.

# ── 1.1 A simple autoregressive model (simulates LLM token generation) ──────
class TinyLM(nn.Module):
    """Minimal autoregressive model for benchmarking.
    Not a real language model β€” just generates tokens one at a time
    with realistic compute per step.
    """
    def __init__(self, vocab_size: int = 32000, d_model: int = 512,
                 n_heads: int = 8, n_layers: int = 4):
        super().__init__()
        self.embed = nn.Embedding(vocab_size, d_model)
        layer = nn.TransformerEncoderLayer(
            d_model=d_model, nhead=n_heads, dim_feedforward=d_model * 4,
            batch_first=True, dropout=0.0,
        )
        self.transformer = nn.TransformerEncoder(layer, num_layers=n_layers)
        self.lm_head = nn.Linear(d_model, vocab_size, bias=False)
        self.vocab_size = vocab_size

    def forward(self, input_ids: torch.Tensor) -> torch.Tensor:
        x = self.embed(input_ids)
        x = self.transformer(x)
        return self.lm_head(x[:, -1:, :])  # predict next token only

    @torch.no_grad()
    def generate(self, input_ids: torch.Tensor, max_new_tokens: int = 100):
        """Autoregressive generation, yielding (token_id, timestamp) pairs."""
        for _ in range(max_new_tokens):
            logits = self.forward(input_ids)
            next_token = logits.argmax(dim=-1)  # greedy
            input_ids = torch.cat([input_ids, next_token], dim=1)
            yield next_token.item(), time.perf_counter()


model = TinyLM().to(DEVICE).eval()
n_params = sum(p.numel() for p in model.parameters())
print(f"TinyLM: {n_params/1e6:.1f}M parameters on {DEVICE}")
# ── 1.2 Measure TTFT & Output Speed ──────────────────────────────────────────
@dataclasses.dataclass
class InferenceMetrics:
    ttft_ms: float
    output_speed_tps: float
    e2e_latency_ms: float
    total_tokens: int
    total_response_time_100t_ms: float


def measure_inference(model: TinyLM, input_length: int = 64,
                       output_tokens: int = 100) -> InferenceMetrics:
    """Run one inference and measure AA-style metrics."""
    input_ids = torch.randint(0, model.vocab_size, (1, input_length), device=DEVICE)

    # Warmup
    for _, _ in model.generate(input_ids[:, :16], max_new_tokens=5):
        pass

    # Actual measurement
    request_sent = time.perf_counter()
    timestamps = []

    for token_id, ts in model.generate(input_ids, max_new_tokens=output_tokens):
        timestamps.append(ts)

    ttft = (timestamps[0] - request_sent) * 1000  # ms
    if len(timestamps) > 1:
        output_speed = (len(timestamps) - 1) / (timestamps[-1] - timestamps[0])
    else:
        output_speed = 0.0
    e2e = (timestamps[-1] - request_sent) * 1000  # ms

    return InferenceMetrics(
        ttft_ms=ttft,
        output_speed_tps=output_speed,
        e2e_latency_ms=e2e,
        total_tokens=len(timestamps),
        total_response_time_100t_ms=ttft + (100 / max(output_speed, 1e-9)) * 1000,
    )


# Run measurement
print("Running single-query inference measurement...")
m = measure_inference(model, input_length=64, output_tokens=100)
print(f"\n=== Inference Metrics ===")
print(f"  TTFT                    : {m.ttft_ms:>8.1f} ms")
print(f"  Output Speed            : {m.output_speed_tps:>8.1f} tokens/sec")
print(f"  E2E Latency             : {m.e2e_latency_ms:>8.1f} ms")
print(f"  Total tokens generated  : {m.total_tokens:>8}")
print(f"  Response time (100 tok) : {m.total_response_time_100t_ms:>8.1f} ms")
# ── 1.3 OpenAI Token Normalization ───────────────────────────────────────────
openai_enc = None
TOKEN_NORMALIZATION_BACKEND = None

try:
    import tiktoken
except ImportError:
    print("tiktoken not installed; using a rough fallback of ~4 chars/token for this demo")
else:
    last_tiktoken_error = None
    for enc_name in ("o200k_base", "cl100k_base"):
        try:
            openai_enc = tiktoken.get_encoding(enc_name)
            TOKEN_NORMALIZATION_BACKEND = enc_name
            print(f"tiktoken loaded ({enc_name} encoder)")
            break
        except Exception as exc:
            last_tiktoken_error = exc

    if openai_enc is None:
        print("tiktoken is installed, but encoder data could not be downloaded.")
        print(f"Reason: {type(last_tiktoken_error).__name__}: {last_tiktoken_error}")
        print("Falling back to ~4 chars/token so the benchmarking demo can continue.")
        print("Local fix: `python -m pip install -U certifi` and set `SSL_CERT_FILE=$(python -m certifi)` before launching Jupyter.")

if TOKEN_NORMALIZATION_BACKEND is None:
    TOKEN_NORMALIZATION_BACKEND = "char_estimate"

def count_openai_tokens(text):
    if openai_enc is not None:
        return len(openai_enc.encode(text))
    return max(1, (len(text) + 3) // 4)

# Example: compare native tokens vs OpenAI tokens
test_text = (
    "The transformer architecture uses self-attention mechanisms to process "
    "input sequences in parallel, enabling efficient training on large datasets. "
    "Key innovations include multi-head attention, positional encoding, and "
    "layer normalization."
)

openai_tokens = count_openai_tokens(test_text)

# Simulate a model that produces 20% more native tokens for the same text
native_tokens = int(openai_tokens * 1.20)

native_speed = 200  # native tokens/sec (what the model reports)
normalized_speed = native_speed * (openai_tokens / native_tokens)

print(f"\n=== Token Normalization Example ===")
print(f"  Backend             : {TOKEN_NORMALIZATION_BACKEND}")
print(f"  Text length         : {len(test_text)} chars")
print(f"  OpenAI tokens       : {openai_tokens}")
print(f"  Native tokens (sim) : {native_tokens}")
print(f"  Ratio               : {native_tokens/openai_tokens:.2f}x")
print(f"  Native speed        : {native_speed} native t/s")
print(f"  Normalized speed    : {normalized_speed:.0f} OpenAI t/s")
print(f"  Overstated by       : {(native_speed - normalized_speed)/normalized_speed*100:.0f}%")

Part 2 β€” Mini AA-SLT (System Load Test)ΒΆ

We’ll implement the AA-SLT methodology: phased concurrency ramp, immediate query replacement, throughput plateau detection.

# ── 2.1 System Load Test Implementation ──────────────────────────────────────

@dataclasses.dataclass
class SLTResult:
    concurrent_users: int
    system_throughput_tps: float   # total tokens/sec across all users
    median_speed_tps: float        # median per-query speed
    median_e2e_ms: float           # median per-query E2E
    response_rate: float           # fraction of queries completed
    queries_completed: int


def run_single_query(model: TinyLM, input_length: int, output_tokens: int) -> dict:
    """Run one inference query, return timing info."""
    input_ids = torch.randint(0, model.vocab_size, (1, input_length), device=DEVICE)
    t_start = time.perf_counter()
    timestamps = []

    for _, ts in model.generate(input_ids, max_new_tokens=output_tokens):
        timestamps.append(ts)

    if not timestamps:
        return {"success": False}

    ttft = timestamps[0] - t_start
    speed = (len(timestamps) - 1) / (timestamps[-1] - timestamps[0]) if len(timestamps) > 1 else 0
    e2e = timestamps[-1] - t_start

    return {
        "success": True,
        "ttft": ttft,
        "speed": speed,
        "e2e": e2e,
        "tokens": len(timestamps),
    }


def run_slt_phase(model: TinyLM, concurrent_users: int,
                   phase_duration_sec: float = 10.0,
                   input_length: int = 32,
                   output_tokens: int = 50) -> SLTResult:
    """Run one phase of the AA-SLT benchmark.

    Maintains `concurrent_users` queries in-flight. When one finishes,
    immediately start another (like AA's methodology).
    """
    results = []
    total_tokens = 0
    phase_start = time.perf_counter()

    with ThreadPoolExecutor(max_workers=concurrent_users) as executor:
        futures = set()

        # Submit initial batch
        for _ in range(concurrent_users):
            f = executor.submit(run_single_query, model, input_length, output_tokens)
            futures.add(f)

        while time.perf_counter() - phase_start < phase_duration_sec:
            done = set()
            for f in futures:
                if f.done():
                    done.add(f)

            for f in done:
                futures.discard(f)
                r = f.result()
                if r["success"]:
                    results.append(r)
                    total_tokens += r["tokens"]

                # Immediately replace completed query
                if time.perf_counter() - phase_start < phase_duration_sec:
                    new_f = executor.submit(run_single_query, model, input_length, output_tokens)
                    futures.add(new_f)

            time.sleep(0.01)  # avoid busy-wait

    phase_elapsed = time.perf_counter() - phase_start

    if not results:
        return SLTResult(concurrent_users, 0, 0, 0, 0.0, 0)

    speeds = [r["speed"] for r in results]
    e2es = [r["e2e"] * 1000 for r in results]

    return SLTResult(
        concurrent_users=concurrent_users,
        system_throughput_tps=total_tokens / phase_elapsed,
        median_speed_tps=statistics.median(speeds),
        median_e2e_ms=statistics.median(e2es),
        response_rate=len(results) / max(len(results), 1),
        queries_completed=len(results),
    )


print("SLT implementation ready.")
# ── 2.2 Run Mini AA-SLT ──────────────────────────────────────────────────────
# Using shorter phases (10s instead of 3min) and fewer tokens for fast iteration

CONCURRENCY_LEVELS = [1, 2, 4]
PHASE_DURATION = 10  # seconds per phase (AA uses 180s)

print(f"Running Mini AA-SLT on {DEVICE_NAME}")
print(f"Phase duration: {PHASE_DURATION}s | Input: 32 tokens | Output: 50 tokens")
print(f"\n{'Users':>6} {'Sys Throughput':>16} {'Med Speed':>12} {'Med E2E':>10} {'Queries':>9} {'Plateau?':>10}")
print("-" * 67)

slt_results = []
prev_throughput = 0

for n_users in CONCURRENCY_LEVELS:
    result = run_slt_phase(model, n_users, phase_duration_sec=PHASE_DURATION,
                            input_length=32, output_tokens=50)
    slt_results.append(result)

    # Plateau detection: <5% throughput increase
    plateau = ""
    if prev_throughput > 0:
        gain = (result.system_throughput_tps - prev_throughput) / prev_throughput
        if gain < 0.05:
            plateau = "<< plateau"
    prev_throughput = result.system_throughput_tps

    print(f"{result.concurrent_users:>6} "
          f"{result.system_throughput_tps:>13.1f} t/s "
          f"{result.median_speed_tps:>9.1f} t/s "
          f"{result.median_e2e_ms:>7.0f} ms "
          f"{result.queries_completed:>9} "
          f"{plateau:>10}")

print(f"\nPeak system throughput: {max(r.system_throughput_tps for r in slt_results):.1f} t/s")

Part 3 β€” SLO-Based Capacity Planning (AA-AgentPerf Style)ΒΆ

Given an SLO target, use binary search to find the maximum concurrent users your system can support.

# ── 3.1 SLO Definition ───────────────────────────────────────────────────────

@dataclasses.dataclass
class SLO:
    name: str
    min_p25_speed_tps: float   # P25 output speed must be >= this
    max_p95_ttft_s: float      # P95 TTFT must be <= this


# Define tiers based on AA-AgentPerf
SLO_TIERS = [
    SLO("Economy",  min_p25_speed_tps=10,  max_p95_ttft_s=5.0),
    SLO("Standard", min_p25_speed_tps=30,  max_p95_ttft_s=3.0),
    SLO("Premium",  min_p25_speed_tps=80,  max_p95_ttft_s=1.0),
]

print("SLO Tiers:")
for slo in SLO_TIERS:
    print(f"  {slo.name:<12} P25 speed >= {slo.min_p25_speed_tps:>5} t/s, P95 TTFT <= {slo.max_p95_ttft_s} s")
# ── 3.2 Binary Search for Max Users ──────────────────────────────────────────

def check_slo(model: TinyLM, n_users: int, slo: SLO,
               phase_duration: float = 8.0) -> tuple[bool, dict]:
    """Check if system meets SLO at given concurrency."""
    result = run_slt_phase(model, n_users, phase_duration_sec=phase_duration,
                            input_length=32, output_tokens=50)

    # Approximate P25 speed as median * 0.8 (in real bench, compute from distribution)
    p25_speed = result.median_speed_tps * 0.8
    # Approximate P95 TTFT (we use first query's TTFT * scaling)
    approx_p95_ttft = result.median_e2e_ms / 1000 * 0.3  # rough estimate

    passed = (p25_speed >= slo.min_p25_speed_tps and
              approx_p95_ttft <= slo.max_p95_ttft_s)

    return passed, {
        "users": n_users,
        "p25_speed": p25_speed,
        "p95_ttft": approx_p95_ttft,
        "throughput": result.system_throughput_tps,
        "queries": result.queries_completed,
    }


def find_max_users(model: TinyLM, slo: SLO, max_search: int = 8) -> dict:
    """Binary search for max concurrent users meeting SLO."""
    print(f"\n  Searching for max users at {slo.name} SLO...")
    phases = []

    # Exponential ramp
    n = 1
    last_pass = 0
    while n <= max_search:
        passed, info = check_slo(model, n, slo)
        status = "PASS" if passed else "FAIL"
        print(f"    Users={n:>3} | P25 speed={info['p25_speed']:>6.1f} t/s | "
              f"P95 TTFT={info['p95_ttft']:>5.2f}s | {status}")
        phases.append(info)
        if passed:
            last_pass = n
        else:
            break
        n *= 2

    # Binary search between last_pass and n
    lo, hi = last_pass, min(n, max_search)
    while hi - lo > 1:
        mid = (lo + hi) // 2
        passed, info = check_slo(model, mid, slo)
        status = "PASS" if passed else "FAIL"
        print(f"    Users={mid:>3} | P25 speed={info['p25_speed']:>6.1f} t/s | "
              f"P95 TTFT={info['p95_ttft']:>5.2f}s | {status}")
        phases.append(info)
        if passed:
            lo = mid
        else:
            hi = mid

    return {"slo": slo.name, "max_users": lo, "phases": phases}


# Run capacity search for each SLO tier
print(f"=== SLO Capacity Search on {DEVICE_NAME} ===")
capacity_results = []
for slo in SLO_TIERS:
    result = find_max_users(model, slo, max_search=4)
    capacity_results.append(result)
    print(f"  >> {slo.name}: Max {result['max_users']} concurrent users")

print(f"\n{'SLO Tier':<12} {'Max Users':>10}")
print("-" * 24)
for r in capacity_results:
    print(f"{r['slo']:<12} {r['max_users']:>10}")

Part 4 – Hardware Comparison DashboardΒΆ

The hardware comparison dashboard normalizes performance metrics across different GPU backends, enabling apples-to-apples comparison using performance per watt as the efficiency metric. Raw TFLOPS or tokens/sec comparisons are misleading because different GPUs have different TDP (thermal design power) – a GPU that delivers 2x throughput at 3x power is actually less efficient. The dashboard collects throughput benchmarks from each available backend (CUDA, ROCm, MPS, CPU), normalizes by measured power consumption, and presents a unified view that datacenter operators use for TCO (Total Cost of Ownership) analysis. This is the type of competitive analysis that hardware vendors (AMD, NVIDIA, Intel) use internally to identify performance gaps and guide optimization priorities.

# ── 4.1 Hardware Comparison ──────────────────────────────────────────────────

# TDP estimates for normalization
TDP_ESTIMATES = {
    "cpu": 65,       # typical desktop CPU watts
    "mps": 30,       # Apple M-series GPU portion
    "cuda": 350,     # typical datacenter GPU
}

RENTAL_ESTIMATES = {
    "cpu": 0.10,     # $/hr for a CPU instance
    "mps": 0.00,     # local hardware (no rental)
    "cuda": 2.50,    # $/GPU/hr
}


def benchmark_device(model: TinyLM, device: torch.device,
                      n_runs: int = 10) -> dict:
    """Benchmark model on a specific device."""
    model = model.to(device)
    input_ids = torch.randint(0, model.vocab_size, (1, 64), device=device)

    # Warmup
    for _, _ in model.generate(input_ids[:, :16], max_new_tokens=5):
        pass

    metrics_list = []
    for _ in range(n_runs):
        m = measure_inference(model, input_length=64, output_tokens=50)
        metrics_list.append(m)

    speeds = [m.output_speed_tps for m in metrics_list]
    ttfts = [m.ttft_ms for m in metrics_list]

    return {
        "device": str(device),
        "median_speed_tps": statistics.median(speeds),
        "p25_speed_tps": np.percentile(speeds, 25),
        "median_ttft_ms": statistics.median(ttfts),
        "p95_ttft_ms": np.percentile(ttfts, 95),
        "n_runs": n_runs,
    }


# Benchmark on current device
print(f"Benchmarking on {DEVICE_NAME}...")
hw_result = benchmark_device(model, DEVICE, n_runs=10)

device_type = DEVICE.type  # 'cpu', 'mps', or 'cuda'
tdp = TDP_ESTIMATES.get(device_type, 100)
rental = RENTAL_ESTIMATES.get(device_type, 1.0)

print(f"\n=== Hardware Benchmark: {DEVICE_NAME} ===")
print(f"  Median output speed : {hw_result['median_speed_tps']:.1f} t/s")
print(f"  P25 output speed    : {hw_result['p25_speed_tps']:.1f} t/s")
print(f"  Median TTFT         : {hw_result['median_ttft_ms']:.1f} ms")
print(f"  P95 TTFT            : {hw_result['p95_ttft_ms']:.1f} ms")

print(f"\n=== Normalized Metrics ===")
print(f"  TDP estimate        : {tdp} W")
print(f"  Speed per watt      : {hw_result['median_speed_tps'] / tdp:.2f} t/s/W")
if rental > 0:
    print(f"  Rental cost         : ${rental:.2f}/hr")
    print(f"  Tokens per dollar   : {hw_result['median_speed_tps'] * 3600 / rental:,.0f} tokens/$")
# ── 4.2 Generate JSON Report (AA-style) ──────────────────────────────────────

report = {
    "benchmark": "Mini AA-SLT + Capacity Test",
    "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
    "system": {
        "device": DEVICE_NAME,
        "device_type": device_type,
        "pytorch_version": torch.__version__,
        "python_version": sys.version.split()[0],
        "platform": platform.platform(),
    },
    "model": {
        "name": "TinyLM",
        "parameters": n_params,
    },
    "single_query": {
        "median_speed_tps": round(hw_result["median_speed_tps"], 1),
        "p25_speed_tps": round(hw_result["p25_speed_tps"], 1),
        "median_ttft_ms": round(hw_result["median_ttft_ms"], 1),
        "p95_ttft_ms": round(hw_result["p95_ttft_ms"], 1),
    },
    "slt_results": [
        {
            "concurrent_users": r.concurrent_users,
            "system_throughput_tps": round(r.system_throughput_tps, 1),
            "median_speed_tps": round(r.median_speed_tps, 1),
        }
        for r in slt_results
    ],
    "capacity": [
        {"slo_tier": r["slo"], "max_users": r["max_users"]}
        for r in capacity_results
    ],
    "normalized": {
        "tdp_watts": tdp,
        "speed_per_watt": round(hw_result["median_speed_tps"] / tdp, 3),
    },
}

report_path = Path("benchmark_report.json")
report_path.write_text(json.dumps(report, indent=2))
print(f"Report saved to: {report_path}")
print(json.dumps(report, indent=2))

Part 5 β€” Mini Intelligence Eval RunnerΒΆ

Run a simple accuracy-based eval and compute confidence intervals (like AA’s Intelligence Index methodology).

# ── 5.1 Mini Eval Dataset ────────────────────────────────────────────────────
# In practice, you'd use MMLU, GPQA, etc. Here we use a simple Q&A dataset
# to demonstrate the methodology.

MINI_EVAL = [
    {"question": "What is the capital of France?", "answer": "paris",
     "category": "geography"},
    {"question": "What is 2^10?", "answer": "1024",
     "category": "math"},
    {"question": "What element has atomic number 1?", "answer": "hydrogen",
     "category": "science"},
    {"question": "Who wrote 'Hamlet'?", "answer": "shakespeare",
     "category": "literature"},
    {"question": "What is the derivative of x^2?", "answer": "2x",
     "category": "math"},
    {"question": "What planet is closest to the Sun?", "answer": "mercury",
     "category": "science"},
    {"question": "What is the binary representation of 10?", "answer": "1010",
     "category": "cs"},
    {"question": "What sorting algorithm has O(n log n) average case?", "answer": "quicksort",
     "category": "cs"},
    {"question": "What is the integral of 1/x?", "answer": "ln(x)",
     "category": "math"},
    {"question": "What year was the first transistor invented?", "answer": "1947",
     "category": "history"},
]

print(f"Mini eval dataset: {len(MINI_EVAL)} questions")
categories = {}
for q in MINI_EVAL:
    categories[q['category']] = categories.get(q['category'], 0) + 1
for cat, count in sorted(categories.items()):
    print(f"  {cat}: {count} questions")
# ── 5.2 Eval Runner with Confidence Intervals ────────────────────────────────
try:
    from scipy import stats as scipy_stats
except ImportError:
    print("⚠️  scipy is not installed. Install it with:  pip install scipy")
    print("   Falling back to basic numpy approximations (results will be approximate).")

    class _FallbackStats:
        """Minimal stand-in for scipy.stats using numpy only."""

        @staticmethod
        def sem(a):
            """Standard error of the mean."""
            a = np.asarray(a)
            return np.std(a, ddof=1) / np.sqrt(len(a))

        class t:
            @staticmethod
            def interval(confidence, df, loc=0, scale=1):
                """Approximate CI using z=1.96 for 95% (ignores df)."""
                # 1.96 is the z-score for 95% CI; good approximation when df >= 10
                z = 1.96 if confidence == 0.95 else 2.576  # 99% fallback
                return (loc - z * scale, loc + z * scale)

        @staticmethod
        def ttest_ind(a, b):
            """Welch's t-test (approximate)."""
            a, b = np.asarray(a), np.asarray(b)
            na, nb = len(a), len(b)
            mean_a, mean_b = np.mean(a), np.mean(b)
            var_a, var_b = np.var(a, ddof=1), np.var(b, ddof=1)
            se = np.sqrt(var_a / na + var_b / nb)
            t_stat = (mean_a - mean_b) / se if se > 0 else 0.0
            # Approximate two-tailed p-value using large-sample normal
            # (rough but prevents the notebook from crashing)
            p_value = 2 * np.exp(-0.717 * t_stat**2 - 0.416 * t_stat**2 * np.sign(abs(t_stat)))
            p_value = float(np.clip(p_value, 0, 1))
            return t_stat, p_value

    scipy_stats = _FallbackStats()


def simulate_model_eval(dataset: list[dict], accuracy: float = 0.7,
                         noise: float = 0.05) -> list[bool]:
    """Simulate a model answering the eval.

    In practice, you'd send each question to an LLM API and check the answer.
    Here we simulate with a target accuracy + noise.
    """
    results = []
    for q in dataset:
        # Probability of correct answer varies by question
        p_correct = accuracy + np.random.normal(0, noise)
        p_correct = np.clip(p_correct, 0.0, 1.0)
        correct = np.random.random() < p_correct
        results.append(correct)
    return results


def compute_eval_metrics(all_runs: list[list[bool]]) -> dict:
    """Compute accuracy with confidence interval across multiple runs."""
    run_accuracies = [sum(run) / len(run) * 100 for run in all_runs]
    n = len(run_accuracies)
    mean = np.mean(run_accuracies)
    se = scipy_stats.sem(run_accuracies)
    ci = scipy_stats.t.interval(0.95, df=n-1, loc=mean, scale=se)

    return {
        "mean_accuracy": mean,
        "std": np.std(run_accuracies, ddof=1),
        "n_runs": n,
        "ci_95_low": ci[0],
        "ci_95_high": ci[1],
        "ci_width_pct": (ci[1] - ci[0]) / mean * 100 if mean > 0 else float('inf'),
    }


# Simulate two models
np.random.seed(42)
N_RUNS = 15  # AA uses 10+ repeats

models_config = {
    "Model A (frontier)": {"accuracy": 0.85, "noise": 0.05},
    "Model B (challenger)": {"accuracy": 0.78, "noise": 0.07},
}

all_metrics = {}
for model_name, config in models_config.items():
    runs = [simulate_model_eval(MINI_EVAL, **config) for _ in range(N_RUNS)]
    metrics = compute_eval_metrics(runs)
    all_metrics[model_name] = metrics

    print(f"\n{model_name}:")
    print(f"  Mean accuracy : {metrics['mean_accuracy']:.1f}%")
    print(f"  Std dev       : {metrics['std']:.1f}%")
    print(f"  95% CI        : [{metrics['ci_95_low']:.1f}%, {metrics['ci_95_high']:.1f}%]")
    print(f"  CI width      : {metrics['ci_width_pct']:.1f}% of mean")
    print(f"  Meets <1% CI? : {'Yes' if metrics['ci_width_pct'] < 1.0 else f'No (need more runs or larger dataset)'}")
# ── 5.3 Statistical Comparison Between Models ────────────────────────────────

def compare_models(metrics_a: dict, metrics_b: dict,
                    name_a: str, name_b: str) -> dict:
    """Determine if two models are statistically different."""
    # We need the raw run accuracies for the t-test
    # Reconstruct from metrics (in practice, keep the raw data)
    np.random.seed(42)
    runs_a = [simulate_model_eval(MINI_EVAL, accuracy=0.85, noise=0.05) for _ in range(15)]
    runs_b = [simulate_model_eval(MINI_EVAL, accuracy=0.78, noise=0.07) for _ in range(15)]

    accs_a = [sum(r)/len(r)*100 for r in runs_a]
    accs_b = [sum(r)/len(r)*100 for r in runs_b]

    t_stat, p_value = scipy_stats.ttest_ind(accs_a, accs_b)
    # Effect size (Cohen's d)
    pooled_std = np.sqrt((np.var(accs_a, ddof=1) + np.var(accs_b, ddof=1)) / 2)
    cohens_d = (np.mean(accs_a) - np.mean(accs_b)) / pooled_std if pooled_std > 0 else 0

    print(f"\n=== Model Comparison: {name_a} vs {name_b} ===")
    print(f"  {name_a} mean : {np.mean(accs_a):.1f}%")
    print(f"  {name_b} mean : {np.mean(accs_b):.1f}%")
    print(f"  Difference     : {np.mean(accs_a) - np.mean(accs_b):+.1f}%")
    print(f"  t-statistic    : {t_stat:.3f}")
    print(f"  p-value        : {p_value:.4f}")
    print(f"  Cohen's d      : {cohens_d:.2f}")

    effect = "large" if abs(cohens_d) >= 0.8 else "medium" if abs(cohens_d) >= 0.5 else "small"
    sig = "Yes" if p_value < 0.05 else "No"
    print(f"  Significant?   : {sig} (p {'<' if p_value < 0.05 else '>='} 0.05)")
    print(f"  Effect size    : {effect}")

    return {"t_stat": t_stat, "p_value": p_value, "cohens_d": cohens_d}


compare_models(all_metrics["Model A (frontier)"], all_metrics["Model B (challenger)"],
               "Model A", "Model B")

ExercisesΒΆ

  1. Extend the SLT: Add concurrency levels 8 and 16. Does the throughput plateau on your device? Plot system throughput vs concurrent users.

  2. Real TTFT measurement: If you have access to an LLM API (OpenAI, Anthropic, Together), write a function that measures real TTFT using SSE streaming. Compare 3 providers.

  3. Token normalization impact: Generate a 1000-word essay with two different models. Tokenize with both their native tokenizer and tiktoken o200k_base. How much does normalization change the effective speed?

  4. SLO capacity planning: Your company needs to serve 100 concurrent agent users at the β€œStandard” SLO tier. Using your benchmark results, estimate how many of your devices you’d need. Calculate the monthly cost.

  5. Confidence interval convergence: Run the mini eval with 5, 10, 20, and 50 repeats. Plot CI width vs number of runs. How many runs does it take to get <1% CI width?

  6. Multi-device comparison: If you have access to both CPU and GPU, benchmark on both. Compute the speedup and cost efficiency (tokens per dollar per hour).

Key TakeawaysΒΆ

  • TTFT and output speed are measured the same way everywhere β€” understand the formulas

  • System Load Testing reveals how hardware scales under concurrent load β€” throughput plateaus expose hardware limits

  • SLO-based capacity planning via binary search is how real deployments are sized

  • Token normalization is essential for fair comparison β€” always normalize to a common tokenizer

  • Confidence intervals tell you whether benchmark differences are real or noise β€” run evals multiple times

  • Your benchmark report JSON should be machine-readable for regression tracking

Previous: lab_07_gpgpu_backends.ipynb
Back to Overview: README.md