Building an A/B Testing PlatformΒΆ

Running one A/B test is easy. Running 50 tests simultaneously without mistakes is a system design problem. This notebook builds a production-grade experimentation platform from scratch β€” covering assignment, analysis, guardrails, and the subtle pitfalls that invalidate results.

1. SetupΒΆ

import numpy as np
import pandas as pd
from scipy import stats
import matplotlib.pyplot as plt
import seaborn as sns
import hashlib
from datetime import date, datetime, timedelta
from dataclasses import dataclass, field
from typing import List, Dict, Optional

np.random.seed(42)
sns.set_theme(style='whitegrid', palette='muted')
plt.rcParams['figure.dpi'] = 100

print("Experimentation platform: initializing.")

2. The Problems with Naive A/B TestingΒΆ

Most A/B testing failures are not statistical β€” they are procedural. Four common mistakes that silently invalidate results:

Mistake

Root cause

Effect

Peeking

Stopping when p < 0.05

Type I error >> Ξ±

Multiple testing

20 metrics, no correction

64% false positive rate

SUTVA violation

Network effects between users

Biased effect estimates

SRM

Bug in assignment code

All results untrustworthy

# ── Mistake 1: Peeking Problem ───────────────────────────────────────────────
# Simulate 1000 A/A tests. Each day, check if p < 0.05 and stop if so.
# Under H0 (no real difference), the false positive rate should be 5%.

def simulate_peeking(n_simulations=1000, max_days=30, daily_users=100, alpha=0.05):
    """Run A/A tests with daily peeking. Count how often we falsely 'win'."""
    false_positives = 0
    for _ in range(n_simulations):
        control_data = []
        treatment_data = []
        for day in range(max_days):
            # Both groups drawn from same distribution β€” A/A test
            control_data.extend(np.random.binomial(1, 0.10, daily_users))
            treatment_data.extend(np.random.binomial(1, 0.10, daily_users))
            if len(control_data) >= 20:  # need minimum data
                _, p = stats.ttest_ind(control_data, treatment_data)
                if p < alpha:
                    false_positives += 1
                    break
    return false_positives / n_simulations

# Compare: fixed sample vs peeking
print("Simulating 1000 A/A tests...")
peeking_fpr = simulate_peeking(n_simulations=1000)

# Fixed sample test (no peeking) β€” just test once at day 30
fixed_fp = sum(
    stats.ttest_ind(
        np.random.binomial(1, 0.10, 3000),
        np.random.binomial(1, 0.10, 3000)
    ).pvalue < 0.05
    for _ in range(1000)
) / 1000

print(f"\n{'Method':<30} {'False Positive Rate':>20} {'Expected':>10}")
print("-" * 62)
print(f"{'Fixed sample (no peeking)':<30} {fixed_fp:>19.1%} {'~5%':>10}")
print(f"{'Daily peeking, stop at p<0.05':<30} {peeking_fpr:>19.1%} {'~5%':>10}")
print(f"\n⚠ Peeking inflates Type I error by {peeking_fpr/0.05:.1f}x")
# ── Mistake 2: Multiple Testing ──────────────────────────────────────────────
# With 20 independent metrics at Ξ±=0.05, P(at least one false positive)
# = 1 - (1-0.05)^20 = 64%

n_metrics_range = range(1, 51)
fwer = [1 - (1 - 0.05)**k for k in n_metrics_range]

# Simulate: run an A/A test, test 20 metrics, count false positives
n_sims = 2000
n_metrics = 20
any_false_positive = 0

for _ in range(n_sims):
    control = np.random.randn(500, n_metrics)
    treatment = np.random.randn(500, n_metrics)  # same distribution!
    p_values = [stats.ttest_ind(control[:, i], treatment[:, i]).pvalue
                for i in range(n_metrics)]
    if any(p < 0.05 for p in p_values):
        any_false_positive += 1

simulated_fwer = any_false_positive / n_sims
theoretical_fwer = 1 - (1 - 0.05)**n_metrics

fig, ax = plt.subplots(figsize=(8, 4))
ax.plot(list(n_metrics_range), fwer, linewidth=2, color='crimson')
ax.axhline(0.05, linestyle='--', color='green', label='Nominal Ξ±=0.05')
ax.axhline(0.64, linestyle=':', color='orange', label='64% at 20 metrics')
ax.axvline(20, linestyle=':', color='gray', alpha=0.5)
ax.set_xlabel('Number of Metrics Tested')
ax.set_ylabel('P(β‰₯1 False Positive)')
ax.set_title('Family-Wise Error Rate vs Number of Metrics')
ax.legend()
ax.set_ylim(0, 1)
plt.tight_layout()
plt.show()

print(f"A/A test, 20 metrics, no correction:")
print(f"  Theoretical FWER: {theoretical_fwer:.1%}")
print(f"  Simulated  FWER: {simulated_fwer:.1%}")
print(f"  β†’ Fix: Bonferroni (strict) or Benjamini-Hochberg FDR (lenient)")
# ── Mistake 3: SUTVA Violation (Network Effects) ─────────────────────────────
# Stable Unit Treatment Value Assumption: each user's outcome depends only on
# their own assignment, not others'. Violated in social/marketplace settings.

print("SUTVA Violation β€” Illustrative Simulation")
print("=" * 50)

# Simulate a referral feature: treatment users refer friends.
# If friends end up in control, they still get partial benefit β†’ biased estimate.

np.random.seed(0)
n_users = 2000
n_per_group = n_users // 2

# True individual effect of treatment: +0.05 conversion rate
base_rate = 0.10
true_effect = 0.05
spillover = 0.02  # control users who were "referred" by treatment users

control_outcomes_no_spillover = np.random.binomial(1, base_rate, n_per_group)
control_outcomes_with_spillover = np.random.binomial(
    1, base_rate + spillover, n_per_group  # contaminated by referrals
)
treatment_outcomes = np.random.binomial(1, base_rate + true_effect, n_per_group)

observed_effect = treatment_outcomes.mean() - control_outcomes_with_spillover.mean()
true_meas_effect = treatment_outcomes.mean() - control_outcomes_no_spillover.mean()

print(f"True treatment effect:      {true_effect:.1%}")
print(f"Measured (no spillover):    {true_meas_effect:.1%}")
print(f"Measured (with spillover):  {observed_effect:.1%}  ← underestimates true effect")
print()
print("Fixes:")
print("  1. Cluster randomization: randomize by social network clusters")
print("  2. Ego-network experiments: only include users with no treated neighbors")
print("  3. Switchback experiments: alternate treatment by time period")
# ── Mistake 4: Sample Ratio Mismatch (SRM) ───────────────────────────────────
# Expected 50/50 split but got 48/52 β†’ assignment bug. All results invalid.

def check_srm(n_control: int, n_treatment: int,
              expected_ratio: float = 0.5,
              alpha: float = 0.01) -> dict:
    """Chi-square test for Sample Ratio Mismatch."""
    total = n_control + n_treatment
    expected_control = total * expected_ratio
    expected_treatment = total * (1 - expected_ratio)
    chi2, p_value = stats.chisquare(
        [n_control, n_treatment],
        f_exp=[expected_control, expected_treatment]
    )
    srm_detected = p_value < alpha
    actual_ratio = n_control / total
    return {
        'n_control': n_control,
        'n_treatment': n_treatment,
        'expected_ratio': expected_ratio,
        'actual_ratio': actual_ratio,
        'chi2': chi2,
        'p_value': p_value,
        'srm_detected': srm_detected,
        'relative_imbalance': abs(actual_ratio - expected_ratio) / expected_ratio
    }

scenarios = [
    ('Perfect 50/50',       5000,  5000),
    ('Slight imbalance 49/51', 4900, 5100),
    ('SRM: 48/52',          4800,  5200),
    ('Severe SRM: 45/55',   4500,  5500),
]

print(f"{'Scenario':<30} {'Control':>8} {'Treatment':>10} {'Actual%':>8} {'p-value':>10} {'SRM?':>6}")
print("-" * 76)
for name, nc, nt in scenarios:
    r = check_srm(nc, nt)
    flag = 'YES ⚠' if r['srm_detected'] else 'No'
    print(f"{name:<30} {nc:>8,} {nt:>10,} {r['actual_ratio']:>7.1%} {r['p_value']:>10.4f} {flag:>6}")

print()
print("Rule: if SRM detected, STOP analysis. Investigate assignment pipeline first.")

3. Experiment Assignment EngineΒΆ

Requirements for a production assigner:

  • Deterministic: same user always gets same variant (no DB needed)

  • Uniform: hash distributes evenly across buckets

  • Isolated: different experiments use different salts to avoid correlation

  • Flexible: supports traffic ramp-up, holdouts, exclusion lists

class ExperimentAssignment:
    """
    Deterministic assignment: same user always gets same variant.
    Uses hash(experiment_id + user_id) % 100 for bucketing.
    Supports holdout groups, ramp-up percentages, exclusions.
    """

    def __init__(
        self,
        experiment_id: str,
        variants: Dict[str, float],  # {'control': 0.5, 'treatment': 0.5}
        traffic_pct: float = 1.0,    # fraction of users eligible
        salt: str = "",
        excluded_users: Optional[set] = None,
    ):
        assert abs(sum(variants.values()) - 1.0) < 1e-9, "Variant weights must sum to 1"
        assert 0 < traffic_pct <= 1.0, "traffic_pct must be in (0, 1]"
        self.experiment_id = experiment_id
        self.variants = variants
        self.traffic_pct = traffic_pct
        self.salt = salt
        self.excluded_users = excluded_users or set()

        # Build cumulative bucket boundaries [0, 100)
        self._boundaries = []
        cumulative = 0
        for variant, weight in variants.items():
            cumulative += weight * 100
            self._boundaries.append((cumulative, variant))

    def _hash_user(self, user_id: str) -> int:
        """Map user_id to a stable integer in [0, 100)."""
        key = f"{self.salt}{self.experiment_id}:{user_id}"
        digest = hashlib.md5(key.encode()).hexdigest()
        return int(digest[:8], 16) % 100

    def assign(self, user_id: str) -> Optional[str]:
        """
        Hash-based: stable across calls, no DB needed.
        Returns None if user is excluded or outside traffic allocation.
        """
        if user_id in self.excluded_users:
            return None

        bucket = self._hash_user(user_id)

        # Traffic ramp-up: only include users in first traffic_pct of buckets
        if bucket >= self.traffic_pct * 100:
            return None

        # Rescale bucket to [0, 100) within the eligible range
        scaled_bucket = bucket / self.traffic_pct

        for boundary, variant in self._boundaries:
            if scaled_bucket < boundary:
                return variant

        return self._boundaries[-1][1]  # fallback to last variant

    def get_assignment_stats(self, user_ids: List[str]) -> dict:
        """Verify actual split matches expected. Includes SRM check."""
        assignments = {v: 0 for v in self.variants}
        assignments['holdout'] = 0
        for uid in user_ids:
            v = self.assign(uid)
            if v is None:
                assignments['holdout'] += 1
            else:
                assignments[v] += 1

        eligible = len(user_ids) - assignments['holdout']
        fractions = {
            v: assignments[v] / eligible if eligible > 0 else 0
            for v in self.variants
        }

        # SRM check among assigned users
        observed = [assignments[v] for v in self.variants]
        expected_weights = list(self.variants.values())
        expected = [w * eligible for w in expected_weights]
        chi2, p_srm = stats.chisquare(observed, f_exp=expected)

        return {
            'counts': assignments,
            'fractions': fractions,
            'expected_fractions': dict(self.variants),
            'total_users': len(user_ids),
            'eligible_users': eligible,
            'srm_chi2': chi2,
            'srm_p_value': p_srm,
            'srm_detected': p_srm < 0.01,
        }


# ── Verify the assigner ───────────────────────────────────────────────────────
assigner = ExperimentAssignment(
    experiment_id='exp_001',
    variants={'control': 0.5, 'treatment': 0.5},
    traffic_pct=1.0,
    salt='v1',
)

user_ids = [f"user_{i}" for i in range(10_000)]
stats_result = assigner.get_assignment_stats(user_ids)

print("Assignment stats for 10,000 users:")
print(f"  Control:   {stats_result['counts']['control']:,} ({stats_result['fractions']['control']:.2%})")
print(f"  Treatment: {stats_result['counts']['treatment']:,} ({stats_result['fractions']['treatment']:.2%})")
print(f"  SRM p-value: {stats_result['srm_p_value']:.4f}  |  SRM detected: {stats_result['srm_detected']}")

# Verify consistency: same user always gets same assignment
sample_user = 'user_42'
assignments_repeated = [assigner.assign(sample_user) for _ in range(100)]
print(f"\nConsistency check β€” user_42 assigned 100 times: {set(assignments_repeated)} (should be one value)")

# Ramp-up test
ramp_assigner = ExperimentAssignment(
    experiment_id='exp_002',
    variants={'control': 0.5, 'treatment': 0.5},
    traffic_pct=0.10,  # only 10% of users
)
ramp_stats = ramp_assigner.get_assignment_stats(user_ids)
print(f"\nRamp-up (10% traffic): {ramp_stats['eligible_users']:,} eligible out of 10,000 (expected ~1,000)")

4. Multi-Armed Bandit β€” Thompson SamplingΒΆ

Fixed A/B testing wastes traffic on the loser for the duration of the experiment. Thompson Sampling adaptively routes more traffic to the better-performing variant β€” minimizing regret (lost conversions due to showing a suboptimal variant).

Beta-Bernoulli model: for binary outcomes (conversion / no conversion), maintain Beta(Ξ±, Ξ²) posterior for each arm, where Ξ± = successes + 1, Ξ² = failures + 1.

class ThompsonSamplingBandit:
    """Adaptively allocate more traffic to better-performing variants."""

    def __init__(self, variants: List[str]):
        self.variants = variants
        self.alpha = {v: 1 for v in variants}  # Prior: Beta(1,1) = Uniform
        self.beta  = {v: 1 for v in variants}

    def select_arm(self) -> str:
        """Sample from Beta(alpha, beta) for each variant, pick max."""
        samples = {
            v: np.random.beta(self.alpha[v], self.beta[v])
            for v in self.variants
        }
        return max(samples, key=samples.get)

    def update(self, variant: str, reward: int):
        """reward = 1 (conversion) or 0 (no conversion)."""
        if reward == 1:
            self.alpha[variant] += 1
        else:
            self.beta[variant] += 1

    def get_stats(self) -> dict:
        n = {v: self.alpha[v] + self.beta[v] - 2 for v in self.variants}
        rate = {v: (self.alpha[v] - 1) / max(n[v], 1) for v in self.variants}
        return {'n_shown': n, 'conversion_rate': rate}


# ── Simulation: Fixed A/B vs Thompson Sampling ────────────────────────────────
np.random.seed(7)

TRUE_RATES = {'control': 0.10, 'treatment': 0.15}  # treatment is 50% better
N_USERS = 10_000
BEST_RATE = max(TRUE_RATES.values())

# Fixed A/B: strict 50/50 split
fixed_regret = []
fixed_assignments = {'control': 0, 'treatment': 0}
cumulative_fixed_regret = 0
for i in range(N_USERS):
    variant = 'control' if i % 2 == 0 else 'treatment'
    fixed_assignments[variant] += 1
    reward = np.random.binomial(1, TRUE_RATES[variant])
    cumulative_fixed_regret += BEST_RATE - TRUE_RATES[variant]
    fixed_regret.append(cumulative_fixed_regret)

# Thompson Sampling
bandit = ThompsonSamplingBandit(['control', 'treatment'])
ts_regret = []
ts_assignments = {'control': 0, 'treatment': 0}
cumulative_ts_regret = 0
for i in range(N_USERS):
    variant = bandit.select_arm()
    ts_assignments[variant] += 1
    reward = np.random.binomial(1, TRUE_RATES[variant])
    bandit.update(variant, reward)
    cumulative_ts_regret += BEST_RATE - TRUE_RATES[variant]
    ts_regret.append(cumulative_ts_regret)

# Plot
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(14, 5))

ax1.plot(fixed_regret, label='Fixed A/B (50/50)', color='steelblue', linewidth=1.5)
ax1.plot(ts_regret, label='Thompson Sampling', color='darkorange', linewidth=1.5)
ax1.set_xlabel('Users Served')
ax1.set_ylabel('Cumulative Regret (missed conversions)')
ax1.set_title('Cumulative Regret: Fixed A/B vs Thompson Sampling')
ax1.legend()

# Traffic allocation
methods = ['Fixed A/B', 'Thompson Sampling']
control_pct = [fixed_assignments['control']/N_USERS,
               ts_assignments['control']/N_USERS]
treatment_pct = [fixed_assignments['treatment']/N_USERS,
                 ts_assignments['treatment']/N_USERS]
x = np.arange(len(methods))
ax2.bar(x - 0.2, control_pct, 0.35, label='Control (worse)', color='steelblue')
ax2.bar(x + 0.2, treatment_pct, 0.35, label='Treatment (better)', color='darkorange')
ax2.set_xticks(x)
ax2.set_xticklabels(methods)
ax2.set_ylabel('Fraction of Traffic')
ax2.set_title('Traffic Allocation')
ax2.legend()
ax2.set_ylim(0, 1)
plt.tight_layout()
plt.show()

print(f"\nFixed A/B    β€” Control: {fixed_assignments['control']:,}  Treatment: {fixed_assignments['treatment']:,}")
print(f"Thompson     β€” Control: {ts_assignments['control']:,}  Treatment: {ts_assignments['treatment']:,}")
print(f"\nTotal regret β€” Fixed A/B: {fixed_regret[-1]:.0f}  |  Thompson: {ts_regret[-1]:.0f}")
print(f"Regret reduction: {(fixed_regret[-1] - ts_regret[-1]) / fixed_regret[-1]:.1%}")

5. Sequential Testing / Always-Valid P-ValuesΒΆ

The peeking problem exists because classical t-test p-values are only valid at a pre-specified sample size. The mixture Sequential Probability Ratio Test (mSPRT) provides p-values that are valid at any stopping time β€” you can peek without inflation.

Key idea: instead of testing a point null, mix over a prior on the effect size. The resulting β€œe-value” (evidence value) maintains its Type I error guarantee regardless of when you stop.

def always_valid_pvalue(
    control_conversions: int,
    treat_conversions: int,
    n_control: int,
    n_treat: int,
    tau_sq: float = 1.0,  # variance of the mixing prior
) -> float:
    """
    Always-valid p-value using the mixture Sequential Probability Ratio Test.
    Based on Johari et al. (2017) 'Peeking at A/B Tests'.

    Valid at any stopping time: P(ever reject H0 | H0 true) ≀ Ξ±.
    """
    if n_control == 0 or n_treat == 0:
        return 1.0

    p_c = control_conversions / n_control
    p_t = treat_conversions / n_treat
    n = n_control + n_treat

    # Pooled proportion under H0
    p_pool = (control_conversions + treat_conversions) / n

    if p_pool <= 0 or p_pool >= 1:
        return 1.0

    # Effect estimate (difference in proportions)
    delta = p_t - p_c

    # Variance of delta under H0
    var_h0 = p_pool * (1 - p_pool) * (1/n_control + 1/n_treat)
    if var_h0 <= 0:
        return 1.0

    # mSPRT statistic: log Bayes Factor under Normal-Normal conjugate model
    # tau_sq is the prior variance on delta
    sigma_sq = var_h0
    log_bf = 0.5 * np.log(sigma_sq / (sigma_sq + tau_sq)) + \
             0.5 * (delta**2 / sigma_sq) * (tau_sq / (sigma_sq + tau_sq))

    # Convert Bayes Factor to always-valid p-value
    bf = np.exp(log_bf)
    # e-value: BF; p-value ≀ 1/BF by Ville's inequality
    av_pvalue = min(1.0, 1.0 / max(bf, 1e-10))
    return av_pvalue


# ── Demonstration: classical vs always-valid p-values over time ───────────────
np.random.seed(42)

N_STEPS = 200
DAILY = 50  # users per day per group
TRUE_RATE = 0.10  # A/A test β€” no difference

classical_pvals = []
av_pvals = []
ctrl_conv = treat_conv = n_c = n_t = 0

for _ in range(N_STEPS):
    ctrl_today = np.random.binomial(DAILY, TRUE_RATE)
    treat_today = np.random.binomial(DAILY, TRUE_RATE)
    ctrl_conv += ctrl_today
    treat_conv += treat_today
    n_c += DAILY
    n_t += DAILY

    # Classical t-test
    ctrl_arr = np.array([1]*ctrl_conv + [0]*(n_c - ctrl_conv))
    treat_arr = np.array([1]*treat_conv + [0]*(n_t - treat_conv))
    _, p_classic = stats.ttest_ind(ctrl_arr, treat_arr)
    classical_pvals.append(p_classic)

    # Always-valid p-value
    p_av = always_valid_pvalue(ctrl_conv, treat_conv, n_c, n_t)
    av_pvals.append(p_av)

steps = np.arange(1, N_STEPS + 1) * DAILY

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 8), sharex=True)

ax1.plot(steps, classical_pvals, color='crimson', linewidth=1.2, label='Classical t-test p-value')
ax1.axhline(0.05, color='black', linestyle='--', linewidth=1, label='Ξ± = 0.05')
crossings = sum(p < 0.05 for p in classical_pvals)
ax1.fill_between(steps, 0, 0.05, alpha=0.08, color='crimson')
ax1.set_ylabel('p-value')
ax1.set_title(f'Classical p-value (A/A test) β€” crosses Ξ±={0.05} {crossings} times out of {N_STEPS}')
ax1.legend()
ax1.set_ylim(0, 1)

ax2.plot(steps, av_pvals, color='steelblue', linewidth=1.2, label='Always-valid p-value (mSPRT)')
ax2.axhline(0.05, color='black', linestyle='--', linewidth=1, label='Ξ± = 0.05')
av_crossings = sum(p < 0.05 for p in av_pvals)
ax2.fill_between(steps, 0, 0.05, alpha=0.08, color='steelblue')
ax2.set_ylabel('p-value')
ax2.set_xlabel('Total users observed')
ax2.set_title(f'Always-valid p-value (A/A test) β€” crosses Ξ±={0.05} {av_crossings} times out of {N_STEPS}')
ax2.legend()
ax2.set_ylim(0, 1)

plt.tight_layout()
plt.show()

print(f"Classical t-test: falsely 'significant' on {crossings}/{N_STEPS} = {crossings/N_STEPS:.1%} of days")
print(f"Always-valid:    falsely 'significant' on {av_crossings}/{N_STEPS} = {av_crossings/N_STEPS:.1%} of days")

6. Experiment Analysis PipelineΒΆ

class ExperimentAnalyzer:
    """Full analysis pipeline: SRM check β†’ metric analysis β†’ multiple testing correction β†’ report."""

    def __init__(
        self,
        data: pd.DataFrame,
        experiment_id: str,
        control: str = 'control',
        alpha: float = 0.05,
    ):
        assert 'variant' in data.columns, "DataFrame must have a 'variant' column"
        self.data = data.copy()
        self.experiment_id = experiment_id
        self.control = control
        self.alpha = alpha
        self.variants = [v for v in data['variant'].unique() if v != control]
        self._srm_result = None

    def check_srm(self) -> dict:
        """Chi-square test on assignment counts."""
        counts = self.data['variant'].value_counts()
        total = counts.sum()
        n_groups = len(counts)
        expected_count = total / n_groups
        chi2, p_value = stats.chisquare(counts.values)
        self._srm_result = {
            'counts': counts.to_dict(),
            'chi2': chi2,
            'p_value': p_value,
            'srm_detected': p_value < 0.01,
            'expected_per_group': expected_count,
        }
        return self._srm_result

    def analyze_metric(
        self,
        metric_col: str,
        variant: str = None,
        method: str = 'frequentist',
        is_binary: bool = False,
    ) -> dict:
        """Returns: effect_size, ci_lower, ci_upper, p_value, significant."""
        if variant is None:
            variant = self.variants[0] if self.variants else None

        ctrl = self.data.loc[self.data['variant'] == self.control, metric_col].dropna()
        trt  = self.data.loc[self.data['variant'] == variant,  metric_col].dropna()

        effect_size = trt.mean() - ctrl.mean()
        relative_effect = effect_size / ctrl.mean() if ctrl.mean() != 0 else np.nan

        if is_binary:
            n_c, x_c = len(ctrl), ctrl.sum()
            n_t, x_t = len(trt), trt.sum()
            p_c, p_t = x_c/n_c, x_t/n_t
            se = np.sqrt(p_c*(1-p_c)/n_c + p_t*(1-p_t)/n_t)
            z = stats.norm.ppf(1 - self.alpha/2)
            ci_lower = effect_size - z * se
            ci_upper = effect_size + z * se
            _, p_value = stats.proportions_ztest([x_t, x_c], [n_t, n_c])
        else:
            t_stat, p_value = stats.ttest_ind(trt, ctrl)
            se = np.sqrt(trt.var()/len(trt) + ctrl.var()/len(ctrl))
            df = len(trt) + len(ctrl) - 2
            t_crit = stats.t.ppf(1 - self.alpha/2, df)
            ci_lower = effect_size - t_crit * se
            ci_upper = effect_size + t_crit * se

        return {
            'metric': metric_col,
            'variant': variant,
            'control_mean': ctrl.mean(),
            'treatment_mean': trt.mean(),
            'effect_size': effect_size,
            'relative_effect': relative_effect,
            'ci_lower': ci_lower,
            'ci_upper': ci_upper,
            'p_value': p_value,
            'significant': p_value < self.alpha,
            'n_control': len(ctrl),
            'n_treatment': len(trt),
        }

    def analyze_all_metrics(
        self,
        metric_cols: List[str],
        is_binary_map: Dict[str, bool] = None,
    ) -> pd.DataFrame:
        """With multiple testing correction (Benjamini-Hochberg FDR)."""
        is_binary_map = is_binary_map or {}
        results = []
        for metric in metric_cols:
            for variant in self.variants:
                r = self.analyze_metric(
                    metric, variant,
                    is_binary=is_binary_map.get(metric, False)
                )
                results.append(r)

        df = pd.DataFrame(results)

        # Benjamini-Hochberg FDR correction
        n = len(df)
        df_sorted = df.sort_values('p_value').reset_index(drop=True)
        df_sorted['bh_rank'] = np.arange(1, n+1)
        df_sorted['bh_threshold'] = df_sorted['bh_rank'] / n * self.alpha
        df_sorted['bh_significant'] = df_sorted['p_value'] <= df_sorted['bh_threshold']

        # Propagate: all ranks up to max significant rank are significant
        if df_sorted['bh_significant'].any():
            max_sig_rank = df_sorted.loc[df_sorted['bh_significant'], 'bh_rank'].max()
            df_sorted['bh_significant'] = df_sorted['bh_rank'] <= max_sig_rank

        return df_sorted.drop(columns=['bh_rank', 'bh_threshold'])

    def segment_analysis(
        self,
        segment_col: str,
        metric_col: str,
    ) -> pd.DataFrame:
        """Break down effect by user segment."""
        segments = self.data[segment_col].unique()
        rows = []
        for seg in segments:
            seg_data = self.data[self.data[segment_col] == seg]
            analyzer = ExperimentAnalyzer(
                seg_data, self.experiment_id,
                control=self.control, alpha=self.alpha
            )
            for variant in self.variants:
                r = analyzer.analyze_metric(metric_col, variant)
                r['segment'] = seg
                rows.append(r)
        return pd.DataFrame(rows)[[
            'segment', 'variant', 'control_mean', 'treatment_mean',
            'effect_size', 'relative_effect', 'p_value', 'significant',
            'n_control', 'n_treatment'
        ]]

    def generate_report(self, metric_cols: List[str] = None,
                        guardrail_metrics: List[str] = None,
                        primary_metric: str = None) -> str:
        """Human-readable summary."""
        lines = []
        lines.append(f"{'=' * 60}")
        lines.append(f"  Experiment Report: {self.experiment_id}")
        lines.append(f"{'=' * 60}")
        lines.append(f"  Total users: {len(self.data):,}")
        for v in [self.control] + self.variants:
            n = (self.data['variant'] == v).sum()
            lines.append(f"    {v}: {n:,}")

        srm = self.check_srm()
        lines.append(f"\n  SRM Check: {'FAIL β€” results unreliable' if srm['srm_detected'] else 'PASS'}")
        lines.append(f"    chi2={srm['chi2']:.2f}, p={srm['p_value']:.4f}")

        if metric_cols:
            guardrail_metrics = guardrail_metrics or []
            lines.append(f"\n  Metric Results (Ξ±={self.alpha}):")
            lines.append(f"  {'Metric':<25} {'Control':>10} {'Treatment':>10} {'Lift':>8} {'p-value':>9} {'Sig':>5} {'Type':>10}")
            lines.append("  " + "-" * 82)
            for metric in metric_cols:
                r = self.analyze_metric(metric, self.variants[0])
                sig_str = 'YES' if r['significant'] else 'no'
                metric_type = 'GUARDRAIL' if metric in guardrail_metrics else \
                              ('PRIMARY' if metric == primary_metric else 'secondary')
                flag = ' *** FAIL' if (metric in guardrail_metrics and r['significant'] and r['effect_size'] > 0) else ''
                lines.append(
                    f"  {metric:<25} {r['control_mean']:>10.4f} {r['treatment_mean']:>10.4f} "
                    f"{r['relative_effect']:>7.1%} {r['p_value']:>9.4f} {sig_str:>5} {metric_type:>10}{flag}"
                )

        lines.append(f"\n  Recommendation: {'HALT β€” investigate SRM first' if srm['srm_detected'] else 'See metric results above.'}")
        lines.append(f"{'=' * 60}")
        return "\n".join(lines)


print("ExperimentAnalyzer defined.")

7. Simulate a Realistic ExperimentΒΆ

Mock e-commerce experiment: does the new ML-based recommendation algorithm improve purchase funnel metrics? One metric must fail its guardrail (page load time increased too much).

np.random.seed(2024)

N_PER_VARIANT = 5000
EXPERIMENT_DAYS = 14

# Experiment parameters
BASE_CLICK_RATE     = 0.25   # control click-through rate
BASE_CART_RATE      = 0.10   # add-to-cart rate
BASE_REVENUE        = 8.50   # revenue per user ($)
BASE_PAGE_LOAD_MS   = 320    # page load time (ms)

# True effects of new ML algorithm
CLICK_LIFT          = 0.08   # +8% relative
CART_LIFT           = 0.04   # +4% relative
REVENUE_LIFT        = 0.05   # +5% relative
PAGE_LOAD_INCREASE  = 20     # +20ms (guardrail threshold: <10ms)

def simulate_user(variant: str, user_id: int) -> dict:
    is_treatment = variant == 'treatment'
    # Device segments
    device = np.random.choice(['mobile', 'desktop', 'tablet'], p=[0.55, 0.35, 0.10])
    # New users vs returning
    user_type = np.random.choice(['new', 'returning'], p=[0.40, 0.60])

    click_rate   = BASE_CLICK_RATE * (1 + CLICK_LIFT * is_treatment)
    cart_rate    = BASE_CART_RATE  * (1 + CART_LIFT  * is_treatment)
    revenue_mu   = BASE_REVENUE    * (1 + REVENUE_LIFT * is_treatment)
    page_load_mu = BASE_PAGE_LOAD_MS + (PAGE_LOAD_INCREASE * is_treatment)

    return {
        'user_id': f'u{user_id}',
        'variant': variant,
        'device': device,
        'user_type': user_type,
        'click_rate': int(np.random.binomial(1, click_rate)),
        'add_to_cart_rate': int(np.random.binomial(1, cart_rate)),
        'revenue_per_user': max(0, np.random.normal(revenue_mu, 15.0)),
        'page_load_ms': max(50, np.random.normal(page_load_mu, 40)),
        'days_since_signup': np.random.exponential(90),
    }

control_users   = [simulate_user('control', i) for i in range(N_PER_VARIANT)]
treatment_users = [simulate_user('treatment', i + N_PER_VARIANT) for i in range(N_PER_VARIANT)]
exp_data = pd.DataFrame(control_users + treatment_users)

print(f"Simulated experiment dataset: {len(exp_data):,} users")
print(exp_data.groupby('variant')[['click_rate', 'add_to_cart_rate', 'revenue_per_user', 'page_load_ms']].mean().round(4))
# ── Run full analysis ─────────────────────────────────────────────────────────
analyzer = ExperimentAnalyzer(
    exp_data,
    experiment_id='exp_ml_reco_v2',
    control='control',
    alpha=0.05,
)

METRIC_COLS    = ['click_rate', 'add_to_cart_rate', 'revenue_per_user', 'page_load_ms']
GUARDRAILS     = ['page_load_ms']
PRIMARY_METRIC = 'click_rate'
IS_BINARY      = {'click_rate': True, 'add_to_cart_rate': True}

all_results = analyzer.analyze_all_metrics(METRIC_COLS, is_binary_map=IS_BINARY)

print("\nAll metrics with BH FDR correction:")
display_cols = ['metric', 'control_mean', 'treatment_mean', 'relative_effect', 'p_value', 'significant', 'bh_significant']
print(all_results[display_cols].to_string(index=False, float_format='{:.4f}'.format))

# Segment analysis
print("\nSegment analysis β€” click_rate by device:")
seg_result = analyzer.segment_analysis('device', 'click_rate')
print(seg_result[['segment', 'control_mean', 'treatment_mean', 'relative_effect', 'p_value', 'significant']].to_string(index=False))

# Full report
report = analyzer.generate_report(
    metric_cols=METRIC_COLS,
    guardrail_metrics=GUARDRAILS,
    primary_metric=PRIMARY_METRIC,
)
print("\n" + report)

print("\n[ACTION REQUIRED] page_load_ms guardrail violated (+20ms > 10ms threshold).")
print("Experiment should NOT ship without engineering fix to page load regression.")
# ── Visualise results ─────────────────────────────────────────────────────────
fig, axes = plt.subplots(2, 2, figsize=(13, 9))

for ax, metric in zip(axes.flat, METRIC_COLS):
    r = analyzer.analyze_metric(metric, is_binary=IS_BINARY.get(metric, False))
    means   = [r['control_mean'], r['treatment_mean']]
    ci_half = [(r['treatment_mean'] - r['ci_lower'])/2,
               (r['treatment_mean'] - r['ci_lower'])/2]

    colors = ['steelblue', 'darkorange']
    bars = ax.bar(['Control', 'Treatment'], means, color=colors, width=0.5)
    ax.errorbar(
        ['Treatment'], [r['treatment_mean']],
        yerr=[[r['treatment_mean'] - r['ci_lower']], [r['ci_upper'] - r['treatment_mean']]],
        fmt='none', color='black', capsize=5
    )
    sig_str = f"p={r['p_value']:.4f} {'*' if r['significant'] else ''}"
    guardrail_note = " [GUARDRAIL FAIL]" if metric in GUARDRAILS and r['significant'] and r['effect_size'] > 0 else ""
    title_color = 'red' if guardrail_note else 'black'
    ax.set_title(f"{metric}\n{sig_str} | lift={r['relative_effect']:.1%}{guardrail_note}",
                 color=title_color, fontsize=10)
    ax.set_ylabel(metric)

plt.suptitle('Experiment Results: ML Recommendations vs Control', fontsize=13, fontweight='bold')
plt.tight_layout()
plt.show()

8. CUPED β€” Controlled-experiment Using Pre-Experiment DataΒΆ

Idea: users who converted at high rates before the experiment will likely convert at high rates during it β€” regardless of treatment. This pre-experiment signal is a covariate correlated with the outcome but independent of treatment assignment. Removing it reduces variance, which shrinks confidence intervals and reduces required sample size.

\[Y_{\text{CUPED}} = Y - \theta \cdot (Y_{\text{pre}} - \bar{Y}_{\text{pre}})\]
\[\theta = \frac{\text{Cov}(Y, Y_{\text{pre}})}{\text{Var}(Y_{\text{pre}})}\]
def cuped_adjust(
    Y_treat: np.ndarray,
    Y_control: np.ndarray,
    Y_pre_treat: np.ndarray,
    Y_pre_control: np.ndarray,
) -> tuple:
    """
    Adjust post-experiment metric using pre-experiment covariate.
    Reduces variance β†’ smaller required sample size β†’ shorter experiments.
    theta = cov(Y, Y_pre) / var(Y_pre)
    Y_cuped = Y - theta * (Y_pre - mean(Y_pre))

    Returns: (Y_treat_cuped, Y_control_cuped, theta, variance_reduction_pct)
    """
    Y_all = np.concatenate([Y_treat, Y_control])
    Y_pre_all = np.concatenate([Y_pre_treat, Y_pre_control])

    # Estimate theta on pooled data (independent of treatment)
    theta = np.cov(Y_all, Y_pre_all)[0, 1] / np.var(Y_pre_all, ddof=1)

    mean_Y_pre = Y_pre_all.mean()

    Y_treat_cuped   = Y_treat   - theta * (Y_pre_treat   - mean_Y_pre)
    Y_control_cuped = Y_control - theta * (Y_pre_control - mean_Y_pre)

    # Variance reduction
    var_before = np.var(Y_all, ddof=1)
    var_after  = np.var(np.concatenate([Y_treat_cuped, Y_control_cuped]), ddof=1)
    variance_reduction = 1 - var_after / var_before

    return Y_treat_cuped, Y_control_cuped, theta, variance_reduction


# ── Simulate pre/post data with correlated pre-experiment metric ──────────────
np.random.seed(99)
N = 2000  # per group
RHO = 0.60  # correlation between pre and post metric (realistic)

# Generate correlated pre/post: [pre, post] ~ MVN
sigma = np.array([[1.0, RHO], [RHO, 1.0]])
samples_ctrl = np.random.multivariate_normal([0, 0], sigma, N)
samples_trt  = np.random.multivariate_normal([0, 0.10], sigma, N)  # 0.10 treatment effect

Y_pre_ctrl,  Y_ctrl  = samples_ctrl[:, 0], samples_ctrl[:, 1]
Y_pre_trt,   Y_trt   = samples_trt[:, 0],  samples_trt[:, 1]

Y_trt_cuped, Y_ctrl_cuped, theta, var_reduction = cuped_adjust(
    Y_trt, Y_ctrl, Y_pre_trt, Y_pre_ctrl
)

# Compare CIs before and after CUPED
def ci_width(a, b, alpha=0.05):
    se = np.sqrt(a.var(ddof=1)/len(a) + b.var(ddof=1)/len(b))
    t_crit = stats.t.ppf(1 - alpha/2, df=len(a)+len(b)-2)
    return 2 * t_crit * se

ci_before = ci_width(Y_trt, Y_ctrl)
ci_after  = ci_width(Y_trt_cuped, Y_ctrl_cuped)

# Sample size reduction: proportional to variance
sample_reduction = 1 - (1 - var_reduction)

print(f"Pre/post correlation (rho): {RHO}")
print(f"CUPED theta (coefficient):  {theta:.4f}")
print(f"Variance reduction:         {var_reduction:.1%}")
print(f"CI width before CUPED:      {ci_before:.4f}")
print(f"CI width after  CUPED:      {ci_after:.4f}")
print(f"CI width reduction:         {1 - ci_after/ci_before:.1%}")
print(f"\nSample size to achieve same CI: {int(N * (1 - var_reduction)):,} instead of {N:,}")
print(f"β†’ With rho={RHO}, CUPED requires {var_reduction:.0%} fewer users for the same precision.")

# Visualise
fig, axes = plt.subplots(1, 2, figsize=(12, 4))

for ax, (y_t, y_c, label) in zip(axes, [
    (Y_trt, Y_ctrl, 'Before CUPED'),
    (Y_trt_cuped, Y_ctrl_cuped, 'After CUPED'),
]):
    ax.hist(y_c, bins=50, alpha=0.5, label='Control', color='steelblue', density=True)
    ax.hist(y_t, bins=50, alpha=0.5, label='Treatment', color='darkorange', density=True)
    ax.set_title(f"{label}\nVar(control) = {y_c.var():.3f}")
    ax.legend()
    ax.set_xlabel('Metric value')

plt.suptitle('CUPED: Variance Reduction by Removing Pre-Experiment Noise', fontweight='bold')
plt.tight_layout()
plt.show()

9. Experiment RegistryΒΆ

@dataclass
class Experiment:
    experiment_id: str
    name: str
    hypothesis: str
    owner: str
    start_date: date
    end_date: date
    variants: List[str]
    primary_metric: str
    guardrail_metrics: List[str]
    min_detectable_effect: float
    required_sample_size: int
    status: str  # planned / running / stopped / completed
    user_segment: Optional[str] = None  # e.g., 'all', 'mobile_only', 'new_users'
    assigned_users: Optional[set] = field(default_factory=set)


class ExperimentRegistry:
    """Tracks all experiments, checks for conflicts, manages lifecycle."""

    def __init__(self):
        self._registry: Dict[str, Experiment] = {}

    def register(self, exp: Experiment) -> None:
        if exp.experiment_id in self._registry:
            raise ValueError(f"Experiment {exp.experiment_id} already registered")
        self._registry[exp.experiment_id] = exp
        print(f"  Registered: {exp.experiment_id} ({exp.name})")

    def check_conflicts(self, exp: Experiment) -> List[str]:
        """
        Mutual exclusion check: flag any running experiment that:
        1. Overlaps in time, AND
        2. Could expose the same users (same or overlapping segment)
        """
        conflicts = []
        for eid, existing in self._registry.items():
            if existing.status not in ('running', 'planned'):
                continue
            # Date overlap
            date_overlap = (exp.start_date <= existing.end_date and
                            exp.end_date   >= existing.start_date)
            if not date_overlap:
                continue
            # Segment overlap (None = 'all users')
            seg_overlap = (
                exp.user_segment is None or
                existing.user_segment is None or
                exp.user_segment == existing.user_segment
            )
            if seg_overlap:
                conflicts.append(eid)
        return conflicts

    def update_status(self, experiment_id: str, new_status: str) -> None:
        self._registry[experiment_id].status = new_status
        print(f"  {experiment_id} β†’ {new_status}")

    def list_active(self) -> pd.DataFrame:
        active = [
            {
                'id': e.experiment_id,
                'name': e.name,
                'owner': e.owner,
                'start': e.start_date,
                'end': e.end_date,
                'status': e.status,
                'primary_metric': e.primary_metric,
                'segment': e.user_segment or 'all',
                'required_n': e.required_sample_size,
            }
            for e in self._registry.values()
            if e.status in ('running', 'planned')
        ]
        return pd.DataFrame(active)


# ── Demo ──────────────────────────────────────────────────────────────────────
registry = ExperimentRegistry()

exp1 = Experiment(
    experiment_id='exp_ml_reco_v2',
    name='ML Recommendations V2',
    hypothesis='New ML model increases click-through rate by β‰₯5%',
    owner='team_ml',
    start_date=date(2024, 3, 1),
    end_date=date(2024, 3, 14),
    variants=['control', 'treatment'],
    primary_metric='click_rate',
    guardrail_metrics=['page_load_ms'],
    min_detectable_effect=0.05,
    required_sample_size=5000,
    status='running',
    user_segment='all',
)

exp2 = Experiment(
    experiment_id='exp_cta_button',
    name='CTA Button Color Test',
    hypothesis='Orange CTA button increases add-to-cart rate',
    owner='team_ux',
    start_date=date(2024, 3, 5),
    end_date=date(2024, 3, 19),
    variants=['blue_cta', 'orange_cta'],
    primary_metric='add_to_cart_rate',
    guardrail_metrics=[],
    min_detectable_effect=0.03,
    required_sample_size=8000,
    status='planned',
    user_segment='all',  # overlaps with exp1!
)

exp3 = Experiment(
    experiment_id='exp_mobile_nav',
    name='Mobile Navigation Redesign',
    hypothesis='Simplified nav improves mobile conversion',
    owner='team_mobile',
    start_date=date(2024, 3, 10),
    end_date=date(2024, 3, 24),
    variants=['old_nav', 'new_nav'],
    primary_metric='revenue_per_user',
    guardrail_metrics=['page_load_ms'],
    min_detectable_effect=0.08,
    required_sample_size=3000,
    status='running',
    user_segment='mobile_only',  # non-overlapping segment
)

print("Registering experiments:")
registry.register(exp1)
registry.register(exp2)
registry.register(exp3)

print("\nConflict checks for new experiment (exp_cta_button):")
conflicts = registry.check_conflicts(exp2)
if conflicts:
    print(f"  WARNING: exp_cta_button conflicts with: {conflicts}")
    print("  Recommendation: use mutual exclusion layer or reduce traffic allocation.")
else:
    print("  No conflicts found.")

print("\nActive/Planned experiments:")
print(registry.list_active().to_string(index=False))

10. Sample Size CalculatorΒΆ

def required_sample_size(
    baseline_rate: float,
    min_detectable_effect: float,  # relative lift, e.g., 0.05 = 5%
    alpha: float = 0.05,
    power: float = 0.80,
    num_variants: int = 2,
) -> int:
    """
    Sample size per variant for a two-proportion z-test.
    Applies Bonferroni correction for num_variants > 2.
    """
    # Bonferroni correction for multiple variants
    alpha_adj = alpha / (num_variants - 1) if num_variants > 2 else alpha

    p1 = baseline_rate
    p2 = baseline_rate * (1 + min_detectable_effect)
    p2 = min(p2, 0.9999)

    # z-scores
    z_alpha = stats.norm.ppf(1 - alpha_adj / 2)  # two-tailed
    z_beta  = stats.norm.ppf(power)

    # Pooled proportion
    p_bar = (p1 + p2) / 2

    # Two-proportion z-test formula
    numerator   = (z_alpha * np.sqrt(2 * p_bar * (1 - p_bar)) +
                   z_beta  * np.sqrt(p1 * (1 - p1) + p2 * (1 - p2)))**2
    denominator = (p2 - p1)**2

    n_per_variant = int(np.ceil(numerator / denominator))
    return n_per_variant


# ── Verification ─────────────────────────────────────────────────────────────
n = required_sample_size(baseline_rate=0.10, min_detectable_effect=0.10)
print(f"Baseline=10%, MDE=10% relative, Ξ±=0.05, power=80%: {n:,} users per variant")
print(f"Total users: {2*n:,}")

# ── Three curves ──────────────────────────────────────────────────────────────
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Curve 1: Sample size vs MDE
mde_range = np.linspace(0.02, 0.30, 100)
n_vs_mde = [required_sample_size(0.10, mde) for mde in mde_range]
axes[0].plot(mde_range * 100, n_vs_mde, color='steelblue', linewidth=2)
axes[0].set_xlabel('Minimum Detectable Effect (%)')
axes[0].set_ylabel('Sample Size per Variant')
axes[0].set_title('Sample Size vs MDE\n(baseline=10%, power=80%)')
axes[0].set_yscale('log')
axes[0].axvline(10, linestyle='--', color='gray', label='10% MDE')
axes[0].legend()

# Curve 2: Sample size vs Power
power_range = np.linspace(0.50, 0.99, 100)
n_vs_power = [required_sample_size(0.10, 0.10, power=pw) for pw in power_range]
axes[1].plot(power_range * 100, n_vs_power, color='darkorange', linewidth=2)
axes[1].axvline(80, linestyle='--', color='gray', label='80% power')
axes[1].axvline(90, linestyle=':', color='red', label='90% power')
axes[1].set_xlabel('Statistical Power (%)')
axes[1].set_ylabel('Sample Size per Variant')
axes[1].set_title('Sample Size vs Power\n(baseline=10%, MDE=10%)')
axes[1].legend()

# Curve 3: Sample size vs Baseline Rate
base_range = np.linspace(0.01, 0.50, 100)
n_vs_base = [required_sample_size(b, 0.10) for b in base_range]
axes[2].plot(base_range * 100, n_vs_base, color='seagreen', linewidth=2)
axes[2].set_xlabel('Baseline Conversion Rate (%)')
axes[2].set_ylabel('Sample Size per Variant')
axes[2].set_title('Sample Size vs Baseline Rate\n(MDE=10% relative, power=80%)')
axes[2].axvline(10, linestyle='--', color='gray', label='Baseline=10%')
axes[2].legend()

plt.tight_layout()
plt.show()

# Quick lookup table
print("\nSample size lookup (Ξ±=0.05, power=80%, 2 variants):")
print(f"{'Baseline':>10} {'MDE 5%':>10} {'MDE 10%':>10} {'MDE 20%':>10}")
print("-" * 44)
for base in [0.05, 0.10, 0.20, 0.30, 0.50]:
    n5  = required_sample_size(base, 0.05)
    n10 = required_sample_size(base, 0.10)
    n20 = required_sample_size(base, 0.20)
    print(f"{base:>9.0%} {n5:>10,} {n10:>10,} {n20:>10,}")

11. Cheat SheetΒΆ

Concept                        Description
──────────────────────────────────────────────────────────────────────────────
SRM (Sample Ratio Mismatch)    Assignment bug: use chi-square test on variant counts
Peeking problem                Checking p<0.05 daily inflates Type I error
FWER                           Family-wise error rate: probability of β‰₯1 false positive
FDR                            False discovery rate: % of "significant" results that are wrong
CUPED                          Pre-experiment covariate reduces variance, shortens experiments
Novelty effect                 Users behave differently because the change is new (not better)
Interference/SUTVA             One unit's outcome depends on another's treatment
Holdout group                  Small % held out of all experiments β€” measures cumulative effect
mSPRT                          Mixture SPRT: always-valid p-values; safe to peek anytime
Thompson Sampling              Bayesian bandit: adaptively routes traffic to winning arm
Bonferroni correction          Adjust Ξ± β†’ Ξ±/m for m tests; conservative FWER control
Benjamini-Hochberg (BH)        FDR control: less conservative than Bonferroni
Mutual exclusion               Users in exp A excluded from exp B; prevents interaction bias

Decision rules:

  1. SRM detected β†’ stop, investigate assignment pipeline; do not interpret metric results

  2. Guardrail metric significant and in wrong direction β†’ do not ship

  3. Primary metric significant (after FDR correction) + no guardrail failures β†’ ship

  4. Novelty effect likely (new UI) β†’ extend experiment β‰₯2 weeks before deciding

  5. Underpowered β†’ calculate required sample size before starting, not after

12. ExercisesΒΆ

Exercise 1 β€” Bayesian A/B test

Given Beta priors on conversion rates, implement:

  • P(treatment > control) via Monte Carlo sampling from posterior Beta distributions

  • Expected loss: E[max(p_control - p_treatment, 0)] (expected regret of choosing treatment)

  • Decision rule: ship if P(treatment > control) > 0.95 and expected loss < 0.001

def bayesian_ab_test(alpha_c, beta_c, alpha_t, beta_t, n_samples=100_000):
    """
    alpha_c, beta_c: posterior parameters for control (prior + observed data)
    Returns: prob_treatment_wins, expected_loss
    """
    samples_c = np.random.beta(alpha_c, beta_c, n_samples)
    samples_t = np.random.beta(alpha_t, beta_t, n_samples)
    prob_wins = (samples_t > samples_c).mean()
    expected_loss = np.maximum(samples_c - samples_t, 0).mean()
    return prob_wins, expected_loss

Exercise 2 β€” Simulate the peeking problem

Run 1000 A/A tests. Each test: add 10 users/day, check p < 0.05, stop if significant or n = 10,000. Record:

  • What fraction falsely β€œwin”?

  • At what sample size do most false positives occur?

  • Compare to always-valid p-values from Section 5

Exercise 3 β€” Network effect check

Implement network_effect_check(user_graph, treatment_users) that:

  • Takes a user-user graph (e.g., friend network)

  • Returns the subset of treatment users with zero control-group neighbors

  • Runs the experiment analysis only on these β€œisolated” users

  • Compare effect estimate on isolated vs contaminated users

def network_effect_check(user_graph: dict, treatment_users: set, control_users: set) -> set:
    """Return treatment users with no control-group neighbors."""
    isolated = set()
    for user in treatment_users:
        neighbors = user_graph.get(user, set())
        if not neighbors.intersection(control_users):
            isolated.add(user)
    return isolated

Exercise 4 β€” Interleaving evaluation (Netflix/Spotify style)

Traditional A/B tests: user sees one recommendation list (A or B). Interleaving: merge both lists, tracking which items came from which algorithm, then measure click preference.

def interleave(list_a: list, list_b: list, method='team_draft') -> tuple:
    """
    Team-draft interleaving: alternately let each algorithm pick first.
    Returns (interleaved_list, attribution_map)
    where attribution_map[item] = 'A' or 'B'
    """
    ...

Simulate: A has 20% CTR, B has 25% CTR. With 1000 interleaved sessions, what is the statistical power to detect B’s superiority?

Exercise 5 β€” Experiment conflict detector

Given a list of running experiments and their assigned user sets, flag any user assigned to more than one experiment.

def detect_conflicts(experiments: list) -> dict:
    """
    experiments: list of Experiment objects with assigned_users sets
    Returns dict: {user_id: [exp_id_1, exp_id_2, ...]} for conflicted users
    """
    user_to_exps = defaultdict(list)
    for exp in experiments:
        for user in exp.assigned_users:
            user_to_exps[user].append(exp.experiment_id)
    return {u: exps for u, exps in user_to_exps.items() if len(exps) > 1}

Test: create 3 experiments with overlapping user sets, verify all conflicts are detected.