Solutions: Statistics & MLOps TrackΒΆ

Worked solutions to all exercises from the statistics-mlops/ notebooks.

01 β€” Hypothesis TestingΒΆ

import numpy as np
import matplotlib.pyplot as plt
from scipy import stats

np.random.seed(42)

# Exercise 1: Permutation test for difference in medians; compare to Mann-Whitney

# Synthetic data: two groups with different medians
group_a = np.concatenate([np.random.normal(10, 3, 50), [25, 28]])  # right-skewed
group_b = np.concatenate([np.random.normal(12, 3, 50), [27, 30]])  # slightly higher

def permutation_test_median(a, b, n_permutations=10000):
    observed = np.median(b) - np.median(a)
    combined = np.concatenate([a, b])
    n_a = len(a)
    count = 0
    for _ in range(n_permutations):
        np.random.shuffle(combined)
        perm_diff = np.median(combined[n_a:]) - np.median(combined[:n_a])
        if abs(perm_diff) >= abs(observed):  # two-sided
            count += 1
    return observed, count / n_permutations

obs_diff, perm_pval = permutation_test_median(group_a, group_b)
mw_stat, mw_pval = stats.mannwhitneyu(group_a, group_b, alternative='two-sided')

print(f'Observed median difference: {obs_diff:.3f}')
print(f'Permutation test p-value:   {perm_pval:.4f}')
print(f'Mann-Whitney p-value:       {mw_pval:.4f}')
print('Key insight: Both test rank/distributional differences without normality assumption.')
print('Mann-Whitney tests stochastic dominance; permutation test directly tests the median statistic.')
# Exercise 2: Statistical Power as a Function of Sample Size
# Find n for 80% power at Cohen's d = 0.2 (small effect)

from scipy.stats import norm

def compute_power(n, effect_size=0.2, alpha=0.05):
    """Power for two-sample t-test (equal groups, two-sided)."""
    z_alpha = norm.ppf(1 - alpha / 2)
    se = np.sqrt(2 / n)
    # Non-centrality parameter
    ncp = effect_size / se
    power = 1 - norm.cdf(z_alpha - ncp) + norm.cdf(-z_alpha - ncp)
    return power

sample_sizes = np.arange(10, 800, 10)
powers = [compute_power(n) for n in sample_sizes]

# Find minimum n for 80% power
n_80 = next(n for n, p in zip(sample_sizes, powers) if p >= 0.80)

plt.figure(figsize=(8, 4))
plt.plot(sample_sizes, powers, label='Power (d=0.2, alpha=0.05)')
plt.axhline(0.80, color='r', linestyle='--', label='80% power')
plt.axvline(n_80, color='g', linestyle='--', label=f'n={n_80} per group')
plt.xlabel('Sample size per group'); plt.ylabel('Statistical Power')
plt.title('Power Curve: Effect Size d=0.2')
plt.legend(); plt.tight_layout(); plt.show()

print(f'Minimum n per group for 80% power (d=0.2): {n_80}')
print('Key insight: small effects require ~394 per group β€” power analysis before experimentation is critical')
# Exercise 3: Two-Sided vs One-Sided Tests β€” When Each is Appropriate

np.random.seed(0)
baseline_ctr = 0.05
n = 1000

# Business scenario A: Testing a NEW checkout button β€” could help OR hurt
# β†’ Two-sided test: we care about both positive and negative effects
control_a = np.random.binomial(1, 0.050, n)
variant_a  = np.random.binomial(1, 0.055, n)  # slight increase
_, p_two = stats.ttest_ind(control_a, variant_a, alternative='two-sided')

# Business scenario B: Testing a new recommendation engine β€” we only care if it IMPROVES revenue
# We would NOT roll back unless it causes serious harm (separate quality check)
# β†’ One-sided test: H1: variant > control
control_b = np.random.binomial(1, 0.050, n)
variant_b  = np.random.binomial(1, 0.060, n)  # clearer increase
_, p_one = stats.ttest_ind(control_b, variant_b, alternative='less')  # H1: control < variant

print('Scenario A: Checkout button (two-sided β€” could hurt UX)')
print(f'  p-value (two-sided): {p_two:.4f}')
print(f'  Significant at 0.05: {p_two < 0.05}')
print()
print('Scenario B: Recommendation engine (one-sided β€” we only care about improvement)')
print(f'  p-value (one-sided): {p_one:.4f}')
print(f'  Significant at 0.05: {p_one < 0.05}')
print()
print('Key insight: one-sided test has more power BUT is only valid when you CANNOT care about')
print('the other direction. Using it to "cheat" significance inflates Type I error.')
# Exercise 4: Multiple Testing Correction β€” Bonferroni, Holm, BH-FDR

from statsmodels.stats.multitest import multipletests

np.random.seed(7)
# Simulate 20 A/B tests: 18 null (H0 true), 2 real effects
n_tests = 20
n_null = 18
null_pvals = np.random.uniform(0, 1, n_null)
effect_pvals = np.array([0.008, 0.031])  # true effects
all_pvals = np.concatenate([null_pvals, effect_pvals])
true_positives = set([n_null, n_null + 1])  # last two are real

methods = [
    ('Bonferroni', 'bonferroni'),
    ('Holm',       'holm'),
    ('BH-FDR',     'fdr_bh'),
]

print(f'{'Method':<12} {'Reject count':>13} {'True positive':>14} {'False positive':>15}')
print('-' * 57)
for name, method in methods:
    reject, _, _, _ = multipletests(all_pvals, alpha=0.05, method=method)
    rejected_idx = set(np.where(reject)[0])
    tp = len(rejected_idx & true_positives)
    fp = len(rejected_idx - true_positives)
    print(f'{name:<12} {sum(reject):>13} {tp:>14} {fp:>15}')

print(f'\nNo correction:  {sum(all_pvals < 0.05)} rejections (expected ~{n_null*0.05:.1f} false positives from nulls)')
print('Key insight: Bonferroni most conservative, BH-FDR best balances power vs false discovery rate')
# Exercise 5: A/A Test Simulation β€” Daily Peeking False Positive Rate

# Simulate 1000 A/A tests with daily peeking over 14 days
# Each day: test if cumulative data shows p<0.05 β€” this is "peeking" (optional stopping)

n_simulations = 1000
n_days = 14
daily_visitors = 100  # per arm
true_ctr = 0.10
alpha = 0.05

false_positives_peeking = 0
false_positives_fixed = 0

np.random.seed(99)
for _ in range(n_simulations):
    ctrl_data, var_data = [], []
    peek_significant = False

    for day in range(n_days):
        ctrl_data.extend(np.random.binomial(1, true_ctr, daily_visitors))
        var_data.extend(np.random.binomial(1, true_ctr, daily_visitors))  # A/A: same rate

        # Peek at each day
        if len(ctrl_data) >= 10:
            _, p = stats.ttest_ind(ctrl_data, var_data)
            if p < alpha:
                peek_significant = True  # any day shows significance

    # Fixed horizon: only look at end of experiment
    _, p_final = stats.ttest_ind(ctrl_data, var_data)
    if peek_significant: false_positives_peeking += 1
    if p_final < alpha:  false_positives_fixed   += 1

print(f'A/A test simulation: {n_simulations} experiments, {n_days} days, daily peeking')
print(f'False positive rate with daily peeking: {false_positives_peeking/n_simulations:.1%}')
print(f'False positive rate with fixed horizon: {false_positives_fixed/n_simulations:.1%}')
print(f'Expected at alpha=0.05: ~5%')
print('Key insight: peeking dramatically inflates Type I error β€” always pre-register sample size')

02 β€” Bayesian ThinkingΒΆ

from scipy.stats import beta as beta_dist

# Exercise 1: Beta-Binomial Sequential Update: Prior β†’ Posterior as n grows 1β†’100

# True conversion rate: 0.3; Prior: Beta(2, 5) β€” slightly conservative
true_p = 0.30
prior_a, prior_b = 2, 5
np.random.seed(0)
observations = np.random.binomial(1, true_p, 100)

checkpoints = [1, 5, 10, 20, 50, 100]
x = np.linspace(0, 1, 300)

plt.figure(figsize=(12, 4))
for i, n in enumerate(checkpoints):
    k = observations[:n].sum()
    post_a = prior_a + k
    post_b = prior_b + (n - k)
    pdf = beta_dist.pdf(x, post_a, post_b)
    mean = post_a / (post_a + post_b)
    plt.plot(x, pdf, label=f'n={n} (mean={mean:.2f})')

plt.axvline(true_p, color='black', linestyle='--', label=f'True p={true_p}')
plt.xlabel('Conversion Rate'); plt.ylabel('Density')
plt.title('Beta-Binomial Sequential Update: Posterior Evolution n=1β†’100')
plt.legend(fontsize=8); plt.tight_layout(); plt.show()
print('Key insight: posterior concentrates around truth as n grows; prior influence shrinks')
# Exercise 2: Bayesian Credible Interval vs Frequentist CI for Conversion Rate

np.random.seed(5)
true_rate = 0.12
scenarios = [(20, 3), (50, 7), (100, 13), (500, 62), (1000, 121)]

print(f'{'n':>5} {'k':>5} {'Freq CI':>24} {'Bayes CrI (Beta(1,1))':>25} {'Bayes Mean':>12}')
print('-' * 75)
for n, k in scenarios:
    p_hat = k / n
    # Frequentist: Wilson score interval
    z = 1.96
    denom = 1 + z**2/n
    center = (p_hat + z**2/(2*n)) / denom
    margin = (z * np.sqrt(p_hat*(1-p_hat)/n + z**2/(4*n**2))) / denom
    freq_lo, freq_hi = center - margin, center + margin

    # Bayesian: Beta(1,1) prior (uniform) β†’ Beta(1+k, 1+n-k) posterior
    pa, pb = 1 + k, 1 + n - k
    bay_lo, bay_hi = beta_dist.ppf(0.025, pa, pb), beta_dist.ppf(0.975, pa, pb)
    bay_mean = pa / (pa + pb)

    print(f'{n:>5} {k:>5}  [{freq_lo:.3f}, {freq_hi:.3f}]    [{bay_lo:.3f}, {bay_hi:.3f}]    {bay_mean:.4f}')

print('\nKey insight: Bayesian CrI has direct probability interpretation (P(theta in CrI)=0.95);')
print('Frequentist CI means 95% of such intervals contain the true parameter (over repeated samples).')
# Exercise 3: Metropolis-Hastings for 2D Bivariate Normal Posterior

def log_target(x, rho=0.7):
    """Log-density of bivariate normal with correlation rho."""
    cov = np.array([[1, rho], [rho, 1]])
    diff = x.reshape(-1)
    return -0.5 * diff @ np.linalg.inv(cov) @ diff

def metropolis_hastings(n_samples=10000, proposal_std=0.5, rho=0.7):
    samples = np.zeros((n_samples, 2))
    current = np.array([0.0, 0.0])
    accepted = 0
    for i in range(n_samples):
        proposal = current + np.random.normal(0, proposal_std, 2)
        log_accept = log_target(proposal, rho) - log_target(current, rho)
        if np.log(np.random.uniform()) < log_accept:
            current = proposal
            accepted += 1
        samples[i] = current
    return samples, accepted / n_samples

np.random.seed(42)
samples, accept_rate = metropolis_hastings(n_samples=10000, rho=0.7)
burnin = 1000
samples = samples[burnin:]

fig, axes = plt.subplots(1, 3, figsize=(14, 4))
axes[0].scatter(samples[:, 0], samples[:, 1], alpha=0.05, s=5)
axes[0].set_title('MH Samples (2D correlated posterior)'); axes[0].set_xlabel('x1'); axes[0].set_ylabel('x2')
axes[1].plot(samples[:500, 0]); axes[1].set_title('Trace plot x1'); axes[1].set_xlabel('iteration')
axes[2].hist(samples[:, 0], bins=50, density=True)
xx = np.linspace(-4, 4, 200)
axes[2].plot(xx, stats.norm.pdf(xx), 'r-', label='True marginal N(0,1)')
axes[2].set_title('Marginal of x1'); axes[2].legend()
plt.tight_layout(); plt.show()

print(f'Acceptance rate: {accept_rate:.2%} (target: 20-50% for MCMC efficiency)')
print(f'Sample mean: x1={samples[:,0].mean():.3f}, x2={samples[:,1].mean():.3f} (expect ~0,0)')
print(f'Sample correlation: {np.corrcoef(samples[:,0], samples[:,1])[0,1]:.3f} (expect ~0.7)')
# Exercise 4: Bayesian A/B Test β€” P(B>A) and Expected Loss for 5 Scenarios

def bayesian_ab_test(n_a, k_a, n_b, k_b, n_samples=50000):
    """Beta-Binomial Bayesian A/B test with uniform priors Beta(1,1)."""
    a_samples = np.random.beta(1 + k_a, 1 + n_a - k_a, n_samples)
    b_samples = np.random.beta(1 + k_b, 1 + n_b - k_b, n_samples)
    p_b_wins = (b_samples > a_samples).mean()
    # Expected loss of choosing A when B is better (opportunity cost)
    loss_a = np.maximum(b_samples - a_samples, 0).mean()  # loss if we choose A
    loss_b = np.maximum(a_samples - b_samples, 0).mean()  # loss if we choose B
    return p_b_wins, loss_a, loss_b

scenarios = [
    (100, 5,  100, 7),    # small n, small effect
    (500, 50, 500, 65),   # medium n, moderate effect
    (1000, 100, 1000, 130),  # large n, clear effect
    (200, 20, 200, 20),   # null result
    (300, 15, 300, 30),   # strong relative lift
]

print(f'{'Scenario':<30} {'P(B>A)':>8} {'Loss(A)':>10} {'Loss(B)':>10} {'Decision'}')
print('-' * 70)
np.random.seed(0)
for n_a, k_a, n_b, k_b in scenarios:
    p_b, l_a, l_b = bayesian_ab_test(n_a, k_a, n_b, k_b)
    decision = 'Launch B' if p_b > 0.95 else ('Keep A' if p_b < 0.05 else 'Need more data')
    label = f'(n_a={n_a},k_a={k_a},n_b={n_b},k_b={k_b})'
    print(f'{label:<30} {p_b:>8.3f} {l_a:>10.4f} {l_b:>10.4f}  {decision}')

print('\nKey insight: Expected loss gives a business-interpretable threshold vs arbitrary p-values')
# Exercise 5: Sensitivity Analysis β€” Posterior Under Different Beta Priors

# Data: n=30 trials, k=9 successes (p_hat = 0.30)
n_obs, k_obs = 30, 9

priors = [
    ('Uniform Beta(1,1)',    1,  1),
    ('Weak prior Beta(2,2)', 2,  2),
    ('Strong low Beta(2,8)', 2,  8),
    ('Strong high Beta(8,2)',8,  2),
    ('Jeffreys Beta(0.5,0.5)', 0.5, 0.5),
]

x = np.linspace(0.01, 0.99, 300)
plt.figure(figsize=(10, 4))

print(f'{'Prior':<26} {'Post Mean':>10} {'95% CrI':>24} {'Agrees truth (0.3)?':>20}')
print('-' * 82)
for name, pa, pb in priors:
    post_a = pa + k_obs
    post_b = pb + (n_obs - k_obs)
    pdf = beta_dist.pdf(x, post_a, post_b)
    plt.plot(x, pdf, label=name)
    mean = post_a / (post_a + post_b)
    lo, hi = beta_dist.ppf(0.025, post_a, post_b), beta_dist.ppf(0.975, post_a, post_b)
    print(f'{name:<26} {mean:>10.3f}  [{lo:.3f}, {hi:.3f}]   {lo < 0.3 < hi}')

plt.axvline(0.3, color='k', linestyle='--', label='True rate')
plt.xlabel('p'); plt.ylabel('Posterior Density')
plt.title('Sensitivity to Prior: n=30, k=9')
plt.legend(fontsize=8); plt.tight_layout(); plt.show()
print('Key insight: with n=30, strong priors still influence posterior; as n grows, posteriors converge')

03 β€” Model Deployment with FastAPIΒΆ

# Exercise 1: /metrics endpoint with in-memory counters
# Simulated β€” shows how to implement in a FastAPI app without running a server.

import time
from collections import deque

class MetricsCollector:
    def __init__(self, window_seconds=60):
        self.total_requests = 0
        self.total_errors = 0
        self.latencies = deque()  # (timestamp, latency_ms)
        self.window = window_seconds

    def record(self, latency_ms: float, is_error: bool = False):
        now = time.time()
        self.total_requests += 1
        if is_error: self.total_errors += 1
        self.latencies.append((now, latency_ms))
        # Evict old entries outside window
        cutoff = now - self.window
        while self.latencies and self.latencies[0][0] < cutoff:
            self.latencies.popleft()

    def get_metrics(self):
        recent_latencies = [l for _, l in self.latencies]
        avg_latency = np.mean(recent_latencies) if recent_latencies else 0
        return {
            'total_requests': self.total_requests,
            'total_errors': self.total_errors,
            'error_rate': self.total_errors / max(self.total_requests, 1),
            'avg_latency_ms': round(avg_latency, 2),
            'requests_in_window': len(recent_latencies),
        }

# FastAPI pseudocode illustrating implementation pattern:
fastapi_metrics_code = '''
from fastapi import FastAPI, Request
import time

app = FastAPI()
metrics = MetricsCollector()

@app.middleware("http")
async def track_metrics(request: Request, call_next):
    start = time.perf_counter()
    response = await call_next(request)
    latency = (time.perf_counter() - start) * 1000
    metrics.record(latency, is_error=(response.status_code >= 400))
    return response

@app.get("/metrics")
def get_metrics():
    return metrics.get_metrics()
'''

# Simulate a stream of requests
metrics = MetricsCollector()
np.random.seed(0)
for _ in range(100):
    latency = np.random.lognormal(mean=3.5, sigma=0.5)  # ~33ms median
    is_error = np.random.random() < 0.03  # 3% error rate
    metrics.record(latency, is_error)

print('/metrics endpoint response:')
import json
print(json.dumps(metrics.get_metrics(), indent=2))
# Exercise 2: Token Bucket Rate Limiter (100 req/min per IP)

import time
from dataclasses import dataclass, field
from typing import Dict

@dataclass
class TokenBucket:
    capacity: float = 100.0
    refill_rate: float = 100.0 / 60.0  # tokens per second
    tokens: float = field(default=None)
    last_refill: float = field(default_factory=time.time)

    def __post_init__(self):
        if self.tokens is None:
            self.tokens = self.capacity

    def consume(self, cost: float = 1.0) -> bool:
        now = time.time()
        elapsed = now - self.last_refill
        self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
        self.last_refill = now
        if self.tokens >= cost:
            self.tokens -= cost
            return True  # allowed
        return False  # rate limited

class RateLimiter:
    def __init__(self):
        self.buckets: Dict[str, TokenBucket] = {}

    def is_allowed(self, ip: str) -> bool:
        if ip not in self.buckets:
            self.buckets[ip] = TokenBucket()
        return self.buckets[ip].consume()

limiter = RateLimiter()

# Test: rapid burst from one IP
ip = '192.168.1.1'
allowed_burst = sum(1 for _ in range(120) if limiter.is_allowed(ip))
print(f'Allowed in rapid burst of 120 requests: {allowed_burst} (bucket capacity=100)')

# Simulate different IP
ip2 = '10.0.0.2'
allowed_ip2 = sum(1 for _ in range(50) if limiter.is_allowed(ip2))
print(f'Different IP allowed 50 requests: {allowed_ip2} (separate bucket, unaffected)')
print('Key insight: token bucket allows bursts up to capacity while enforcing long-term rate')
# Exercise 3: Input Validation β€” Reject requests with features >10 std devs from training

from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler

# Simulate training data statistics
np.random.seed(0)
X_train, _ = make_classification(n_samples=1000, n_features=5, random_state=0)
scaler = StandardScaler().fit(X_train)
train_mean = scaler.mean_
train_std = scaler.scale_

class InputValidator:
    def __init__(self, mean, std, threshold=10.0):
        self.mean = mean
        self.std = std
        self.threshold = threshold

    def validate(self, features: np.ndarray):
        z_scores = np.abs((features - self.mean) / self.std)
        violating = np.where(z_scores > self.threshold)[0]
        if len(violating) > 0:
            return False, {
                'error': 'Feature out of expected range',
                'violating_features': violating.tolist(),
                'z_scores': z_scores[violating].tolist(),
            }
        return True, {'z_scores_max': float(z_scores.max())}

validator = InputValidator(train_mean, train_std)

# Valid request
valid_input = np.array([0.1, -0.2, 0.5, -0.1, 0.3])
ok, info = validator.validate(valid_input)
print(f'Valid input:   allowed={ok}, max_z={info.get("z_scores_max", "N/A"):.2f}')

# Adversarial/corrupted input
bad_input = np.array([0.1, 150.0, 0.5, -0.1, 0.3])  # feature 1 is extreme
ok2, info2 = validator.validate(bad_input)
print(f'Bad input:     allowed={ok2}, detail={info2}')
print('Key insight: z-score validation catches data drift, sensor faults, and adversarial inputs')
# Exercise 4: Model Versioning β€” /predict/v1 and /predict/v2

from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import pickle, io

np.random.seed(42)
X, y = make_classification(n_samples=500, n_features=10, random_state=42)

# Train two model versions
model_v1 = LogisticRegression(max_iter=1000).fit(X, y)
model_v2 = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y)

# Serialize to bytes (simulating model file storage)
def serialize_model(model):
    buf = io.BytesIO()
    pickle.dump(model, buf)
    return buf.getvalue()

MODEL_REGISTRY = {
    'v1': serialize_model(model_v1),
    'v2': serialize_model(model_v2),
}

def predict_versioned(version: str, features: np.ndarray):
    if version not in MODEL_REGISTRY:
        raise ValueError(f'Unknown model version: {version}. Available: {list(MODEL_REGISTRY)}')
    model = pickle.loads(MODEL_REGISTRY[version])
    proba = model.predict_proba(features)[:, 1]
    pred = (proba > 0.5).astype(int)
    return {'version': version, 'predictions': pred.tolist(), 'probabilities': proba.round(3).tolist()}

# Simulate API calls
test_batch = X[:3]
resp_v1 = predict_versioned('v1', test_batch)
resp_v2 = predict_versioned('v2', test_batch)

print('GET /predict/v1')
print(f'  predictions:   {resp_v1["predictions"]}')
print(f'  probabilities: {resp_v1["probabilities"]}')
print('GET /predict/v2')
print(f'  predictions:   {resp_v2["predictions"]}')
print(f'  probabilities: {resp_v2["probabilities"]}')
print('Key insight: versioned endpoints allow shadow deployments and gradual traffic shifting')
# Exercise 5: Load Test Simulation β€” p50/p95/p99 Latency

import concurrent.futures

def simulate_request(request_id: int) -> float:
    """Simulate model inference latency (log-normal distribution)."""
    np.random.seed(request_id)
    base_latency = np.random.lognormal(mean=3.2, sigma=0.6)  # ~24ms median
    # Occasional slow requests (10% chance of 5x slowdown β€” GC pauses, etc.)
    if np.random.random() < 0.10:
        base_latency *= 5
    return base_latency

# Simulate concurrent load test
n_requests = 500
max_workers = 20  # concurrent "users"

start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
    latencies = list(executor.map(simulate_request, range(n_requests)))
total_time = time.perf_counter() - start

latencies = np.array(latencies)
print(f'Load test: {n_requests} requests, {max_workers} concurrent workers')
print(f'Total time: {total_time:.2f}s | Throughput: {n_requests/total_time:.0f} req/s')
print(f'p50  latency: {np.percentile(latencies, 50):>8.1f} ms')
print(f'p90  latency: {np.percentile(latencies, 90):>8.1f} ms')
print(f'p95  latency: {np.percentile(latencies, 95):>8.1f} ms')
print(f'p99  latency: {np.percentile(latencies, 99):>8.1f} ms')
print(f'p999 latency: {np.percentile(latencies, 99.9):>8.1f} ms')

plt.figure(figsize=(8, 3))
plt.hist(latencies, bins=50, density=True)
for pct, color in [(50,'g'),(95,'orange'),(99,'r')]:
    v = np.percentile(latencies, pct)
    plt.axvline(v, color=color, linestyle='--', label=f'p{pct}={v:.0f}ms')
plt.xlabel('Latency (ms)'); plt.ylabel('Density')
plt.title('Request Latency Distribution')
plt.legend(); plt.tight_layout(); plt.show()
print('Key insight: tail latencies (p99) can be 5-10x median β€” optimize for tail, not mean')

04 β€” ML Monitoring & Drift DetectionΒΆ

# Exercise 1: PSI (Population Stability Index) vs KS Test

def compute_psi(reference, current, n_bins=10, epsilon=1e-7):
    """Compute PSI between reference and current distributions."""
    bins = np.linspace(
        min(reference.min(), current.min()),
        max(reference.max(), current.max()),
        n_bins + 1
    )
    ref_counts, _ = np.histogram(reference, bins=bins)
    cur_counts, _ = np.histogram(current,   bins=bins)
    ref_pct = ref_counts / len(reference) + epsilon
    cur_pct = cur_counts / len(current)   + epsilon
    psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
    return psi

np.random.seed(42)
reference = np.random.normal(0, 1, 1000)

drift_scenarios = [
    ('No drift',      np.random.normal(0, 1, 1000)),
    ('Slight drift',  np.random.normal(0.3, 1, 1000)),
    ('Moderate drift',np.random.normal(0.7, 1, 1000)),
    ('Severe drift',  np.random.normal(1.5, 1.5, 1000)),
]

print(f'{'Scenario':<20} {'PSI':>8} {'PSI Interp':>20} {'KS stat':>10} {'KS p-value':>12}')
print('-' * 74)
for name, current in drift_scenarios:
    psi = compute_psi(reference, current)
    ks_stat, ks_p = stats.ks_2samp(reference, current)
    interp = 'Stable' if psi < 0.1 else ('Minor shift' if psi < 0.2 else 'Significant drift')
    print(f'{name:<20} {psi:>8.4f} {interp:>20} {ks_stat:>10.4f} {ks_p:>12.4f}')

print('\nPSI thresholds: <0.1 stable, 0.1-0.2 minor, >0.2 significant')
print('Key insight: PSI gives interpretable magnitude of drift; KS gives hypothesis test p-value')
# Exercise 2: Sliding Window Monitor β€” Alert if Drift in 3+ Consecutive Windows

class SlidingWindowDriftMonitor:
    def __init__(self, window_size=500, drift_threshold=0.2, consecutive=3):
        self.window_size = window_size
        self.drift_threshold = drift_threshold
        self.consecutive = consecutive
        self.reference = None
        self.drift_streak = 0
        self.alerts = []

    def set_reference(self, data):
        self.reference = data

    def check_window(self, window_data, window_id):
        if self.reference is None:
            raise ValueError('Set reference first')
        psi = compute_psi(self.reference, window_data)
        is_drift = psi > self.drift_threshold
        self.drift_streak = self.drift_streak + 1 if is_drift else 0
        alert = self.drift_streak >= self.consecutive
        if alert:
            self.alerts.append(window_id)
        return {'window': window_id, 'psi': psi, 'drift': is_drift, 'streak': self.drift_streak, 'alert': alert}

# Simulate: stable β†’ gradual drift starting at window 5
np.random.seed(0)
reference = np.random.normal(0, 1, 2000)
monitor = SlidingWindowDriftMonitor(window_size=500, drift_threshold=0.2, consecutive=3)
monitor.set_reference(reference)

print(f'{'Window':>8} {'PSI':>8} {'Drift':>8} {'Streak':>8} {'ALERT':>8}')
print('-' * 45)
for i in range(10):
    shift = 0.8 * max(0, i - 4)  # drift starts at window 4
    window = np.random.normal(shift, 1, 500)
    result = monitor.check_window(window, window_id=i)
    flag = '*** ALERT ***' if result['alert'] else ''
    print(f'{i:>8} {result["psi"]:>8.4f} {str(result["drift"]):>8} {result["streak"]:>8} {flag}')
# Exercise 3: Concept Drift Detection using DDM (Drift Detection Method)

class DDM:
    """Drift Detection Method (Gama et al., 2004)."""
    def __init__(self, warning_level=2.0, drift_level=3.0):
        self.warning_level = warning_level
        self.drift_level = drift_level
        self.reset()

    def reset(self):
        self.n = 0
        self.sum_errors = 0
        self.min_pn_std = float('inf')
        self.p_min = float('inf')
        self.s_min = float('inf')

    def add_element(self, prediction_correct: bool):
        """Returns: 'normal', 'warning', or 'drift'"""
        self.n += 1
        error = 0 if prediction_correct else 1
        self.sum_errors += error
        p = self.sum_errors / self.n
        s = np.sqrt(p * (1 - p) / self.n)
        if p + s < self.min_pn_std:
            self.p_min = p
            self.s_min = s
            self.min_pn_std = p + s
        if self.n < 30:
            return 'normal'
        if p + s >= self.p_min + self.drift_level * self.s_min:
            return 'drift'
        if p + s >= self.p_min + self.warning_level * self.s_min:
            return 'warning'
        return 'normal'

# Simulate: accurate model until step 500, then concept drift
np.random.seed(7)
n_steps = 1000
drift_point = 500

ddm = DDM()
statuses = []
for t in range(n_steps):
    if t < drift_point:
        correct = np.random.random() < 0.85  # 15% error rate
    else:
        correct = np.random.random() < 0.60  # drift: 40% error rate
    statuses.append(ddm.add_element(correct))

# Find first warning and drift
first_warning = next((i for i, s in enumerate(statuses) if s == 'warning'), None)
first_drift   = next((i for i, s in enumerate(statuses) if s == 'drift'),   None)

print(f'True drift point:       {drift_point}')
print(f'DDM first warning:      {first_warning}')
print(f'DDM first drift alert:  {first_drift}')
if first_drift:
    print(f'Detection lag:          {first_drift - drift_point} steps')
print('Key insight: DDM detects drift by tracking error rate bounds from the best-so-far performance')
# Exercise 4: Gradual vs Sudden Drift β€” Detector Response Comparison

def simulate_stream(n, drift_type, drift_start=300, drift_end=600):
    """Generate binary error stream with gradual or sudden concept drift."""
    errors = []
    for t in range(n):
        if t < drift_start:
            error_rate = 0.10
        elif drift_type == 'sudden':
            error_rate = 0.40
        elif drift_type == 'gradual':
            progress = min(1.0, (t - drift_start) / (drift_end - drift_start))
            error_rate = 0.10 + 0.30 * progress
        errors.append(np.random.random() < error_rate)
    return errors

def run_ddm_on_stream(errors):
    ddm = DDM()
    results = [ddm.add_element(not e) for e in errors]
    first_warning = next((i for i, s in enumerate(results) if s == 'warning'), None)
    first_drift   = next((i for i, s in enumerate(results) if s == 'drift'),   None)
    return results, first_warning, first_drift

np.random.seed(42)
n_steps = 800

for dtype in ['sudden', 'gradual']:
    errors = simulate_stream(n_steps, dtype)
    statuses, fw, fd = run_ddm_on_stream(errors)
    print(f'\n{dtype.capitalize()} drift (starts at 300):')
    print(f'  First warning: step {fw}')
    print(f'  First drift:   step {fd}')
    if fd:
        print(f'  Detection lag: {fd - 300} steps')

print('\nKey insight: DDM detects sudden drift faster than gradual drift.')
print('For gradual drift, ADWIN (adaptive windowing) performs better as it tracks recent vs historical.')
# Exercise 5: Retraining Trigger β€” If PSI > 0.2 for any feature, retrain and compare AUC

from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split

np.random.seed(0)
n_train, n_serve, n_features = 1000, 500, 5

# Training distribution
X_train = np.random.randn(n_train, n_features)
y_train = (X_train[:, 0] + X_train[:, 1] > 0).astype(int)

# Serving data with distribution shift on features 0,1
X_serve = np.random.randn(n_serve, n_features)
X_serve[:, 0] += 1.5   # shift feature 0
X_serve[:, 1] -= 0.8   # shift feature 1
y_serve = (X_serve[:, 0] + X_serve[:, 1] > 0).astype(int)

# Compute PSI per feature
feature_psi = {f'feature_{i}': compute_psi(X_train[:, i], X_serve[:, i])
               for i in range(n_features)}

print('PSI per feature:')
for feat, psi in feature_psi.items():
    flag = ' <-- TRIGGER RETRAIN' if psi > 0.2 else ''
    print(f'  {feat}: {psi:.4f}{flag}')

trigger_retrain = any(p > 0.2 for p in feature_psi.values())
print(f'\nRetraining triggered: {trigger_retrain}')

# Train original model (on training distribution)
model_original = RandomForestClassifier(n_estimators=50, random_state=0).fit(X_train, y_train)
auc_original = roc_auc_score(y_serve, model_original.predict_proba(X_serve)[:, 1])

# Retrain on new serving data (simulate labeled retraining set)
model_retrained = RandomForestClassifier(n_estimators=50, random_state=0).fit(X_serve[:400], y_serve[:400])
auc_retrained = roc_auc_score(y_serve[400:], model_retrained.predict_proba(X_serve[400:])[:, 1])

print(f'\nAUC before retraining: {auc_original:.4f}')
print(f'AUC after retraining:  {auc_retrained:.4f}')
print(f'AUC improvement: {auc_retrained - auc_original:+.4f}')
print('Key insight: PSI-triggered retraining recalibrates the model to the new data distribution')

05 β€” Feature StoresΒΆ

import pandas as pd
from datetime import datetime, timedelta

# Exercise 1: Feature Freshness Check β€” Flag Features Not Updated in >24h

class FeatureStore:
    def __init__(self):
        self.features = {}  # {feature_name: {'value': ..., 'updated_at': datetime}}

    def set_feature(self, name: str, value, updated_at: datetime = None):
        self.features[name] = {'value': value, 'updated_at': updated_at or datetime.utcnow()}

    def check_freshness(self, max_age_hours: float = 24.0) -> dict:
        now = datetime.utcnow()
        report = {}
        for name, meta in self.features.items():
            age_hours = (now - meta['updated_at']).total_seconds() / 3600
            report[name] = {
                'age_hours': round(age_hours, 1),
                'stale': age_hours > max_age_hours,
                'last_updated': meta['updated_at'].isoformat(),
            }
        return report

now = datetime.utcnow()
store = FeatureStore()
store.set_feature('age_bucket',        42, now - timedelta(hours=2))
store.set_feature('purchase_count_7d', 15, now - timedelta(hours=12))
store.set_feature('last_login_days',    3, now - timedelta(hours=30))  # stale!
store.set_feature('account_tenure_d', 365, now - timedelta(hours=72)) # stale!

freshness = store.check_freshness(max_age_hours=24)
print('Feature Freshness Report:')
for feat, info in freshness.items():
    status = 'STALE' if info['stale'] else 'OK'
    print(f'  {feat:<25} age={info["age_hours"]:5.1f}h  [{status}]')
# Exercise 2: Lag Features with Point-in-Time Semantics (lag-7d, lag-30d, lag-90d)

np.random.seed(42)

# Simulate daily purchase events
date_range = pd.date_range('2024-01-01', '2024-04-30', freq='D')
events = pd.DataFrame({
    'customer_id': np.random.choice(['C001', 'C002', 'C003'], len(date_range)),
    'event_date': date_range,
    'purchase_amount': np.random.exponential(50, len(date_range)),
})

def compute_lag_features(events: pd.DataFrame, as_of_date: datetime, customer_id: str):
    """
    Point-in-time lag features: only use data BEFORE as_of_date.
    Prevents label leakage by excluding future data.
    """
    past = events[
        (events['customer_id'] == customer_id) &
        (events['event_date'] < pd.Timestamp(as_of_date))
    ]
    lags = {}
    for lag_days, label in [(7, 'lag7d'), (30, 'lag30d'), (90, 'lag90d')]:
        cutoff = pd.Timestamp(as_of_date) - timedelta(days=lag_days)
        window = past[past['event_date'] >= cutoff]
        lags[f'purchase_sum_{label}']   = round(window['purchase_amount'].sum(), 2)
        lags[f'purchase_count_{label}'] = len(window)
    return lags

# Compute features as of two different dates to verify point-in-time correctness
for as_of in ['2024-02-15', '2024-04-01']:
    feats = compute_lag_features(events, datetime.strptime(as_of, '%Y-%m-%d'), 'C001')
    print(f'\nFeatures as of {as_of} for C001:')
    for k, v in feats.items():
        print(f'  {k}: {v}')

print('\nKey insight: point-in-time correctness prevents data leakage in historical training datasets')
# Exercise 3: Training-Serving Skew Detection using PSI

np.random.seed(0)

# Simulate training features and serving features with some skew
n_train, n_serve = 5000, 1000
feature_specs = {
    'age':             (np.random.normal(35, 10, n_train), np.random.normal(35, 10, n_serve)),       # no skew
    'income':          (np.random.lognormal(10, 1, n_train), np.random.lognormal(10.3, 1, n_serve)), # slight skew
    'session_duration':(np.random.exponential(5, n_train),   np.random.exponential(8, n_serve)),    # moderate skew
    'page_views':      (np.random.poisson(10, n_train),      np.random.poisson(15, n_serve)),        # significant skew
    'purchase_history':(np.random.binomial(5, 0.3, n_train), np.random.binomial(5, 0.3, n_serve)),  # no skew
}

print('Training-Serving Skew Report (PSI per feature):')
print(f'{'Feature':<20} {'PSI':>8} {'Status':>20}')
print('-' * 50)
skewed_features = []
for feat, (train_vals, serve_vals) in feature_specs.items():
    psi = compute_psi(train_vals, serve_vals)
    if psi > 0.1:
        status = 'SKEW DETECTED'
        skewed_features.append(feat)
    else:
        status = 'OK'
    print(f'{feat:<20} {psi:>8.4f} {status:>20}')

print(f'\nFeatures with PSI > 0.1 (flag for investigation): {skewed_features}')
print('Key insight: skew in high-PSI features indicates preprocessing differences or data pipeline bugs')
# Exercise 4: Real-Time Session Feature β€” Sum of Purchases in Last 30 Minutes

import time
from collections import defaultdict

class SessionFeatureStore:
    """In-memory rolling window feature store for real-time session features."""
    def __init__(self, window_minutes: int = 30):
        self.window_seconds = window_minutes * 60
        # {customer_id: deque of (timestamp, amount)}
        self.events = defaultdict(list)

    def record_purchase(self, customer_id: str, amount: float, timestamp: float = None):
        ts = timestamp or time.time()
        self.events[customer_id].append((ts, amount))

    def _evict_old(self, customer_id: str, now: float):
        cutoff = now - self.window_seconds
        self.events[customer_id] = [
            (ts, amt) for ts, amt in self.events[customer_id] if ts > cutoff
        ]

    def get_session_sum(self, customer_id: str, now: float = None) -> float:
        t = now or time.time()
        self._evict_old(customer_id, t)
        return sum(amt for _, amt in self.events[customer_id])

    def get_session_count(self, customer_id: str, now: float = None) -> int:
        t = now or time.time()
        self._evict_old(customer_id, t)
        return len(self.events[customer_id])

# Simulate purchase events
now_ts = time.time()
session_store = SessionFeatureStore(window_minutes=30)

# C001 had purchases 10m, 25m, 35m ago
session_store.record_purchase('C001', 29.99, now_ts - 10*60)
session_store.record_purchase('C001', 49.50, now_ts - 25*60)
session_store.record_purchase('C001', 15.00, now_ts - 35*60)  # outside 30min window
session_store.record_purchase('C002', 99.00, now_ts - 5*60)

print('Real-time session features (30-minute window):')
for customer in ['C001', 'C002', 'C003']:
    total = session_store.get_session_sum(customer, now_ts)
    count = session_store.get_session_count(customer, now_ts)
    print(f'  {customer}: purchases_30m_count={count}, purchases_30m_sum=${total:.2f}')
print('Key insight: C001\'s $15 purchase (35m ago) is correctly excluded from the 30-min window')
# Exercise 5: Feature Versioning β€” v1 and v2 Coexist in Registry

from typing import Callable, Any
from dataclasses import dataclass, field

@dataclass
class FeatureDefinition:
    name: str
    version: str
    description: str
    compute_fn: Callable
    dependencies: list = field(default_factory=list)

class FeatureRegistry:
    def __init__(self):
        self._registry = {}  # {(name, version): FeatureDefinition}

    def register(self, feat_def: FeatureDefinition):
        key = (feat_def.name, feat_def.version)
        self._registry[key] = feat_def
        print(f'Registered: {feat_def.name}@{feat_def.version} β€” {feat_def.description}')

    def get(self, name: str, version: str = 'latest') -> FeatureDefinition:
        if version == 'latest':
            versions = [k[1] for k in self._registry if k[0] == name]
            if not versions:
                raise KeyError(f'Feature {name} not found')
            version = sorted(versions)[-1]
        return self._registry[(name, version)]

    def compute(self, name: str, version: str, raw_data: dict) -> Any:
        feat_def = self.get(name, version)
        return feat_def.compute_fn(raw_data)

    def list_features(self):
        for (name, ver), fd in sorted(self._registry.items()):
            print(f'  {name:<25} v{ver}  {fd.description}')

registry = FeatureRegistry()

# Feature: customer_age_days β€” v1 uses account creation, v2 uses verified registration date
registry.register(FeatureDefinition(
    name='customer_age_days', version='1',
    description='Days since account creation',
    compute_fn=lambda d: (datetime.utcnow() - datetime.fromisoformat(d['created_at'])).days
))
registry.register(FeatureDefinition(
    name='customer_age_days', version='2',
    description='Days since verified registration (more accurate)',
    compute_fn=lambda d: (datetime.utcnow() - datetime.fromisoformat(d.get('verified_at', d['created_at']))).days
))

# Feature: purchase_rate_30d β€” v1 raw count, v2 smoothed with Laplace
registry.register(FeatureDefinition(
    name='purchase_rate_30d', version='1',
    description='Raw purchase count per 30d',
    compute_fn=lambda d: d['purchases_30d']
))
registry.register(FeatureDefinition(
    name='purchase_rate_30d', version='2',
    description='Laplace-smoothed purchase rate (handles cold start)',
    compute_fn=lambda d: (d['purchases_30d'] + 1) / (d.get('days_active', 30) + 2)
))

print('\nFeature Registry:')
registry.list_features()

# Test compute on sample data
sample = {
    'created_at': '2023-01-15',
    'verified_at': '2023-01-20',
    'purchases_30d': 5,
    'days_active': 30,
}

print('\nFeature values for sample customer:')
for feat in ['customer_age_days', 'purchase_rate_30d']:
    for ver in ['1', '2']:
        val = registry.compute(feat, ver, sample)
        print(f'  {feat} v{ver}: {val}')

print('\nKey insight: coexisting versions enable A/B testing of feature logic and safe migration')