ML Monitoring & Drift Detection: Keeping Models Healthy in ProductionΒΆ
Models degrade silently. Without monitoring, you only find out when the business suffers. This notebook covers data drift detection, concept drift, performance monitoring, and automated alerting β the practices that keep production ML systems reliable.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
from scipy import stats
from sklearn.ensemble import GradientBoostingClassifier, RandomForestClassifier
from sklearn.preprocessing import StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.metrics import roc_auc_score, accuracy_score
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
# Try Evidently for professional drift monitoring
try:
from evidently.report import Report
from evidently.metric_preset import DataDriftPreset, ClassificationPreset
HAS_EVIDENTLY = True
print('Evidently available for drift reports')
except ImportError:
HAS_EVIDENTLY = False
print('Evidently not installed β implementing drift detection from scratch')
# Simulate production: model trained in Jan, deployed, data drifts over time
def generate_data(n, drift_factor=0.0, seed=None):
"""Generate customer churn data with controllable drift."""
rng = np.random.default_rng(seed)
df = pd.DataFrame({
'session_count': rng.poisson(20 * (1 + drift_factor * 0.5), n),
'days_since_login': rng.exponential(30 * (1 + drift_factor), n),
'total_spent': rng.lognormal(4 + drift_factor * 0.3, 1.5, n),
'support_tickets': rng.poisson(1 + drift_factor * 2, n),
'plan_encoded': rng.choice([0,1,2,3], n, p=[0.5-drift_factor*0.1, 0.25, 0.15, 0.1+drift_factor*0.1]),
})
churn_prob = 1 / (1 + np.exp(
0.5 * np.log(df['session_count'] + 1)
- 0.2 * df['support_tickets']
- drift_factor * 0.5
+ rng.normal(0, 1, n)
))
df['churned'] = (churn_prob > 0.5).astype(int)
return df
# Training data: Jan 2024
train_data = generate_data(2000, drift_factor=0.0, seed=42)
# Production data over 6 months (increasing drift)
monthly_data = {
'Feb': generate_data(500, drift_factor=0.0, seed=100),
'Mar': generate_data(500, drift_factor=0.1, seed=101),
'Apr': generate_data(500, drift_factor=0.3, seed=102),
'May': generate_data(500, drift_factor=0.6, seed=103),
'Jun': generate_data(500, drift_factor=1.0, seed=104),
'Jul': generate_data(500, drift_factor=1.5, seed=105),
}
features = ['session_count', 'days_since_login', 'total_spent', 'support_tickets', 'plan_encoded']
# Train model
model = GradientBoostingClassifier(n_estimators=100, random_state=42)
model.fit(train_data[features], train_data['churned'])
print(f'Training data: {len(train_data)} rows, churn rate: {train_data["churned"].mean():.1%}')
1. Data Drift β When Your Input Distribution ChangesΒΆ
# Data drift = production inputs differ from training inputs
# Can happen due to: seasonality, user behavior changes, upstream data issues
def detect_drift_ks(reference: pd.DataFrame, current: pd.DataFrame,
features: list, alpha: float = 0.05) -> pd.DataFrame:
"""
Kolmogorov-Smirnov test for each feature.
KS test: compares full distributions, not just means.
Returns: DataFrame with KS statistic, p-value, and drift flag per feature.
"""
results = []
for feat in features:
ks_stat, p_val = stats.ks_2samp(
reference[feat].values,
current[feat].values
)
results.append({
'feature': feat,
'ks_statistic': round(ks_stat, 4),
'p_value': round(p_val, 6),
'drift_detected': p_val < alpha,
'severity': 'HIGH' if ks_stat > 0.2 else 'MEDIUM' if ks_stat > 0.1 else 'LOW'
})
return pd.DataFrame(results).sort_values('ks_statistic', ascending=False)
print('Drift Detection Results by Month (KS test, Ξ±=0.05):')
print()
drift_over_time = {}
for month, data in monthly_data.items():
drift_df = detect_drift_ks(train_data, data, features)
n_drifted = drift_df['drift_detected'].sum()
drift_over_time[month] = drift_df
print(f'{month}: {n_drifted}/{len(features)} features drifted')
if n_drifted > 0:
drifted = drift_df[drift_df['drift_detected']][['feature', 'ks_statistic', 'severity']]
for _, row in drifted.iterrows():
print(f' β οΈ {row["feature"]:20s} KS={row["ks_statistic"]:.3f} [{row["severity"]}]')
# Visualize drift over time for one feature
fig, axes = plt.subplots(2, 3, figsize=(15, 8))
axes = axes.flatten()
for i, (month, data) in enumerate(monthly_data.items()):
ax = axes[i]
ax.hist(train_data['days_since_login'], bins=30, alpha=0.5, density=True, label='Train (Jan)', color='blue')
ax.hist(data['days_since_login'], bins=30, alpha=0.5, density=True, label=month, color='red')
ks_stat = drift_over_time[month][drift_over_time[month]['feature']=='days_since_login']['ks_statistic'].values[0]
ax.set_title(f'{month}: KS={ks_stat:.3f}')
ax.legend(fontsize=8)
plt.suptitle('Feature Distribution Drift: days_since_login', fontsize=13)
plt.tight_layout()
plt.show()
2. Concept Drift β When Your Modelβs Predictions Become StaleΒΆ
# Concept drift = the relationship between features and target changes
# Even if features look normal, the model can still be wrong
def monitor_predictions(model, data: pd.DataFrame, features: list,
reference_score: float) -> dict:
"""Monitor model prediction quality over time."""
X = data[features]
y = data['churned']
y_pred = model.predict(X)
y_prob = model.predict_proba(X)[:, 1]
auc = roc_auc_score(y, y_prob)
acc = accuracy_score(y, y_pred)
pred_churn_rate = y_pred.mean()
actual_churn_rate = y.mean()
return {
'auc': auc,
'accuracy': acc,
'auc_drop': reference_score - auc,
'pred_churn_rate': pred_churn_rate,
'actual_churn_rate': actual_churn_rate,
'prediction_bias': pred_churn_rate - actual_churn_rate,
}
# Baseline performance on training data
ref_auc = roc_auc_score(train_data['churned'], model.predict_proba(train_data[features])[:, 1])
print(f'Baseline AUC (train): {ref_auc:.4f}')
print()
metrics_over_time = {}
for month, data in monthly_data.items():
m = monitor_predictions(model, data, features, ref_auc)
metrics_over_time[month] = m
alert = 'π¨' if m['auc_drop'] > 0.05 else 'β οΈ' if m['auc_drop'] > 0.02 else 'β
'
print(f'{month} {alert} AUC={m["auc"]:.4f} (Ξ={m["auc_drop"]:+.4f}) | bias={m["prediction_bias"]:+.3f}')
# Performance degradation chart
months = list(metrics_over_time.keys())
aucs = [metrics_over_time[m]['auc'] for m in months]
biases = [metrics_over_time[m]['prediction_bias'] for m in months]
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(13, 5))
ax1.plot(months, aucs, 'bo-', linewidth=2, markersize=8)
ax1.axhline(ref_auc, color='green', linestyle='--', label=f'Baseline AUC: {ref_auc:.3f}')
ax1.axhline(ref_auc - 0.05, color='orange', linestyle=':', alpha=0.7, label='Warning threshold')
ax1.axhline(ref_auc - 0.10, color='red', linestyle=':', alpha=0.7, label='Critical threshold')
ax1.set_xlabel('Month')
ax1.set_ylabel('ROC-AUC')
ax1.set_title('Model Performance Degradation Over Time')
ax1.legend()
ax1.set_ylim(0.5, 1.0)
colors = ['red' if abs(b) > 0.05 else 'orange' if abs(b) > 0.02 else 'green' for b in biases]
ax2.bar(months, biases, color=colors, alpha=0.8)
ax2.axhline(0, color='black', linewidth=0.5)
ax2.set_xlabel('Month')
ax2.set_ylabel('Prediction Bias (pred - actual churn rate)')
ax2.set_title('Prediction Bias Over Time')
plt.tight_layout()
plt.show()
3. Population Stability Index (PSI)ΒΆ
# PSI measures distribution shift β widely used in financial ML
# PSI < 0.1: stable (no action needed)
# PSI 0.1-0.2: moderate shift (investigate)
# PSI > 0.2: significant shift (retrain or adjust)
def calculate_psi(reference: np.ndarray, current: np.ndarray,
n_bins: int = 10, epsilon: float = 1e-4) -> float:
"""Population Stability Index."""
# Use reference percentiles as bins
bins = np.percentile(reference, np.linspace(0, 100, n_bins + 1))
bins[0] -= 1e-10 # Include minimum
bins[-1] += 1e-10 # Include maximum
ref_counts, _ = np.histogram(reference, bins=bins)
cur_counts, _ = np.histogram(current, bins=bins)
# Convert to proportions with epsilon to avoid log(0)
ref_pct = (ref_counts + epsilon) / len(reference)
cur_pct = (cur_counts + epsilon) / len(current)
psi = np.sum((cur_pct - ref_pct) * np.log(cur_pct / ref_pct))
return psi
print('PSI Analysis (score distribution drift):')
print(f'{"Month":<8} ', end='')
for feat in features:
print(f'{feat[:12]:<14}', end='')
print()
print('-' * 80)
for month, data in monthly_data.items():
print(f'{month:<8} ', end='')
for feat in features:
psi = calculate_psi(train_data[feat].values, data[feat].values)
flag = 'π¨' if psi > 0.2 else 'β οΈ' if psi > 0.1 else ' '
print(f'{psi:.3f}{flag} '[:(14)], end='')
print()
print()
print('PSI Legend: < 0.10 = stable | 0.10-0.20 = moderate | > 0.20 = significant shift')
4. Automated Alerting & Retraining TriggersΒΆ
from dataclasses import dataclass, field
from typing import List, Callable, Optional
from enum import Enum
class AlertSeverity(Enum):
INFO = 'info'
WARNING = 'warning'
CRITICAL = 'critical'
@dataclass
class Alert:
severity: AlertSeverity
metric: str
value: float
threshold: float
message: str
@dataclass
class MonitoringConfig:
auc_warning_threshold: float = 0.02 # AUC drop that triggers warning
auc_critical_threshold: float = 0.05 # AUC drop that triggers critical
psi_warning_threshold: float = 0.10
psi_critical_threshold: float = 0.20
bias_threshold: float = 0.05 # Absolute prediction bias
def run_monitoring_check(model, reference_data, current_data, features,
config: MonitoringConfig, month: str) -> List[Alert]:
"""Full monitoring check returning list of alerts."""
alerts = []
ref_auc = roc_auc_score(reference_data['churned'], model.predict_proba(reference_data[features])[:,1])
# 1. Model performance
metrics = monitor_predictions(model, current_data, features, ref_auc)
if metrics['auc_drop'] > config.auc_critical_threshold:
alerts.append(Alert(
AlertSeverity.CRITICAL, 'AUC', metrics['auc'], ref_auc - config.auc_critical_threshold,
f'AUC dropped {metrics["auc_drop"]:.3f} from baseline β RETRAIN IMMEDIATELY'
))
elif metrics['auc_drop'] > config.auc_warning_threshold:
alerts.append(Alert(
AlertSeverity.WARNING, 'AUC', metrics['auc'], ref_auc - config.auc_warning_threshold,
f'AUC dropped {metrics["auc_drop"]:.3f} β Monitor closely'
))
if abs(metrics['prediction_bias']) > config.bias_threshold:
alerts.append(Alert(
AlertSeverity.WARNING, 'Prediction Bias', metrics['prediction_bias'], config.bias_threshold,
f'Model is predicting {metrics["prediction_bias"]:+.1%} vs actual β Calibration needed'
))
# 2. Data drift
for feat in features:
psi = calculate_psi(reference_data[feat].values, current_data[feat].values)
if psi > config.psi_critical_threshold:
alerts.append(Alert(
AlertSeverity.CRITICAL, f'PSI:{feat}', psi, config.psi_critical_threshold,
f'{feat} distribution has significant shift (PSI={psi:.3f})'
))
elif psi > config.psi_warning_threshold:
alerts.append(Alert(
AlertSeverity.WARNING, f'PSI:{feat}', psi, config.psi_warning_threshold,
f'{feat} distribution is shifting (PSI={psi:.3f})'
))
return alerts
config = MonitoringConfig()
print('=== Monthly Monitoring Dashboard ===')
print()
all_alerts = {}
for month, data in monthly_data.items():
alerts = run_monitoring_check(model, train_data, data, features, config, month)
all_alerts[month] = alerts
critical = [a for a in alerts if a.severity == AlertSeverity.CRITICAL]
warnings = [a for a in alerts if a.severity == AlertSeverity.WARNING]
status = 'π¨ CRITICAL' if critical else 'β οΈ WARNING ' if warnings else 'β
HEALTHY '
print(f'{month}: {status} | {len(critical)} critical, {len(warnings)} warnings')
for alert in critical:
print(f' π¨ {alert.message}')
if month in ('Jun', 'Jul'):
for alert in warnings[:2]: # Show first 2 warnings
print(f' β οΈ {alert.message}')
5. Retraining StrategyΒΆ
# When to retrain and how
def retrain_model(new_data: pd.DataFrame, features: list, target: str,
strategy: str = 'full') -> tuple:
"""
Retrain strategies:
- 'full': train on all available data
- 'sliding_window': only recent N months
- 'online': incremental update (warm_start)
"""
model_new = GradientBoostingClassifier(n_estimators=100, random_state=42)
if strategy == 'sliding_window':
# Use only most recent 3 months
window_data = new_data.tail(1500)
model_new.fit(window_data[features], window_data[target])
else:
model_new.fit(new_data[features], new_data[target])
return model_new
# Simulate retraining at month 5 (May)
# Combine training + first 4 months of production
extended_data = pd.concat([
train_data,
monthly_data['Feb'],
monthly_data['Mar'],
monthly_data['Apr'],
], ignore_index=True)
retrained_model = retrain_model(extended_data, features, 'churned')
# Compare original vs retrained on Jun and Jul
print('Performance Comparison: Original vs Retrained Model')
print(f'{"Month":<8} {"Original AUC":<16} {"Retrained AUC":<16} {"Improvement"}')
print('-' * 55)
for month in ['May', 'Jun', 'Jul']:
data = monthly_data[month]
X, y = data[features], data['churned']
auc_orig = roc_auc_score(y, model.predict_proba(X)[:, 1])
auc_retrain = roc_auc_score(y, retrained_model.predict_proba(X)[:, 1])
improvement = auc_retrain - auc_orig
flag = 'β
' if improvement > 0 else 'β'
print(f'{month:<8} {auc_orig:.4f} {auc_retrain:.4f} {improvement:+.4f} {flag}')
print()
print('Retraining best practices:')
print(' 1. Shadow mode: new model runs in parallel, compare before switching')
print(' 2. Champion/challenger: new model handles 10% of traffic first')
print(' 3. Rollback plan: keep N-1 model ready to restore in < 5 minutes')
print(' 4. Scheduled retraining: weekly/monthly even without drift signals')
print(' 5. Track model lineage: which data version trained which model version')
ML Monitoring Cheat SheetΒΆ
Signal Type Metric Threshold Tool
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Model performance AUC/Accuracy drop > 2-5% Custom, Evidently
Prediction quality Calibration / bias > 5pp scipy, scikit-learn
Data drift KS test p < 0.05 Evidently, NannyML
Data drift PSI > 0.10 Custom
Feature health Null rate spike > 2x normal Pandas / Great Expectations
Prediction drift Output distribution shift Evidently
Monitoring Stack (open source):
Evidently β Drift reports + dashboards (easiest to start)
NannyML β Performance estimation without labels
Grafana β Time-series dashboards from any metric source
Prometheus β Metrics collection from your FastAPI endpoint
MLflow β Model tracking + experiment comparison
Retraining Triggers:
Scheduled: Weekly / monthly regardless of drift
Event-based: PSI > 0.2, AUC drop > 5%, bias > 5pp
Performance: Business KPI deviation (revenue, conversion)
ExercisesΒΆ
Implement the Chi-square test as an alternative to KS test for categorical feature drift.
Use
NannyMLto estimate model performance WITHOUT ground truth labels (useful when labels arrive with delay).Build an Evidently HTML drift report for the train vs Jul production data.
Implement a sliding window retraining pipeline that automatically retrains when PSI > 0.2 on any feature.
Create a Grafana dashboard JSON definition that tracks AUC and PSI over time using Prometheus metrics.