Solutions: Statistics & MLOps TrackΒΆ
Worked solutions to all exercises from the statistics-mlops/ notebooks.
01 β Hypothesis TestingΒΆ
import numpy as np
import matplotlib.pyplot as plt
from scipy import stats
np.random.seed(42)
# Exercise 1: Permutation test for difference in medians; compare to Mann-Whitney
# Synthetic data: two groups with different medians
group_a = np.concatenate([np.random.normal(10, 3, 50), [25, 28]]) # right-skewed
group_b = np.concatenate([np.random.normal(12, 3, 50), [27, 30]]) # slightly higher
def permutation_test_median(a, b, n_permutations=10000):
observed = np.median(b) - np.median(a)
combined = np.concatenate([a, b])
n_a = len(a)
count = 0
for _ in range(n_permutations):
np.random.shuffle(combined)
perm_diff = np.median(combined[n_a:]) - np.median(combined[:n_a])
if abs(perm_diff) >= abs(observed): # two-sided
count += 1
return observed, count / n_permutations
obs_diff, perm_pval = permutation_test_median(group_a, group_b)
mw_stat, mw_pval = stats.mannwhitneyu(group_a, group_b, alternative='two-sided')
print(f'Observed median difference: {obs_diff:.3f}')
print(f'Permutation test p-value: {perm_pval:.4f}')
print(f'Mann-Whitney p-value: {mw_pval:.4f}')
print('Key insight: Both test rank/distributional differences without normality assumption.')
print('Mann-Whitney tests stochastic dominance; permutation test directly tests the median statistic.')
# Exercise 2: Statistical Power as a Function of Sample Size
# Find n for 80% power at Cohen's d = 0.2 (small effect)
from scipy.stats import norm
def compute_power(n, effect_size=0.2, alpha=0.05):
"""Power for two-sample t-test (equal groups, two-sided)."""
z_alpha = norm.ppf(1 - alpha / 2)
se = np.sqrt(2 / n)
# Non-centrality parameter
ncp = effect_size / se
power = 1 - norm.cdf(z_alpha - ncp) + norm.cdf(-z_alpha - ncp)
return power
sample_sizes = np.arange(10, 800, 10)
powers = [compute_power(n) for n in sample_sizes]
# Find minimum n for 80% power
n_80 = next(n for n, p in zip(sample_sizes, powers) if p >= 0.80)
plt.figure(figsize=(8, 4))
plt.plot(sample_sizes, powers, label='Power (d=0.2, alpha=0.05)')
plt.axhline(0.80, color='r', linestyle='--', label='80% power')
plt.axvline(n_80, color='g', linestyle='--', label=f'n={n_80} per group')
plt.xlabel('Sample size per group'); plt.ylabel('Statistical Power')
plt.title('Power Curve: Effect Size d=0.2')
plt.legend(); plt.tight_layout(); plt.show()
print(f'Minimum n per group for 80% power (d=0.2): {n_80}')
print('Key insight: small effects require ~394 per group β power analysis before experimentation is critical')
# Exercise 3: Two-Sided vs One-Sided Tests β When Each is Appropriate
np.random.seed(0)
baseline_ctr = 0.05
n = 1000
# Business scenario A: Testing a NEW checkout button β could help OR hurt
# β Two-sided test: we care about both positive and negative effects
control_a = np.random.binomial(1, 0.050, n)
variant_a = np.random.binomial(1, 0.055, n) # slight increase
_, p_two = stats.ttest_ind(control_a, variant_a, alternative='two-sided')
# Business scenario B: Testing a new recommendation engine β we only care if it IMPROVES revenue
# We would NOT roll back unless it causes serious harm (separate quality check)
# β One-sided test: H1: variant > control
control_b = np.random.binomial(1, 0.050, n)
variant_b = np.random.binomial(1, 0.060, n) # clearer increase
_, p_one = stats.ttest_ind(control_b, variant_b, alternative='less') # H1: control < variant
print('Scenario A: Checkout button (two-sided β could hurt UX)')
print(f' p-value (two-sided): {p_two:.4f}')
print(f' Significant at 0.05: {p_two < 0.05}')
print()
print('Scenario B: Recommendation engine (one-sided β we only care about improvement)')
print(f' p-value (one-sided): {p_one:.4f}')
print(f' Significant at 0.05: {p_one < 0.05}')
print()
print('Key insight: one-sided test has more power BUT is only valid when you CANNOT care about')
print('the other direction. Using it to "cheat" significance inflates Type I error.')
# Exercise 4: Multiple Testing Correction β Bonferroni, Holm, BH-FDR
from statsmodels.stats.multitest import multipletests
np.random.seed(7)
# Simulate 20 A/B tests: 18 null (H0 true), 2 real effects
n_tests = 20
n_null = 18
null_pvals = np.random.uniform(0, 1, n_null)
effect_pvals = np.array([0.008, 0.031]) # true effects
all_pvals = np.concatenate([null_pvals, effect_pvals])
true_positives = set([n_null, n_null + 1]) # last two are real
methods = [
('Bonferroni', 'bonferroni'),
('Holm', 'holm'),
('BH-FDR', 'fdr_bh'),
]
print(f'{'Method':<12} {'Reject count':>13} {'True positive':>14} {'False positive':>15}')
print('-' * 57)
for name, method in methods:
reject, _, _, _ = multipletests(all_pvals, alpha=0.05, method=method)
rejected_idx = set(np.where(reject)[0])
tp = len(rejected_idx & true_positives)
fp = len(rejected_idx - true_positives)
print(f'{name:<12} {sum(reject):>13} {tp:>14} {fp:>15}')
print(f'\nNo correction: {sum(all_pvals < 0.05)} rejections (expected ~{n_null*0.05:.1f} false positives from nulls)')
print('Key insight: Bonferroni most conservative, BH-FDR best balances power vs false discovery rate')
# Exercise 5: A/A Test Simulation β Daily Peeking False Positive Rate
# Simulate 1000 A/A tests with daily peeking over 14 days
# Each day: test if cumulative data shows p<0.05 β this is "peeking" (optional stopping)
n_simulations = 1000
n_days = 14
daily_visitors = 100 # per arm
true_ctr = 0.10
alpha = 0.05
false_positives_peeking = 0
false_positives_fixed = 0
np.random.seed(99)
for _ in range(n_simulations):
ctrl_data, var_data = [], []
peek_significant = False
for day in range(n_days):
ctrl_data.extend(np.random.binomial(1, true_ctr, daily_visitors))
var_data.extend(np.random.binomial(1, true_ctr, daily_visitors)) # A/A: same rate
# Peek at each day
if len(ctrl_data) >= 10:
_, p = stats.ttest_ind(ctrl_data, var_data)
if p < alpha:
peek_significant = True # any day shows significance
# Fixed horizon: only look at end of experiment
_, p_final = stats.ttest_ind(ctrl_data, var_data)
if peek_significant: false_positives_peeking += 1
if p_final < alpha: false_positives_fixed += 1
print(f'A/A test simulation: {n_simulations} experiments, {n_days} days, daily peeking')
print(f'False positive rate with daily peeking: {false_positives_peeking/n_simulations:.1%}')
print(f'False positive rate with fixed horizon: {false_positives_fixed/n_simulations:.1%}')
print(f'Expected at alpha=0.05: ~5%')
print('Key insight: peeking dramatically inflates Type I error β always pre-register sample size')
02 β Bayesian ThinkingΒΆ
from scipy.stats import beta as beta_dist
# Exercise 1: Beta-Binomial Sequential Update: Prior β Posterior as n grows 1β100
# True conversion rate: 0.3; Prior: Beta(2, 5) β slightly conservative
true_p = 0.30
prior_a, prior_b = 2, 5
np.random.seed(0)
observations = np.random.binomial(1, true_p, 100)
checkpoints = [1, 5, 10, 20, 50, 100]
x = np.linspace(0, 1, 300)
plt.figure(figsize=(12, 4))
for i, n in enumerate(checkpoints):
k = observations[:n].sum()
post_a = prior_a + k
post_b = prior_b + (n - k)
pdf = beta_dist.pdf(x, post_a, post_b)
mean = post_a / (post_a + post_b)
plt.plot(x, pdf, label=f'n={n} (mean={mean:.2f})')
plt.axvline(true_p, color='black', linestyle='--', label=f'True p={true_p}')
plt.xlabel('Conversion Rate'); plt.ylabel('Density')
plt.title('Beta-Binomial Sequential Update: Posterior Evolution n=1β100')
plt.legend(fontsize=8); plt.tight_layout(); plt.show()
print('Key insight: posterior concentrates around truth as n grows; prior influence shrinks')
# Exercise 2: Bayesian Credible Interval vs Frequentist CI for Conversion Rate
np.random.seed(5)
true_rate = 0.12
scenarios = [(20, 3), (50, 7), (100, 13), (500, 62), (1000, 121)]
print(f'{'n':>5} {'k':>5} {'Freq CI':>24} {'Bayes CrI (Beta(1,1))':>25} {'Bayes Mean':>12}')
print('-' * 75)
for n, k in scenarios:
p_hat = k / n
# Frequentist: Wilson score interval
z = 1.96
denom = 1 + z**2/n
center = (p_hat + z**2/(2*n)) / denom
margin = (z * np.sqrt(p_hat*(1-p_hat)/n + z**2/(4*n**2))) / denom
freq_lo, freq_hi = center - margin, center + margin
# Bayesian: Beta(1,1) prior (uniform) β Beta(1+k, 1+n-k) posterior
pa, pb = 1 + k, 1 + n - k
bay_lo, bay_hi = beta_dist.ppf(0.025, pa, pb), beta_dist.ppf(0.975, pa, pb)
bay_mean = pa / (pa + pb)
print(f'{n:>5} {k:>5} [{freq_lo:.3f}, {freq_hi:.3f}] [{bay_lo:.3f}, {bay_hi:.3f}] {bay_mean:.4f}')
print('\nKey insight: Bayesian CrI has direct probability interpretation (P(theta in CrI)=0.95);')
print('Frequentist CI means 95% of such intervals contain the true parameter (over repeated samples).')
# Exercise 3: Metropolis-Hastings for 2D Bivariate Normal Posterior
def log_target(x, rho=0.7):
"""Log-density of bivariate normal with correlation rho."""
cov = np.array([[1, rho], [rho, 1]])
diff = x.reshape(-1)
return -0.5 * diff @ np.linalg.inv(cov) @ diff
def metropolis_hastings(n_samples=10000, proposal_std=0.5, rho=0.7):
samples = np.zeros((n_samples, 2))
current = np.array([0.0, 0.0])
accepted = 0
for i in range(n_samples):
proposal = current + np.random.normal(0, proposal_std, 2)
log_accept = log_target(proposal, rho) - log_target(current, rho)
if np.log(np.random.uniform()) < log_accept:
current = proposal
accepted += 1
samples[i] = current
return samples, accepted / n_samples
np.random.seed(42)
samples, accept_rate = metropolis_hastings(n_samples=10000, rho=0.7)
burnin = 1000
samples = samples[burnin:]
fig, axes = plt.subplots(1, 3, figsize=(14, 4))
axes[0].scatter(samples[:, 0], samples[:, 1], alpha=0.05, s=5)
axes[0].set_title('MH Samples (2D correlated posterior)'); axes[0].set_xlabel('x1'); axes[0].set_ylabel('x2')
axes[1].plot(samples[:500, 0]); axes[1].set_title('Trace plot x1'); axes[1].set_xlabel('iteration')
axes[2].hist(samples[:, 0], bins=50, density=True)
xx = np.linspace(-4, 4, 200)
axes[2].plot(xx, stats.norm.pdf(xx), 'r-', label='True marginal N(0,1)')
axes[2].set_title('Marginal of x1'); axes[2].legend()
plt.tight_layout(); plt.show()
print(f'Acceptance rate: {accept_rate:.2%} (target: 20-50% for MCMC efficiency)')
print(f'Sample mean: x1={samples[:,0].mean():.3f}, x2={samples[:,1].mean():.3f} (expect ~0,0)')
print(f'Sample correlation: {np.corrcoef(samples[:,0], samples[:,1])[0,1]:.3f} (expect ~0.7)')
# Exercise 4: Bayesian A/B Test β P(B>A) and Expected Loss for 5 Scenarios
def bayesian_ab_test(n_a, k_a, n_b, k_b, n_samples=50000):
"""Beta-Binomial Bayesian A/B test with uniform priors Beta(1,1)."""
a_samples = np.random.beta(1 + k_a, 1 + n_a - k_a, n_samples)
b_samples = np.random.beta(1 + k_b, 1 + n_b - k_b, n_samples)
p_b_wins = (b_samples > a_samples).mean()
# Expected loss of choosing A when B is better (opportunity cost)
loss_a = np.maximum(b_samples - a_samples, 0).mean() # loss if we choose A
loss_b = np.maximum(a_samples - b_samples, 0).mean() # loss if we choose B
return p_b_wins, loss_a, loss_b
scenarios = [
(100, 5, 100, 7), # small n, small effect
(500, 50, 500, 65), # medium n, moderate effect
(1000, 100, 1000, 130), # large n, clear effect
(200, 20, 200, 20), # null result
(300, 15, 300, 30), # strong relative lift
]
print(f'{'Scenario':<30} {'P(B>A)':>8} {'Loss(A)':>10} {'Loss(B)':>10} {'Decision'}')
print('-' * 70)
np.random.seed(0)
for n_a, k_a, n_b, k_b in scenarios:
p_b, l_a, l_b = bayesian_ab_test(n_a, k_a, n_b, k_b)
decision = 'Launch B' if p_b > 0.95 else ('Keep A' if p_b < 0.05 else 'Need more data')
label = f'(n_a={n_a},k_a={k_a},n_b={n_b},k_b={k_b})'
print(f'{label:<30} {p_b:>8.3f} {l_a:>10.4f} {l_b:>10.4f} {decision}')
print('\nKey insight: Expected loss gives a business-interpretable threshold vs arbitrary p-values')
# Exercise 5: Sensitivity Analysis β Posterior Under Different Beta Priors
# Data: n=30 trials, k=9 successes (p_hat = 0.30)
n_obs, k_obs = 30, 9
priors = [
('Uniform Beta(1,1)', 1, 1),
('Weak prior Beta(2,2)', 2, 2),
('Strong low Beta(2,8)', 2, 8),
('Strong high Beta(8,2)',8, 2),
('Jeffreys Beta(0.5,0.5)', 0.5, 0.5),
]
x = np.linspace(0.01, 0.99, 300)
plt.figure(figsize=(10, 4))
print(f'{'Prior':<26} {'Post Mean':>10} {'95% CrI':>24} {'Agrees truth (0.3)?':>20}')
print('-' * 82)
for name, pa, pb in priors:
post_a = pa + k_obs
post_b = pb + (n_obs - k_obs)
pdf = beta_dist.pdf(x, post_a, post_b)
plt.plot(x, pdf, label=name)
mean = post_a / (post_a + post_b)
lo, hi = beta_dist.ppf(0.025, post_a, post_b), beta_dist.ppf(0.975, post_a, post_b)
print(f'{name:<26} {mean:>10.3f} [{lo:.3f}, {hi:.3f}] {lo < 0.3 < hi}')
plt.axvline(0.3, color='k', linestyle='--', label='True rate')
plt.xlabel('p'); plt.ylabel('Posterior Density')
plt.title('Sensitivity to Prior: n=30, k=9')
plt.legend(fontsize=8); plt.tight_layout(); plt.show()
print('Key insight: with n=30, strong priors still influence posterior; as n grows, posteriors converge')
03 β Model Deployment with FastAPIΒΆ
# Exercise 1: /metrics endpoint with in-memory counters
# Simulated β shows how to implement in a FastAPI app without running a server.
import time
from collections import deque
class MetricsCollector:
def __init__(self, window_seconds=60):
self.total_requests = 0
self.total_errors = 0
self.latencies = deque() # (timestamp, latency_ms)
self.window = window_seconds
def record(self, latency_ms: float, is_error: bool = False):
now = time.time()
self.total_requests += 1
if is_error: self.total_errors += 1
self.latencies.append((now, latency_ms))
# Evict old entries outside window
cutoff = now - self.window
while self.latencies and self.latencies[0][0] < cutoff:
self.latencies.popleft()
def get_metrics(self):
recent_latencies = [l for _, l in self.latencies]
avg_latency = np.mean(recent_latencies) if recent_latencies else 0
return {
'total_requests': self.total_requests,
'total_errors': self.total_errors,
'error_rate': self.total_errors / max(self.total_requests, 1),
'avg_latency_ms': round(avg_latency, 2),
'requests_in_window': len(recent_latencies),
}
# FastAPI pseudocode illustrating implementation pattern:
fastapi_metrics_code = '''
from fastapi import FastAPI, Request
import time
app = FastAPI()
metrics = MetricsCollector()
@app.middleware("http")
async def track_metrics(request: Request, call_next):
start = time.perf_counter()
response = await call_next(request)
latency = (time.perf_counter() - start) * 1000
metrics.record(latency, is_error=(response.status_code >= 400))
return response
@app.get("/metrics")
def get_metrics():
return metrics.get_metrics()
'''
# Simulate a stream of requests
metrics = MetricsCollector()
np.random.seed(0)
for _ in range(100):
latency = np.random.lognormal(mean=3.5, sigma=0.5) # ~33ms median
is_error = np.random.random() < 0.03 # 3% error rate
metrics.record(latency, is_error)
print('/metrics endpoint response:')
import json
print(json.dumps(metrics.get_metrics(), indent=2))
# Exercise 2: Token Bucket Rate Limiter (100 req/min per IP)
import time
from dataclasses import dataclass, field
from typing import Dict
@dataclass
class TokenBucket:
capacity: float = 100.0
refill_rate: float = 100.0 / 60.0 # tokens per second
tokens: float = field(default=None)
last_refill: float = field(default_factory=time.time)
def __post_init__(self):
if self.tokens is None:
self.tokens = self.capacity
def consume(self, cost: float = 1.0) -> bool:
now = time.time()
elapsed = now - self.last_refill
self.tokens = min(self.capacity, self.tokens + elapsed * self.refill_rate)
self.last_refill = now
if self.tokens >= cost:
self.tokens -= cost
return True # allowed
return False # rate limited
class RateLimiter:
def __init__(self):
self.buckets: Dict[str, TokenBucket] = {}
def is_allowed(self, ip: str) -> bool:
if ip not in self.buckets:
self.buckets[ip] = TokenBucket()
return self.buckets[ip].consume()
limiter = RateLimiter()
# Test: rapid burst from one IP
ip = '192.168.1.1'
allowed_burst = sum(1 for _ in range(120) if limiter.is_allowed(ip))
print(f'Allowed in rapid burst of 120 requests: {allowed_burst} (bucket capacity=100)')
# Simulate different IP
ip2 = '10.0.0.2'
allowed_ip2 = sum(1 for _ in range(50) if limiter.is_allowed(ip2))
print(f'Different IP allowed 50 requests: {allowed_ip2} (separate bucket, unaffected)')
print('Key insight: token bucket allows bursts up to capacity while enforcing long-term rate')
# Exercise 3: Input Validation β Reject requests with features >10 std devs from training
from sklearn.datasets import make_classification
from sklearn.preprocessing import StandardScaler
# Simulate training data statistics
np.random.seed(0)
X_train, _ = make_classification(n_samples=1000, n_features=5, random_state=0)
scaler = StandardScaler().fit(X_train)
train_mean = scaler.mean_
train_std = scaler.scale_
class InputValidator:
def __init__(self, mean, std, threshold=10.0):
self.mean = mean
self.std = std
self.threshold = threshold
def validate(self, features: np.ndarray):
z_scores = np.abs((features - self.mean) / self.std)
violating = np.where(z_scores > self.threshold)[0]
if len(violating) > 0:
return False, {
'error': 'Feature out of expected range',
'violating_features': violating.tolist(),
'z_scores': z_scores[violating].tolist(),
}
return True, {'z_scores_max': float(z_scores.max())}
validator = InputValidator(train_mean, train_std)
# Valid request
valid_input = np.array([0.1, -0.2, 0.5, -0.1, 0.3])
ok, info = validator.validate(valid_input)
print(f'Valid input: allowed={ok}, max_z={info.get("z_scores_max", "N/A"):.2f}')
# Adversarial/corrupted input
bad_input = np.array([0.1, 150.0, 0.5, -0.1, 0.3]) # feature 1 is extreme
ok2, info2 = validator.validate(bad_input)
print(f'Bad input: allowed={ok2}, detail={info2}')
print('Key insight: z-score validation catches data drift, sensor faults, and adversarial inputs')
# Exercise 4: Model Versioning β /predict/v1 and /predict/v2
from sklearn.linear_model import LogisticRegression
from sklearn.ensemble import RandomForestClassifier
import pickle, io
np.random.seed(42)
X, y = make_classification(n_samples=500, n_features=10, random_state=42)
# Train two model versions
model_v1 = LogisticRegression(max_iter=1000).fit(X, y)
model_v2 = RandomForestClassifier(n_estimators=50, random_state=42).fit(X, y)
# Serialize to bytes (simulating model file storage)
def serialize_model(model):
buf = io.BytesIO()
pickle.dump(model, buf)
return buf.getvalue()
MODEL_REGISTRY = {
'v1': serialize_model(model_v1),
'v2': serialize_model(model_v2),
}
def predict_versioned(version: str, features: np.ndarray):
if version not in MODEL_REGISTRY:
raise ValueError(f'Unknown model version: {version}. Available: {list(MODEL_REGISTRY)}')
model = pickle.loads(MODEL_REGISTRY[version])
proba = model.predict_proba(features)[:, 1]
pred = (proba > 0.5).astype(int)
return {'version': version, 'predictions': pred.tolist(), 'probabilities': proba.round(3).tolist()}
# Simulate API calls
test_batch = X[:3]
resp_v1 = predict_versioned('v1', test_batch)
resp_v2 = predict_versioned('v2', test_batch)
print('GET /predict/v1')
print(f' predictions: {resp_v1["predictions"]}')
print(f' probabilities: {resp_v1["probabilities"]}')
print('GET /predict/v2')
print(f' predictions: {resp_v2["predictions"]}')
print(f' probabilities: {resp_v2["probabilities"]}')
print('Key insight: versioned endpoints allow shadow deployments and gradual traffic shifting')
# Exercise 5: Load Test Simulation β p50/p95/p99 Latency
import concurrent.futures
def simulate_request(request_id: int) -> float:
"""Simulate model inference latency (log-normal distribution)."""
np.random.seed(request_id)
base_latency = np.random.lognormal(mean=3.2, sigma=0.6) # ~24ms median
# Occasional slow requests (10% chance of 5x slowdown β GC pauses, etc.)
if np.random.random() < 0.10:
base_latency *= 5
return base_latency
# Simulate concurrent load test
n_requests = 500
max_workers = 20 # concurrent "users"
start = time.perf_counter()
with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor:
latencies = list(executor.map(simulate_request, range(n_requests)))
total_time = time.perf_counter() - start
latencies = np.array(latencies)
print(f'Load test: {n_requests} requests, {max_workers} concurrent workers')
print(f'Total time: {total_time:.2f}s | Throughput: {n_requests/total_time:.0f} req/s')
print(f'p50 latency: {np.percentile(latencies, 50):>8.1f} ms')
print(f'p90 latency: {np.percentile(latencies, 90):>8.1f} ms')
print(f'p95 latency: {np.percentile(latencies, 95):>8.1f} ms')
print(f'p99 latency: {np.percentile(latencies, 99):>8.1f} ms')
print(f'p999 latency: {np.percentile(latencies, 99.9):>8.1f} ms')
plt.figure(figsize=(8, 3))
plt.hist(latencies, bins=50, density=True)
for pct, color in [(50,'g'),(95,'orange'),(99,'r')]:
v = np.percentile(latencies, pct)
plt.axvline(v, color=color, linestyle='--', label=f'p{pct}={v:.0f}ms')
plt.xlabel('Latency (ms)'); plt.ylabel('Density')
plt.title('Request Latency Distribution')
plt.legend(); plt.tight_layout(); plt.show()
print('Key insight: tail latencies (p99) can be 5-10x median β optimize for tail, not mean')
04 β ML Monitoring & Drift DetectionΒΆ
# Exercise 1: PSI (Population Stability Index) vs KS Test
def compute_psi(reference, current, n_bins=10, epsilon=1e-7):
"""Compute PSI between reference and current distributions."""
bins = np.linspace(
min(reference.min(), current.min()),
max(reference.max(), current.max()),
n_bins + 1
)
ref_counts, _ = np.histogram(reference, bins=bins)
cur_counts, _ = np.histogram(current, bins=bins)
ref_pct = ref_counts / len(reference) + epsilon
cur_pct = cur_counts / len(current) + epsilon
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return psi
np.random.seed(42)
reference = np.random.normal(0, 1, 1000)
drift_scenarios = [
('No drift', np.random.normal(0, 1, 1000)),
('Slight drift', np.random.normal(0.3, 1, 1000)),
('Moderate drift',np.random.normal(0.7, 1, 1000)),
('Severe drift', np.random.normal(1.5, 1.5, 1000)),
]
print(f'{'Scenario':<20} {'PSI':>8} {'PSI Interp':>20} {'KS stat':>10} {'KS p-value':>12}')
print('-' * 74)
for name, current in drift_scenarios:
psi = compute_psi(reference, current)
ks_stat, ks_p = stats.ks_2samp(reference, current)
interp = 'Stable' if psi < 0.1 else ('Minor shift' if psi < 0.2 else 'Significant drift')
print(f'{name:<20} {psi:>8.4f} {interp:>20} {ks_stat:>10.4f} {ks_p:>12.4f}')
print('\nPSI thresholds: <0.1 stable, 0.1-0.2 minor, >0.2 significant')
print('Key insight: PSI gives interpretable magnitude of drift; KS gives hypothesis test p-value')
# Exercise 2: Sliding Window Monitor β Alert if Drift in 3+ Consecutive Windows
class SlidingWindowDriftMonitor:
def __init__(self, window_size=500, drift_threshold=0.2, consecutive=3):
self.window_size = window_size
self.drift_threshold = drift_threshold
self.consecutive = consecutive
self.reference = None
self.drift_streak = 0
self.alerts = []
def set_reference(self, data):
self.reference = data
def check_window(self, window_data, window_id):
if self.reference is None:
raise ValueError('Set reference first')
psi = compute_psi(self.reference, window_data)
is_drift = psi > self.drift_threshold
self.drift_streak = self.drift_streak + 1 if is_drift else 0
alert = self.drift_streak >= self.consecutive
if alert:
self.alerts.append(window_id)
return {'window': window_id, 'psi': psi, 'drift': is_drift, 'streak': self.drift_streak, 'alert': alert}
# Simulate: stable β gradual drift starting at window 5
np.random.seed(0)
reference = np.random.normal(0, 1, 2000)
monitor = SlidingWindowDriftMonitor(window_size=500, drift_threshold=0.2, consecutive=3)
monitor.set_reference(reference)
print(f'{'Window':>8} {'PSI':>8} {'Drift':>8} {'Streak':>8} {'ALERT':>8}')
print('-' * 45)
for i in range(10):
shift = 0.8 * max(0, i - 4) # drift starts at window 4
window = np.random.normal(shift, 1, 500)
result = monitor.check_window(window, window_id=i)
flag = '*** ALERT ***' if result['alert'] else ''
print(f'{i:>8} {result["psi"]:>8.4f} {str(result["drift"]):>8} {result["streak"]:>8} {flag}')
# Exercise 3: Concept Drift Detection using DDM (Drift Detection Method)
class DDM:
"""Drift Detection Method (Gama et al., 2004)."""
def __init__(self, warning_level=2.0, drift_level=3.0):
self.warning_level = warning_level
self.drift_level = drift_level
self.reset()
def reset(self):
self.n = 0
self.sum_errors = 0
self.min_pn_std = float('inf')
self.p_min = float('inf')
self.s_min = float('inf')
def add_element(self, prediction_correct: bool):
"""Returns: 'normal', 'warning', or 'drift'"""
self.n += 1
error = 0 if prediction_correct else 1
self.sum_errors += error
p = self.sum_errors / self.n
s = np.sqrt(p * (1 - p) / self.n)
if p + s < self.min_pn_std:
self.p_min = p
self.s_min = s
self.min_pn_std = p + s
if self.n < 30:
return 'normal'
if p + s >= self.p_min + self.drift_level * self.s_min:
return 'drift'
if p + s >= self.p_min + self.warning_level * self.s_min:
return 'warning'
return 'normal'
# Simulate: accurate model until step 500, then concept drift
np.random.seed(7)
n_steps = 1000
drift_point = 500
ddm = DDM()
statuses = []
for t in range(n_steps):
if t < drift_point:
correct = np.random.random() < 0.85 # 15% error rate
else:
correct = np.random.random() < 0.60 # drift: 40% error rate
statuses.append(ddm.add_element(correct))
# Find first warning and drift
first_warning = next((i for i, s in enumerate(statuses) if s == 'warning'), None)
first_drift = next((i for i, s in enumerate(statuses) if s == 'drift'), None)
print(f'True drift point: {drift_point}')
print(f'DDM first warning: {first_warning}')
print(f'DDM first drift alert: {first_drift}')
if first_drift:
print(f'Detection lag: {first_drift - drift_point} steps')
print('Key insight: DDM detects drift by tracking error rate bounds from the best-so-far performance')
# Exercise 4: Gradual vs Sudden Drift β Detector Response Comparison
def simulate_stream(n, drift_type, drift_start=300, drift_end=600):
"""Generate binary error stream with gradual or sudden concept drift."""
errors = []
for t in range(n):
if t < drift_start:
error_rate = 0.10
elif drift_type == 'sudden':
error_rate = 0.40
elif drift_type == 'gradual':
progress = min(1.0, (t - drift_start) / (drift_end - drift_start))
error_rate = 0.10 + 0.30 * progress
errors.append(np.random.random() < error_rate)
return errors
def run_ddm_on_stream(errors):
ddm = DDM()
results = [ddm.add_element(not e) for e in errors]
first_warning = next((i for i, s in enumerate(results) if s == 'warning'), None)
first_drift = next((i for i, s in enumerate(results) if s == 'drift'), None)
return results, first_warning, first_drift
np.random.seed(42)
n_steps = 800
for dtype in ['sudden', 'gradual']:
errors = simulate_stream(n_steps, dtype)
statuses, fw, fd = run_ddm_on_stream(errors)
print(f'\n{dtype.capitalize()} drift (starts at 300):')
print(f' First warning: step {fw}')
print(f' First drift: step {fd}')
if fd:
print(f' Detection lag: {fd - 300} steps')
print('\nKey insight: DDM detects sudden drift faster than gradual drift.')
print('For gradual drift, ADWIN (adaptive windowing) performs better as it tracks recent vs historical.')
# Exercise 5: Retraining Trigger β If PSI > 0.2 for any feature, retrain and compare AUC
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
np.random.seed(0)
n_train, n_serve, n_features = 1000, 500, 5
# Training distribution
X_train = np.random.randn(n_train, n_features)
y_train = (X_train[:, 0] + X_train[:, 1] > 0).astype(int)
# Serving data with distribution shift on features 0,1
X_serve = np.random.randn(n_serve, n_features)
X_serve[:, 0] += 1.5 # shift feature 0
X_serve[:, 1] -= 0.8 # shift feature 1
y_serve = (X_serve[:, 0] + X_serve[:, 1] > 0).astype(int)
# Compute PSI per feature
feature_psi = {f'feature_{i}': compute_psi(X_train[:, i], X_serve[:, i])
for i in range(n_features)}
print('PSI per feature:')
for feat, psi in feature_psi.items():
flag = ' <-- TRIGGER RETRAIN' if psi > 0.2 else ''
print(f' {feat}: {psi:.4f}{flag}')
trigger_retrain = any(p > 0.2 for p in feature_psi.values())
print(f'\nRetraining triggered: {trigger_retrain}')
# Train original model (on training distribution)
model_original = RandomForestClassifier(n_estimators=50, random_state=0).fit(X_train, y_train)
auc_original = roc_auc_score(y_serve, model_original.predict_proba(X_serve)[:, 1])
# Retrain on new serving data (simulate labeled retraining set)
model_retrained = RandomForestClassifier(n_estimators=50, random_state=0).fit(X_serve[:400], y_serve[:400])
auc_retrained = roc_auc_score(y_serve[400:], model_retrained.predict_proba(X_serve[400:])[:, 1])
print(f'\nAUC before retraining: {auc_original:.4f}')
print(f'AUC after retraining: {auc_retrained:.4f}')
print(f'AUC improvement: {auc_retrained - auc_original:+.4f}')
print('Key insight: PSI-triggered retraining recalibrates the model to the new data distribution')
05 β Feature StoresΒΆ
import pandas as pd
from datetime import datetime, timedelta
# Exercise 1: Feature Freshness Check β Flag Features Not Updated in >24h
class FeatureStore:
def __init__(self):
self.features = {} # {feature_name: {'value': ..., 'updated_at': datetime}}
def set_feature(self, name: str, value, updated_at: datetime = None):
self.features[name] = {'value': value, 'updated_at': updated_at or datetime.utcnow()}
def check_freshness(self, max_age_hours: float = 24.0) -> dict:
now = datetime.utcnow()
report = {}
for name, meta in self.features.items():
age_hours = (now - meta['updated_at']).total_seconds() / 3600
report[name] = {
'age_hours': round(age_hours, 1),
'stale': age_hours > max_age_hours,
'last_updated': meta['updated_at'].isoformat(),
}
return report
now = datetime.utcnow()
store = FeatureStore()
store.set_feature('age_bucket', 42, now - timedelta(hours=2))
store.set_feature('purchase_count_7d', 15, now - timedelta(hours=12))
store.set_feature('last_login_days', 3, now - timedelta(hours=30)) # stale!
store.set_feature('account_tenure_d', 365, now - timedelta(hours=72)) # stale!
freshness = store.check_freshness(max_age_hours=24)
print('Feature Freshness Report:')
for feat, info in freshness.items():
status = 'STALE' if info['stale'] else 'OK'
print(f' {feat:<25} age={info["age_hours"]:5.1f}h [{status}]')
# Exercise 2: Lag Features with Point-in-Time Semantics (lag-7d, lag-30d, lag-90d)
np.random.seed(42)
# Simulate daily purchase events
date_range = pd.date_range('2024-01-01', '2024-04-30', freq='D')
events = pd.DataFrame({
'customer_id': np.random.choice(['C001', 'C002', 'C003'], len(date_range)),
'event_date': date_range,
'purchase_amount': np.random.exponential(50, len(date_range)),
})
def compute_lag_features(events: pd.DataFrame, as_of_date: datetime, customer_id: str):
"""
Point-in-time lag features: only use data BEFORE as_of_date.
Prevents label leakage by excluding future data.
"""
past = events[
(events['customer_id'] == customer_id) &
(events['event_date'] < pd.Timestamp(as_of_date))
]
lags = {}
for lag_days, label in [(7, 'lag7d'), (30, 'lag30d'), (90, 'lag90d')]:
cutoff = pd.Timestamp(as_of_date) - timedelta(days=lag_days)
window = past[past['event_date'] >= cutoff]
lags[f'purchase_sum_{label}'] = round(window['purchase_amount'].sum(), 2)
lags[f'purchase_count_{label}'] = len(window)
return lags
# Compute features as of two different dates to verify point-in-time correctness
for as_of in ['2024-02-15', '2024-04-01']:
feats = compute_lag_features(events, datetime.strptime(as_of, '%Y-%m-%d'), 'C001')
print(f'\nFeatures as of {as_of} for C001:')
for k, v in feats.items():
print(f' {k}: {v}')
print('\nKey insight: point-in-time correctness prevents data leakage in historical training datasets')
# Exercise 3: Training-Serving Skew Detection using PSI
np.random.seed(0)
# Simulate training features and serving features with some skew
n_train, n_serve = 5000, 1000
feature_specs = {
'age': (np.random.normal(35, 10, n_train), np.random.normal(35, 10, n_serve)), # no skew
'income': (np.random.lognormal(10, 1, n_train), np.random.lognormal(10.3, 1, n_serve)), # slight skew
'session_duration':(np.random.exponential(5, n_train), np.random.exponential(8, n_serve)), # moderate skew
'page_views': (np.random.poisson(10, n_train), np.random.poisson(15, n_serve)), # significant skew
'purchase_history':(np.random.binomial(5, 0.3, n_train), np.random.binomial(5, 0.3, n_serve)), # no skew
}
print('Training-Serving Skew Report (PSI per feature):')
print(f'{'Feature':<20} {'PSI':>8} {'Status':>20}')
print('-' * 50)
skewed_features = []
for feat, (train_vals, serve_vals) in feature_specs.items():
psi = compute_psi(train_vals, serve_vals)
if psi > 0.1:
status = 'SKEW DETECTED'
skewed_features.append(feat)
else:
status = 'OK'
print(f'{feat:<20} {psi:>8.4f} {status:>20}')
print(f'\nFeatures with PSI > 0.1 (flag for investigation): {skewed_features}')
print('Key insight: skew in high-PSI features indicates preprocessing differences or data pipeline bugs')
# Exercise 4: Real-Time Session Feature β Sum of Purchases in Last 30 Minutes
import time
from collections import defaultdict
class SessionFeatureStore:
"""In-memory rolling window feature store for real-time session features."""
def __init__(self, window_minutes: int = 30):
self.window_seconds = window_minutes * 60
# {customer_id: deque of (timestamp, amount)}
self.events = defaultdict(list)
def record_purchase(self, customer_id: str, amount: float, timestamp: float = None):
ts = timestamp or time.time()
self.events[customer_id].append((ts, amount))
def _evict_old(self, customer_id: str, now: float):
cutoff = now - self.window_seconds
self.events[customer_id] = [
(ts, amt) for ts, amt in self.events[customer_id] if ts > cutoff
]
def get_session_sum(self, customer_id: str, now: float = None) -> float:
t = now or time.time()
self._evict_old(customer_id, t)
return sum(amt for _, amt in self.events[customer_id])
def get_session_count(self, customer_id: str, now: float = None) -> int:
t = now or time.time()
self._evict_old(customer_id, t)
return len(self.events[customer_id])
# Simulate purchase events
now_ts = time.time()
session_store = SessionFeatureStore(window_minutes=30)
# C001 had purchases 10m, 25m, 35m ago
session_store.record_purchase('C001', 29.99, now_ts - 10*60)
session_store.record_purchase('C001', 49.50, now_ts - 25*60)
session_store.record_purchase('C001', 15.00, now_ts - 35*60) # outside 30min window
session_store.record_purchase('C002', 99.00, now_ts - 5*60)
print('Real-time session features (30-minute window):')
for customer in ['C001', 'C002', 'C003']:
total = session_store.get_session_sum(customer, now_ts)
count = session_store.get_session_count(customer, now_ts)
print(f' {customer}: purchases_30m_count={count}, purchases_30m_sum=${total:.2f}')
print('Key insight: C001\'s $15 purchase (35m ago) is correctly excluded from the 30-min window')
# Exercise 5: Feature Versioning β v1 and v2 Coexist in Registry
from typing import Callable, Any
from dataclasses import dataclass, field
@dataclass
class FeatureDefinition:
name: str
version: str
description: str
compute_fn: Callable
dependencies: list = field(default_factory=list)
class FeatureRegistry:
def __init__(self):
self._registry = {} # {(name, version): FeatureDefinition}
def register(self, feat_def: FeatureDefinition):
key = (feat_def.name, feat_def.version)
self._registry[key] = feat_def
print(f'Registered: {feat_def.name}@{feat_def.version} β {feat_def.description}')
def get(self, name: str, version: str = 'latest') -> FeatureDefinition:
if version == 'latest':
versions = [k[1] for k in self._registry if k[0] == name]
if not versions:
raise KeyError(f'Feature {name} not found')
version = sorted(versions)[-1]
return self._registry[(name, version)]
def compute(self, name: str, version: str, raw_data: dict) -> Any:
feat_def = self.get(name, version)
return feat_def.compute_fn(raw_data)
def list_features(self):
for (name, ver), fd in sorted(self._registry.items()):
print(f' {name:<25} v{ver} {fd.description}')
registry = FeatureRegistry()
# Feature: customer_age_days β v1 uses account creation, v2 uses verified registration date
registry.register(FeatureDefinition(
name='customer_age_days', version='1',
description='Days since account creation',
compute_fn=lambda d: (datetime.utcnow() - datetime.fromisoformat(d['created_at'])).days
))
registry.register(FeatureDefinition(
name='customer_age_days', version='2',
description='Days since verified registration (more accurate)',
compute_fn=lambda d: (datetime.utcnow() - datetime.fromisoformat(d.get('verified_at', d['created_at']))).days
))
# Feature: purchase_rate_30d β v1 raw count, v2 smoothed with Laplace
registry.register(FeatureDefinition(
name='purchase_rate_30d', version='1',
description='Raw purchase count per 30d',
compute_fn=lambda d: d['purchases_30d']
))
registry.register(FeatureDefinition(
name='purchase_rate_30d', version='2',
description='Laplace-smoothed purchase rate (handles cold start)',
compute_fn=lambda d: (d['purchases_30d'] + 1) / (d.get('days_active', 30) + 2)
))
print('\nFeature Registry:')
registry.list_features()
# Test compute on sample data
sample = {
'created_at': '2023-01-15',
'verified_at': '2023-01-20',
'purchases_30d': 5,
'days_active': 30,
}
print('\nFeature values for sample customer:')
for feat in ['customer_age_days', 'purchase_rate_30d']:
for ver in ['1', '2']:
val = registry.compute(feat, ver, sample)
print(f' {feat} v{ver}: {val}')
print('\nKey insight: coexisting versions enable A/B testing of feature logic and safe migration')