Anomaly Detection: Finding the Signal in Noisy Time SeriesΒΆ

Anomalies are events that don’t fit the expected pattern β€” a spike in server errors, an unusual drop in sales, a sensor reading that’s physically impossible. This notebook covers statistical, ML, and deep learning approaches to finding them.

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.preprocessing import StandardScaler
from sklearn.neighbors import LocalOutlierFactor
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)

# Synthetic server metrics time series
n = 500
t = np.arange(n)

# Normal behavior: daily and weekly patterns
daily    = 100 + 20 * np.sin(2 * np.pi * t / 24)
weekly   = 15 * np.sin(2 * np.pi * t / (24 * 7))
noise    = np.random.normal(0, 5, n)
normal   = daily + weekly + noise

# Inject anomalies
anomalies = normal.copy()
anomalies[80]  += 120   # Spike
anomalies[200] -= 80    # Dip
anomalies[300:310] += 60  # Sustained elevation
anomalies[420] = 0      # Drop to zero

true_anomaly_idx = [80, 200, *range(300, 310), 420]
true_labels = np.zeros(n, dtype=int)
true_labels[true_anomaly_idx] = 1

ts = pd.Series(anomalies, name='server_requests')
print(f'Series: {n} points, {true_labels.sum()} injected anomalies')

1. Statistical Methods β€” Fast and InterpretableΒΆ

# Method 1: Z-score (global) β€” detects global outliers
def zscore_anomaly(series: pd.Series, threshold: float = 3.0) -> np.ndarray:
    z = np.abs(stats.zscore(series))
    return (z > threshold).astype(int)

# Method 2: Rolling Z-score β€” detects local anomalies relative to recent window
def rolling_zscore_anomaly(series: pd.Series, window: int = 50, threshold: float = 3.0) -> np.ndarray:
    rolling_mean = series.rolling(window, center=True, min_periods=1).mean()
    rolling_std  = series.rolling(window, center=True, min_periods=1).std().fillna(1)
    z = np.abs((series - rolling_mean) / rolling_std)
    return (z > threshold).astype(int)

# Method 3: IQR-based (robust to outliers in baseline)
def iqr_anomaly(series: pd.Series, factor: float = 3.0) -> np.ndarray:
    Q1, Q3 = series.quantile(0.25), series.quantile(0.75)
    IQR = Q3 - Q1
    return ((series < Q1 - factor*IQR) | (series > Q3 + factor*IQR)).astype(int)

# Method 4: Moving average Β± bands
def moving_avg_anomaly(series: pd.Series, window: int = 30, sigma: float = 3.0) -> np.ndarray:
    ma = series.rolling(window, min_periods=1).mean()
    std = series.rolling(window, min_periods=1).std().fillna(0)
    lower = ma - sigma * std
    upper = ma + sigma * std
    return ((series < lower) | (series > upper)).astype(int)

results = {
    'Z-score (global)':    zscore_anomaly(ts),
    'Rolling Z-score':     rolling_zscore_anomaly(ts),
    'IQR (3Γ—)':           iqr_anomaly(ts),
    'Moving Avg Bands':    moving_avg_anomaly(ts),
}

print('Detection summary:')
print(f'{"Method":<25} {"Detected":<10} {"True Positives":<15} {"False Positives"}')
print('-' * 65)
for method, preds in results.items():
    det = preds.sum()
    tp = (preds & true_labels).sum()
    fp = (preds & ~true_labels).sum()
    print(f'{method:<25} {det:<10} {tp:<15} {fp}')

2. Isolation Forest β€” ML-Based Anomaly DetectionΒΆ

# Create features for ML approach
def create_ts_features(series: pd.Series, window: int = 20) -> pd.DataFrame:
    """Feature engineering for anomaly detection."""
    df = pd.DataFrame({'value': series})
    df['rolling_mean'] = series.rolling(window, min_periods=1).mean()
    df['rolling_std']  = series.rolling(window, min_periods=1).std().fillna(0)
    df['residual']     = series - df['rolling_mean']
    df['z_score']      = df['residual'] / (df['rolling_std'] + 1e-6)
    df['lag1']         = series.shift(1).fillna(series.mean())
    df['lag2']         = series.shift(2).fillna(series.mean())
    df['diff1']        = series.diff().fillna(0)
    return df

X_features = create_ts_features(ts)

# Isolation Forest: builds random trees, anomalies have shorter paths
iforest = IsolationForest(
    contamination=0.05,   # Expected proportion of anomalies
    n_estimators=100,
    random_state=42,
)
iforest_labels = (iforest.fit_predict(X_features) == -1).astype(int)
iforest_scores = -iforest.score_samples(X_features)  # Higher = more anomalous

# LOF: Local Outlier Factor
lof = LocalOutlierFactor(n_neighbors=20, contamination=0.05)
lof_labels = (lof.fit_predict(X_features) == -1).astype(int)
lof_scores = -lof.negative_outlier_factor_

for method, labels in [('Isolation Forest', iforest_labels), ('LOF', lof_labels)]:
    tp = (labels & true_labels).sum()
    fp = (labels & ~true_labels).sum()
    print(f'{method}: detected={labels.sum()}, TP={tp}, FP={fp}')

# Visualize
fig, axes = plt.subplots(3, 1, figsize=(14, 10), sharex=True)

axes[0].plot(ts.values, color='steelblue', linewidth=0.8, alpha=0.8)
axes[0].scatter(np.where(true_labels)[0], ts.values[true_labels==1],
                color='red', s=50, zorder=5, label='True anomaly')
axes[0].set_title('Time Series with True Anomalies')
axes[0].legend()

axes[1].plot(iforest_scores, color='orange', linewidth=1)
threshold = np.percentile(iforest_scores, 95)
axes[1].axhline(threshold, color='red', linestyle='--', label=f'Threshold (95th pct)')
axes[1].scatter(np.where(iforest_labels)[0], iforest_scores[iforest_labels==1],
                color='red', s=50, zorder=5, label='Detected')
axes[1].set_title('Isolation Forest Anomaly Scores')
axes[1].legend()

axes[2].plot(lof_scores / lof_scores.max(), color='purple', linewidth=1)
lof_threshold = np.percentile(lof_scores / lof_scores.max(), 95)
axes[2].axhline(lof_threshold, color='red', linestyle='--')
axes[2].scatter(np.where(lof_labels)[0], (lof_scores/lof_scores.max())[lof_labels==1],
                color='red', s=50, zorder=5)
axes[2].set_title('LOF Scores (Normalized)')

plt.tight_layout()
plt.show()

3. Residual-Based Detection β€” Predict then DetectΒΆ

# Predict expected values β†’ flag large residuals
# Works well when you have a good forecasting model

from sklearn.linear_model import LinearRegression

# Fit a simple model on normal data
# In production: train on known-clean data, score production data
X_clean = X_features.values[:400]  # Assume first 400 are mostly clean
y_clean = ts.values[:400]

lr = LinearRegression()
lr.fit(X_clean, y_clean)

# Predict on full series
y_pred = lr.predict(X_features.values)
residuals = ts.values - y_pred

# Anomaly = residual > threshold
residual_std = residuals[:400].std()  # Std from clean period
residual_threshold = 3 * residual_std
residual_labels = (np.abs(residuals) > residual_threshold).astype(int)

tp = (residual_labels & true_labels).sum()
fp = (residual_labels & ~true_labels).sum()
print(f'Residual-based detection: TP={tp}, FP={fp}')

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(13, 7), sharex=True)

ax1.plot(ts.values, color='steelblue', linewidth=0.8)
ax1.plot(y_pred, color='orange', linewidth=1.5, label='Predicted')
ax1.scatter(np.where(true_labels)[0], ts.values[true_labels==1],
            color='red', s=60, zorder=5, label='True anomaly')
ax1.set_title('Predicted vs Actual')
ax1.legend()

ax2.fill_between(range(n), -residual_threshold, residual_threshold,
                 alpha=0.2, color='green', label='Normal range')
ax2.plot(residuals, color='steelblue', linewidth=0.8)
ax2.scatter(np.where(residual_labels)[0], residuals[residual_labels==1],
            color='red', s=60, zorder=5, label='Detected anomaly')
ax2.axhline(0, color='black', linewidth=0.5)
ax2.set_title('Residuals β€” Detect by Exceeding Β±3Οƒ')
ax2.legend()

plt.tight_layout()
plt.show()

Anomaly Detection Cheat SheetΒΆ

Method                  Best For                  Complexity
────────────────────────────────────────────────────────────
Z-score                 Simple global outliers    β˜…β˜†β˜†
Rolling Z-score         Local outliers in trends  β˜…β˜…β˜†
Moving Avg Bands        Business dashboards       β˜…β˜†β˜†
Isolation Forest        Multivariate, no labels   β˜…β˜…β˜†
LOF                     Density-based clusters    β˜…β˜…β˜†
Residual (model-based)  Known pattern + deviation β˜…β˜…β˜…
Autoencoders            Complex sequences         β˜…β˜…β˜…β˜…

Key decisions:
  Supervised vs unsupervised: Do you have labeled anomalies?
  Point vs contextual: Is the value out of range, or just unusual for this time?
  Point vs collective: Single point, or a run of unusual values?

Threshold tuning:
  precision > recall: Raise threshold (fewer alerts, more misses)
  recall > precision: Lower threshold (catch more, more false alarms)
  Use F-beta score: Ξ²=0.5 weights precision 2Γ— (alert fatigue matters)

ExercisesΒΆ

  1. Implement a CUSUM (Cumulative Sum) detector for gradual drift β€” compare it to the rolling Z-score.

  2. Train an LSTM autoencoder: encode the time series, decode it, flag high reconstruction error.

  3. Build an alerting system that only fires after N consecutive anomalous points (reduces false alerts).

  4. Evaluate all methods using F1 score and plot a precision-recall curve.

  5. Apply the full pipeline to real data: yfinance stock prices, flag unusual daily returns.