Solutions: Machine Learning TrackΒΆ
Worked solutions to all exercises from the machine-learning/ notebooks.
01 β sklearn PipelinesΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score, cross_val_predict, GridSearchCV
from sklearn.metrics import ConfusionMatrixDisplay
# ββ Exercise 1: Pipeline with custom ratio-feature transformer + RF ββββββββ
# Key insight: adding derived features inside a Pipeline transformer ensures
# ratios are computed consistently on both train and test splits.
class RatioFeatureAdder(BaseEstimator, TransformerMixin):
"""Adds pairwise ratio features for all pairs of input columns."""
def fit(self, X, y=None):
self.n_features_in_ = X.shape[1]
return self
def transform(self, X):
X = np.array(X)
ratios = []
for i in range(X.shape[1]):
for j in range(X.shape[1]):
if i != j:
# Avoid division-by-zero with a small epsilon
ratios.append((X[:, i] / (X[:, j] + 1e-8)).reshape(-1, 1))
return np.hstack([X] + ratios)
X, y = make_classification(n_samples=400, n_features=6, n_informative=4, random_state=1)
pipe = Pipeline([
('ratios', RatioFeatureAdder()),
('scaler', StandardScaler()),
('clf', RandomForestClassifier(n_estimators=100, random_state=42)),
])
scores = cross_val_score(pipe, X, y, cv=5)
print(f'CV accuracy with ratio features: {scores.mean():.4f} Β± {scores.std():.4f}')
# ββ Exercise 2: set_output(transform='pandas') to preserve column names ββββ
# Key insight: sklearn 1.2+ supports set_output, making pipelines return
# DataFrames with meaningful column names β essential for debugging
# and downstream feature selection.
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
rng = np.random.default_rng(0)
df_named = pd.DataFrame(
rng.normal(0, 1, (50, 4)),
columns=['age', 'income', 'score', 'tenure']
)
# Inject some NaN
df_named.loc[0, 'age'] = np.nan
pipe_named = Pipeline([
('imputer', SimpleImputer(strategy='mean')),
('scaler', StandardScaler()),
]).set_output(transform='pandas')
result_df = pipe_named.fit_transform(df_named)
print('Output type:', type(result_df))
print('Columns preserved:', result_df.columns.tolist())
print(result_df.head(3))
# ββ Exercise 3: cross_val_predict β confusion matrix βββββββββββββββββββββ
# Key insight: cross_val_predict generates out-of-fold predictions so the
# confusion matrix reflects true generalisation β not training fit.
from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix
X, y = make_classification(n_samples=500, n_features=10, random_state=7)
pipe_cv = make_pipeline(
SimpleImputer(),
StandardScaler(),
RandomForestClassifier(n_estimators=100, random_state=42)
)
y_pred = cross_val_predict(pipe_cv, X, y, cv=5)
cm = confusion_matrix(y, y_pred)
disp = ConfusionMatrixDisplay(confusion_matrix=cm)
fig, ax = plt.subplots(figsize=(5, 4))
disp.plot(ax=ax, colorbar=False)
ax.set_title('OOF Confusion Matrix (cross_val_predict)')
plt.tight_layout()
plt.show()
accuracy = (cm.diagonal().sum()) / cm.sum()
print(f'OOF Accuracy: {accuracy:.4f}')
# ββ Exercise 4: Cache expensive transformers with joblib Memory ββββββββββββ
# Key insight: caching a slow transformer (e.g., PCA on large data) avoids
# recomputation when re-fitting the pipeline with only model changes.
import time
import tempfile
from joblib import Memory
from sklearn.decomposition import PCA
X, y = make_classification(n_samples=1000, n_features=50, random_state=0)
with tempfile.TemporaryDirectory() as cachedir:
memory = Memory(location=cachedir, verbose=0)
pipe_cached = Pipeline([
('scaler', StandardScaler()),
('pca', PCA(n_components=10)),
('clf', RandomForestClassifier(n_estimators=50, random_state=1)),
], memory=memory)
# First fit β runs all steps
t0 = time.perf_counter()
pipe_cached.fit(X, y)
t_first = time.perf_counter() - t0
# Change only the classifier hyperparameter; scaler+PCA should be cache-hit
pipe_cached.set_params(clf__n_estimators=100)
t1 = time.perf_counter()
pipe_cached.fit(X, y)
t_second = time.perf_counter() - t1
print(f'First fit: {t_first:.3f}s')
print(f'Second fit (cached transforms): {t_second:.3f}s')
print(f'Speedup factor: {t_first / max(t_second, 1e-9):.1f}x')
# ββ Exercise 5: GridSearchCV over preprocessing + model hyperparams ββββββββ
# Key insight: Pipeline param names follow <step>__<param> syntax, letting
# GridSearchCV tune both preprocessing and model in one search.
from sklearn.preprocessing import MinMaxScaler
X, y = make_classification(n_samples=300, n_features=10, random_state=5)
pipe_gs = Pipeline([
('scaler', StandardScaler()),
('clf', RandomForestClassifier(random_state=42)),
])
param_grid = {
'scaler': [StandardScaler(), MinMaxScaler()],
'clf__n_estimators': [50, 100],
'clf__max_depth': [None, 5, 10],
'clf__min_samples_leaf': [1, 5],
}
gs = GridSearchCV(pipe_gs, param_grid, cv=3, scoring='accuracy', n_jobs=-1, verbose=0)
gs.fit(X, y)
print(f'Best CV accuracy: {gs.best_score_:.4f}')
print('Best params:')
for k, v in gs.best_params_.items():
print(f' {k}: {v}')
02 β Model Selection & ValidationΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import (learning_curve, cross_val_score,
GridSearchCV, StratifiedKFold,
train_test_split)
from sklearn.datasets import make_classification
X, y = make_classification(n_samples=800, n_features=20, n_informative=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# ββ Exercise 1: Learning curves to diagnose bias vs variance ββββββββββββββ
# Key insight: large gap between train and val curve = overfitting (high variance);
# both curves low and converged = underfitting (high bias).
clf = RandomForestClassifier(n_estimators=100, random_state=42)
train_sizes, train_scores, val_scores = learning_curve(
clf, X, y, cv=5, train_sizes=np.linspace(0.1, 1.0, 10),
scoring='accuracy', n_jobs=-1
)
fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(train_sizes, train_scores.mean(axis=1), 'o-', label='Train score', color='steelblue')
ax.fill_between(train_sizes,
train_scores.mean(axis=1) - train_scores.std(axis=1),
train_scores.mean(axis=1) + train_scores.std(axis=1), alpha=0.15, color='steelblue')
ax.plot(train_sizes, val_scores.mean(axis=1), 'o-', label='Val score', color='darkorange')
ax.fill_between(train_sizes,
val_scores.mean(axis=1) - val_scores.std(axis=1),
val_scores.mean(axis=1) + val_scores.std(axis=1), alpha=0.15, color='darkorange')
ax.set_xlabel('Training set size')
ax.set_ylabel('Accuracy')
ax.set_title('Learning Curves β Random Forest')
ax.legend()
plt.tight_layout()
plt.show()
# ββ Exercise 2: Nested cross-validation βββββββββββββββββββββββββββββββββββ
# Key insight: standard CV re-uses the validation set for model selection AND
# evaluation β this is optimistic. Nested CV has an outer loop
# for unbiased evaluation and an inner loop for hyperparameter tuning.
from sklearn.model_selection import cross_val_score, GridSearchCV, StratifiedKFold
outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
inner_cv = StratifiedKFold(n_splits=3, shuffle=True, random_state=2)
clf_base = RandomForestClassifier(random_state=42)
param_grid = {'n_estimators': [50, 100], 'max_depth': [None, 5]}
inner_search = GridSearchCV(clf_base, param_grid, cv=inner_cv, scoring='accuracy', n_jobs=-1)
# outer_cv evaluates inner_search (which itself tunes on inner_cv)
nested_scores = cross_val_score(inner_search, X, y, cv=outer_cv, scoring='accuracy')
# Compare to non-nested CV (optimistic baseline)
non_nested_scores = cross_val_score(clf_base, X, y, cv=outer_cv, scoring='accuracy')
print(f'Nested CV accuracy: {nested_scores.mean():.4f} Β± {nested_scores.std():.4f}')
print(f'Non-nested CV accuracy: {non_nested_scores.mean():.4f} Β± {non_nested_scores.std():.4f}')
print(f'Optimism bias: {non_nested_scores.mean() - nested_scores.mean():.4f}')
# ββ Exercise 3: Train / Val / Test score analysis for over/underfitting ββββ
# Key insight: Train >> Val β Test β overfit. Train β Val β Test, all low β underfit.
# A well-fit model has small train-val gap and decent absolute score.
from sklearn.tree import DecisionTreeClassifier
X_tr, X_tmp, y_tr, y_tmp = train_test_split(X, y, test_size=0.3, random_state=0)
X_val, X_te, y_val, y_te = train_test_split(X_tmp, y_tmp, test_size=0.5, random_state=0)
configs = [
('Overfit (max_depth=None)', DecisionTreeClassifier(max_depth=None, random_state=0)),
('Underfit (max_depth=1)', DecisionTreeClassifier(max_depth=1, random_state=0)),
('Good fit (max_depth=5)', DecisionTreeClassifier(max_depth=5, random_state=0)),
]
rows = []
for name, clf in configs:
clf.fit(X_tr, y_tr)
rows.append({
'model': name,
'train': round(clf.score(X_tr, y_tr), 4),
'val': round(clf.score(X_val, y_val), 4),
'test': round(clf.score(X_te, y_te), 4),
})
print(pd.DataFrame(rows).to_string(index=False))
# ββ Exercise 4: Bootstrap confidence interval for accuracy ββββββββββββββββ
# Key insight: the bootstrap CI is distribution-free β ideal when you can't
# assume normality of the accuracy estimator across folds.
from sklearn.metrics import accuracy_score
rng = np.random.default_rng(99)
clf_boot = RandomForestClassifier(n_estimators=100, random_state=0)
clf_boot.fit(X_train, y_train)
y_pred_boot = clf_boot.predict(X_test)
n_bootstrap = 1000
boot_scores = []
n_test = len(y_test)
for _ in range(n_bootstrap):
idx = rng.integers(0, n_test, n_test)
boot_scores.append(accuracy_score(y_test[idx], y_pred_boot[idx]))
boot_scores = np.array(boot_scores)
ci_lower = np.percentile(boot_scores, 2.5)
ci_upper = np.percentile(boot_scores, 97.5)
print(f'Point estimate: {accuracy_score(y_test, y_pred_boot):.4f}')
print(f'95% Bootstrap CI: [{ci_lower:.4f}, {ci_upper:.4f}]')
plt.figure(figsize=(7, 3))
plt.hist(boot_scores, bins=40, color='steelblue', edgecolor='white')
plt.axvline(ci_lower, color='red', linestyle='--', label=f'2.5% = {ci_lower:.3f}')
plt.axvline(ci_upper, color='red', linestyle='-', label=f'97.5% = {ci_upper:.3f}')
plt.xlabel('Bootstrap Accuracy'); plt.legend()
plt.title('Bootstrap Distribution of Accuracy')
plt.tight_layout(); plt.show()
# ββ Exercise 5: Early stopping for gradient descent via validation loss ββββ
# Key insight: monitor validation loss each epoch; stop when it stops improving
# for `patience` epochs. This prevents wasting compute on overfitting.
def early_stopping_gd(X_tr, y_tr, X_val, y_val, lr=0.05, max_epochs=500, patience=10):
"""Logistic regression via gradient descent with early stopping."""
n, p = X_tr.shape
w = np.zeros(p)
best_val_loss = np.inf
best_w = w.copy()
no_improve = 0
history = []
def sigmoid(z): return 1 / (1 + np.exp(-np.clip(z, -500, 500)))
def bce(y_true, y_prob): return -np.mean(y_true * np.log(y_prob + 1e-12) +
(1 - y_true) * np.log(1 - y_prob + 1e-12))
for epoch in range(max_epochs):
y_hat = sigmoid(X_tr @ w)
grad = X_tr.T @ (y_hat - y_tr) / n
w = w - lr * grad
val_loss = bce(y_val, sigmoid(X_val @ w))
history.append(val_loss)
if val_loss < best_val_loss - 1e-5:
best_val_loss = val_loss
best_w = w.copy()
no_improve = 0
else:
no_improve += 1
if no_improve >= patience:
print(f'Early stop at epoch {epoch} (best val loss {best_val_loss:.4f})')
break
return best_w, history
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
Xtr_s = scaler.fit_transform(X_train)
Xte_s = scaler.transform(X_test)
X_val_s, X_te_s, y_val_s, y_te_s = train_test_split(Xte_s, y_test, test_size=0.5, random_state=0)
best_w, hist = early_stopping_gd(Xtr_s, y_train, X_val_s, y_val_s)
plt.plot(hist, color='steelblue')
plt.xlabel('Epoch'); plt.ylabel('Validation BCE Loss')
plt.title('Gradient Descent with Early Stopping')
plt.tight_layout(); plt.show()
03 β Ensemble MethodsΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score, StratifiedKFold, train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from scipy.optimize import minimize
X, y = make_classification(n_samples=600, n_features=15, n_informative=8, random_state=7)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=0)
# ββ Exercise 1: Soft-voting ensemble with scipy-tuned weights βββββββββββββ
# Key insight: soft voting averages class probabilities; optimal weights
# maximise validation accuracy and are found by L-BFGS minimisation.
lr = LogisticRegression(max_iter=1000, random_state=0)
rf = RandomForestClassifier(n_estimators=100, random_state=0)
gbm = GradientBoostingClassifier(n_estimators=100, random_state=0)
X_trn, X_val, y_trn, y_val = train_test_split(X_tr, y_tr, test_size=0.25, random_state=1)
for clf in [lr, rf, gbm]: clf.fit(X_trn, y_trn)
probs_val = np.stack([
lr.predict_proba(X_val)[:,1],
rf.predict_proba(X_val)[:,1],
gbm.predict_proba(X_val)[:,1],
], axis=1)
def neg_accuracy(weights):
w = np.array(weights)
w = np.abs(w) / np.abs(w).sum() # softmax-like normalisation
preds = (probs_val @ w >= 0.5).astype(int)
return -accuracy_score(y_val, preds)
res = minimize(neg_accuracy, x0=[1/3, 1/3, 1/3], method='Nelder-Mead')
best_w = np.abs(res.x) / np.abs(res.x).sum()
print(f'Optimal weights: LR={best_w[0]:.3f}, RF={best_w[1]:.3f}, GBM={best_w[2]:.3f}')
probs_te = np.stack([
lr.predict_proba(X_te)[:,1],
rf.predict_proba(X_te)[:,1],
gbm.predict_proba(X_te)[:,1],
], axis=1)
y_pred_ens = (probs_te @ best_w >= 0.5).astype(int)
print(f'Test accuracy (optimised soft voting): {accuracy_score(y_te, y_pred_ens):.4f}')
# ββ Exercise 2: Stacking from scratch with OOF base-learner predictions βββ
# Key insight: out-of-fold predictions as meta-features prevent the meta-learner
# from seeing predictions made on data it was trained on.
def stacking_oof(X_train, y_train, X_test, base_learners, meta_learner, n_splits=5):
kf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
oof_train = np.zeros((len(y_train), len(base_learners)))
test_preds = np.zeros((len(X_test), len(base_learners)))
for i, clf in enumerate(base_learners):
fold_test_preds = np.zeros((len(X_test), n_splits))
for fold, (trn_idx, val_idx) in enumerate(kf.split(X_train, y_train)):
clf.fit(X_train[trn_idx], y_train[trn_idx])
oof_train[val_idx, i] = clf.predict_proba(X_train[val_idx])[:, 1]
fold_test_preds[:, fold] = clf.predict_proba(X_test)[:, 1]
test_preds[:, i] = fold_test_preds.mean(axis=1)
meta_learner.fit(oof_train, y_train)
return meta_learner.predict(test_preds)
base = [
LogisticRegression(max_iter=500, random_state=0),
RandomForestClassifier(n_estimators=50, random_state=0),
]
meta = LogisticRegression(max_iter=500, random_state=0)
y_stack = stacking_oof(X_tr, y_tr, X_te, base, meta)
print(f'Stacking test accuracy: {accuracy_score(y_te, y_stack):.4f}')
# ββ Exercise 3: OOB score vs CV score for RandomForest ββββββββββββββββββββ
# Key insight: OOB uses the ~37% of samples not drawn for each tree as
# validation, giving a free cross-validation estimate; it should
# be close to k-fold CV but can be slightly optimistic.
rf_oob = RandomForestClassifier(n_estimators=200, oob_score=True, random_state=42)
rf_oob.fit(X_tr, y_tr)
cv_scores = cross_val_score(rf_oob, X_tr, y_tr, cv=5)
print(f'OOB score: {rf_oob.oob_score_:.4f}')
print(f'5-fold CV score: {cv_scores.mean():.4f} Β± {cv_scores.std():.4f}')
print('Both estimates use ~out-of-sample data, so they should be similar.')
# ββ Exercise 4: AdaBoost from scratch βββββββββββββββββββββββββββββββββββββ
# Key insight: AdaBoost reweights misclassified samples exponentially so
# subsequent weak learners focus on hard examples; alpha controls
# how much weight each learner gets in the final vote.
from sklearn.tree import DecisionTreeClassifier
def adaboost_scratch(X_train, y_train, X_test, n_estimators=50):
n = len(y_train)
w = np.ones(n) / n
# Convert labels to {-1, +1}
y_pm = np.where(y_train == 1, 1, -1)
estimators = []
alphas = []
for _ in range(n_estimators):
stump = DecisionTreeClassifier(max_depth=1)
stump.fit(X_train, y_pm, sample_weight=w)
y_pred_pm = stump.predict(X_train)
# Weighted error
err = np.dot(w, y_pred_pm != y_pm) / w.sum()
err = np.clip(err, 1e-10, 1 - 1e-10)
alpha = 0.5 * np.log((1 - err) / err)
# Update weights
w *= np.exp(-alpha * y_pm * y_pred_pm)
w /= w.sum()
estimators.append(stump)
alphas.append(alpha)
# Predict
scores = sum(a * np.where(e.predict(X_test) == 1, 1, -1)
for a, e in zip(alphas, estimators))
return (scores >= 0).astype(int)
y_ada = adaboost_scratch(X_tr, y_tr, X_te)
print(f'Scratch AdaBoost test accuracy: {accuracy_score(y_te, y_ada):.4f}')
# Sanity check vs sklearn
from sklearn.ensemble import AdaBoostClassifier
sk_ada = AdaBoostClassifier(n_estimators=50, random_state=0, algorithm='SAMME')
sk_ada.fit(X_tr, y_tr)
print(f'sklearn AdaBoost test accuracy: {sk_ada.score(X_te, y_te):.4f}')
# ββ Exercise 5: Feature importance stability across RF seeds ββββββββββββββ
# Key insight: if top-5 features change across seeds, the importances are
# unreliable (correlated features share importance randomly);
# SHAP or permutation importance is more stable in that case.
n_runs = 10
importances_all = []
for seed in range(n_runs):
clf_i = RandomForestClassifier(n_estimators=100, random_state=seed)
clf_i.fit(X_tr, y_tr)
importances_all.append(clf_i.feature_importances_)
imp_df = pd.DataFrame(importances_all, columns=[f'f{i}' for i in range(X_tr.shape[1])])
mean_imp = imp_df.mean().sort_values(ascending=False)
std_imp = imp_df.std().reindex(mean_imp.index)
top5 = mean_imp.head(5)
fig, ax = plt.subplots(figsize=(7, 4))
ax.bar(top5.index, top5.values, yerr=std_imp[top5.index].values,
color='steelblue', capsize=5, error_kw={'elinewidth': 2})
ax.set_title('Top-5 Feature Importances across 10 RF seeds (mean Β± std)')
ax.set_ylabel('Mean Importance')
plt.tight_layout(); plt.show()
print('Top-5 stable features:', top5.index.tolist())
04 β Imbalanced DatasetsΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import average_precision_score, precision_recall_curve, f1_score
from imblearn.over_sampling import SMOTE, ADASYN
# Highly imbalanced dataset (5% minority)
X, y = make_classification(n_samples=2000, n_features=10, weights=[0.95, 0.05],
n_informative=5, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25,
stratify=y, random_state=0)
# ββ Exercise 1: Compare 5 resampling strategies β PR-AUC table ββββββββββββ
# Key insight: PR-AUC is the right metric for imbalanced data β ROC-AUC can
# look great even when the minority class is mostly ignored.
def fit_predict_proba(X_train, y_train, X_test, clf_kwargs=None):
clf_kwargs = clf_kwargs or {}
clf = RandomForestClassifier(n_estimators=100, random_state=0, **clf_kwargs)
clf.fit(X_train, y_train)
return clf.predict_proba(X_test)[:, 1], clf
strategies = {}
# No fix
proba, _ = fit_predict_proba(X_tr, y_tr, X_te)
strategies['No fix'] = proba
# SMOTE
X_sm, y_sm = SMOTE(random_state=0).fit_resample(X_tr, y_tr)
strategies['SMOTE'], _ = fit_predict_proba(X_sm, y_sm, X_te)
# ADASYN
X_ad, y_ad = ADASYN(random_state=0).fit_resample(X_tr, y_tr)
strategies['ADASYN'], _ = fit_predict_proba(X_ad, y_ad, X_te)
# class_weight='balanced'
strategies['class_weight'], _ = fit_predict_proba(X_tr, y_tr, X_te,
clf_kwargs={'class_weight': 'balanced'})
# Threshold tuning (train without fix, tune threshold on val)
X_trn, X_val, y_trn, y_val = train_test_split(X_tr, y_tr, test_size=0.2, stratify=y_tr, random_state=1)
proba_val, clf_tt = fit_predict_proba(X_trn, y_trn, X_val)
thresholds = np.linspace(0.01, 0.99, 200)
best_t = max(thresholds, key=lambda t: f1_score(y_val, (proba_val >= t).astype(int), zero_division=0))
strategies['Threshold tuning'] = clf_tt.predict_proba(X_te)[:, 1] # store raw proba
rows = []
for name, proba_arr in strategies.items():
pr_auc = average_precision_score(y_te, proba_arr)
rows.append({'strategy': name, 'PR-AUC': round(pr_auc, 4)})
print(pd.DataFrame(rows).sort_values('PR-AUC', ascending=False).to_string(index=False))
# ββ Exercise 2: Custom k-means difficulty-based oversampler βββββββββββββββ
# Key insight: standard SMOTE oversamples uniformly; focusing on minority samples
# near cluster boundaries (high difficulty) targets the decision
# boundary more effectively.
from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator
class KMeansDifficultyOversampler(BaseEstimator):
"""
Oversample minority class by generating synthetic samples near cluster
centroids that have the highest ratio of majority neighbours.
"""
def __init__(self, k=5, n_synthetic=200, random_state=0):
self.k = k
self.n_synthetic = n_synthetic
self.random_state = random_state
def fit_resample(self, X, y):
rng = np.random.default_rng(self.random_state)
minority_mask = y == 1
X_min = X[minority_mask]
km = KMeans(n_clusters=min(self.k, len(X_min)), random_state=self.random_state, n_init=5)
km.fit(X_min)
labels = km.labels_
# Difficulty = fraction of majority class among ALL samples near each centroid
from sklearn.neighbors import NearestNeighbors
nn = NearestNeighbors(n_neighbors=10).fit(X)
difficulties = np.zeros(km.n_clusters)
for c in range(km.n_clusters):
c_center = km.cluster_centers_[c].reshape(1, -1)
_, nbr_idx = nn.kneighbors(c_center)
difficulties[c] = (y[nbr_idx[0]] == 0).mean()
# Sampling probability proportional to difficulty
probs = difficulties / difficulties.sum()
chosen_clusters = rng.choice(km.n_clusters, size=self.n_synthetic, p=probs)
synthetic = []
for c in chosen_clusters:
members = X_min[labels == c]
if len(members) < 2:
synthetic.append(members[0])
else:
a, b = members[rng.integers(0, len(members), 2)]
alpha = rng.random()
synthetic.append(alpha * a + (1 - alpha) * b)
X_synthetic = np.vstack(synthetic)
y_synthetic = np.ones(self.n_synthetic, dtype=y.dtype)
return np.vstack([X, X_synthetic]), np.concatenate([y, y_synthetic])
sampler = KMeansDifficultyOversampler(k=5, n_synthetic=200, random_state=0)
X_km, y_km = sampler.fit_resample(X_tr, y_tr)
print(f'Before: {np.bincount(y_tr)} After: {np.bincount(y_km)}')
# ββ Exercise 3: Precision-Recall curves for all strategies ββββββββββββββββ
# Key insight: plotting all PR curves together lets you compare the precision-
# recall trade-off across strategies at every threshold at once.
fig, ax = plt.subplots(figsize=(7, 5))
colors = ['grey', 'steelblue', 'darkorange', 'green', 'purple']
for (name, proba_arr), color in zip(strategies.items(), colors):
prec, rec, _ = precision_recall_curve(y_te, proba_arr)
ap = average_precision_score(y_te, proba_arr)
ax.step(rec, prec, where='post', label=f'{name} (AP={ap:.3f})', color=color)
# Random baseline
baseline = y_te.mean()
ax.axhline(baseline, linestyle='--', color='black', label=f'Baseline ({baseline:.3f})')
ax.set_xlabel('Recall')
ax.set_ylabel('Precision')
ax.set_title('Precision-Recall Curves β Imbalanced Data Strategies')
ax.legend(loc='upper right', fontsize=8)
plt.tight_layout()
plt.show()
# ββ Exercise 4: Optimal threshold that maximises F2 score βββββββββββββββββ
# Key insight: F2 weighs recall twice as much as precision β appropriate when
# missing a positive (FN) is more costly than a false alarm (FP).
from sklearn.metrics import fbeta_score
proba_smote = strategies['SMOTE']
thresholds = np.linspace(0.01, 0.99, 300)
f2_scores = [fbeta_score(y_te, (proba_smote >= t).astype(int), beta=2, zero_division=0)
for t in thresholds]
best_idx = np.argmax(f2_scores)
best_threshold = thresholds[best_idx]
best_f2 = f2_scores[best_idx]
plt.figure(figsize=(7, 4))
plt.plot(thresholds, f2_scores, color='steelblue')
plt.axvline(best_threshold, color='crimson', linestyle='--',
label=f'Best t={best_threshold:.2f}, F2={best_f2:.3f}')
plt.xlabel('Threshold'); plt.ylabel('F2 Score')
plt.title('F2 Score vs Classification Threshold (SMOTE strategy)')
plt.legend(); plt.tight_layout(); plt.show()
print(f'Optimal threshold: {best_threshold:.3f} | F2: {best_f2:.4f}')
print('F2 vs F1: F2 up-weights recall, penalising missed positives more than false alarms.')
# ββ Exercise 5: ROC-AUC misleading on imbalanced data β numerical example β
# Key insight: a classifier that always predicts the majority class achieves
# high ROC-AUC but zero recall on the minority class β PR-AUC
# unmasks this because it is anchored to the class prevalence.
from sklearn.metrics import roc_auc_score, average_precision_score
# 95/5 imbalanced test set
n_test = 1000
y_imb = np.array([0]*950 + [1]*50)
# Trivial classifier: always predict majority (0)
# We add tiny random noise to avoid degenerate probability arrays
rng = np.random.default_rng(0)
proba_majority = rng.uniform(0, 0.1, n_test) # all low probabilities
# Good classifier with near-perfect minority recall
proba_good = np.where(y_imb == 1,
rng.uniform(0.7, 1.0, n_test),
rng.uniform(0.0, 0.3, n_test))
print('Classifier | ROC-AUC | PR-AUC')
print('-'*47)
for name, proba_arr in [('Majority-class trivial', proba_majority),
('Good minority detector', proba_good)]:
roc = roc_auc_score(y_imb, proba_arr)
prc = average_precision_score(y_imb, proba_arr)
print(f'{name:24s} | {roc:.4f} | {prc:.4f}')
print('\nThe trivial classifier has ROC-AUC β 0.5 but near-zero PR-AUC,')
print('showing ROC-AUC alone is not sufficient for imbalanced tasks.')
05 β Model InterpretabilityΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import shap
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score
X, y = make_classification(n_samples=800, n_features=12, n_informative=6,
n_redundant=2, random_state=42)
feature_names = [f'feat_{i}' for i in range(X.shape[1])]
X_df = pd.DataFrame(X, columns=feature_names)
X_tr, X_te, y_tr, y_te = train_test_split(X_df, y, test_size=0.25, random_state=0)
clf = RandomForestClassifier(n_estimators=200, random_state=42)
clf.fit(X_tr, y_tr)
# ββ Exercise 1: SHAP vs permutation importance vs gain importance ββββββββββ
# Key insight: gain importance is biased toward high-cardinality features;
# permutation importance is model-agnostic and measures actual
# performance drop; SHAP is theoretically grounded and additive.
# Gain (Gini) importance
gain_imp = pd.Series(clf.feature_importances_, index=feature_names).sort_values(ascending=False)
# Permutation importance
perm_res = permutation_importance(clf, X_te, y_te, n_repeats=10, random_state=0)
perm_imp = pd.Series(perm_res.importances_mean, index=feature_names).sort_values(ascending=False)
# SHAP importance
explainer = shap.TreeExplainer(clf)
shap_values = explainer.shap_values(X_te)
# For binary classification, shap_values is a list [class0, class1]
shap_mean = np.abs(shap_values[1] if isinstance(shap_values, list) else shap_values).mean(axis=0)
shap_imp = pd.Series(shap_mean, index=feature_names).sort_values(ascending=False)
fig, axes = plt.subplots(1, 3, figsize=(14, 4))
for ax, (name, imp) in zip(axes, [('Gain', gain_imp), ('Permutation', perm_imp), ('SHAP', shap_imp)]):
imp.head(8).sort_values().plot(kind='barh', ax=ax, color='steelblue')
ax.set_title(f'{name} Importance')
plt.suptitle('Three Importance Methods β Top 8 Features', y=1.01)
plt.tight_layout(); plt.show()
# ββ Exercise 2: 2D Partial Dependence Plot for two interacting features ββββ
# Key insight: a 2D PDP shows how two features jointly affect model output;
# diagonal or non-additive patterns reveal interaction effects.
from sklearn.inspection import PartialDependenceDisplay
fig, ax = plt.subplots(figsize=(7, 5))
PartialDependenceDisplay.from_estimator(
clf, X_tr,
features=[('feat_0', 'feat_1')], # 2D PDP for the two top features
kind='average',
ax=ax,
)
ax.set_title('2D Partial Dependence: feat_0 Γ feat_1')
plt.tight_layout(); plt.show()
# ββ Exercise 3: Detect spurious feature via SHAP ββββββββββββββββββββββββββ
# Key insight: a purely random column carries zero mutual information with
# the target; SHAP values for it should cluster near zero.
rng = np.random.default_rng(0)
X_spurious = X_tr.copy()
X_spurious['random_noise'] = rng.standard_normal(len(X_spurious))
all_features = feature_names + ['random_noise']
clf_sp = RandomForestClassifier(n_estimators=200, random_state=42)
clf_sp.fit(X_spurious, y_tr)
explainer_sp = shap.TreeExplainer(clf_sp)
X_te_sp = X_te.copy()
X_te_sp['random_noise'] = rng.standard_normal(len(X_te_sp))
shap_vals_sp = explainer_sp.shap_values(X_te_sp)
shap_sp = np.abs(shap_vals_sp[1] if isinstance(shap_vals_sp, list) else shap_vals_sp).mean(axis=0)
shap_sp_series = pd.Series(shap_sp, index=all_features).sort_values()
fig, ax = plt.subplots(figsize=(7, 5))
colors = ['crimson' if f == 'random_noise' else 'steelblue' for f in shap_sp_series.index]
shap_sp_series.plot(kind='barh', ax=ax, color=colors)
ax.set_title('SHAP Importance (red = spurious random column)')
ax.set_xlabel('Mean |SHAP|')
plt.tight_layout(); plt.show()
print(f'random_noise SHAP importance: {shap_sp_series["random_noise"]:.5f} (should be near 0)')
# ββ Exercise 4: Counterfactual explanation β minimal feature change ββββββββ
# Key insight: a counterfactual shows the smallest perturbation that changes
# a prediction β actionable for end-users (e.g., loan denials).
from scipy.optimize import minimize as sp_min
def counterfactual(clf, x0: np.ndarray, target_class: int = 1,
lambda_reg: float = 0.1, n_restarts: int = 5) -> np.ndarray:
"""
Find x_cf close to x0 such that clf predicts target_class.
Minimises: CrossEntropy(target | x_cf) + lambda * L2(x_cf - x0)
"""
best_cf, best_loss = None, np.inf
def objective(x):
proba = clf.predict_proba(x.reshape(1, -1))[0, target_class]
ce = -np.log(proba + 1e-12)
l2 = lambda_reg * np.sum((x - x0) ** 2)
return ce + l2
rng_cf = np.random.default_rng(7)
for _ in range(n_restarts):
x_init = x0 + rng_cf.normal(0, 0.5, x0.shape)
res = sp_min(objective, x_init, method='Nelder-Mead',
options={'maxiter': 2000, 'xatol': 1e-5})
if res.fun < best_loss:
best_loss = res.fun
best_cf = res.x
return best_cf
# Pick a sample predicted as class 0 and find CF that flips to class 1
x_sample = X_te.values[clf.predict(X_te) == 0][0]
x_cf = counterfactual(clf, x_sample, target_class=1)
print(f'Original prediction: {clf.predict(x_sample.reshape(1,-1))[0]}')
print(f'CF prediction: {clf.predict(x_cf.reshape(1,-1))[0]}')
diff = x_cf - x_sample
top3_changes = np.argsort(np.abs(diff))[::-1][:3]
print('Top-3 feature changes:')
for idx in top3_changes:
print(f' {feature_names[idx]}: {x_sample[idx]:.3f} β {x_cf[idx]:.3f} (Ξ={diff[idx]:+.3f})')
# ββ Exercise 5: Fairness check β SHAP by demographic group ββββββββββββββββ
# Key insight: features with disparate SHAP impact across demographic groups
# may proxy protected attributes and cause discriminatory outputs.
rng = np.random.default_rng(1)
group = rng.choice(['GroupA', 'GroupB'], len(X_te))
# Reuse shap_values from Exercise 1
shap_class1 = shap_values[1] if isinstance(shap_values, list) else shap_values
shap_te_df = pd.DataFrame(shap_class1, columns=feature_names)
shap_te_df['group'] = group
mean_shap_by_group = shap_te_df.groupby('group')[feature_names].apply(lambda g: g.abs().mean())
# Compute ratio of Group A / Group B for each feature
ratio = mean_shap_by_group.loc['GroupA'] / (mean_shap_by_group.loc['GroupB'] + 1e-12)
flagged = ratio[ratio > 1.5].sort_values(ascending=False)
fig, axes = plt.subplots(1, 2, figsize=(12, 5))
mean_shap_by_group.T.plot(kind='bar', ax=axes[0])
axes[0].set_title('Mean |SHAP| per group')
axes[0].set_ylabel('Mean |SHAP|')
ratio.sort_values(ascending=False).plot(kind='bar', ax=axes[1], color='darkorange')
axes[1].axhline(1.5, color='red', linestyle='--', label='Disparate impact threshold (1.5x)')
axes[1].set_title('GroupA / GroupB SHAP ratio')
axes[1].legend()
plt.tight_layout(); plt.show()
print('Flagged features (ratio > 1.5x):', flagged.index.tolist() if len(flagged) else 'None')
06 β End-to-End ProjectΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_predict
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.metrics import average_precision_score, brier_score_loss
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline
# Synthetic churn-like dataset
X_raw, y = make_classification(
n_samples=3000, n_features=10, n_informative=7,
weights=[0.85, 0.15], random_state=0
)
feat_names = [f'feat_{i}' for i in range(X_raw.shape[1])]
X_df = pd.DataFrame(X_raw, columns=feat_names)
X_tr, X_te, y_tr, y_te = train_test_split(X_df, y, test_size=0.2,
stratify=y, random_state=42)
# ββ Exercise 1: Platt scaling calibration + reliability diagram βββββββββββ
# Key insight: Platt scaling (sigmoid fitting) is well-calibrated for RF;
# isotonic regression works for larger datasets. A perfectly
# calibrated model has the reliability curve on the diagonal.
base_clf = RandomForestClassifier(n_estimators=200, random_state=0)
base_clf.fit(X_tr, y_tr)
calibrated_clf = CalibratedClassifierCV(base_clf, method='sigmoid', cv='prefit')
calibrated_clf.fit(X_tr, y_tr)
proba_raw = base_clf.predict_proba(X_te)[:, 1]
proba_cal = calibrated_clf.predict_proba(X_te)[:, 1]
frac_pos_raw, mean_pred_raw = calibration_curve(y_te, proba_raw, n_bins=10)
frac_pos_cal, mean_pred_cal = calibration_curve(y_te, proba_cal, n_bins=10)
fig, ax = plt.subplots(figsize=(6, 5))
ax.plot([0, 1], [0, 1], 'k--', label='Perfect calibration')
ax.plot(mean_pred_raw, frac_pos_raw, 'o-', color='steelblue', label=f'RF (Brier={brier_score_loss(y_te, proba_raw):.4f})')
ax.plot(mean_pred_cal, frac_pos_cal, 's-', color='darkorange', label=f'Platt (Brier={brier_score_loss(y_te, proba_cal):.4f})')
ax.set_xlabel('Mean Predicted Probability'); ax.set_ylabel('Fraction of Positives')
ax.set_title('Reliability Diagram: Before & After Platt Scaling')
ax.legend(); plt.tight_layout(); plt.show()
# ββ Exercise 2: Profit-driven threshold (FP=$50, FN=$200) βββββββββββββββββ
# Key insight: optimising accuracy or F1 ignores business costs; the optimal
# threshold maximises total expected profit.
# Profit = TP*gain - FP*50 - FN*200 (TP gain treated as 0 here
# since we only care about cost minimisation).
from sklearn.metrics import confusion_matrix
fp_cost = 50 # cost of calling a non-churner (unnecessary retention)
fn_cost = 200 # cost of missing a churner (lost customer)
thresholds = np.linspace(0.01, 0.99, 300)
costs = []
for t in thresholds:
y_pred_t = (proba_cal >= t).astype(int)
tn, fp, fn, tp = confusion_matrix(y_te, y_pred_t).ravel()
total_cost = fp * fp_cost + fn * fn_cost
costs.append(total_cost)
best_idx = np.argmin(costs)
best_t = thresholds[best_idx]
plt.figure(figsize=(7, 4))
plt.plot(thresholds, costs, color='steelblue')
plt.axvline(best_t, color='crimson', linestyle='--',
label=f'Optimal t={best_t:.2f} (cost=${costs[best_idx]:,.0f})')
plt.xlabel('Threshold'); plt.ylabel('Total Cost ($)')
plt.title('Profit-Driven Threshold Optimisation')
plt.legend(); plt.tight_layout(); plt.show()
print(f'Default t=0.5 cost: ${costs[np.argmin(np.abs(thresholds - 0.5))]:,.0f}')
print(f'Optimal threshold: t={best_t:.3f}, cost=${costs[best_idx]:,.0f}')
# ββ Exercise 3: SMOTE in pipeline β PR-AUC comparison βββββββββββββββββββββ
# Key insight: SMOTE must be applied INSIDE the CV loop via an imblearn Pipeline
# to avoid leaking synthetic samples into validation folds.
from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.model_selection import cross_val_score
pipe_no_smote = make_pipeline(
StandardScaler(),
RandomForestClassifier(n_estimators=100, random_state=0)
)
pipe_smote = ImbPipeline([
('scaler', StandardScaler()),
('smote', SMOTE(random_state=0)),
('clf', RandomForestClassifier(n_estimators=100, random_state=0)),
])
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores_no = cross_val_score(pipe_no_smote, X_tr, y_tr, cv=cv, scoring='average_precision')
scores_smote = cross_val_score(pipe_smote, X_tr, y_tr, cv=cv, scoring='average_precision')
print(f'Without SMOTE β CV PR-AUC: {scores_no.mean():.4f} Β± {scores_no.std():.4f}')
print(f'With SMOTE β CV PR-AUC: {scores_smote.mean():.4f} Β± {scores_smote.std():.4f}')
# ββ Exercise 4: Feature importance stability across CV folds ββββββββββββββ
# Key insight: variance of importance across folds reveals which features are
# unstable (often due to correlations) β avoid over-relying on them.
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
fold_importances = []
for trn_idx, val_idx in cv.split(X_tr, y_tr):
X_fold = X_tr.iloc[trn_idx]
y_fold = y_tr.iloc[trn_idx] if hasattr(y_tr, 'iloc') else y_tr[trn_idx]
clf_fold = RandomForestClassifier(n_estimators=100, random_state=0)
clf_fold.fit(X_fold, y_fold)
fold_importances.append(clf_fold.feature_importances_)
imp_df = pd.DataFrame(fold_importances, columns=feat_names)
mean_imp = imp_df.mean().sort_values(ascending=False)
std_imp = imp_df.std().reindex(mean_imp.index)
fig, ax = plt.subplots(figsize=(9, 4))
ax.bar(range(len(mean_imp)), mean_imp.values,
yerr=std_imp.values, color='steelblue', capsize=5,
error_kw={'elinewidth': 2, 'ecolor': 'crimson'})
ax.set_xticks(range(len(mean_imp)))
ax.set_xticklabels(mean_imp.index, rotation=45, ha='right')
ax.set_title('Feature Importance Stability (5-fold CV mean Β± std)')
ax.set_ylabel('Importance')
plt.tight_layout(); plt.show()
# ββ Exercise 5: FastAPI /predict endpoint with top-3 risk factors βββββββββ
# Key insight: wrapping the model in a FastAPI app requires only pydantic
# schemas for input validation; returning SHAP-derived risk factors
# makes the prediction interpretable for end-users.
# NOTE: run this as a standalone server (uvicorn main:app).
# Here we define the full app and demonstrate a mock call.
from pydantic import BaseModel
from typing import List
import shap
# Re-train a simple pipeline for the endpoint
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
final_pipe = make_pipeline(
StandardScaler(),
RandomForestClassifier(n_estimators=200, random_state=0)
)
final_pipe.fit(X_tr, y_tr)
# Pre-compute SHAP explainer on the inner RF (after scaling)
scaler_step = final_pipe.named_steps['standardscaler']
rf_step = final_pipe.named_steps['randomforestclassifier']
explainer_api = shap.TreeExplainer(rf_step)
class CustomerFeatures(BaseModel):
feat_0: float; feat_1: float; feat_2: float; feat_3: float; feat_4: float
feat_5: float; feat_6: float; feat_7: float; feat_8: float; feat_9: float
def predict_churn(customer: CustomerFeatures):
"""Simulates POST /predict handler."""
x = np.array([[customer.feat_0, customer.feat_1, customer.feat_2,
customer.feat_3, customer.feat_4, customer.feat_5,
customer.feat_6, customer.feat_7, customer.feat_8,
customer.feat_9]])
x_scaled = scaler_step.transform(x)
probability = rf_step.predict_proba(x_scaled)[0, 1]
decision = 'churn' if probability >= 0.5 else 'stay'
# SHAP explanation for this instance
sv = explainer_api.shap_values(x_scaled)
sv_class1 = sv[1][0] if isinstance(sv, list) else sv[0]
top3_idx = np.argsort(np.abs(sv_class1))[::-1][:3]
risk_factors = [
{'feature': feat_names[i], 'shap_value': round(float(sv_class1[i]), 4)}
for i in top3_idx
]
return {
'probability': round(float(probability), 4),
'decision': decision,
'top_risk_factors': risk_factors
}
# FastAPI app definition (uncomment to deploy)
# from fastapi import FastAPI
# app = FastAPI()
# @app.post('/predict')
# def predict(customer: CustomerFeatures):
# return predict_churn(customer)
# Demo mock call
sample = X_te.iloc[0]
mock_input = CustomerFeatures(**{f'feat_{i}': float(sample[f'feat_{i}']) for i in range(10)})
result = predict_churn(mock_input)
print(f"Probability: {result['probability']}")
print(f"Decision: {result['decision']}")
print('Top-3 Risk Factors:')
for rf_item in result['top_risk_factors']:
print(f" {rf_item['feature']}: SHAP = {rf_item['shap_value']}")