Solutions: Machine Learning TrackΒΆ

Worked solutions to all exercises from the machine-learning/ notebooks.

01 β€” sklearn PipelinesΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.ensemble import RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score, cross_val_predict, GridSearchCV
from sklearn.metrics import ConfusionMatrixDisplay

# ── Exercise 1: Pipeline with custom ratio-feature transformer + RF ────────
# Key insight: adding derived features inside a Pipeline transformer ensures
#              ratios are computed consistently on both train and test splits.

class RatioFeatureAdder(BaseEstimator, TransformerMixin):
    """Adds pairwise ratio features for all pairs of input columns."""
    def fit(self, X, y=None):
        self.n_features_in_ = X.shape[1]
        return self

    def transform(self, X):
        X = np.array(X)
        ratios = []
        for i in range(X.shape[1]):
            for j in range(X.shape[1]):
                if i != j:
                    # Avoid division-by-zero with a small epsilon
                    ratios.append((X[:, i] / (X[:, j] + 1e-8)).reshape(-1, 1))
        return np.hstack([X] + ratios)

X, y = make_classification(n_samples=400, n_features=6, n_informative=4, random_state=1)

pipe = Pipeline([
    ('ratios', RatioFeatureAdder()),
    ('scaler', StandardScaler()),
    ('clf',    RandomForestClassifier(n_estimators=100, random_state=42)),
])

scores = cross_val_score(pipe, X, y, cv=5)
print(f'CV accuracy with ratio features: {scores.mean():.4f} Β± {scores.std():.4f}')
# ── Exercise 2: set_output(transform='pandas') to preserve column names ────
# Key insight: sklearn 1.2+ supports set_output, making pipelines return
#              DataFrames with meaningful column names β€” essential for debugging
#              and downstream feature selection.

from sklearn.preprocessing import StandardScaler
from sklearn.impute import SimpleImputer

rng = np.random.default_rng(0)
df_named = pd.DataFrame(
    rng.normal(0, 1, (50, 4)),
    columns=['age', 'income', 'score', 'tenure']
)
# Inject some NaN
df_named.loc[0, 'age'] = np.nan

pipe_named = Pipeline([
    ('imputer', SimpleImputer(strategy='mean')),
    ('scaler',  StandardScaler()),
]).set_output(transform='pandas')

result_df = pipe_named.fit_transform(df_named)
print('Output type:', type(result_df))
print('Columns preserved:', result_df.columns.tolist())
print(result_df.head(3))
# ── Exercise 3: cross_val_predict β†’ confusion matrix ─────────────────────
# Key insight: cross_val_predict generates out-of-fold predictions so the
#              confusion matrix reflects true generalisation β€” not training fit.

from sklearn.metrics import ConfusionMatrixDisplay, confusion_matrix

X, y = make_classification(n_samples=500, n_features=10, random_state=7)

pipe_cv = make_pipeline(
    SimpleImputer(),
    StandardScaler(),
    RandomForestClassifier(n_estimators=100, random_state=42)
)

y_pred = cross_val_predict(pipe_cv, X, y, cv=5)
cm = confusion_matrix(y, y_pred)

disp = ConfusionMatrixDisplay(confusion_matrix=cm)
fig, ax = plt.subplots(figsize=(5, 4))
disp.plot(ax=ax, colorbar=False)
ax.set_title('OOF Confusion Matrix (cross_val_predict)')
plt.tight_layout()
plt.show()

accuracy = (cm.diagonal().sum()) / cm.sum()
print(f'OOF Accuracy: {accuracy:.4f}')
# ── Exercise 4: Cache expensive transformers with joblib Memory ────────────
# Key insight: caching a slow transformer (e.g., PCA on large data) avoids
#              recomputation when re-fitting the pipeline with only model changes.

import time
import tempfile
from joblib import Memory
from sklearn.decomposition import PCA

X, y = make_classification(n_samples=1000, n_features=50, random_state=0)

with tempfile.TemporaryDirectory() as cachedir:
    memory = Memory(location=cachedir, verbose=0)

    pipe_cached = Pipeline([
        ('scaler', StandardScaler()),
        ('pca',    PCA(n_components=10)),
        ('clf',    RandomForestClassifier(n_estimators=50, random_state=1)),
    ], memory=memory)

    # First fit β€” runs all steps
    t0 = time.perf_counter()
    pipe_cached.fit(X, y)
    t_first = time.perf_counter() - t0

    # Change only the classifier hyperparameter; scaler+PCA should be cache-hit
    pipe_cached.set_params(clf__n_estimators=100)

    t1 = time.perf_counter()
    pipe_cached.fit(X, y)
    t_second = time.perf_counter() - t1

    print(f'First fit:  {t_first:.3f}s')
    print(f'Second fit (cached transforms): {t_second:.3f}s')
    print(f'Speedup factor: {t_first / max(t_second, 1e-9):.1f}x')
# ── Exercise 5: GridSearchCV over preprocessing + model hyperparams ────────
# Key insight: Pipeline param names follow <step>__<param> syntax, letting
#              GridSearchCV tune both preprocessing and model in one search.

from sklearn.preprocessing import MinMaxScaler

X, y = make_classification(n_samples=300, n_features=10, random_state=5)

pipe_gs = Pipeline([
    ('scaler', StandardScaler()),
    ('clf',    RandomForestClassifier(random_state=42)),
])

param_grid = {
    'scaler':                [StandardScaler(), MinMaxScaler()],
    'clf__n_estimators':     [50, 100],
    'clf__max_depth':        [None, 5, 10],
    'clf__min_samples_leaf': [1, 5],
}

gs = GridSearchCV(pipe_gs, param_grid, cv=3, scoring='accuracy', n_jobs=-1, verbose=0)
gs.fit(X, y)

print(f'Best CV accuracy: {gs.best_score_:.4f}')
print('Best params:')
for k, v in gs.best_params_.items():
    print(f'  {k}: {v}')

02 β€” Model Selection & ValidationΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.ensemble import RandomForestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import (learning_curve, cross_val_score,
                                      GridSearchCV, StratifiedKFold,
                                      train_test_split)
from sklearn.datasets import make_classification

X, y = make_classification(n_samples=800, n_features=20, n_informative=8, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# ── Exercise 1: Learning curves to diagnose bias vs variance ──────────────
# Key insight: large gap between train and val curve = overfitting (high variance);
#              both curves low and converged = underfitting (high bias).

clf = RandomForestClassifier(n_estimators=100, random_state=42)
train_sizes, train_scores, val_scores = learning_curve(
    clf, X, y, cv=5, train_sizes=np.linspace(0.1, 1.0, 10),
    scoring='accuracy', n_jobs=-1
)

fig, ax = plt.subplots(figsize=(8, 5))
ax.plot(train_sizes, train_scores.mean(axis=1), 'o-', label='Train score', color='steelblue')
ax.fill_between(train_sizes,
                train_scores.mean(axis=1) - train_scores.std(axis=1),
                train_scores.mean(axis=1) + train_scores.std(axis=1), alpha=0.15, color='steelblue')
ax.plot(train_sizes, val_scores.mean(axis=1),   'o-', label='Val score',   color='darkorange')
ax.fill_between(train_sizes,
                val_scores.mean(axis=1) - val_scores.std(axis=1),
                val_scores.mean(axis=1) + val_scores.std(axis=1), alpha=0.15, color='darkorange')
ax.set_xlabel('Training set size')
ax.set_ylabel('Accuracy')
ax.set_title('Learning Curves β€” Random Forest')
ax.legend()
plt.tight_layout()
plt.show()
# ── Exercise 2: Nested cross-validation ───────────────────────────────────
# Key insight: standard CV re-uses the validation set for model selection AND
#              evaluation β€” this is optimistic. Nested CV has an outer loop
#              for unbiased evaluation and an inner loop for hyperparameter tuning.

from sklearn.model_selection import cross_val_score, GridSearchCV, StratifiedKFold

outer_cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=1)
inner_cv  = StratifiedKFold(n_splits=3, shuffle=True, random_state=2)

clf_base = RandomForestClassifier(random_state=42)
param_grid = {'n_estimators': [50, 100], 'max_depth': [None, 5]}

inner_search = GridSearchCV(clf_base, param_grid, cv=inner_cv, scoring='accuracy', n_jobs=-1)

# outer_cv evaluates inner_search (which itself tunes on inner_cv)
nested_scores = cross_val_score(inner_search, X, y, cv=outer_cv, scoring='accuracy')

# Compare to non-nested CV (optimistic baseline)
non_nested_scores = cross_val_score(clf_base, X, y, cv=outer_cv, scoring='accuracy')

print(f'Nested CV accuracy:     {nested_scores.mean():.4f} Β± {nested_scores.std():.4f}')
print(f'Non-nested CV accuracy: {non_nested_scores.mean():.4f} Β± {non_nested_scores.std():.4f}')
print(f'Optimism bias: {non_nested_scores.mean() - nested_scores.mean():.4f}')
# ── Exercise 3: Train / Val / Test score analysis for over/underfitting ────
# Key insight: Train >> Val β‰ˆ Test β†’ overfit. Train β‰ˆ Val β‰ˆ Test, all low β†’ underfit.
#              A well-fit model has small train-val gap and decent absolute score.

from sklearn.tree import DecisionTreeClassifier

X_tr, X_tmp, y_tr, y_tmp = train_test_split(X, y, test_size=0.3, random_state=0)
X_val, X_te, y_val, y_te = train_test_split(X_tmp, y_tmp, test_size=0.5, random_state=0)

configs = [
    ('Overfit (max_depth=None)', DecisionTreeClassifier(max_depth=None, random_state=0)),
    ('Underfit (max_depth=1)',   DecisionTreeClassifier(max_depth=1,    random_state=0)),
    ('Good fit (max_depth=5)',   DecisionTreeClassifier(max_depth=5,    random_state=0)),
]

rows = []
for name, clf in configs:
    clf.fit(X_tr, y_tr)
    rows.append({
        'model':   name,
        'train':   round(clf.score(X_tr,  y_tr),  4),
        'val':     round(clf.score(X_val, y_val), 4),
        'test':    round(clf.score(X_te,  y_te),  4),
    })

print(pd.DataFrame(rows).to_string(index=False))
# ── Exercise 4: Bootstrap confidence interval for accuracy ────────────────
# Key insight: the bootstrap CI is distribution-free β€” ideal when you can't
#              assume normality of the accuracy estimator across folds.

from sklearn.metrics import accuracy_score

rng = np.random.default_rng(99)
clf_boot = RandomForestClassifier(n_estimators=100, random_state=0)
clf_boot.fit(X_train, y_train)
y_pred_boot = clf_boot.predict(X_test)

n_bootstrap = 1000
boot_scores = []
n_test = len(y_test)
for _ in range(n_bootstrap):
    idx = rng.integers(0, n_test, n_test)
    boot_scores.append(accuracy_score(y_test[idx], y_pred_boot[idx]))

boot_scores = np.array(boot_scores)
ci_lower = np.percentile(boot_scores, 2.5)
ci_upper = np.percentile(boot_scores, 97.5)

print(f'Point estimate: {accuracy_score(y_test, y_pred_boot):.4f}')
print(f'95% Bootstrap CI: [{ci_lower:.4f}, {ci_upper:.4f}]')

plt.figure(figsize=(7, 3))
plt.hist(boot_scores, bins=40, color='steelblue', edgecolor='white')
plt.axvline(ci_lower, color='red', linestyle='--', label=f'2.5% = {ci_lower:.3f}')
plt.axvline(ci_upper, color='red', linestyle='-',  label=f'97.5% = {ci_upper:.3f}')
plt.xlabel('Bootstrap Accuracy'); plt.legend()
plt.title('Bootstrap Distribution of Accuracy')
plt.tight_layout(); plt.show()
# ── Exercise 5: Early stopping for gradient descent via validation loss ────
# Key insight: monitor validation loss each epoch; stop when it stops improving
#              for `patience` epochs. This prevents wasting compute on overfitting.

def early_stopping_gd(X_tr, y_tr, X_val, y_val, lr=0.05, max_epochs=500, patience=10):
    """Logistic regression via gradient descent with early stopping."""
    n, p = X_tr.shape
    w = np.zeros(p)
    best_val_loss = np.inf
    best_w = w.copy()
    no_improve = 0
    history = []

    def sigmoid(z): return 1 / (1 + np.exp(-np.clip(z, -500, 500)))
    def bce(y_true, y_prob): return -np.mean(y_true * np.log(y_prob + 1e-12) +
                                              (1 - y_true) * np.log(1 - y_prob + 1e-12))

    for epoch in range(max_epochs):
        y_hat = sigmoid(X_tr @ w)
        grad  = X_tr.T @ (y_hat - y_tr) / n
        w     = w - lr * grad

        val_loss = bce(y_val, sigmoid(X_val @ w))
        history.append(val_loss)

        if val_loss < best_val_loss - 1e-5:
            best_val_loss = val_loss
            best_w = w.copy()
            no_improve = 0
        else:
            no_improve += 1
            if no_improve >= patience:
                print(f'Early stop at epoch {epoch} (best val loss {best_val_loss:.4f})')
                break

    return best_w, history

from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
Xtr_s = scaler.fit_transform(X_train)
Xte_s = scaler.transform(X_test)
X_val_s, X_te_s, y_val_s, y_te_s = train_test_split(Xte_s, y_test, test_size=0.5, random_state=0)

best_w, hist = early_stopping_gd(Xtr_s, y_train, X_val_s, y_val_s)

plt.plot(hist, color='steelblue')
plt.xlabel('Epoch'); plt.ylabel('Validation BCE Loss')
plt.title('Gradient Descent with Early Stopping')
plt.tight_layout(); plt.show()

03 β€” Ensemble MethodsΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import cross_val_score, StratifiedKFold, train_test_split
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier, VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score
from scipy.optimize import minimize

X, y = make_classification(n_samples=600, n_features=15, n_informative=8, random_state=7)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.2, random_state=0)

# ── Exercise 1: Soft-voting ensemble with scipy-tuned weights ─────────────
# Key insight: soft voting averages class probabilities; optimal weights
#              maximise validation accuracy and are found by L-BFGS minimisation.

lr  = LogisticRegression(max_iter=1000, random_state=0)
rf  = RandomForestClassifier(n_estimators=100, random_state=0)
gbm = GradientBoostingClassifier(n_estimators=100, random_state=0)

X_trn, X_val, y_trn, y_val = train_test_split(X_tr, y_tr, test_size=0.25, random_state=1)
for clf in [lr, rf, gbm]: clf.fit(X_trn, y_trn)

probs_val = np.stack([
    lr.predict_proba(X_val)[:,1],
    rf.predict_proba(X_val)[:,1],
    gbm.predict_proba(X_val)[:,1],
], axis=1)

def neg_accuracy(weights):
    w = np.array(weights)
    w = np.abs(w) / np.abs(w).sum()   # softmax-like normalisation
    preds = (probs_val @ w >= 0.5).astype(int)
    return -accuracy_score(y_val, preds)

res = minimize(neg_accuracy, x0=[1/3, 1/3, 1/3], method='Nelder-Mead')
best_w = np.abs(res.x) / np.abs(res.x).sum()
print(f'Optimal weights: LR={best_w[0]:.3f}, RF={best_w[1]:.3f}, GBM={best_w[2]:.3f}')

probs_te = np.stack([
    lr.predict_proba(X_te)[:,1],
    rf.predict_proba(X_te)[:,1],
    gbm.predict_proba(X_te)[:,1],
], axis=1)
y_pred_ens = (probs_te @ best_w >= 0.5).astype(int)
print(f'Test accuracy (optimised soft voting): {accuracy_score(y_te, y_pred_ens):.4f}')
# ── Exercise 2: Stacking from scratch with OOF base-learner predictions ───
# Key insight: out-of-fold predictions as meta-features prevent the meta-learner
#              from seeing predictions made on data it was trained on.

def stacking_oof(X_train, y_train, X_test, base_learners, meta_learner, n_splits=5):
    kf = StratifiedKFold(n_splits=n_splits, shuffle=True, random_state=42)
    oof_train = np.zeros((len(y_train), len(base_learners)))
    test_preds = np.zeros((len(X_test), len(base_learners)))

    for i, clf in enumerate(base_learners):
        fold_test_preds = np.zeros((len(X_test), n_splits))
        for fold, (trn_idx, val_idx) in enumerate(kf.split(X_train, y_train)):
            clf.fit(X_train[trn_idx], y_train[trn_idx])
            oof_train[val_idx, i]     = clf.predict_proba(X_train[val_idx])[:, 1]
            fold_test_preds[:, fold]  = clf.predict_proba(X_test)[:, 1]
        test_preds[:, i] = fold_test_preds.mean(axis=1)

    meta_learner.fit(oof_train, y_train)
    return meta_learner.predict(test_preds)

base = [
    LogisticRegression(max_iter=500, random_state=0),
    RandomForestClassifier(n_estimators=50, random_state=0),
]
meta = LogisticRegression(max_iter=500, random_state=0)

y_stack = stacking_oof(X_tr, y_tr, X_te, base, meta)
print(f'Stacking test accuracy: {accuracy_score(y_te, y_stack):.4f}')
# ── Exercise 3: OOB score vs CV score for RandomForest ────────────────────
# Key insight: OOB uses the ~37% of samples not drawn for each tree as
#              validation, giving a free cross-validation estimate; it should
#              be close to k-fold CV but can be slightly optimistic.

rf_oob = RandomForestClassifier(n_estimators=200, oob_score=True, random_state=42)
rf_oob.fit(X_tr, y_tr)

cv_scores = cross_val_score(rf_oob, X_tr, y_tr, cv=5)

print(f'OOB score:       {rf_oob.oob_score_:.4f}')
print(f'5-fold CV score: {cv_scores.mean():.4f} Β± {cv_scores.std():.4f}')
print('Both estimates use ~out-of-sample data, so they should be similar.')
# ── Exercise 4: AdaBoost from scratch ─────────────────────────────────────
# Key insight: AdaBoost reweights misclassified samples exponentially so
#              subsequent weak learners focus on hard examples; alpha controls
#              how much weight each learner gets in the final vote.

from sklearn.tree import DecisionTreeClassifier

def adaboost_scratch(X_train, y_train, X_test, n_estimators=50):
    n = len(y_train)
    w = np.ones(n) / n
    # Convert labels to {-1, +1}
    y_pm = np.where(y_train == 1, 1, -1)

    estimators = []
    alphas     = []

    for _ in range(n_estimators):
        stump = DecisionTreeClassifier(max_depth=1)
        stump.fit(X_train, y_pm, sample_weight=w)
        y_pred_pm = stump.predict(X_train)

        # Weighted error
        err = np.dot(w, y_pred_pm != y_pm) / w.sum()
        err = np.clip(err, 1e-10, 1 - 1e-10)
        alpha = 0.5 * np.log((1 - err) / err)

        # Update weights
        w *= np.exp(-alpha * y_pm * y_pred_pm)
        w /= w.sum()

        estimators.append(stump)
        alphas.append(alpha)

    # Predict
    scores = sum(a * np.where(e.predict(X_test) == 1, 1, -1)
                 for a, e in zip(alphas, estimators))
    return (scores >= 0).astype(int)

y_ada = adaboost_scratch(X_tr, y_tr, X_te)
print(f'Scratch AdaBoost test accuracy: {accuracy_score(y_te, y_ada):.4f}')

# Sanity check vs sklearn
from sklearn.ensemble import AdaBoostClassifier
sk_ada = AdaBoostClassifier(n_estimators=50, random_state=0, algorithm='SAMME')
sk_ada.fit(X_tr, y_tr)
print(f'sklearn AdaBoost test accuracy: {sk_ada.score(X_te, y_te):.4f}')
# ── Exercise 5: Feature importance stability across RF seeds ──────────────
# Key insight: if top-5 features change across seeds, the importances are
#              unreliable (correlated features share importance randomly);
#              SHAP or permutation importance is more stable in that case.

n_runs = 10
importances_all = []

for seed in range(n_runs):
    clf_i = RandomForestClassifier(n_estimators=100, random_state=seed)
    clf_i.fit(X_tr, y_tr)
    importances_all.append(clf_i.feature_importances_)

imp_df = pd.DataFrame(importances_all, columns=[f'f{i}' for i in range(X_tr.shape[1])])
mean_imp = imp_df.mean().sort_values(ascending=False)
std_imp  = imp_df.std().reindex(mean_imp.index)

top5 = mean_imp.head(5)
fig, ax = plt.subplots(figsize=(7, 4))
ax.bar(top5.index, top5.values, yerr=std_imp[top5.index].values,
       color='steelblue', capsize=5, error_kw={'elinewidth': 2})
ax.set_title('Top-5 Feature Importances across 10 RF seeds (mean Β± std)')
ax.set_ylabel('Mean Importance')
plt.tight_layout(); plt.show()

print('Top-5 stable features:', top5.index.tolist())

04 β€” Imbalanced DatasetsΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, StratifiedKFold
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import average_precision_score, precision_recall_curve, f1_score
from imblearn.over_sampling import SMOTE, ADASYN

# Highly imbalanced dataset (5% minority)
X, y = make_classification(n_samples=2000, n_features=10, weights=[0.95, 0.05],
                            n_informative=5, random_state=42)
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.25,
                                            stratify=y, random_state=0)

# ── Exercise 1: Compare 5 resampling strategies β€” PR-AUC table ────────────
# Key insight: PR-AUC is the right metric for imbalanced data β€” ROC-AUC can
#              look great even when the minority class is mostly ignored.

def fit_predict_proba(X_train, y_train, X_test, clf_kwargs=None):
    clf_kwargs = clf_kwargs or {}
    clf = RandomForestClassifier(n_estimators=100, random_state=0, **clf_kwargs)
    clf.fit(X_train, y_train)
    return clf.predict_proba(X_test)[:, 1], clf

strategies = {}

# No fix
proba, _ = fit_predict_proba(X_tr, y_tr, X_te)
strategies['No fix'] = proba

# SMOTE
X_sm, y_sm = SMOTE(random_state=0).fit_resample(X_tr, y_tr)
strategies['SMOTE'], _ = fit_predict_proba(X_sm, y_sm, X_te)

# ADASYN
X_ad, y_ad = ADASYN(random_state=0).fit_resample(X_tr, y_tr)
strategies['ADASYN'], _ = fit_predict_proba(X_ad, y_ad, X_te)

# class_weight='balanced'
strategies['class_weight'], _ = fit_predict_proba(X_tr, y_tr, X_te,
                                                   clf_kwargs={'class_weight': 'balanced'})

# Threshold tuning (train without fix, tune threshold on val)
X_trn, X_val, y_trn, y_val = train_test_split(X_tr, y_tr, test_size=0.2, stratify=y_tr, random_state=1)
proba_val, clf_tt = fit_predict_proba(X_trn, y_trn, X_val)
thresholds = np.linspace(0.01, 0.99, 200)
best_t = max(thresholds, key=lambda t: f1_score(y_val, (proba_val >= t).astype(int), zero_division=0))
strategies['Threshold tuning'] = clf_tt.predict_proba(X_te)[:, 1]  # store raw proba

rows = []
for name, proba_arr in strategies.items():
    pr_auc = average_precision_score(y_te, proba_arr)
    rows.append({'strategy': name, 'PR-AUC': round(pr_auc, 4)})

print(pd.DataFrame(rows).sort_values('PR-AUC', ascending=False).to_string(index=False))
# ── Exercise 2: Custom k-means difficulty-based oversampler ───────────────
# Key insight: standard SMOTE oversamples uniformly; focusing on minority samples
#              near cluster boundaries (high difficulty) targets the decision
#              boundary more effectively.

from sklearn.cluster import KMeans
from sklearn.base import BaseEstimator

class KMeansDifficultyOversampler(BaseEstimator):
    """
    Oversample minority class by generating synthetic samples near cluster
    centroids that have the highest ratio of majority neighbours.
    """
    def __init__(self, k=5, n_synthetic=200, random_state=0):
        self.k = k
        self.n_synthetic = n_synthetic
        self.random_state = random_state

    def fit_resample(self, X, y):
        rng = np.random.default_rng(self.random_state)
        minority_mask = y == 1
        X_min = X[minority_mask]

        km = KMeans(n_clusters=min(self.k, len(X_min)), random_state=self.random_state, n_init=5)
        km.fit(X_min)
        labels = km.labels_

        # Difficulty = fraction of majority class among ALL samples near each centroid
        from sklearn.neighbors import NearestNeighbors
        nn = NearestNeighbors(n_neighbors=10).fit(X)
        difficulties = np.zeros(km.n_clusters)
        for c in range(km.n_clusters):
            c_center = km.cluster_centers_[c].reshape(1, -1)
            _, nbr_idx = nn.kneighbors(c_center)
            difficulties[c] = (y[nbr_idx[0]] == 0).mean()

        # Sampling probability proportional to difficulty
        probs = difficulties / difficulties.sum()
        chosen_clusters = rng.choice(km.n_clusters, size=self.n_synthetic, p=probs)

        synthetic = []
        for c in chosen_clusters:
            members = X_min[labels == c]
            if len(members) < 2:
                synthetic.append(members[0])
            else:
                a, b = members[rng.integers(0, len(members), 2)]
                alpha = rng.random()
                synthetic.append(alpha * a + (1 - alpha) * b)

        X_synthetic = np.vstack(synthetic)
        y_synthetic = np.ones(self.n_synthetic, dtype=y.dtype)
        return np.vstack([X, X_synthetic]), np.concatenate([y, y_synthetic])

sampler = KMeansDifficultyOversampler(k=5, n_synthetic=200, random_state=0)
X_km, y_km = sampler.fit_resample(X_tr, y_tr)
print(f'Before: {np.bincount(y_tr)}  After: {np.bincount(y_km)}')
# ── Exercise 3: Precision-Recall curves for all strategies ────────────────
# Key insight: plotting all PR curves together lets you compare the precision-
#              recall trade-off across strategies at every threshold at once.

fig, ax = plt.subplots(figsize=(7, 5))

colors = ['grey', 'steelblue', 'darkorange', 'green', 'purple']
for (name, proba_arr), color in zip(strategies.items(), colors):
    prec, rec, _ = precision_recall_curve(y_te, proba_arr)
    ap = average_precision_score(y_te, proba_arr)
    ax.step(rec, prec, where='post', label=f'{name} (AP={ap:.3f})', color=color)

# Random baseline
baseline = y_te.mean()
ax.axhline(baseline, linestyle='--', color='black', label=f'Baseline ({baseline:.3f})')

ax.set_xlabel('Recall')
ax.set_ylabel('Precision')
ax.set_title('Precision-Recall Curves β€” Imbalanced Data Strategies')
ax.legend(loc='upper right', fontsize=8)
plt.tight_layout()
plt.show()
# ── Exercise 4: Optimal threshold that maximises F2 score ─────────────────
# Key insight: F2 weighs recall twice as much as precision β€” appropriate when
#              missing a positive (FN) is more costly than a false alarm (FP).

from sklearn.metrics import fbeta_score

proba_smote = strategies['SMOTE']
thresholds  = np.linspace(0.01, 0.99, 300)
f2_scores   = [fbeta_score(y_te, (proba_smote >= t).astype(int), beta=2, zero_division=0)
               for t in thresholds]

best_idx = np.argmax(f2_scores)
best_threshold = thresholds[best_idx]
best_f2 = f2_scores[best_idx]

plt.figure(figsize=(7, 4))
plt.plot(thresholds, f2_scores, color='steelblue')
plt.axvline(best_threshold, color='crimson', linestyle='--',
            label=f'Best t={best_threshold:.2f}, F2={best_f2:.3f}')
plt.xlabel('Threshold'); plt.ylabel('F2 Score')
plt.title('F2 Score vs Classification Threshold (SMOTE strategy)')
plt.legend(); plt.tight_layout(); plt.show()

print(f'Optimal threshold: {best_threshold:.3f}  |  F2: {best_f2:.4f}')
print('F2 vs F1: F2 up-weights recall, penalising missed positives more than false alarms.')
# ── Exercise 5: ROC-AUC misleading on imbalanced data β€” numerical example ─
# Key insight: a classifier that always predicts the majority class achieves
#              high ROC-AUC but zero recall on the minority class β€” PR-AUC
#              unmasks this because it is anchored to the class prevalence.

from sklearn.metrics import roc_auc_score, average_precision_score

# 95/5 imbalanced test set
n_test = 1000
y_imb = np.array([0]*950 + [1]*50)

# Trivial classifier: always predict majority (0)
# We add tiny random noise to avoid degenerate probability arrays
rng = np.random.default_rng(0)
proba_majority = rng.uniform(0, 0.1, n_test)  # all low probabilities

# Good classifier with near-perfect minority recall
proba_good = np.where(y_imb == 1,
                       rng.uniform(0.7, 1.0, n_test),
                       rng.uniform(0.0, 0.3, n_test))

print('Classifier             | ROC-AUC | PR-AUC')
print('-'*47)
for name, proba_arr in [('Majority-class trivial', proba_majority),
                         ('Good minority detector', proba_good)]:
    roc = roc_auc_score(y_imb, proba_arr)
    prc = average_precision_score(y_imb, proba_arr)
    print(f'{name:24s} | {roc:.4f}  | {prc:.4f}')

print('\nThe trivial classifier has ROC-AUC β‰ˆ 0.5 but near-zero PR-AUC,')
print('showing ROC-AUC alone is not sufficient for imbalanced tasks.')

05 β€” Model InterpretabilityΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import shap
from sklearn.datasets import make_classification
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.inspection import permutation_importance
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

X, y = make_classification(n_samples=800, n_features=12, n_informative=6,
                            n_redundant=2, random_state=42)
feature_names = [f'feat_{i}' for i in range(X.shape[1])]
X_df = pd.DataFrame(X, columns=feature_names)
X_tr, X_te, y_tr, y_te = train_test_split(X_df, y, test_size=0.25, random_state=0)

clf = RandomForestClassifier(n_estimators=200, random_state=42)
clf.fit(X_tr, y_tr)

# ── Exercise 1: SHAP vs permutation importance vs gain importance ──────────
# Key insight: gain importance is biased toward high-cardinality features;
#              permutation importance is model-agnostic and measures actual
#              performance drop; SHAP is theoretically grounded and additive.

# Gain (Gini) importance
gain_imp = pd.Series(clf.feature_importances_, index=feature_names).sort_values(ascending=False)

# Permutation importance
perm_res = permutation_importance(clf, X_te, y_te, n_repeats=10, random_state=0)
perm_imp = pd.Series(perm_res.importances_mean, index=feature_names).sort_values(ascending=False)

# SHAP importance
explainer = shap.TreeExplainer(clf)
shap_values = explainer.shap_values(X_te)
# For binary classification, shap_values is a list [class0, class1]
shap_mean = np.abs(shap_values[1] if isinstance(shap_values, list) else shap_values).mean(axis=0)
shap_imp = pd.Series(shap_mean, index=feature_names).sort_values(ascending=False)

fig, axes = plt.subplots(1, 3, figsize=(14, 4))
for ax, (name, imp) in zip(axes, [('Gain', gain_imp), ('Permutation', perm_imp), ('SHAP', shap_imp)]):
    imp.head(8).sort_values().plot(kind='barh', ax=ax, color='steelblue')
    ax.set_title(f'{name} Importance')
plt.suptitle('Three Importance Methods β€” Top 8 Features', y=1.01)
plt.tight_layout(); plt.show()
# ── Exercise 2: 2D Partial Dependence Plot for two interacting features ────
# Key insight: a 2D PDP shows how two features jointly affect model output;
#              diagonal or non-additive patterns reveal interaction effects.

from sklearn.inspection import PartialDependenceDisplay

fig, ax = plt.subplots(figsize=(7, 5))
PartialDependenceDisplay.from_estimator(
    clf, X_tr,
    features=[('feat_0', 'feat_1')],  # 2D PDP for the two top features
    kind='average',
    ax=ax,
)
ax.set_title('2D Partial Dependence: feat_0 Γ— feat_1')
plt.tight_layout(); plt.show()
# ── Exercise 3: Detect spurious feature via SHAP ──────────────────────────
# Key insight: a purely random column carries zero mutual information with
#              the target; SHAP values for it should cluster near zero.

rng = np.random.default_rng(0)
X_spurious = X_tr.copy()
X_spurious['random_noise'] = rng.standard_normal(len(X_spurious))
all_features = feature_names + ['random_noise']

clf_sp = RandomForestClassifier(n_estimators=200, random_state=42)
clf_sp.fit(X_spurious, y_tr)

explainer_sp = shap.TreeExplainer(clf_sp)
X_te_sp = X_te.copy()
X_te_sp['random_noise'] = rng.standard_normal(len(X_te_sp))

shap_vals_sp = explainer_sp.shap_values(X_te_sp)
shap_sp = np.abs(shap_vals_sp[1] if isinstance(shap_vals_sp, list) else shap_vals_sp).mean(axis=0)
shap_sp_series = pd.Series(shap_sp, index=all_features).sort_values()

fig, ax = plt.subplots(figsize=(7, 5))
colors = ['crimson' if f == 'random_noise' else 'steelblue' for f in shap_sp_series.index]
shap_sp_series.plot(kind='barh', ax=ax, color=colors)
ax.set_title('SHAP Importance (red = spurious random column)')
ax.set_xlabel('Mean |SHAP|')
plt.tight_layout(); plt.show()

print(f'random_noise SHAP importance: {shap_sp_series["random_noise"]:.5f} (should be near 0)')
# ── Exercise 4: Counterfactual explanation β€” minimal feature change ────────
# Key insight: a counterfactual shows the smallest perturbation that changes
#              a prediction β€” actionable for end-users (e.g., loan denials).

from scipy.optimize import minimize as sp_min

def counterfactual(clf, x0: np.ndarray, target_class: int = 1,
                   lambda_reg: float = 0.1, n_restarts: int = 5) -> np.ndarray:
    """
    Find x_cf close to x0 such that clf predicts target_class.
    Minimises: CrossEntropy(target | x_cf) + lambda * L2(x_cf - x0)
    """
    best_cf, best_loss = None, np.inf

    def objective(x):
        proba = clf.predict_proba(x.reshape(1, -1))[0, target_class]
        ce    = -np.log(proba + 1e-12)
        l2    = lambda_reg * np.sum((x - x0) ** 2)
        return ce + l2

    rng_cf = np.random.default_rng(7)
    for _ in range(n_restarts):
        x_init = x0 + rng_cf.normal(0, 0.5, x0.shape)
        res = sp_min(objective, x_init, method='Nelder-Mead',
                     options={'maxiter': 2000, 'xatol': 1e-5})
        if res.fun < best_loss:
            best_loss = res.fun
            best_cf   = res.x

    return best_cf


# Pick a sample predicted as class 0 and find CF that flips to class 1
x_sample = X_te.values[clf.predict(X_te) == 0][0]
x_cf = counterfactual(clf, x_sample, target_class=1)

print(f'Original prediction: {clf.predict(x_sample.reshape(1,-1))[0]}')
print(f'CF prediction:       {clf.predict(x_cf.reshape(1,-1))[0]}')
diff = x_cf - x_sample
top3_changes = np.argsort(np.abs(diff))[::-1][:3]
print('Top-3 feature changes:')
for idx in top3_changes:
    print(f'  {feature_names[idx]}: {x_sample[idx]:.3f} β†’ {x_cf[idx]:.3f} (Ξ”={diff[idx]:+.3f})')
# ── Exercise 5: Fairness check β€” SHAP by demographic group ────────────────
# Key insight: features with disparate SHAP impact across demographic groups
#              may proxy protected attributes and cause discriminatory outputs.

rng = np.random.default_rng(1)
group = rng.choice(['GroupA', 'GroupB'], len(X_te))

# Reuse shap_values from Exercise 1
shap_class1 = shap_values[1] if isinstance(shap_values, list) else shap_values
shap_te_df = pd.DataFrame(shap_class1, columns=feature_names)
shap_te_df['group'] = group

mean_shap_by_group = shap_te_df.groupby('group')[feature_names].apply(lambda g: g.abs().mean())

# Compute ratio of Group A / Group B for each feature
ratio = mean_shap_by_group.loc['GroupA'] / (mean_shap_by_group.loc['GroupB'] + 1e-12)
flagged = ratio[ratio > 1.5].sort_values(ascending=False)

fig, axes = plt.subplots(1, 2, figsize=(12, 5))
mean_shap_by_group.T.plot(kind='bar', ax=axes[0])
axes[0].set_title('Mean |SHAP| per group')
axes[0].set_ylabel('Mean |SHAP|')

ratio.sort_values(ascending=False).plot(kind='bar', ax=axes[1], color='darkorange')
axes[1].axhline(1.5, color='red', linestyle='--', label='Disparate impact threshold (1.5x)')
axes[1].set_title('GroupA / GroupB SHAP ratio')
axes[1].legend()

plt.tight_layout(); plt.show()
print('Flagged features (ratio > 1.5x):', flagged.index.tolist() if len(flagged) else 'None')

06 β€” End-to-End ProjectΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split, StratifiedKFold, cross_val_predict
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline
from sklearn.calibration import CalibratedClassifierCV, calibration_curve
from sklearn.metrics import average_precision_score, brier_score_loss
from imblearn.over_sampling import SMOTE
from imblearn.pipeline import Pipeline as ImbPipeline

# Synthetic churn-like dataset
X_raw, y = make_classification(
    n_samples=3000, n_features=10, n_informative=7,
    weights=[0.85, 0.15], random_state=0
)
feat_names = [f'feat_{i}' for i in range(X_raw.shape[1])]
X_df = pd.DataFrame(X_raw, columns=feat_names)
X_tr, X_te, y_tr, y_te = train_test_split(X_df, y, test_size=0.2,
                                            stratify=y, random_state=42)

# ── Exercise 1: Platt scaling calibration + reliability diagram ───────────
# Key insight: Platt scaling (sigmoid fitting) is well-calibrated for RF;
#              isotonic regression works for larger datasets. A perfectly
#              calibrated model has the reliability curve on the diagonal.

base_clf = RandomForestClassifier(n_estimators=200, random_state=0)
base_clf.fit(X_tr, y_tr)

calibrated_clf = CalibratedClassifierCV(base_clf, method='sigmoid', cv='prefit')
calibrated_clf.fit(X_tr, y_tr)

proba_raw  = base_clf.predict_proba(X_te)[:, 1]
proba_cal  = calibrated_clf.predict_proba(X_te)[:, 1]

frac_pos_raw, mean_pred_raw = calibration_curve(y_te, proba_raw, n_bins=10)
frac_pos_cal, mean_pred_cal = calibration_curve(y_te, proba_cal, n_bins=10)

fig, ax = plt.subplots(figsize=(6, 5))
ax.plot([0, 1], [0, 1], 'k--', label='Perfect calibration')
ax.plot(mean_pred_raw, frac_pos_raw, 'o-', color='steelblue',   label=f'RF (Brier={brier_score_loss(y_te, proba_raw):.4f})')
ax.plot(mean_pred_cal, frac_pos_cal, 's-', color='darkorange',  label=f'Platt (Brier={brier_score_loss(y_te, proba_cal):.4f})')
ax.set_xlabel('Mean Predicted Probability'); ax.set_ylabel('Fraction of Positives')
ax.set_title('Reliability Diagram: Before & After Platt Scaling')
ax.legend(); plt.tight_layout(); plt.show()
# ── Exercise 2: Profit-driven threshold (FP=$50, FN=$200) ─────────────────
# Key insight: optimising accuracy or F1 ignores business costs; the optimal
#              threshold maximises total expected profit.
#              Profit = TP*gain - FP*50 - FN*200  (TP gain treated as 0 here
#              since we only care about cost minimisation).

from sklearn.metrics import confusion_matrix

fp_cost = 50   # cost of calling a non-churner (unnecessary retention)
fn_cost = 200  # cost of missing a churner (lost customer)

thresholds = np.linspace(0.01, 0.99, 300)
costs = []
for t in thresholds:
    y_pred_t = (proba_cal >= t).astype(int)
    tn, fp, fn, tp = confusion_matrix(y_te, y_pred_t).ravel()
    total_cost = fp * fp_cost + fn * fn_cost
    costs.append(total_cost)

best_idx = np.argmin(costs)
best_t   = thresholds[best_idx]

plt.figure(figsize=(7, 4))
plt.plot(thresholds, costs, color='steelblue')
plt.axvline(best_t, color='crimson', linestyle='--',
            label=f'Optimal t={best_t:.2f} (cost=${costs[best_idx]:,.0f})')
plt.xlabel('Threshold'); plt.ylabel('Total Cost ($)')
plt.title('Profit-Driven Threshold Optimisation')
plt.legend(); plt.tight_layout(); plt.show()

print(f'Default t=0.5 cost: ${costs[np.argmin(np.abs(thresholds - 0.5))]:,.0f}')
print(f'Optimal threshold:  t={best_t:.3f},  cost=${costs[best_idx]:,.0f}')
# ── Exercise 3: SMOTE in pipeline β€” PR-AUC comparison ─────────────────────
# Key insight: SMOTE must be applied INSIDE the CV loop via an imblearn Pipeline
#              to avoid leaking synthetic samples into validation folds.

from imblearn.pipeline import Pipeline as ImbPipeline
from sklearn.model_selection import cross_val_score

pipe_no_smote = make_pipeline(
    StandardScaler(),
    RandomForestClassifier(n_estimators=100, random_state=0)
)

pipe_smote = ImbPipeline([
    ('scaler', StandardScaler()),
    ('smote',  SMOTE(random_state=0)),
    ('clf',    RandomForestClassifier(n_estimators=100, random_state=0)),
])

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores_no   = cross_val_score(pipe_no_smote, X_tr, y_tr, cv=cv, scoring='average_precision')
scores_smote = cross_val_score(pipe_smote,   X_tr, y_tr, cv=cv, scoring='average_precision')

print(f'Without SMOTE β€” CV PR-AUC: {scores_no.mean():.4f} Β± {scores_no.std():.4f}')
print(f'With    SMOTE β€” CV PR-AUC: {scores_smote.mean():.4f} Β± {scores_smote.std():.4f}')
# ── Exercise 4: Feature importance stability across CV folds ──────────────
# Key insight: variance of importance across folds reveals which features are
#              unstable (often due to correlations) β€” avoid over-relying on them.

cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=0)
fold_importances = []

for trn_idx, val_idx in cv.split(X_tr, y_tr):
    X_fold = X_tr.iloc[trn_idx]
    y_fold = y_tr.iloc[trn_idx] if hasattr(y_tr, 'iloc') else y_tr[trn_idx]
    clf_fold = RandomForestClassifier(n_estimators=100, random_state=0)
    clf_fold.fit(X_fold, y_fold)
    fold_importances.append(clf_fold.feature_importances_)

imp_df = pd.DataFrame(fold_importances, columns=feat_names)
mean_imp = imp_df.mean().sort_values(ascending=False)
std_imp  = imp_df.std().reindex(mean_imp.index)

fig, ax = plt.subplots(figsize=(9, 4))
ax.bar(range(len(mean_imp)), mean_imp.values,
       yerr=std_imp.values, color='steelblue', capsize=5,
       error_kw={'elinewidth': 2, 'ecolor': 'crimson'})
ax.set_xticks(range(len(mean_imp)))
ax.set_xticklabels(mean_imp.index, rotation=45, ha='right')
ax.set_title('Feature Importance Stability (5-fold CV mean Β± std)')
ax.set_ylabel('Importance')
plt.tight_layout(); plt.show()
# ── Exercise 5: FastAPI /predict endpoint with top-3 risk factors ─────────
# Key insight: wrapping the model in a FastAPI app requires only pydantic
#              schemas for input validation; returning SHAP-derived risk factors
#              makes the prediction interpretable for end-users.

# NOTE: run this as a standalone server (uvicorn main:app).
#       Here we define the full app and demonstrate a mock call.

from pydantic import BaseModel
from typing import List
import shap

# Re-train a simple pipeline for the endpoint
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import make_pipeline

final_pipe = make_pipeline(
    StandardScaler(),
    RandomForestClassifier(n_estimators=200, random_state=0)
)
final_pipe.fit(X_tr, y_tr)

# Pre-compute SHAP explainer on the inner RF (after scaling)
scaler_step = final_pipe.named_steps['standardscaler']
rf_step     = final_pipe.named_steps['randomforestclassifier']
explainer_api = shap.TreeExplainer(rf_step)


class CustomerFeatures(BaseModel):
    feat_0: float; feat_1: float; feat_2: float; feat_3: float; feat_4: float
    feat_5: float; feat_6: float; feat_7: float; feat_8: float; feat_9: float


def predict_churn(customer: CustomerFeatures):
    """Simulates POST /predict handler."""
    x = np.array([[customer.feat_0, customer.feat_1, customer.feat_2,
                   customer.feat_3, customer.feat_4, customer.feat_5,
                   customer.feat_6, customer.feat_7, customer.feat_8,
                   customer.feat_9]])
    x_scaled = scaler_step.transform(x)
    probability = rf_step.predict_proba(x_scaled)[0, 1]
    decision    = 'churn' if probability >= 0.5 else 'stay'

    # SHAP explanation for this instance
    sv = explainer_api.shap_values(x_scaled)
    sv_class1 = sv[1][0] if isinstance(sv, list) else sv[0]
    top3_idx  = np.argsort(np.abs(sv_class1))[::-1][:3]
    risk_factors = [
        {'feature': feat_names[i], 'shap_value': round(float(sv_class1[i]), 4)}
        for i in top3_idx
    ]

    return {
        'probability': round(float(probability), 4),
        'decision':    decision,
        'top_risk_factors': risk_factors
    }


# FastAPI app definition (uncomment to deploy)
# from fastapi import FastAPI
# app = FastAPI()
# @app.post('/predict')
# def predict(customer: CustomerFeatures):
#     return predict_churn(customer)


# Demo mock call
sample = X_te.iloc[0]
mock_input = CustomerFeatures(**{f'feat_{i}': float(sample[f'feat_{i}']) for i in range(10)})
result = predict_churn(mock_input)
print(f"Probability: {result['probability']}")
print(f"Decision:    {result['decision']}")
print('Top-3 Risk Factors:')
for rf_item in result['top_risk_factors']:
    print(f"  {rf_item['feature']}: SHAP = {rf_item['shap_value']}")