Anomaly Detection: Finding the Signal in Noisy Time SeriesΒΆ
Anomalies are events that donβt fit the expected pattern β a spike in server errors, an unusual drop in sales, a sensor reading thatβs physically impossible. This notebook covers statistical, ML, and deep learning approaches to finding them.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
import warnings
warnings.filterwarnings('ignore')
np.random.seed(42)
# Synthetic server metrics time series
n = 500
t = np.arange(n)
# Normal behavior: daily and weekly patterns
daily = 100 + 20 * np.sin(2 * np.pi * t / 24)
weekly = 15 * np.sin(2 * np.pi * t / (24 * 7))
noise = np.random.normal(0, 5, n)
normal = daily + weekly + noise
# Inject anomalies
anomalies = normal.copy()
anomalies[80] += 120 # Spike
anomalies[200] -= 80 # Dip
anomalies[300:310] += 60 # Sustained elevation
anomalies[420] = 0 # Drop to zero
true_anomaly_idx = [80, 200, *range(300, 310), 420]
true_labels = np.zeros(n, dtype=int)
true_labels[true_anomaly_idx] = 1
ts = pd.Series(anomalies, name='server_requests')
print(f'Series: {n} points, {true_labels.sum()} injected anomalies')
1. Statistical Methods β Fast and InterpretableΒΆ
# Method 1: Z-score (global) β detects global outliers
def zscore_anomaly(series: pd.Series, threshold: float = 3.0) -> np.ndarray:
z = np.abs(stats.zscore(series))
return (z > threshold).astype(int)
# Method 2: Rolling Z-score β detects local anomalies relative to recent window
def rolling_zscore_anomaly(series: pd.Series, window: int = 50, threshold: float = 3.0) -> np.ndarray:
rolling_mean = series.rolling(window, center=True, min_periods=1).mean()
rolling_std = series.rolling(window, center=True, min_periods=1).std().fillna(1)
z = np.abs((series - rolling_mean) / rolling_std)
return (z > threshold).astype(int)
# Method 3: IQR-based (robust to outliers in baseline)
def iqr_anomaly(series: pd.Series, factor: float = 3.0) -> np.ndarray:
Q1, Q3 = series.quantile(0.25), series.quantile(0.75)
IQR = Q3 - Q1
return ((series < Q1 - factor*IQR) | (series > Q3 + factor*IQR)).astype(int)
# Method 4: Moving average Β± bands
def moving_avg_anomaly(series: pd.Series, window: int = 30, sigma: float = 3.0) -> np.ndarray:
ma = series.rolling(window, min_periods=1).mean()
std = series.rolling(window, min_periods=1).std().fillna(0)
lower = ma - sigma * std
upper = ma + sigma * std
return ((series < lower) | (series > upper)).astype(int)
results = {
'Z-score (global)': zscore_anomaly(ts),
'Rolling Z-score': rolling_zscore_anomaly(ts),
'IQR (3Γ)': iqr_anomaly(ts),
'Moving Avg Bands': moving_avg_anomaly(ts),
}
print('Detection summary:')
print(f'{"Method":<25} {"Detected":<10} {"True Positives":<15} {"False Positives"}')
print('-' * 65)
for method, preds in results.items():
det = preds.sum()
tp = (preds & true_labels).sum()
fp = (preds & ~true_labels).sum()
print(f'{method:<25} {det:<10} {tp:<15} {fp}')
2. Isolation Forest β ML-Based Anomaly DetectionΒΆ
# Create features for ML approach
def create_ts_features(series: pd.Series, window: int = 20) -> pd.DataFrame:
"""Feature engineering for anomaly detection."""
df = pd.DataFrame({'value': series})
df['rolling_mean'] = series.rolling(window, min_periods=1).mean()
df['rolling_std'] = series.rolling(window, min_periods=1).std().fillna(0)
df['residual'] = series - df['rolling_mean']
df['z_score'] = df['residual'] / (df['rolling_std'] + 1e-6)
df['lag1'] = series.shift(1).fillna(series.mean())
df['lag2'] = series.shift(2).fillna(series.mean())
df['diff1'] = series.diff().fillna(0)
return df
X_features = create_ts_features(ts)
# Isolation Forest: builds random trees, anomalies have shorter paths
iforest = IsolationForest(
contamination=0.05, # Expected proportion of anomalies
n_estimators=100,
random_state=42,
)
iforest_labels = (iforest.fit_predict(X_features) == -1).astype(int)
iforest_scores = -iforest.score_samples(X_features) # Higher = more anomalous
# LOF: Local Outlier Factor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
lof_labels = (lof.fit_predict(X_features) == -1).astype(int)
lof_scores = -lof.negative_outlier_factor_
for method, labels in [('Isolation Forest', iforest_labels), ('LOF', lof_labels)]:
tp = (labels & true_labels).sum()
fp = (labels & ~true_labels).sum()
print(f'{method}: detected={labels.sum()}, TP={tp}, FP={fp}')
# Visualize
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)
axes[0].plot(ts.values, color='steelblue', linewidth=0.8, alpha=0.8)
axes[0].scatter(np.where(true_labels)[0], ts.values[true_labels==1],
color='red', s=50, zorder=5, label='True anomaly')
axes[0].set_title('Time Series with True Anomalies')
axes[0].legend()
axes[1].plot(iforest_scores, color='orange', linewidth=1)
threshold = np.percentile(iforest_scores, 95)
axes[1].axhline(threshold, color='red', linestyle='--', label=f'Threshold (95th pct)')
axes[1].scatter(np.where(iforest_labels)[0], iforest_scores[iforest_labels==1],
color='red', s=50, zorder=5, label='Detected')
axes[1].set_title('Isolation Forest Anomaly Scores')
axes[1].legend()
axes[2].plot(lof_scores / lof_scores.max(), color='purple', linewidth=1)
lof_threshold = np.percentile(lof_scores / lof_scores.max(), 95)
axes[2].axhline(lof_threshold, color='red', linestyle='--')
axes[2].scatter(np.where(lof_labels)[0], (lof_scores/lof_scores.max())[lof_labels==1],
color='red', s=50, zorder=5)
axes[2].set_title('LOF Scores (Normalized)')
plt.tight_layout()
plt.show()
3. Residual-Based Detection β Predict then DetectΒΆ
# Predict expected values β flag large residuals
# Works well when you have a good forecasting model
from sklearn.linear_model import LinearRegression
# Fit a simple model on normal data
# In production: train on known-clean data, score production data
X_clean = X_features.values[:400] # Assume first 400 are mostly clean
y_clean = ts.values[:400]
lr = LinearRegression()
lr.fit(X_clean, y_clean)
# Predict on full series
y_pred = lr.predict(X_features.values)
residuals = ts.values - y_pred
# Anomaly = residual > threshold
residual_std = residuals[:400].std() # Std from clean period
residual_threshold = 3 * residual_std
residual_labels = (np.abs(residuals) > residual_threshold).astype(int)
tp = (residual_labels & true_labels).sum()
fp = (residual_labels & ~true_labels).sum()
print(f'Residual-based detection: TP={tp}, FP={fp}')
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(13, 7), sharex=True)
ax1.plot(ts.values, color='steelblue', linewidth=0.8)
ax1.plot(y_pred, color='orange', linewidth=1.5, label='Predicted')
ax1.scatter(np.where(true_labels)[0], ts.values[true_labels==1],
color='red', s=60, zorder=5, label='True anomaly')
ax1.set_title('Predicted vs Actual')
ax1.legend()
ax2.fill_between(range(n), -residual_threshold, residual_threshold,
alpha=0.2, color='green', label='Normal range')
ax2.plot(residuals, color='steelblue', linewidth=0.8)
ax2.scatter(np.where(residual_labels)[0], residuals[residual_labels==1],
color='red', s=60, zorder=5, label='Detected anomaly')
ax2.axhline(0, color='black', linewidth=0.5)
ax2.set_title('Residuals β Detect by Exceeding Β±3Ο')
ax2.legend()
plt.tight_layout()
plt.show()
Anomaly Detection Cheat SheetΒΆ
Method Best For Complexity
ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Z-score Simple global outliers β
ββ
Rolling Z-score Local outliers in trends β
β
β
Moving Avg Bands Business dashboards β
ββ
Isolation Forest Multivariate, no labels β
β
β
LOF Density-based clusters β
β
β
Residual (model-based) Known pattern + deviation β
β
β
Autoencoders Complex sequences β
β
β
β
Key decisions:
Supervised vs unsupervised: Do you have labeled anomalies?
Point vs contextual: Is the value out of range, or just unusual for this time?
Point vs collective: Single point, or a run of unusual values?
Threshold tuning:
precision > recall: Raise threshold (fewer alerts, more misses)
recall > precision: Lower threshold (catch more, more false alarms)
Use F-beta score: Ξ²=0.5 weights precision 2Γ (alert fatigue matters)
ExercisesΒΆ
Implement a CUSUM (Cumulative Sum) detector for gradual drift β compare it to the rolling Z-score.
Train an LSTM autoencoder: encode the time series, decode it, flag high reconstruction error.
Build an alerting system that only fires after N consecutive anomalous points (reduces false alerts).
Evaluate all methods using F1 score and plot a precision-recall curve.
Apply the full pipeline to real data:
yfinancestock prices, flag unusual daily returns.