Solutions: Recommender Systems & Causal Inference TrackΒΆ
Worked solutions to all exercises from the recommender-causal/ notebooks.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.metrics import mean_squared_error
import warnings; warnings.filterwarnings('ignore')
# Synthetic ratings matrix: 200 users x 100 items, sparse (90% missing)
np.random.seed(42)
N_USERS, N_ITEMS = 200, 100
true_user_factors = np.random.randn(N_USERS, 10)
true_item_factors = np.random.randn(N_ITEMS, 10)
true_ratings_full = true_user_factors @ true_item_factors.T
# Normalize to 1-5 scale
true_ratings_full = 1 + 4 * (true_ratings_full - true_ratings_full.min()) / (true_ratings_full.ptp() + 1e-8)
# Mask 90% of entries
mask = np.random.rand(N_USERS, N_ITEMS) > 0.9
ratings_matrix = np.where(mask, true_ratings_full, np.nan)
# Build user-item pairs for known ratings
rows, cols = np.where(mask)
known_ratings = pd.DataFrame({'user': rows, 'item': cols, 'rating': ratings_matrix[rows, cols]})
train_df, test_df = known_ratings.iloc[:int(0.8*len(known_ratings))], known_ratings.iloc[int(0.8*len(known_ratings)):]
print(f'Known ratings: {len(known_ratings)}, Train: {len(train_df)}, Test: {len(test_df)}')
01 β Collaborative FilteringΒΆ
from sklearn.metrics.pairwise import cosine_similarity
def user_cf_predict(train, user, item, user_sim, k=5):
"""Predict rating for (user, item) using top-k similar users."""
neighbors = np.argsort(-user_sim[user])[:k+1]
neighbors = [n for n in neighbors if n != user]
train_mat = np.full((N_USERS, N_ITEMS), np.nan)
for _, row in train.iterrows():
train_mat[int(row['user']), int(row['item'])] = row['rating']
numer, denom = 0.0, 0.0
for n in neighbors[:k]:
if not np.isnan(train_mat[n, item]):
numer += user_sim[user, n] * train_mat[n, item]
denom += abs(user_sim[user, n])
return numer / denom if denom > 0 else 3.0 # fallback to global mean
def build_user_matrix(df):
mat = np.full((N_USERS, N_ITEMS), np.nan)
for _, row in df.iterrows():
mat[int(row['user']), int(row['item'])] = row['rating']
return mat
train_mat = build_user_matrix(train_df)
# Exercise 1: Mean-centering before computing similarity
# Key insight: Mean-centering removes user bias (some users rate all items high/low);
# similarity computed on residuals reflects true taste alignment.
def fill_nan_mean(mat):
"""Replace NaN with row mean for similarity computation."""
filled = mat.copy()
row_means = np.nanmean(mat, axis=1, keepdims=True)
nan_mask = np.isnan(filled)
filled[nan_mask] = np.broadcast_to(row_means, filled.shape)[nan_mask]
return filled
# Without mean centering
filled_raw = fill_nan_mean(train_mat)
sim_raw = cosine_similarity(filled_raw)
# With mean centering
user_means = np.nanmean(train_mat, axis=1, keepdims=True)
centered = train_mat - user_means
filled_center = fill_nan_mean(centered)
sim_centered = cosine_similarity(filled_center)
def rmse_cf(sim_matrix, sample_size=100):
sample = test_df.sample(min(sample_size, len(test_df)), random_state=1)
preds, trues = [], []
for _, row in sample.iterrows():
p = user_cf_predict(train_df, int(row['user']), int(row['item']), sim_matrix)
preds.append(p); trues.append(row['rating'])
return np.sqrt(mean_squared_error(trues, preds))
rmse_raw = rmse_cf(sim_raw)
rmse_center = rmse_cf(sim_centered)
print(f'RMSE without mean centering: {rmse_raw:.4f}')
print(f'RMSE with mean centering: {rmse_center:.4f}')
print('Lower RMSE => mean-centering helps remove systematic bias.')
# Exercise 2: Pearson correlation similarity vs cosine
# Key insight: Pearson is equivalent to cosine on mean-centered vectors;
# it naturally handles rating scale differences across users.
def pearson_similarity(mat):
"""Row-wise Pearson correlation (mean-centered cosine)."""
centered = mat - np.nanmean(mat, axis=1, keepdims=True)
filled = fill_nan_mean(centered)
norms = np.linalg.norm(filled, axis=1, keepdims=True) + 1e-8
return (filled / norms) @ (filled / norms).T
sim_pearson = pearson_similarity(train_mat)
# Cold-start test: users with few ratings
ratings_per_user = (~np.isnan(train_mat)).sum(axis=1)
cold_users = np.where(ratings_per_user <= 3)[0]
print(f'Cold-start users (<=3 ratings): {len(cold_users)}')
cold_test = test_df[test_df['user'].isin(cold_users)]
if len(cold_test) > 0:
cold_rmse_cosine = rmse_cf(sim_raw, sample_size=min(50, len(cold_test)))
cold_rmse_pearson = rmse_cf(sim_pearson, sample_size=min(50, len(cold_test)))
print(f'Cold-start RMSE β cosine: {cold_rmse_cosine:.4f}')
print(f'Cold-start RMSE β Pearson: {cold_rmse_pearson:.4f}')
else:
print('No cold-start users in test set; Pearson RMSE (all):', rmse_cf(sim_pearson))
# Exercise 3: ALS with bias terms (user_bias + item_bias)
# Key insight: Bias terms capture user leniency and item popularity offsets;
# without them, the latent factors must absorb this signal, reducing their quality.
def als_with_bias(train_df, n_users, n_items, K=10, n_iter=20, reg=0.01, lr=0.01):
mu = train_df['rating'].mean()
user_bias = np.zeros(n_users)
item_bias = np.zeros(n_items)
P = np.random.randn(n_users, K) * 0.1
Q = np.random.randn(n_items, K) * 0.1
for _ in range(n_iter):
for _, row in train_df.iterrows():
u, i, r = int(row['user']), int(row['item']), row['rating']
pred = mu + user_bias[u] + item_bias[i] + P[u] @ Q[i]
err = r - pred
user_bias[u] += lr * (err - reg * user_bias[u])
item_bias[i] += lr * (err - reg * item_bias[i])
P[u] += lr * (err * Q[i] - reg * P[u])
Q[i] += lr * (err * P[u] - reg * Q[i])
return mu, user_bias, item_bias, P, Q
mu, ub, ib, P, Q = als_with_bias(train_df, N_USERS, N_ITEMS, K=10, n_iter=10)
preds_als, trues_als = [], []
for _, row in test_df.iterrows():
u, i = int(row['user']), int(row['item'])
pred = mu + ub[u] + ib[i] + P[u] @ Q[i]
preds_als.append(np.clip(pred, 1, 5))
trues_als.append(row['rating'])
print(f'ALS with bias RMSE: {np.sqrt(mean_squared_error(trues_als, preds_als)):.4f}')
# Exercise 4: Implicit ALS β positive-only feedback weighted by frequency
# Key insight: Implicit data (clicks, views) has no explicit negatives;
# treating all observed events as positive and weighting by frequency
# (c_ui = 1 + alpha * r_ui) lets the model learn confidence.
def implicit_als(interaction_df, n_users, n_items, K=10, alpha=40, n_iter=15, reg=0.1):
"""Hu et al. (2008) implicit ALS."""
P = np.random.randn(n_users, K) * 0.01
Q = np.random.randn(n_items, K) * 0.01
I = np.eye(K)
# Build sparse confidence matrix
C = np.zeros((n_users, n_items))
R = np.zeros((n_users, n_items))
for _, row in interaction_df.iterrows():
u, i, r = int(row['user']), int(row['item']), row['rating']
C[u, i] = 1 + alpha * r
R[u, i] = 1
for _ in range(n_iter):
# Fix Q, solve for P
for u in range(n_users):
Cu = np.diag(C[u])
P[u] = np.linalg.solve(Q.T @ Cu @ Q + reg * I, Q.T @ Cu @ R[u])
# Fix P, solve for Q
for i in range(n_items):
Ci = np.diag(C[:, i])
Q[i] = np.linalg.solve(P.T @ Ci @ P + reg * I, P.T @ Ci @ R[:, i])
return P, Q
# Use training data with smaller subset for speed
small_train = train_df.sample(min(500, len(train_df)), random_state=0)
P_impl, Q_impl = implicit_als(small_train, N_USERS, N_ITEMS, K=8, n_iter=5)
scores = P_impl @ Q_impl.T
print(f'Implicit ALS: learned {P_impl.shape[1]}-dim user embeddings for {N_USERS} users')
print(f'Sample top-5 item recommendations for user 0: {np.argsort(-scores[0])[:5].tolist()}')
# Exercise 5: Coverage and serendipity metrics
# Key insight: RMSE measures accuracy but not diversity; coverage = % of catalog recommended,
# serendipity = unexpected + relevant items (relevance outside user's past interactions).
def catalog_coverage(recommendations, n_items):
"""Fraction of items that appear in at least one recommendation list."""
recommended_items = set(item for recs in recommendations.values() for item in recs)
return len(recommended_items) / n_items
def novelty(recommendations, item_popularity):
"""Average log(1/popularity) across recommended items β higher = more novel."""
scores = []
for recs in recommendations.values():
for item in recs:
pop = item_popularity.get(item, 1e-8)
scores.append(-np.log2(pop + 1e-8))
return np.mean(scores)
# Generate top-10 recommendations for all users using ALS scores
recs = {u: np.argsort(-scores[u])[:10].tolist() for u in range(N_USERS)}
item_pop = train_df.groupby('item').size() / len(train_df)
item_pop_dict = item_pop.to_dict()
cov = catalog_coverage(recs, N_ITEMS)
novelty_score = novelty(recs, item_pop_dict)
print(f'Catalog coverage: {cov:.3f} ({cov*N_ITEMS:.0f}/{N_ITEMS} items recommended)')
print(f'Novelty score: {novelty_score:.3f} (higher = less popular items recommended)')
02 β Content-Based FilteringΒΆ
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity as cos_sim
# Synthetic item catalog
np.random.seed(10)
genres = ['Action','Comedy','Drama','Sci-Fi','Horror']
directors = ['Dir_A','Dir_B','Dir_C','Dir_D']
items_df = pd.DataFrame({
'item_id': range(N_ITEMS),
'desc': [f'movie about {np.random.choice(["love","war","space","crime","family"])} and {np.random.choice(["heroes","villains","robots","magic"])}' for _ in range(N_ITEMS)],
'genre': np.random.choice(genres, N_ITEMS),
'director': np.random.choice(directors, N_ITEMS),
'rating_num': np.random.uniform(5, 9, N_ITEMS).round(1)
})
# TF-IDF features for description
tfidf = TfidfVectorizer(max_features=50)
desc_feats = tfidf.fit_transform(items_df['desc']).toarray()
# One-hot encode genre and director
genre_feats = pd.get_dummies(items_df['genre']).values.astype(float)
dir_feats = pd.get_dummies(items_df['director']).values.astype(float)
num_feats = items_df['rating_num'].values.reshape(-1, 1)
def build_features(W_DESC=1.0, W_GENRE=1.0, W_DIR=1.0, W_NUM=0.5):
return np.hstack([
W_DESC * desc_feats,
W_GENRE * genre_feats,
W_DIR * dir_feats,
W_NUM * num_feats
])
def precision_at_k(sim_matrix, train_df, k=10, n_users=50):
"""Approximated Precision@k using held-out positive items."""
precs = []
for u in range(min(n_users, N_USERS)):
user_items = set(train_df[train_df['user'] == u]['item'].values)
if len(user_items) < 2: continue
# Use one item as query; rest as positives
query_item = list(user_items)[0]
positives = list(user_items)[1:]
sims = sim_matrix[query_item]
top_k_items = set(np.argsort(-sims)[1:k+1])
precs.append(len(top_k_items & set(positives)) / k)
return np.mean(precs) if precs else 0.0
# Exercise 1: Grid search over (W_DESC, W_GENRE, W_DIR, W_NUM) to maximize Precision@10
# Key insight: Feature weights determine how much each modality contributes;
# grid search finds the combination that best separates user-liked items.
best_p10, best_weights = 0, None
for wd in [0.5, 1.0, 2.0]:
for wg in [0.5, 1.0, 2.0]:
feats = build_features(W_DESC=wd, W_GENRE=wg, W_DIR=1.0, W_NUM=0.5)
sim = cos_sim(feats)
p10 = precision_at_k(sim, train_df, k=10)
if p10 > best_p10:
best_p10, best_weights = p10, (wd, wg, 1.0, 0.5)
print(f'Best Precision@10: {best_p10:.4f}')
print(f'Best weights (W_DESC, W_GENRE, W_DIR, W_NUM): {best_weights}')
# Exercise 2: Word2Vec embeddings for item descriptions vs TF-IDF
# Key insight: Word2Vec captures semantic similarity ("space" ~ "galaxy");
# TF-IDF matches exact tokens β Word2Vec generalizes better on paraphrase.
try:
from gensim.models import Word2Vec
corpus = [desc.lower().split() for desc in items_df['desc']]
w2v = Word2Vec(corpus, vector_size=50, window=3, min_count=1, workers=1, epochs=20, seed=42)
def mean_w2v(desc, model):
tokens = desc.lower().split()
vecs = [model.wv[t] for t in tokens if t in model.wv]
return np.mean(vecs, axis=0) if vecs else np.zeros(model.vector_size)
w2v_feats = np.array([mean_w2v(d, w2v) for d in items_df['desc']])
sim_w2v = cos_sim(w2v_feats)
sim_tfidf = cos_sim(desc_feats)
p10_w2v = precision_at_k(sim_w2v, train_df, k=10)
p10_tfidf = precision_at_k(sim_tfidf, train_df, k=10)
print(f'Precision@10 β TF-IDF: {p10_tfidf:.4f}')
print(f'Precision@10 β Word2Vec: {p10_w2v:.4f}')
except ImportError:
print('gensim not installed. Install with: pip install gensim')
print('Word2Vec would provide semantic similarity beyond exact token matching.')
# Exercise 3: Temporal decay β weight recent interactions with exponential decay
# Key insight: Recent interactions are more predictive of current preferences;
# decay w_t = exp(-lambda * (T - t)) assigns lower weights to older events.
# Add fake timestamps to interactions
interactions = known_ratings.copy()
T_max = 100
interactions['timestamp'] = np.random.randint(0, T_max, len(interactions))
def temporal_user_profile(user_id, interactions, item_feats, decay_lambda=0.05, T=T_max):
"""Weighted sum of item features, with exponential decay on interaction age."""
user_ints = interactions[interactions['user'] == user_id]
if len(user_ints) == 0:
return np.zeros(item_feats.shape[1])
weights = np.exp(-decay_lambda * (T - user_ints['timestamp'].values))
item_vecs = item_feats[user_ints['item'].values]
return (weights[:, None] * item_vecs).sum(axis=0) / (weights.sum() + 1e-8)
default_feats = build_features()
profiles_decay = np.array([temporal_user_profile(u, interactions, default_feats)
for u in range(N_USERS)])
profiles_flat = np.array([default_feats[interactions[interactions['user']==u]['item'].values].mean(axis=0)
if len(interactions[interactions['user']==u]) > 0
else np.zeros(default_feats.shape[1])
for u in range(N_USERS)])
print(f'Profile L2 norm diff (decay vs flat) user 0: {np.linalg.norm(profiles_decay[0] - profiles_flat[0]):.4f}')
print('Temporal decay shifts the profile toward recently interacted items.')
# Exercise 4: MMR diversity re-ranking
# Key insight: MMR balances relevance and diversity by iteratively selecting the
# candidate that maximizes relevance - lambda * max_similarity_to_already_selected.
def mmr_rerank(query_vec, candidate_vecs, candidate_ids, top_n=10, lam=0.5):
"""Maximal Marginal Relevance re-ranking."""
relevance = cos_sim([query_vec], candidate_vecs)[0]
selected = []
remaining = list(range(len(candidate_ids)))
while len(selected) < top_n and remaining:
if not selected:
best = np.argmax(relevance[remaining])
else:
selected_vecs = candidate_vecs[[candidate_ids[s] if False else s for s in selected]]
# Actually index into candidate_vecs directly
sel_vecs = np.array([candidate_vecs[s] for s in selected])
sim_to_selected = cos_sim(candidate_vecs[remaining], sel_vecs).max(axis=1)
scores = lam * relevance[remaining] - (1 - lam) * sim_to_selected
best = np.argmax(scores)
chosen = remaining.pop(best if not selected else list(range(len(remaining)))[best])
selected.append(chosen)
return [candidate_ids[s] for s in selected]
# Re-rank top-20 candidates for user 0 using their profile
query = profiles_flat[0]
top20_idx = np.argsort(-cos_sim([query], default_feats)[0])[:20]
top20_vecs = default_feats[top20_idx]
mmr_ids = mmr_rerank(query, top20_vecs, list(range(20)), top_n=10, lam=0.6)
print(f'MMR top-10 (indices into top-20 candidates): {mmr_ids}')
print('MMR selects diverse items rather than the 10 most similar.')
# Exercise 5: Hybrid alpha β learn optimal CF/CB blend via held-out validation
# Key insight: A simple linear blend alpha*CF + (1-alpha)*CB can outperform
# either component alone; grid search on a held-out set finds the best alpha.
# CF scores: use ALS P @ Q
cf_scores = P_impl @ Q_impl.T # shape (N_USERS, N_ITEMS)
# CB scores: user profile cosine similarity with items
cb_scores = cos_sim(profiles_flat, default_feats) # (N_USERS, N_ITEMS)
# Normalize both to [0,1]
def min_max_norm(mat):
mn, mx = mat.min(), mat.max()
return (mat - mn) / (mx - mn + 1e-8)
cf_norm = min_max_norm(cf_scores)
cb_norm = min_max_norm(cb_scores)
def hybrid_rmse(alpha, val_df):
preds, trues = [], []
for _, row in val_df.iterrows():
u, i = int(row['user']), int(row['item'])
pred = alpha * cf_norm[u, i] + (1 - alpha) * cb_norm[u, i]
# Scale back to 1-5
pred = 1 + 4 * pred
preds.append(pred); trues.append(row['rating'])
return np.sqrt(mean_squared_error(trues, preds))
val_df = test_df.sample(min(200, len(test_df)), random_state=42)
alphas = np.arange(0, 1.1, 0.1)
rmses = [hybrid_rmse(a, val_df) for a in alphas]
best_alpha = alphas[np.argmin(rmses)]
plt.figure(figsize=(8, 4))
plt.plot(alphas, rmses, marker='o')
plt.axvline(best_alpha, color='red', linestyle='--', label=f'Best alpha={best_alpha:.1f}')
plt.xlabel('Alpha (CF weight)'); plt.ylabel('RMSE'); plt.title('Hybrid CF/CB Alpha Search')
plt.legend(); plt.show()
print(f'Best alpha: {best_alpha:.1f}, RMSE: {min(rmses):.4f}')
03 β Neural Collaborative FilteringΒΆ
try:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
TORCH = True
except ImportError:
TORCH = False; print('PyTorch not installed.')
if TORCH:
users_t = torch.tensor(train_df['user'].values, dtype=torch.long)
items_t = torch.tensor(train_df['item'].values, dtype=torch.long)
ratings_t = torch.tensor(train_df['rating'].values, dtype=torch.float32)
users_v = torch.tensor(test_df['user'].values, dtype=torch.long)
items_v = torch.tensor(test_df['item'].values, dtype=torch.long)
ratings_v = torch.tensor(test_df['rating'].values, dtype=torch.float32)
tr_loader = DataLoader(TensorDataset(users_t, items_t, ratings_t), batch_size=64, shuffle=True)
class GMF(nn.Module):
def __init__(self, n_users, n_items, dim):
super().__init__()
self.user_emb = nn.Embedding(n_users, dim)
self.item_emb = nn.Embedding(n_items, dim)
self.out = nn.Linear(dim, 1)
def forward(self, u, i):
return self.out(self.user_emb(u) * self.item_emb(i)).squeeze(-1)
def train_ncf(model, n_epochs=20):
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
crit = nn.MSELoss()
for ep in range(n_epochs):
model.train()
for ub, ib, rb in tr_loader:
opt.zero_grad(); crit(model(ub, ib), rb).backward(); opt.step()
model.eval()
with torch.no_grad():
val_preds = model(users_v, items_v)
val_rmse = crit(val_preds, ratings_v).sqrt().item()
return val_rmse
if TORCH:
# Exercise 1: Embedding dimension ablation
# Key insight: Small d underfits (not enough capacity); large d risks overfitting
# and increases compute; the elbow in the RMSE vs d curve identifies the sweet spot.
dim_rmses = {}
for d in [8, 16, 32, 64]:
model_d = GMF(N_USERS, N_ITEMS, d)
rmse_d = train_ncf(model_d, n_epochs=15)
dim_rmses[d] = rmse_d
print(f'd={d:3d}: val RMSE={rmse_d:.4f}')
plt.figure(figsize=(7, 4))
plt.plot(list(dim_rmses.keys()), list(dim_rmses.values()), marker='o')
plt.xlabel('Embedding dim d'); plt.ylabel('Val RMSE')
plt.title('Embedding Dimension Ablation'); plt.show()
if TORCH:
# Exercise 2: L2 regularization on embeddings β compare train vs val RMSE gap
# Key insight: Without regularization, embeddings overfit to training users/items;
# L2 penalty (weight_decay) shrinks embedding norms, reducing the train-val gap.
def train_with_reg(reg_weight, n_epochs=20):
model = GMF(N_USERS, N_ITEMS, 32)
opt = torch.optim.Adam(model.parameters(), lr=1e-3, weight_decay=reg_weight)
crit = nn.MSELoss()
tr_losses, va_losses = [], []
for ep in range(n_epochs):
model.train(); ep_loss = []
for ub, ib, rb in tr_loader:
opt.zero_grad(); loss = crit(model(ub,ib), rb); loss.backward(); opt.step()
ep_loss.append(loss.item())
tr_losses.append(np.mean(ep_loss))
model.eval()
with torch.no_grad():
va_losses.append(crit(model(users_v, items_v), ratings_v).item())
return tr_losses, va_losses
tr0, va0 = train_with_reg(0.0, n_epochs=25)
tr1, va1 = train_with_reg(1e-4, n_epochs=25)
plt.figure(figsize=(10, 4))
plt.plot(tr0, 'b--', alpha=0.7, label='Train (no reg)')
plt.plot(va0, 'b', alpha=0.7, label='Val (no reg)')
plt.plot(tr1, 'r--', alpha=0.7, label='Train (L2 1e-4)')
plt.plot(va1, 'r', alpha=0.7, label='Val (L2 1e-4)')
plt.xlabel('Epoch'); plt.ylabel('MSE Loss')
plt.title('L2 Regularization: Train vs Val Gap'); plt.legend(); plt.show()
if TORCH:
# Exercise 3: BPR (Bayesian Personalized Ranking) loss with negative sampling
# Key insight: BPR optimizes the ranking of positive over negative items directly;
# it's more suitable than MSE for implicit feedback where only positives are observed.
class BPRModel(nn.Module):
def __init__(self, n_users, n_items, dim=32):
super().__init__()
self.user_emb = nn.Embedding(n_users, dim)
self.item_emb = nn.Embedding(n_items, dim)
def forward(self, u, i_pos, i_neg):
pu = self.user_emb(u)
qi = self.item_emb(i_pos)
qj = self.item_emb(i_neg)
diff = (pu * (qi - qj)).sum(dim=-1)
return -torch.log(torch.sigmoid(diff) + 1e-8).mean()
bpr_model = BPRModel(N_USERS, N_ITEMS, 32)
bpr_opt = torch.optim.Adam(bpr_model.parameters(), lr=1e-3)
neg_ratio = 4 # 1 positive : 4 negatives
bpr_losses = []
item_set = set(range(N_ITEMS))
for ep in range(20):
bpr_model.train(); ep_loss = []
for ub, ib, _ in tr_loader:
# Sample negatives
neg_items = torch.randint(0, N_ITEMS, (len(ub) * neg_ratio,))
u_rep = ub.repeat_interleave(neg_ratio)
i_rep = ib.repeat_interleave(neg_ratio)
bpr_opt.zero_grad()
loss = bpr_model(u_rep, i_rep, neg_items)
loss.backward(); bpr_opt.step()
ep_loss.append(loss.item())
bpr_losses.append(np.mean(ep_loss))
plt.figure(figsize=(7, 3))
plt.plot(bpr_losses); plt.title('BPR Training Loss'); plt.xlabel('Epoch'); plt.show()
print(f'Final BPR loss: {bpr_losses[-1]:.4f}')
if TORCH:
# Exercise 4: ANN with FAISS for approximate nearest-neighbor item retrieval
# Key insight: Exact kNN over millions of item embeddings is O(N*d);
# FAISS IndexFlatL2 gives exact results and IndexIVFFlat gives approximate results in O(log N).
try:
import faiss
bpr_model.eval()
with torch.no_grad():
item_embs = bpr_model.item_emb.weight.numpy().astype(np.float32)
dim_faiss = item_embs.shape[1]
index = faiss.IndexFlatL2(dim_faiss)
faiss.normalize_L2(item_embs)
index.add(item_embs)
query_emb = item_embs[[0]]
D, I = index.search(query_emb, 10)
print('Top-10 similar items to item 0 (FAISS IndexFlatL2):', I[0].tolist())
print('L2 distances:', D[0].tolist())
except ImportError:
print('faiss-cpu not installed. Install with: pip install faiss-cpu')
# Manual brute-force ANN
bpr_model.eval()
with torch.no_grad():
item_embs = bpr_model.item_emb.weight.numpy()
query = item_embs[0]
dists = np.linalg.norm(item_embs - query, axis=1)
top10 = np.argsort(dists)[1:11]
print('Top-10 similar items (brute-force):', top10.tolist())
if TORCH:
# Exercise 5: NeuMF ablation β GMF only vs MLP only vs full NeuMF
# Key insight: GMF captures multiplicative interactions; MLP captures complex
# nonlinear patterns; NeuMF concatenates both for best-of-both-worlds.
class MLPRec(nn.Module):
def __init__(self, n_users, n_items, dim=32):
super().__init__()
self.user_emb = nn.Embedding(n_users, dim)
self.item_emb = nn.Embedding(n_items, dim)
self.mlp = nn.Sequential(
nn.Linear(2*dim, 64), nn.ReLU(),
nn.Linear(64, 32), nn.ReLU(),
nn.Linear(32, 1))
def forward(self, u, i):
x = torch.cat([self.user_emb(u), self.item_emb(i)], dim=-1)
return self.mlp(x).squeeze(-1)
class NeuMF(nn.Module):
def __init__(self, n_users, n_items, dim=16):
super().__init__()
self.gmf_user = nn.Embedding(n_users, dim)
self.gmf_item = nn.Embedding(n_items, dim)
self.mlp_user = nn.Embedding(n_users, dim)
self.mlp_item = nn.Embedding(n_items, dim)
self.mlp = nn.Sequential(nn.Linear(2*dim, 32), nn.ReLU(), nn.Linear(32, 16), nn.ReLU())
self.out = nn.Linear(dim + 16, 1)
def forward(self, u, i):
gmf_out = self.gmf_user(u) * self.gmf_item(i)
mlp_out = self.mlp(torch.cat([self.mlp_user(u), self.mlp_item(i)], dim=-1))
return self.out(torch.cat([gmf_out, mlp_out], dim=-1)).squeeze(-1)
print('Training NeuMF ablation...')
gmf_rmse = train_ncf(GMF(N_USERS, N_ITEMS, 32), n_epochs=20)
mlp_rmse = train_ncf(MLPRec(N_USERS, N_ITEMS, 32), n_epochs=20)
neumf_rmse = train_ncf(NeuMF(N_USERS, N_ITEMS, 16), n_epochs=20)
print(f'GMF only: {gmf_rmse:.4f}')
print(f'MLP only: {mlp_rmse:.4f}')
print(f'NeuMF (both): {neumf_rmse:.4f}')
04 β Causal Inference BasicsΒΆ
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor
# Synthetic observational study: email campaign effect on purchase
np.random.seed(99)
N_causal = 1000
age = np.random.normal(40, 10, N_causal)
income = np.random.normal(60000, 15000, N_causal)
prev_pur = np.random.binomial(1, 0.4, N_causal)
# Treatment assignment (confounded by income and prev_purchase)
propensity_true = 1 / (1 + np.exp(-(0.00003*income + 0.8*prev_pur - 3)))
treatment = np.random.binomial(1, propensity_true)
# Outcome: true ATE = 5
noise = np.random.normal(0, 5, N_causal)
outcome = 20 + 5*treatment + 0.0002*income + 3*prev_pur + noise
causal_df = pd.DataFrame({'age': age, 'income': income, 'prev_pur': prev_pur,
'treatment': treatment, 'outcome': outcome})
X_cov = causal_df[['age','income','prev_pur']]
print(f'Treatment rate: {treatment.mean():.3f}, True ATE: 5.0')
print(f'Naive diff-in-means: {outcome[treatment==1].mean() - outcome[treatment==0].mean():.3f}')
# Exercise 1: PSM at different calipers β sensitivity analysis
# Key insight: Caliper controls the maximum propensity score distance for a match;
# smaller caliper = better balance but fewer matches; larger = more matches but more bias.
ps_model = LogisticRegression().fit(X_cov, treatment)
ps_scores = ps_model.predict_proba(X_cov)[:, 1]
causal_df['ps'] = ps_scores
def psm_ate(df, caliper):
treated = df[df['treatment'] == 1]
control = df[df['treatment'] == 0]
matched_outcomes = []
for _, t_row in treated.iterrows():
dists = abs(control['ps'] - t_row['ps'])
nearest = dists.idxmin()
if dists[nearest] <= caliper:
matched_outcomes.append(t_row['outcome'] - control.loc[nearest, 'outcome'])
return np.mean(matched_outcomes) if matched_outcomes else np.nan, len(matched_outcomes)
print(f'{"Caliper":>10} {"ATE":>8} {"Matched pairs":>15}')
for cal in [0.01, 0.05, 0.1]:
ate, n_matched = psm_ate(causal_df, cal)
print(f'{cal:>10.2f} {ate:>8.3f} {n_matched:>15}')
# Exercise 2: IPW with stabilized vs unstabilized weights
# Key insight: Unstabilized weights (1/e(X)) can be extreme; stabilized weights
# (P(T)/e(X)) have better finite-sample properties and reduce effective sample size variance.
def ipw_ate(ps, treatment, outcome, stabilized=True):
if stabilized:
p_treat = treatment.mean()
w = np.where(treatment == 1, p_treat / ps, (1 - p_treat) / (1 - ps))
else:
w = np.where(treatment == 1, 1 / ps, 1 / (1 - ps))
y1 = (treatment * outcome * w).sum() / (treatment * w).sum()
y0 = ((1-treatment) * outcome * w).sum() / ((1-treatment) * w).sum()
return y1 - y0, w
ate_unstab, w_unstab = ipw_ate(ps_scores, treatment, outcome, stabilized=False)
ate_stab, w_stab = ipw_ate(ps_scores, treatment, outcome, stabilized=True)
print(f'IPW ATE (unstabilized): {ate_unstab:.3f}')
print(f'IPW ATE (stabilized): {ate_stab:.3f}')
print(f'Max unstabilized weight: {w_unstab.max():.1f}')
print(f'Max stabilized weight: {w_stab.max():.1f}')
ess_unstab = w_unstab.sum()**2 / (w_unstab**2).sum()
ess_stab = w_stab.sum()**2 / (w_stab**2).sum()
print(f'Effective sample size β unstabilized: {ess_unstab:.0f}, stabilized: {ess_stab:.0f}')
# Exercise 3: Double Machine Learning (DML) with different base learners
# Key insight: DML partialling out removes nuisance functions (outcome ~ X and
# treatment ~ X) using flexible ML; the ATE is estimated on residuals, making it
# robust to misspecification of either nuisance model.
from sklearn.model_selection import cross_val_predict
def dml_ate(X, T, Y, outcome_model, treatment_model, cv=5):
"""Partially-linear DML (Robinson 1988 style)."""
Y_hat = cross_val_predict(outcome_model, X, Y, cv=cv)
T_hat = cross_val_predict(treatment_model, X, T, cv=cv)
Y_res = Y - Y_hat
T_res = T - T_hat
# ATE = regression coefficient of Y_res on T_res
ate = (T_res * Y_res).sum() / (T_res**2).sum()
se = np.sqrt(((Y_res - ate * T_res)**2).mean() / (T_res**2).sum() / len(Y))
return ate, se
X_arr = X_cov.values
configs = [
('LR + LR', LinearRegression(), LogisticRegression(max_iter=500)),
('RF + LR', RandomForestRegressor(n_estimators=50, random_state=0), LogisticRegression(max_iter=500)),
('GBM + LR', GradientBoostingRegressor(n_estimators=50, random_state=0), LogisticRegression(max_iter=500)),
]
print(f'{"Learner":<15} {"ATE":>8} {"SE":>8}')
for name, out_m, treat_m in configs:
ate, se = dml_ate(X_arr, treatment.astype(float), outcome, out_m, treat_m)
print(f'{name:<15} {ate:>8.3f} {se:>8.4f}')
# Exercise 4: E-value sensitivity analysis
# Key insight: The E-value is the minimum unmeasured confounder strength
# (on both treatment and outcome) needed to fully explain away an observed effect.
# E = RR + sqrt(RR*(RR-1)) where RR is the risk/rate ratio.
def evalue_from_rr(rr):
"""E-value for a risk ratio RR (VanderWeele & Ding 2017)."""
if rr >= 1:
return rr + np.sqrt(rr * (rr - 1))
else: # RR < 1: flip
rr_inv = 1 / rr
return rr_inv + np.sqrt(rr_inv * (rr_inv - 1))
# Estimate RR from observational data using IPW
# For continuous outcomes, use approximate RR via mean ratio
y1_mean = outcome[treatment == 1].mean()
y0_mean = outcome[treatment == 0].mean()
rr_naive = y1_mean / y0_mean
# Use DML estimate to compute ATE-based approximate RR
ate_dml, _ = dml_ate(X_arr, treatment.astype(float), outcome,
LinearRegression(), LogisticRegression(max_iter=500))
rr_dml = (y0_mean + ate_dml) / y0_mean # counterfactual ratio
for label, rr in [('Naive RR', rr_naive), ('DML-adjusted RR', rr_dml)]:
ev = evalue_from_rr(rr)
print(f'{label}: RR={rr:.3f}, E-value={ev:.3f}')
print(f' => Unmeasured confounder must have RR >= {ev:.2f} with both treatment and outcome to explain away the effect.')
# Exercise 5: Standardized Mean Differences (SMD) before/after PSM
# Key insight: SMD < 0.1 (or 10%) indicates adequate balance after matching;
# plotting SMD before/after is the standard love plot for balance assessment.
def smd(group1, group0):
"""Standardized mean difference: (mu1 - mu0) / pooled_std."""
pooled_std = np.sqrt((group1.std()**2 + group0.std()**2) / 2 + 1e-8)
return (group1.mean() - group0.mean()) / pooled_std
# Before matching
smd_before = {col: smd(X_cov[col][treatment==1], X_cov[col][treatment==0]) for col in X_cov.columns}
# After PSM (caliper=0.05)
treated_idx, matched_ctrl_idx = [], []
caliper = 0.05
control_df = causal_df[causal_df['treatment'] == 0].copy()
for idx, t_row in causal_df[causal_df['treatment'] == 1].iterrows():
dists = abs(control_df['ps'] - t_row['ps'])
best = dists.idxmin()
if dists[best] <= caliper:
treated_idx.append(idx)
matched_ctrl_idx.append(best)
control_df = control_df.drop(best) # no replacement
if treated_idx:
matched_df = pd.concat([causal_df.loc[treated_idx], causal_df.loc[matched_ctrl_idx]])
smd_after = {col: smd(matched_df[matched_df['treatment']==1][col],
matched_df[matched_df['treatment']==0][col])
for col in X_cov.columns}
fig, ax = plt.subplots(figsize=(8, 4))
covs = list(smd_before.keys())
ax.barh(covs, [abs(smd_before[c]) for c in covs], color='salmon', label='Before PSM')
ax.barh(covs, [-abs(smd_after[c]) for c in covs], color='steelblue', label='After PSM')
ax.axvline(0.1, color='red', linestyle='--', label='SMD=0.1 threshold')
ax.axvline(-0.1, color='red', linestyle='--')
ax.set_xlabel('|SMD|'); ax.legend(); ax.set_title('Balance: SMD Before/After PSM')
plt.tight_layout(); plt.show()
else:
print('No matched pairs found; try larger caliper.')
05 β Difference-in-DifferencesΒΆ
# Synthetic panel: 2 groups x 24 periods (12 pre, 12 post)
np.random.seed(7)
N_units = 100
T_total = 24
T_treat = 12 # treatment starts after period 12
unit_fe = np.random.normal(0, 5, N_units)
time_fe = np.linspace(0, 10, T_total)
treated = np.random.binomial(1, 0.5, N_units)
true_ATT = 8.0
panel_rows = []
for u in range(N_units):
for t in range(T_total):
post = int(t >= T_treat)
treatment_effect = true_ATT * treated[u] * post
y = 50 + unit_fe[u] + time_fe[t] + treatment_effect + np.random.normal(0, 3)
panel_rows.append({'unit': u, 'period': t, 'treated': treated[u],
'post': post, 'y': y})
panel_df = pd.DataFrame(panel_rows)
print(f'Panel: {N_units} units x {T_total} periods, ATT = {true_ATT}')
from statsmodels.formula.api import ols
# Baseline DiD estimate
model_did = ols('y ~ treated * post + C(unit) + C(period)', data=panel_df).fit()
att_hat = model_did.params['treated:post']
print(f'DiD ATT estimate: {att_hat:.3f} (true = {true_ATT})')
# Exercise 1: Placebo test β randomly assign treatment in pre-period
# Key insight: If the parallel trends assumption holds, a DiD on random treatment
# assignment in the pre-period should yield an estimate close to 0.
pre_df = panel_df[panel_df['post'] == 0].copy()
# Split pre-period in half: fake "pre" = first 6 periods, fake "post" = last 6
pre_df['fake_post'] = (pre_df['period'] >= T_treat // 2).astype(int)
fake_treat = np.random.binomial(1, 0.5, N_units)
pre_df['fake_treated'] = pre_df['unit'].map(dict(enumerate(fake_treat)))
model_placebo = ols('y ~ fake_treated * fake_post + C(unit) + C(period)',
data=pre_df).fit()
placebo_att = model_placebo.params.get('fake_treated:fake_post', np.nan)
print(f'Placebo DiD estimate (should β 0): {placebo_att:.3f}')
# Exercise 2: Robustness check β vary treatment start date Β±3 months
# Key insight: A true treatment effect should be robust to small shifts in
# the assumed treatment date; spurious effects typically disappear when shifted.
robust_estimates = {}
for shift in range(-3, 4):
t_start = T_treat + shift
if t_start <= 0 or t_start >= T_total:
continue
df_rob = panel_df.copy()
df_rob['post'] = (df_rob['period'] >= t_start).astype(int)
try:
m = ols('y ~ treated * post + C(unit) + C(period)', data=df_rob).fit()
robust_estimates[shift] = m.params.get('treated:post', np.nan)
except Exception:
pass
plt.figure(figsize=(8, 4))
plt.plot(list(robust_estimates.keys()), list(robust_estimates.values()), marker='o')
plt.axvline(0, color='red', linestyle='--', label='True treatment start')
plt.axhline(true_ATT, color='grey', linestyle=':', label=f'True ATT={true_ATT}')
plt.xlabel('Shift in treatment start (periods)'); plt.ylabel('ATT Estimate')
plt.title('DiD Robustness: Varying Treatment Start'); plt.legend(); plt.show()
# Exercise 3: Permutation test for DiD
# Key insight: Permuting treatment assignment 1000 times generates an empirical null
# distribution for the DiD estimator; p-value = fraction of permutations >= observed ATT.
def did_estimate(df):
try:
m = ols('y ~ treated * post + C(period)', data=df).fit()
return m.params.get('treated:post', np.nan)
except Exception:
return np.nan
observed_ate = did_estimate(panel_df)
n_perms = 200 # reduced for demo speed
null_dist = []
for _ in range(n_perms):
perm_treat = np.random.permutation(panel_df['treated'].unique())
perm_map = dict(enumerate(np.random.binomial(1, 0.5, N_units)))
df_perm = panel_df.copy()
df_perm['treated'] = df_perm['unit'].map(perm_map)
null_dist.append(did_estimate(df_perm))
null_dist = np.array([x for x in null_dist if not np.isnan(x)])
p_value = (np.abs(null_dist) >= np.abs(observed_ate)).mean()
plt.figure(figsize=(8, 4))
plt.hist(null_dist, bins=30, alpha=0.7, label='Permutation null dist')
plt.axvline(observed_ate, color='red', linewidth=2, label=f'Observed ATT={observed_ate:.2f}')
plt.title(f'DiD Permutation Test (p={p_value:.3f})'); plt.legend(); plt.show()
print(f'Observed ATT: {observed_ate:.3f}, Empirical p-value: {p_value:.3f}')
# Exercise 4: Goodman-Bacon decomposition (simplified 2x2 decomposition)
# Key insight: With heterogeneous treatment timing, TWFE is a weighted average
# of all 2x2 DiD comparisons; some weights can be negative ("bad comparisons"),
# biasing the estimate. Bacon decomposition decomposes this into clean 2x2 DiDs.
# Simplified: 2 cohorts β early treated (T=8) and late treated (T=16)
np.random.seed(42)
cohorts = {'never': 0, 'early': 8, 'late': 16}
bacon_rows = []
for u in range(90):
cohort = np.random.choice(['never','early','late'], p=[0.33,0.33,0.34])
t_start = cohorts[cohort]
for t in range(24):
post = int(t >= t_start) if t_start > 0 else 0
te = 5 * post if cohort == 'early' else 10 * post if cohort == 'late' else 0
y = 50 + 0.5*t + te + np.random.normal(0, 3)
bacon_rows.append({'unit': u, 'period': t, 'cohort': cohort,
'treated': int(cohort != 'never'), 'y': y})
bacon_df = pd.DataFrame(bacon_rows)
def bacon_2x2(df, g1, g2, T_g1, T_g2):
"""Estimate 2x2 DiD between group g1 and g2, where g1 is treated at T_g1."""
sub = df[df['cohort'].isin([g1, g2])].copy()
sub['post'] = (sub['period'] >= T_g1).astype(int)
sub['treated_grp'] = (sub['cohort'] == g1).astype(int)
try:
m = ols('y ~ treated_grp * post', data=sub).fit()
return m.params.get('treated_grp:post', np.nan)
except Exception:
return np.nan
# Key 2x2 DiDs
early_v_never = bacon_2x2(bacon_df, 'early', 'never', 8, None)
late_v_never = bacon_2x2(bacon_df, 'late', 'never', 16, None)
early_v_late_pre = bacon_2x2(bacon_df[bacon_df['period'] < 16], 'early', 'late', 8, 16)
print('Goodman-Bacon 2x2 components:')
print(f' Early vs Never: {early_v_never:.3f} (true = 5)')
print(f' Late vs Never: {late_v_never:.3f} (true = 10)')
print(f' Early vs Late (pre-16): {early_v_late_pre:.3f}')
# Exercise 5: Interrupted Time Series (ITS) vs DiD
# Key insight: ITS fits a regression discontinuity at the treatment date for
# the treatment group only; DiD uses the control group as a counterfactual.
# ITS can be biased if pre-trends were changing; DiD corrects for common trends.
treated_series = panel_df[panel_df['treated'] == 1].groupby('period')['y'].mean()
t_arr = np.arange(T_total)
post = (t_arr >= T_treat).astype(int)
# ITS: y = b0 + b1*t + b2*post + b3*t*post + e
its_df = pd.DataFrame({
'y': treated_series.values, 't': t_arr,
'post': post, 't_post': t_arr * post
})
its_model = ols('y ~ t + post + t_post', data=its_df).fit()
its_jump = its_model.params['post'] # level shift at treatment
print(f'ITS level shift (ATT estimate): {its_jump:.3f}')
print(f'DiD ATT estimate: {att_hat:.3f}')
print(f'True ATT: {true_ATT}')
print('\nDiD uses control group to remove common time trends;')
print('ITS only uses the treated series (vulnerable to coincident events).')
06 β A/B Testing PlatformΒΆ
from scipy import stats
# Exercise 1: Bayesian A/B test β P(B > A) and expected loss using Beta priors
# Key insight: With Beta priors, P(B > A) = integral of Beta(a_B, b_B) > Beta(a_A, b_A);
# expected loss = E[max(theta_A - theta_B, 0)] under posterior β guides stopping decisions.
def bayesian_ab(alpha_A, beta_A, alpha_B, beta_B, n_samples=100_000):
"""Monte Carlo estimate of P(B>A) and expected loss."""
theta_A = np.random.beta(alpha_A, beta_A, n_samples)
theta_B = np.random.beta(alpha_B, beta_B, n_samples)
prob_B_wins = (theta_B > theta_A).mean()
expected_loss = np.maximum(theta_A - theta_B, 0).mean()
return prob_B_wins, expected_loss
# 5 scenarios: (alpha_A, beta_A, alpha_B, beta_B, conversions per arm)
scenarios = [
(50, 950, 60, 940, 'Weak B advantage'),
(50, 950, 80, 920, 'Strong B advantage'),
(100, 900, 99, 901, 'Very close'),
(50, 950, 40, 960, 'A wins'),
(200, 800, 210, 790, 'High traffic B advantage'),
]
print(f'{"Scenario":<28} {"P(B>A)":>8} {"E[Loss]":>10}')
print('-' * 50)
for aA, bA, aB, bB, label in scenarios:
p, loss = bayesian_ab(aA, bA, aB, bB)
print(f'{label:<28} {p:>8.4f} {loss:>10.6f}')
# Exercise 2: Peeking simulation β 1000 A/A tests with daily checks
# Key insight: Repeated significance testing inflates Type I error;
# checking daily until any check hits p<0.05 yields far more false positives
# than the nominal 5% rate β sequential testing methods (e.g., SPRT) correct this.
np.random.seed(123)
n_sims = 500 # reduced for demo speed (use 1000 in production)
n_days = 30
daily_n = 50 # new users per day
false_pos = 0
peeking_p = 0.05
for sim in range(n_sims):
# A/A test: both arms have same true conversion rate
ctrl_data, treat_data = [], []
stopped = False
for day in range(1, n_days + 1):
ctrl_data.extend(np.random.binomial(1, 0.1, daily_n))
treat_data.extend(np.random.binomial(1, 0.1, daily_n))
if len(ctrl_data) >= 10:
_, p = stats.chi2_contingency([
[sum(ctrl_data), len(ctrl_data) - sum(ctrl_data)],
[sum(treat_data), len(treat_data) - sum(treat_data)]
])[:2]
if p < peeking_p:
false_pos += 1; stopped = True; break
false_pos_rate = false_pos / n_sims
print(f'A/A tests: {n_sims}, False positives: {false_pos}')
print(f'False positive rate with peeking: {false_pos_rate:.3f}')
print(f'Expected nominal rate: 0.050')
print(f'Peeking inflates Type I error by ~{false_pos_rate/0.05:.1f}x')
# Exercise 3: Network effect test β experiment on independent user clusters
# Key insight: Randomly assigning individual users to A/B violates SUTVA when
# users interact; assigning whole clusters (network components) prevents spillover.
def create_user_clusters(n_users=500, n_clusters=20):
"""Assign users to independent clusters (e.g., social graph communities)."""
cluster_id = np.random.randint(0, n_clusters, n_users)
return pd.DataFrame({'user_id': range(n_users), 'cluster_id': cluster_id})
def cluster_randomize(cluster_df, treatment_frac=0.5, seed=42):
"""Assign treatment at cluster level to prevent spillover."""
np.random.seed(seed)
clusters = cluster_df['cluster_id'].unique()
n_treat = int(len(clusters) * treatment_frac)
treat_clusters = set(np.random.choice(clusters, n_treat, replace=False))
cluster_df['treatment'] = cluster_df['cluster_id'].isin(treat_clusters).astype(int)
return cluster_df
users_df = create_user_clusters()
users_df = cluster_randomize(users_df)
# Simulate outcome with cluster-level network effect
cluster_effects = {c: np.random.normal(5, 2) for c in users_df['cluster_id'].unique()
if users_df[users_df['cluster_id']==c]['treatment'].iloc[0] == 1}
users_df['outcome'] = (users_df['cluster_id'].map(cluster_effects).fillna(0) +
np.random.normal(50, 5, len(users_df)))
# Cluster-robust DiD estimate
cluster_avg = users_df.groupby(['cluster_id','treatment'])['outcome'].mean().reset_index()
ate_cluster = cluster_avg[cluster_avg['treatment']==1]['outcome'].mean() - \
cluster_avg[cluster_avg['treatment']==0]['outcome'].mean()
print(f'Cluster-level ATE estimate: {ate_cluster:.3f}')
print(f'Treatment clusters: {users_df.groupby("cluster_id")["treatment"].first().sum()}/{users_df["cluster_id"].nunique()}')
# Exercise 4: Interleaving evaluation β measure user preference between control/treatment
# Key insight: Interleaving mixes items from two rankers in a single list;
# user clicks on items from each ranker indicate which algorithm they prefer,
# making it more sensitive than A/B split testing for ranking systems.
def team_draft_interleave(list_A, list_B, n=10):
"""Team-draft interleaving: alternate picking top unchosen item from A then B."""
interleaved = []
assignment = {} # item -> 'A' or 'B'
a_picks, b_picks = [], []
seen = set()
team = 'A' # coin flip for who goes first
for _ in range(n):
lst = list_A if team == 'A' else list_B
for item in lst:
if item not in seen:
interleaved.append(item)
assignment[item] = team
seen.add(item)
break
team = 'B' if team == 'A' else 'A'
return interleaved, assignment
# Simulate two rankers and user clicks
items = list(range(20))
np.random.shuffle(items)
ranker_A = items[:10]
ranker_B = sorted(items[:10], key=lambda x: -np.random.rand()) # different order
interleaved_list, assignments = team_draft_interleave(ranker_A, ranker_B, n=10)
# Simulate clicks (B slightly better)
click_probs = {item: 0.3 if assignments.get(item) == 'B' else 0.2 for item in interleaved_list}
clicks = {item: np.random.binomial(100, click_probs[item]) for item in interleaved_list}
clicks_A = sum(v for k, v in clicks.items() if assignments.get(k) == 'A')
clicks_B = sum(v for k, v in clicks.items() if assignments.get(k) == 'B')
winner = 'B' if clicks_B > clicks_A else 'A'
print(f'Interleaved list: {interleaved_list}')
print(f'Clicks on A items: {clicks_A}, Clicks on B items: {clicks_B}')
print(f'Preferred ranker: {winner}')
# Exercise 5: Conflict detector β flag users in >1 experiment simultaneously
# Key insight: Users in multiple experiments simultaneously violate the mutual
# exclusivity assumption; their outcome reflects combined treatment effects,
# making per-experiment analysis confounded.
from datetime import datetime, timedelta
def create_experiment_assignments(n_users=300, n_experiments=5):
rows = []
base = datetime(2024, 1, 1)
for exp_id in range(n_experiments):
exp_start = base + timedelta(days=exp_id * 10)
exp_end = exp_start + timedelta(days=30)
# Assign ~40% of users to each experiment
assigned_users = np.random.choice(n_users, int(n_users * 0.4), replace=False)
for u in assigned_users:
rows.append({'user_id': u, 'experiment_id': exp_id,
'start_date': exp_start, 'end_date': exp_end})
return pd.DataFrame(rows)
def detect_conflicts(assignments_df):
"""Find users enrolled in overlapping experiments."""
conflicts = []
user_groups = assignments_df.groupby('user_id')
for user_id, group in user_groups:
if len(group) < 2: continue
exps = group.sort_values('start_date').reset_index(drop=True)
for i in range(len(exps)):
for j in range(i+1, len(exps)):
# Check date overlap
if exps.loc[i, 'end_date'] > exps.loc[j, 'start_date'] and \
exps.loc[j, 'end_date'] > exps.loc[i, 'start_date']:
conflicts.append({
'user_id': user_id,
'exp1': exps.loc[i, 'experiment_id'],
'exp2': exps.loc[j, 'experiment_id'],
'overlap_start': max(exps.loc[i,'start_date'], exps.loc[j,'start_date']),
'overlap_end': min(exps.loc[i,'end_date'], exps.loc[j,'end_date'])
})
return pd.DataFrame(conflicts).drop_duplicates()
assignments = create_experiment_assignments(n_users=200, n_experiments=5)
conflicts = detect_conflicts(assignments)
print(f'Total assignments: {len(assignments)}')
print(f'Conflicting user-experiment pairs: {len(conflicts)}')
print(f'Users with conflicts: {conflicts["user_id"].nunique() if len(conflicts) > 0 else 0}')
if len(conflicts) > 0:
print('\nSample conflicts:')
print(conflicts.head(5).to_string(index=False))