ML Monitoring & Drift Detection: Keeping Models Healthy in ProductionΒΆ

Models degrade silently. Without monitoring, you only find out when the business suffers. This notebook covers data drift detection, concept drift, performance monitoring, and automated alerting β€” the practices that keep production ML systems reliable.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from scipy import stats
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score, accuracy_score
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)

# Try Evidently for professional drift monitoring
try:
    from evidently.report import Report
    from evidently.metric_preset import DataDriftPreset, ClassificationPreset
    HAS_EVIDENTLY = True
    print('Evidently available for drift reports')
except ImportError:
    HAS_EVIDENTLY = False
    print('Evidently not installed β€” implementing drift detection from scratch')

# Simulate production: model trained in Jan, deployed, data drifts over time
def generate_data(n, drift_factor=0.0, seed=None):
    """Generate customer churn data with controllable drift."""
    rng = np.random.default_rng(seed)
    df = pd.DataFrame({
        'session_count':    rng.poisson(20 * (1 + drift_factor * 0.5), n),
        'days_since_login': rng.exponential(30 * (1 + drift_factor), n),
        'total_spent':      rng.lognormal(4 + drift_factor * 0.3, 1.5, n),
        'support_tickets':  rng.poisson(1 + drift_factor * 2, n),
        'plan_encoded':     rng.choice([0,1,2,3], n, p=[0.5-drift_factor*0.1, 0.25, 0.15, 0.1+drift_factor*0.1]),
    })
    churn_prob = 1 / (1 + np.exp(
        0.5 * np.log(df['session_count'] + 1)
        - 0.2 * df['support_tickets']
        - drift_factor * 0.5
        + rng.normal(0, 1, n)
    ))
    df['churned'] = (churn_prob > 0.5).astype(int)
    return df

# Training data: Jan 2024
train_data = generate_data(2000, drift_factor=0.0, seed=42)

# Production data over 6 months (increasing drift)
monthly_data = {
    'Feb': generate_data(500, drift_factor=0.0, seed=100),
    'Mar': generate_data(500, drift_factor=0.1, seed=101),
    'Apr': generate_data(500, drift_factor=0.3, seed=102),
    'May': generate_data(500, drift_factor=0.6, seed=103),
    'Jun': generate_data(500, drift_factor=1.0, seed=104),
    'Jul': generate_data(500, drift_factor=1.5, seed=105),
}

features = ['session_count', 'days_since_login', 'total_spent', 'support_tickets', 'plan_encoded']

# Train model
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(train_data[features], train_data['churned'])

print(f'Training data: {len(train_data)} rows, churn rate: {train_data["churned"].mean():.1%}')

1. Data Drift β€” When Your Input Distribution ChangesΒΆ

# Data drift = production inputs differ from training inputs
# Can happen due to: seasonality, user behavior changes, upstream data issues

def detect_drift_ks(reference: pd.DataFrame, current: pd.DataFrame,
                    features: list, alpha: float = 0.05) -> pd.DataFrame:
    """
    Kolmogorov-Smirnov test for each feature.
    KS test: compares full distributions, not just means.
    Returns: DataFrame with KS statistic, p-value, and drift flag per feature.
    """
    results = []
    for feat in features:
        ks_stat, p_val = stats.ks_2samp(
            reference[feat].values,
            current[feat].values
        )
        results.append({
            'feature': feat,
            'ks_statistic': round(ks_stat, 4),
            'p_value': round(p_val, 6),
            'drift_detected': p_val < alpha,
            'severity': 'HIGH' if ks_stat > 0.2 else 'MEDIUM' if ks_stat > 0.1 else 'LOW'
        })
    return pd.DataFrame(results).sort_values('ks_statistic', ascending=False)

print('Drift Detection Results by Month (KS test, Ξ±=0.05):')
print()

drift_over_time = {}
for month, data in monthly_data.items():
    drift_df = detect_drift_ks(train_data, data, features)
    n_drifted = drift_df['drift_detected'].sum()
    drift_over_time[month] = drift_df
    print(f'{month}: {n_drifted}/{len(features)} features drifted')
    if n_drifted > 0:
        drifted = drift_df[drift_df['drift_detected']][['feature', 'ks_statistic', 'severity']]
        for _, row in drifted.iterrows():
            print(f'  ⚠️  {row["feature"]:20s} KS={row["ks_statistic"]:.3f} [{row["severity"]}]')

# Visualize drift over time for one feature
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()

for i, (month, data) in enumerate(monthly_data.items()):
    ax = axes[i]
    ax.hist(train_data['days_since_login'], bins=30, alpha=0.5, density=True, label='Train (Jan)', color='blue')
    ax.hist(data['days_since_login'], bins=30, alpha=0.5, density=True, label=month, color='red')
    ks_stat = drift_over_time[month][drift_over_time[month]['feature']=='days_since_login']['ks_statistic'].values[0]
    ax.set_title(f'{month}: KS={ks_stat:.3f}')
    ax.legend(fontsize=8)

plt.suptitle('Feature Distribution Drift: days_since_login', fontsize=13)
plt.tight_layout()
plt.show()

2. Concept Drift β€” When Your Model’s Predictions Become StaleΒΆ

# Concept drift = the relationship between features and target changes
# Even if features look normal, the model can still be wrong

def monitor_predictions(model, data: pd.DataFrame, features: list,
                        reference_score: float) -> dict:
    """Monitor model prediction quality over time."""
    X = data[features]
    y = data['churned']
    y_pred = model.predict(X)
    y_prob = model.predict_proba(X)[:, 1]
    
    auc = roc_auc_score(y, y_prob)
    acc = accuracy_score(y, y_pred)
    pred_churn_rate = y_pred.mean()
    actual_churn_rate = y.mean()
    
    return {
        'auc': auc,
        'accuracy': acc,
        'auc_drop': reference_score - auc,
        'pred_churn_rate': pred_churn_rate,
        'actual_churn_rate': actual_churn_rate,
        'prediction_bias': pred_churn_rate - actual_churn_rate,
    }

# Baseline performance on training data
ref_auc = roc_auc_score(train_data['churned'], model.predict_proba(train_data[features])[:, 1])
print(f'Baseline AUC (train): {ref_auc:.4f}')
print()

metrics_over_time = {}
for month, data in monthly_data.items():
    m = monitor_predictions(model, data, features, ref_auc)
    metrics_over_time[month] = m
    alert = '🚨' if m['auc_drop'] > 0.05 else '⚠️' if m['auc_drop'] > 0.02 else 'βœ…'
    print(f'{month} {alert}  AUC={m["auc"]:.4f} (Ξ”={m["auc_drop"]:+.4f}) | bias={m["prediction_bias"]:+.3f}')

# Performance degradation chart
months = list(metrics_over_time.keys())
aucs = [metrics_over_time[m]['auc'] for m in months]
biases = [metrics_over_time[m]['prediction_bias'] for m in months]

fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(13, 5))

ax1.plot(months, aucs, 'bo-', linewidth=2, markersize=8)
ax1.axhline(ref_auc, color='green', linestyle='--', label=f'Baseline AUC: {ref_auc:.3f}')
ax1.axhline(ref_auc - 0.05, color='orange', linestyle=':', alpha=0.7, label='Warning threshold')
ax1.axhline(ref_auc - 0.10, color='red', linestyle=':', alpha=0.7, label='Critical threshold')
ax1.set_xlabel('Month')
ax1.set_ylabel('ROC-AUC')
ax1.set_title('Model Performance Degradation Over Time')
ax1.legend()
ax1.set_ylim(0.5, 1.0)

colors = ['red' if abs(b) > 0.05 else 'orange' if abs(b) > 0.02 else 'green' for b in biases]
ax2.bar(months, biases, color=colors, alpha=0.8)
ax2.axhline(0, color='black', linewidth=0.5)
ax2.set_xlabel('Month')
ax2.set_ylabel('Prediction Bias (pred - actual churn rate)')
ax2.set_title('Prediction Bias Over Time')

plt.tight_layout()
plt.show()

3. Population Stability Index (PSI)ΒΆ

# PSI measures distribution shift β€” widely used in financial ML
# PSI < 0.1:  stable (no action needed)
# PSI 0.1-0.2: moderate shift (investigate)
# PSI > 0.2:  significant shift (retrain or adjust)

def calculate_psi(reference: np.ndarray, current: np.ndarray,
                  n_bins: int = 10, epsilon: float = 1e-4) -> float:
    """Population Stability Index."""
    # Use reference percentiles as bins
    bins = np.percentile(reference, np.linspace(0, 100, n_bins + 1))
    bins[0] -= 1e-10  # Include minimum
    bins[-1] += 1e-10  # Include maximum
    
    ref_counts, _ = np.histogram(reference, bins=bins)
    cur_counts, _ = np.histogram(current, bins=bins)
    
    # Convert to proportions with epsilon to avoid log(0)
    ref_pct = (ref_counts + epsilon) / len(reference)
    cur_pct = (cur_counts + epsilon) / len(current)
    
    psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
    return psi

print('PSI Analysis (score distribution drift):')
print(f'{"Month":<8} ', end='')
for feat in features:
    print(f'{feat[:12]:<14}', end='')
print()
print('-' * 80)

for month, data in monthly_data.items():
    print(f'{month:<8} ', end='')
    for feat in features:
        psi = calculate_psi(train_data[feat].values, data[feat].values)
        flag = '🚨' if psi > 0.2 else '⚠️' if psi > 0.1 else '  '
        print(f'{psi:.3f}{flag}        '[:(14)], end='')
    print()

print()
print('PSI Legend: < 0.10 = stable | 0.10-0.20 = moderate | > 0.20 = significant shift')

4. Automated Alerting & Retraining TriggersΒΆ

from dataclasses import dataclass, field
from typing import List, Callable, Optional
from enum import Enum

class AlertSeverity(Enum):
    INFO    = 'info'
    WARNING = 'warning'
    CRITICAL = 'critical'

@dataclass
class Alert:
    severity: AlertSeverity
    metric: str
    value: float
    threshold: float
    message: str

@dataclass
class MonitoringConfig:
    auc_warning_threshold: float = 0.02   # AUC drop that triggers warning
    auc_critical_threshold: float = 0.05  # AUC drop that triggers critical
    psi_warning_threshold: float = 0.10
    psi_critical_threshold: float = 0.20
    bias_threshold: float = 0.05          # Absolute prediction bias

def run_monitoring_check(model, reference_data, current_data, features,
                         config: MonitoringConfig, month: str) -> List[Alert]:
    """Full monitoring check returning list of alerts."""
    alerts = []
    ref_auc = roc_auc_score(reference_data['churned'], model.predict_proba(reference_data[features])[:,1])
    
    # 1. Model performance
    metrics = monitor_predictions(model, current_data, features, ref_auc)
    
    if metrics['auc_drop'] > config.auc_critical_threshold:
        alerts.append(Alert(
            AlertSeverity.CRITICAL, 'AUC', metrics['auc'], ref_auc - config.auc_critical_threshold,
            f'AUC dropped {metrics["auc_drop"]:.3f} from baseline β†’ RETRAIN IMMEDIATELY'
        ))
    elif metrics['auc_drop'] > config.auc_warning_threshold:
        alerts.append(Alert(
            AlertSeverity.WARNING, 'AUC', metrics['auc'], ref_auc - config.auc_warning_threshold,
            f'AUC dropped {metrics["auc_drop"]:.3f} β†’ Monitor closely'
        ))
    
    if abs(metrics['prediction_bias']) > config.bias_threshold:
        alerts.append(Alert(
            AlertSeverity.WARNING, 'Prediction Bias', metrics['prediction_bias'], config.bias_threshold,
            f'Model is predicting {metrics["prediction_bias"]:+.1%} vs actual β†’ Calibration needed'
        ))
    
    # 2. Data drift
    for feat in features:
        psi = calculate_psi(reference_data[feat].values, current_data[feat].values)
        if psi > config.psi_critical_threshold:
            alerts.append(Alert(
                AlertSeverity.CRITICAL, f'PSI:{feat}', psi, config.psi_critical_threshold,
                f'{feat} distribution has significant shift (PSI={psi:.3f})'
            ))
        elif psi > config.psi_warning_threshold:
            alerts.append(Alert(
                AlertSeverity.WARNING, f'PSI:{feat}', psi, config.psi_warning_threshold,
                f'{feat} distribution is shifting (PSI={psi:.3f})'
            ))
    
    return alerts

config = MonitoringConfig()
print('=== Monthly Monitoring Dashboard ===')
print()

all_alerts = {}
for month, data in monthly_data.items():
    alerts = run_monitoring_check(model, train_data, data, features, config, month)
    all_alerts[month] = alerts
    
    critical = [a for a in alerts if a.severity == AlertSeverity.CRITICAL]
    warnings = [a for a in alerts if a.severity == AlertSeverity.WARNING]
    status = '🚨 CRITICAL' if critical else '⚠️  WARNING ' if warnings else 'βœ… HEALTHY '
    
    print(f'{month}: {status} | {len(critical)} critical, {len(warnings)} warnings')
    for alert in critical:
        print(f'  🚨 {alert.message}')
    if month in ('Jun', 'Jul'):
        for alert in warnings[:2]:  # Show first 2 warnings
            print(f'  ⚠️  {alert.message}')

5. Retraining StrategyΒΆ

# When to retrain and how

def retrain_model(new_data: pd.DataFrame, features: list, target: str,
                  strategy: str = 'full') -> tuple:
    """
    Retrain strategies:
    - 'full': train on all available data
    - 'sliding_window': only recent N months
    - 'online': incremental update (warm_start)
    """
    model_new = GradientBoostingClassifier(n_estimators=100, random_state=42)
    
    if strategy == 'sliding_window':
        # Use only most recent 3 months
        window_data = new_data.tail(1500)
        model_new.fit(window_data[features], window_data[target])
    else:
        model_new.fit(new_data[features], new_data[target])
    
    return model_new

# Simulate retraining at month 5 (May)
# Combine training + first 4 months of production
extended_data = pd.concat([
    train_data,
    monthly_data['Feb'],
    monthly_data['Mar'],
    monthly_data['Apr'],
], ignore_index=True)

retrained_model = retrain_model(extended_data, features, 'churned')

# Compare original vs retrained on Jun and Jul
print('Performance Comparison: Original vs Retrained Model')
print(f'{"Month":<8} {"Original AUC":<16} {"Retrained AUC":<16} {"Improvement"}')
print('-' * 55)

for month in ['May', 'Jun', 'Jul']:
    data = monthly_data[month]
    X, y = data[features], data['churned']
    
    auc_orig    = roc_auc_score(y, model.predict_proba(X)[:, 1])
    auc_retrain = roc_auc_score(y, retrained_model.predict_proba(X)[:, 1])
    improvement = auc_retrain - auc_orig
    flag = 'βœ…' if improvement > 0 else '❌'
    
    print(f'{month:<8} {auc_orig:.4f}           {auc_retrain:.4f}           {improvement:+.4f} {flag}')

print()
print('Retraining best practices:')
print('  1. Shadow mode: new model runs in parallel, compare before switching')
print('  2. Champion/challenger: new model handles 10% of traffic first')
print('  3. Rollback plan: keep N-1 model ready to restore in < 5 minutes')
print('  4. Scheduled retraining: weekly/monthly even without drift signals')
print('  5. Track model lineage: which data version trained which model version')

ML Monitoring Cheat SheetΒΆ

Signal Type         Metric              Threshold    Tool
────────────────────────────────────────────────────────────────
Model performance   AUC/Accuracy drop   > 2-5%       Custom, Evidently
Prediction quality  Calibration / bias  > 5pp        scipy, scikit-learn
Data drift          KS test             p < 0.05     Evidently, NannyML
Data drift          PSI                 > 0.10       Custom
Feature health      Null rate spike     > 2x normal  Pandas / Great Expectations
Prediction drift    Output distribution shift         Evidently

Monitoring Stack (open source):
  Evidently   β†’ Drift reports + dashboards (easiest to start)
  NannyML     β†’ Performance estimation without labels
  Grafana     β†’ Time-series dashboards from any metric source
  Prometheus  β†’ Metrics collection from your FastAPI endpoint
  MLflow      β†’ Model tracking + experiment comparison

Retraining Triggers:
  Scheduled:    Weekly / monthly regardless of drift
  Event-based:  PSI > 0.2, AUC drop > 5%, bias > 5pp
  Performance:  Business KPI deviation (revenue, conversion)

ExercisesΒΆ

  1. Implement the Chi-square test as an alternative to KS test for categorical feature drift.

  2. Use NannyML to estimate model performance WITHOUT ground truth labels (useful when labels arrive with delay).

  3. Build an Evidently HTML drift report for the train vs Jul production data.

  4. Implement a sliding window retraining pipeline that automatically retrains when PSI > 0.2 on any feature.

  5. Create a Grafana dashboard JSON definition that tracks AUC and PSI over time using Prometheus metrics.