Solutions: Time Series & Forecasting TrackΒΆ
Worked solutions to all exercises from the time-series-forecasting/ notebooks.
01 β Time Series FundamentalsΒΆ
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import STL
# Synthetic monthly sales with trend and seasonality
np.random.seed(42)
n = 120
dates = pd.date_range('2014-01', periods=n, freq='MS')
trend = np.linspace(100, 300, n)
seasonal = 30 * np.sin(2 * np.pi * np.arange(n) / 12)
noise = np.random.normal(0, 10, n)
sales = trend + seasonal + noise
ts = pd.Series(sales, index=dates, name='sales')
# Exercise 1: Box-Cox transformation to stabilize variance
# Key insight: Box-Cox finds lambda that makes variance constant across the series.
# lambda=0 => log transform; ADF p-value should improve (lower) after transform.
shifted = ts - ts.min() + 1 # Box-Cox requires positive values
transformed, lam = stats.boxcox(shifted)
transformed_series = pd.Series(transformed, index=dates)
adf_orig = adfuller(ts)[1]
adf_bc = adfuller(transformed_series)[1]
print(f'Box-Cox lambda: {lam:.3f}')
print(f'ADF p-value BEFORE Box-Cox: {adf_orig:.4f}')
print(f'ADF p-value AFTER Box-Cox: {adf_bc:.4f}')
print('Interpretation: lower p-value => more evidence of stationarity after transform.')
# Exercise 2: Additive vs Multiplicative STL decomposition
# Key insight: Additive when seasonal amplitude is constant; multiplicative when it
# grows proportionally with the level (heteroscedastic seasonality).
stl_add = STL(ts, period=12, robust=True).fit()
# Multiplicative: decompose log-transformed series then exponentiate
log_ts = np.log(shifted)
stl_mult = STL(log_ts, period=12, robust=True).fit()
fig, axes = plt.subplots(4, 2, figsize=(14, 10))
for i, (comp, label) in enumerate(zip(
[stl_add.observed, stl_add.trend, stl_add.seasonal, stl_add.resid],
['Observed', 'Trend', 'Seasonal', 'Residual'])):
axes[i, 0].plot(comp); axes[i, 0].set_title(f'Additive {label}')
axes[i, 1].plot([stl_mult.observed, stl_mult.trend,
stl_mult.seasonal, stl_mult.resid][i])
axes[i, 1].set_title(f'Multiplicative (log) {label}')
plt.tight_layout()
plt.show()
print('Use additive when seasonal swings stay constant; multiplicative when they grow with trend.')
# Exercise 3: Rolling correlation between two time series
# Key insight: Static correlation misses time-varying relationships;
# rolling correlation reveals when sales and temperature become coupled.
temperature = 15 + 10 * np.sin(2 * np.pi * np.arange(n) / 12) + np.random.normal(0, 2, n)
temp_series = pd.Series(temperature, index=dates, name='temperature')
df = pd.DataFrame({'sales': ts, 'temperature': temp_series})
rolling_corr = df['sales'].rolling(window=12).corr(df['temperature'])
fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
ax1.plot(ts, label='Sales'); ax1.plot(temp_series, label='Temperature'); ax1.legend()
ax2.plot(rolling_corr, color='purple', label='12-month rolling correlation')
ax2.axhline(0, linestyle='--', color='grey')
ax2.set_ylabel('Correlation'); ax2.legend()
plt.tight_layout(); plt.show()
# Exercise 4: CUSUM change point detection
# Key insight: CUSUM accumulates deviations from target mean; when the cumulative
# sum exceeds a threshold h it signals a structural shift in the mean.
def cusum_detector(series, k=0.5, h=5):
"""k=allowance (half slack), h=decision interval threshold (in std units)."""
mu = series.mean()
sigma = series.std()
s_pos = np.zeros(len(series))
s_neg = np.zeros(len(series))
flags = []
for i in range(1, len(series)):
xi = (series.iloc[i] - mu) / sigma
s_pos[i] = max(0, s_pos[i-1] + xi - k)
s_neg[i] = max(0, s_neg[i-1] - xi - k)
if s_pos[i] > h or s_neg[i] > h:
flags.append(series.index[i])
return s_pos, s_neg, flags
# Inject a mean shift at month 60
ts_shift = ts.copy()
ts_shift.iloc[60:] += 80
sp, sn, change_points = cusum_detector(ts_shift)
print(f'Change points detected at: {change_points[:5]}')
plt.figure(figsize=(12, 4))
plt.plot(ts_shift, label='Series with shift')
for cp in change_points[:3]:
plt.axvline(cp, color='red', linestyle='--', alpha=0.7)
plt.title('CUSUM Change Point Detection'); plt.legend(); plt.show()
# Exercise 5: Seasonal subseries plot
# Key insight: Each panel shows the same month across all years, making
# year-over-year trends within each season clearly visible.
df_ss = ts.to_frame()
df_ss['month'] = df_ss.index.month
df_ss['year'] = df_ss.index.year
month_names = ['Jan','Feb','Mar','Apr','May','Jun',
'Jul','Aug','Sep','Oct','Nov','Dec']
fig, axes = plt.subplots(3, 4, figsize=(14, 8), sharey=True)
for m, ax in enumerate(axes.flatten(), start=1):
subset = df_ss[df_ss['month'] == m]
ax.plot(subset['year'], subset['sales'], marker='o')
ax.axhline(subset['sales'].mean(), color='red', linestyle='--', linewidth=1)
ax.set_title(month_names[m-1])
ax.set_xlabel('Year')
fig.suptitle('Seasonal Subseries Plot', fontsize=14)
plt.tight_layout(); plt.show()
02 β ARIMA / SARIMAΒΆ
from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox
from sklearn.metrics import mean_absolute_percentage_error
train, test = ts[:-24], ts[-24:]
# Exercise 1: auto_arima vs manual SARIMA order selection
# Key insight: auto_arima uses AIC/BIC to search the order space; comparing to
# manual order reveals whether domain knowledge can beat grid search.
try:
from pmdarima import auto_arima
auto_model = auto_arima(train, seasonal=True, m=12,
stepwise=True, suppress_warnings=True, error_action='ignore')
print('Auto ARIMA order:', auto_model.order, auto_model.seasonal_order)
except ImportError:
print('pmdarima not installed; using manual order (1,1,1)(1,1,1,12)')
# Manual order based on ACF/PACF inspection
manual_model = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
manual_fc = manual_model.forecast(24)
manual_mape = mean_absolute_percentage_error(test, manual_fc)
print(f'Manual SARIMA MAPE on holdout: {manual_mape:.4f}')
# Exercise 2: Rolling forecast with expanding window (refit every 4 weeks)
# Key insight: Expanding window uses all data up to each point, reducing parameter
# variance but increasing compute; simulates real deployment conditions.
rolling_preds = []
refit_freq = 4 # refit every 4 periods
model_fit = None
for i in range(len(test)):
if i % refit_freq == 0 or model_fit is None:
window = ts[:len(train) + i]
model_fit = SARIMAX(window, order=(1,1,1),
seasonal_order=(1,1,1,12)).fit(disp=False)
pred = model_fit.forecast(1)
rolling_preds.append(pred.iloc[0])
rolling_mape = mean_absolute_percentage_error(test, rolling_preds)
print(f'Rolling SARIMA MAPE (refit every {refit_freq} periods): {rolling_mape:.4f}')
# Exercise 3: Ljung-Box test on SARIMA residuals
# Key insight: If we reject H0 (p < 0.05), the residuals still contain autocorrelation,
# meaning the model has not captured all structure β higher-order terms may be needed.
residuals = manual_model.resid
lb_result = acorr_ljungbox(residuals, lags=[12, 24], return_df=True)
print(lb_result)
print('\nInterpretation:')
for lag, row in lb_result.iterrows():
verdict = 'REJECT (residual autocorrelation remains)' if row['lb_pvalue'] < 0.05 else 'FAIL TO REJECT (residuals look white-noise)'
print(f' Lag {lag}: p={row["lb_pvalue"]:.4f} => {verdict}')
# Exercise 4: SARIMA vs Holt-Winters on the same holdout
# Key insight: Holt-Winters is simpler and faster; SARIMA may edge it out
# when autocorrelation structure is more complex than exponential smoothing captures.
from statsmodels.tsa.holtwinters import ExponentialSmoothing
hw_model = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=12).fit()
hw_fc = hw_model.forecast(24)
hw_mape = mean_absolute_percentage_error(test, hw_fc)
print(f'SARIMA MAPE: {manual_mape:.4f}')
print(f'Holt-Winters MAPE: {hw_mape:.4f}')
plt.figure(figsize=(12, 4))
plt.plot(test.index, test.values, label='Actual')
plt.plot(test.index, manual_fc.values, label=f'SARIMA (MAPE={manual_mape:.3f})')
plt.plot(test.index, hw_fc.values, label=f'Holt-Winters (MAPE={hw_mape:.3f})')
plt.legend(); plt.title('SARIMA vs Holt-Winters'); plt.show()
# Exercise 5: SARIMAX with exogenous promo flag
# Key insight: Adding an exogenous feature lets SARIMAX absorb promotional lift
# that would otherwise show up as unexplained spikes in residuals.
promo = pd.Series(np.random.binomial(1, 0.15, n), index=dates, name='promo')
ts_promo = ts + promo * 40 # promotions add ~40 units
train_p, test_p = ts_promo[:-24], ts_promo[-24:]
promo_train, promo_test = promo[:-24], promo[-24:]
sarimax = SARIMAX(train_p, exog=promo_train.values.reshape(-1,1),
order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
sarimax_fc = sarimax.forecast(24, exog=promo_test.values.reshape(-1,1))
sarima_base = SARIMAX(train_p, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
sarima_fc = sarima_base.forecast(24)
mape_x = mean_absolute_percentage_error(test_p, sarimax_fc)
mape_no = mean_absolute_percentage_error(test_p, sarima_fc)
print(f'SARIMA MAPE (no exog): {mape_no:.4f}')
print(f'SARIMAX MAPE (w/ promo): {mape_x:.4f}')
print(f'Improvement: {(mape_no - mape_x)/mape_no*100:.1f}%')
03 β Prophet ForecastingΒΆ
# Exercises require Prophet. Install with: pip install prophet
try:
from prophet import Prophet
from prophet.diagnostics import cross_validation, performance_metrics
from prophet.plot import plot_cross_validation_metric
PROPHET_AVAILABLE = True
except ImportError:
print('Prophet not installed. Run: pip install prophet')
PROPHET_AVAILABLE = False
# Prepare prophet-format dataframe
df_prophet = pd.DataFrame({'ds': dates, 'y': ts.values})
if PROPHET_AVAILABLE:
# Exercise 1: Cross-validate Prophet β 6-month horizon, 1-month step
# Key insight: CV over multiple cutoffs shows whether MAPE degrades for
# longer horizons, helping decide how far ahead the model is trustworthy.
m = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False)
m.fit(df_prophet)
df_cv = cross_validation(m, initial='730 days', period='30 days', horizon='180 days')
df_perf = performance_metrics(df_cv)
print(df_perf[['horizon','mape']].head(10))
ax = plot_cross_validation_metric(df_cv, metric='mape')
plt.title('Prophet CV: MAPE over forecast horizon'); plt.show()
if PROPHET_AVAILABLE:
# Exercise 2: Changepoint sensitivity β more vs fewer changepoints
# Key insight: More changepoints β flexible trend but risk of overfitting;
# fewer β smoother but may miss real structural breaks.
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
for ax, n_cp, title in zip(axes, [5, 50], ['Few changepoints (n=5)', 'Many changepoints (n=50)']):
m_cp = Prophet(n_changepoints=n_cp, yearly_seasonality=True,
weekly_seasonality=False, daily_seasonality=False)
m_cp.fit(df_prophet)
future = m_cp.make_future_dataframe(periods=24, freq='MS')
fc = m_cp.predict(future)
ax.plot(df_prophet['ds'], df_prophet['y'], label='Actual')
ax.plot(fc['ds'], fc['trend'], label='Trend', color='red')
ax.set_title(title); ax.legend()
plt.tight_layout(); plt.show()
if PROPHET_AVAILABLE:
# Exercise 3: Custom holiday effect for a major shopping event
# Key insight: Explicit holiday components isolate the event's lift so the
# trend doesn't absorb it and future forecasts reflect expected promotions.
black_fridays = pd.DataFrame({
'holiday': 'black_friday',
'ds': pd.to_datetime(['2014-11-28','2015-11-27','2016-11-25',
'2017-11-24','2018-11-23','2019-11-29',
'2020-11-27','2021-11-26','2022-11-25','2023-11-24']),
'lower_window': -2,
'upper_window': 4
})
m_h = Prophet(holidays=black_fridays, yearly_seasonality=True,
weekly_seasonality=False, daily_seasonality=False)
m_h.fit(df_prophet)
future = m_h.make_future_dataframe(periods=24, freq='MS')
fc_h = m_h.predict(future)
holiday_effect = fc_h[fc_h['ds'].dt.month == 11]['black_friday'].mean()
print(f'Estimated Black Friday lift in November: {holiday_effect:.2f} units')
if PROPHET_AVAILABLE:
# Exercise 4: Prophet with temperature as add_regressor
# Key insight: External regressors must be available at forecast time;
# adding temperature captures weather-driven demand not explained by seasonality alone.
df_reg = df_prophet.copy()
df_reg['temperature'] = temp_series.values
m_reg = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False)
m_reg.add_regressor('temperature')
m_reg.fit(df_reg)
future_reg = m_reg.make_future_dataframe(periods=24, freq='MS')
# Forecast future temperature using seasonal pattern
future_reg['temperature'] = 15 + 10 * np.sin(2 * np.pi * np.arange(len(future_reg)) / 12)
fc_reg = m_reg.predict(future_reg)
print('Regressor coefficient (temperature):', m_reg.params['beta'][0])
if PROPHET_AVAILABLE:
# Exercise 5: Prophet ensemble β average 3 models with different priors
# Key insight: Ensembling diversifies model uncertainty; different changepoint
# and seasonality priors cover different structural assumptions.
configs = [
dict(changepoint_prior_scale=0.01, seasonality_prior_scale=1.0),
dict(changepoint_prior_scale=0.10, seasonality_prior_scale=5.0),
dict(changepoint_prior_scale=0.50, seasonality_prior_scale=10.0),
]
ensemble_preds = []
for cfg in configs:
m_e = Prophet(yearly_seasonality=True, weekly_seasonality=False,
daily_seasonality=False, **cfg)
m_e.fit(df_prophet)
future_e = m_e.make_future_dataframe(periods=24, freq='MS')
fc_e = m_e.predict(future_e)
ensemble_preds.append(fc_e['yhat'].values)
ensemble_mean = np.mean(ensemble_preds, axis=0)
ensemble_forecast = pd.Series(ensemble_mean[-24:], index=test.index)
ens_mape = mean_absolute_percentage_error(test, ensemble_forecast)
print(f'Prophet Ensemble MAPE: {ens_mape:.4f}')
04 β LSTM Time SeriesΒΆ
import warnings
warnings.filterwarnings('ignore')
try:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
TORCH_AVAILABLE = True
except ImportError:
print('PyTorch not installed. Run: pip install torch')
TORCH_AVAILABLE = False
from sklearn.preprocessing import MinMaxScaler
# Scale data
scaler = MinMaxScaler()
scaled = scaler.fit_transform(ts.values.reshape(-1, 1)).flatten()
def make_sequences(data, look_back):
X, y = [], []
for i in range(len(data) - look_back):
X.append(data[i:i+look_back])
y.append(data[i+look_back])
return np.array(X), np.array(y)
if TORCH_AVAILABLE:
class SimpleLSTM(nn.Module):
def __init__(self, input_size=1, hidden=32, dropout=0.0):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden, batch_first=True)
self.drop = nn.Dropout(dropout)
self.fc = nn.Linear(hidden, 1)
def forward(self, x):
out, _ = self.lstm(x)
return self.fc(self.drop(out[:, -1, :]))
def train_lstm(X_tr, y_tr, X_val, y_val, input_size=1, epochs=30, dropout=0.0, clip_norm=None):
model = SimpleLSTM(input_size=input_size, dropout=dropout)
opt = torch.optim.Adam(model.parameters(), lr=1e-3)
criterion = nn.MSELoss()
Xt = torch.tensor(X_tr, dtype=torch.float32).unsqueeze(-1) if input_size==1 else torch.tensor(X_tr, dtype=torch.float32)
yt = torch.tensor(y_tr, dtype=torch.float32).unsqueeze(-1)
Xv = torch.tensor(X_val, dtype=torch.float32).unsqueeze(-1) if input_size==1 else torch.tensor(X_val, dtype=torch.float32)
yv = torch.tensor(y_val, dtype=torch.float32).unsqueeze(-1)
val_losses = []
for _ in range(epochs):
model.train()
opt.zero_grad()
loss = criterion(model(Xt), yt)
loss.backward()
if clip_norm:
nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
opt.step()
model.eval()
with torch.no_grad():
val_losses.append(criterion(model(Xv), yv).item())
return model, val_losses
# Exercise 1: Compare look_back values
# Key insight: Short look_back misses long seasonal cycles; too long adds noise.
# Optimal look_back often aligns with the dominant seasonality period.
split = int(0.8 * len(scaled))
rmse_by_lb = {}
for lb in [7, 14, 30, 60]:
if lb >= split:
continue
X, y = make_sequences(scaled, lb)
X_tr, y_tr = X[:split-lb], y[:split-lb]
X_val, y_val = X[split-lb:], y[split-lb:]
if len(X_tr) < 5 or len(X_val) < 2:
continue
_, val_losses = train_lstm(X_tr, y_tr, X_val, y_val, epochs=20)
rmse_by_lb[lb] = np.sqrt(val_losses[-1])
print('Val RMSE by look_back:', rmse_by_lb)
plt.figure(figsize=(7, 4))
plt.plot(list(rmse_by_lb.keys()), list(rmse_by_lb.values()), marker='o')
plt.xlabel('look_back'); plt.ylabel('Val RMSE')
plt.title('LSTM: Val RMSE vs look_back'); plt.show()
if TORCH_AVAILABLE:
# Exercise 2: Add day-of-week and month-of-year as extra features
# Key insight: Calendar features inject domain knowledge about periodicity
# that the LSTM would otherwise need many epochs to learn from raw values.
look_back = 12
month_feat = (dates.month - 1) / 11.0 # normalize 0-1
dow_feat = dates.dayofweek / 6.0
feat_matrix = np.column_stack([scaled, month_feat, dow_feat])
X_mf, y_mf = [], []
for i in range(len(feat_matrix) - look_back):
X_mf.append(feat_matrix[i:i+look_back])
y_mf.append(scaled[i+look_back])
X_mf, y_mf = np.array(X_mf), np.array(y_mf)
split2 = int(0.8 * len(X_mf))
_, vl_base = train_lstm(X_mf[:split2, :, 0], y_mf[:split2], X_mf[split2:, :, 0], y_mf[split2:], input_size=1, epochs=30)
class MultiLSTM(nn.Module):
def __init__(self, input_size=3, hidden=32):
super().__init__()
self.lstm = nn.LSTM(input_size, hidden, batch_first=True)
self.fc = nn.Linear(hidden, 1)
def forward(self, x): out, _ = self.lstm(x); return self.fc(out[:, -1, :])
model_mf = MultiLSTM(input_size=3)
opt = torch.optim.Adam(model_mf.parameters(), lr=1e-3)
criterion = nn.MSELoss()
Xt = torch.tensor(X_mf[:split2], dtype=torch.float32)
yt = torch.tensor(y_mf[:split2], dtype=torch.float32).unsqueeze(-1)
Xv = torch.tensor(X_mf[split2:], dtype=torch.float32)
yv = torch.tensor(y_mf[split2:], dtype=torch.float32).unsqueeze(-1)
vl_multi = []
for _ in range(30):
model_mf.train(); opt.zero_grad()
criterion(model_mf(Xt), yt).backward(); opt.step()
model_mf.eval()
with torch.no_grad(): vl_multi.append(criterion(model_mf(Xv), yv).item())
print(f'Final val loss β univariate: {vl_base[-1]:.6f}, with calendar features: {vl_multi[-1]:.6f}')
if TORCH_AVAILABLE:
# Exercise 3: Dropout tuning
# Key insight: Dropout acts as regularization; too little β overfit,
# too much β underfit. Optimal value minimizes the val/train loss gap.
look_back = 12
X12, y12 = make_sequences(scaled, look_back)
sp = int(0.8 * len(X12))
Xtr12, ytr12 = X12[:sp], y12[:sp]
Xv12, yv12 = X12[sp:], y12[sp:]
dropout_results = {}
for dr in [0.0, 0.1, 0.2, 0.3]:
_, vl = train_lstm(Xtr12, ytr12, Xv12, yv12, epochs=40, dropout=dr)
dropout_results[dr] = vl
plt.figure(figsize=(10, 4))
for dr, vl in dropout_results.items():
plt.plot(vl, label=f'dropout={dr}')
plt.xlabel('Epoch'); plt.ylabel('Val Loss')
plt.title('LSTM Dropout Tuning'); plt.legend(); plt.show()
if TORCH_AVAILABLE:
# Exercise 4: Walk-forward evaluation with monthly refitting
# Key insight: Walk-forward avoids look-ahead bias; retraining every 30 steps
# keeps the model fresh without the cost of per-step retraining.
look_back = 12
wf_preds = []
refit_every = 10 # every 10 steps (reduced for demo speed)
test_start = int(0.8 * len(scaled))
for i in range(test_start, min(test_start + 20, len(scaled))):
if (i - test_start) % refit_every == 0:
X_wf, y_wf = make_sequences(scaled[:i], look_back)
if len(X_wf) > 5:
wf_model, _ = train_lstm(X_wf[:-2], y_wf[:-2], X_wf[-2:], y_wf[-2:], epochs=15)
seq = torch.tensor(scaled[i-look_back:i], dtype=torch.float32).unsqueeze(0).unsqueeze(-1)
wf_model.eval()
with torch.no_grad():
wf_preds.append(wf_model(seq).item())
actual_wf = scaled[test_start:test_start + len(wf_preds)]
rmse_wf = np.sqrt(np.mean((np.array(wf_preds) - actual_wf)**2))
print(f'Walk-forward RMSE (scaled): {rmse_wf:.5f}')
if TORCH_AVAILABLE:
# Exercise 5: Gradient clipping β compare training stability
# Key insight: Without clipping, exploding gradients cause loss spikes;
# max_norm=1.0 keeps updates bounded and smooths the training curve.
look_back = 12
X12, y12 = make_sequences(scaled, look_back)
sp = int(0.8 * len(X12))
_, vl_no_clip = train_lstm(X12[:sp], y12[:sp], X12[sp:], y12[sp:], epochs=50, clip_norm=None)
_, vl_clipped = train_lstm(X12[:sp], y12[:sp], X12[sp:], y12[sp:], epochs=50, clip_norm=1.0)
plt.figure(figsize=(10, 4))
plt.plot(vl_no_clip, label='No gradient clipping', alpha=0.8)
plt.plot(vl_clipped, label='Gradient clipping max_norm=1.0', alpha=0.8)
plt.xlabel('Epoch'); plt.ylabel('Val Loss')
plt.title('Training Stability: Gradient Clipping'); plt.legend(); plt.show()
05 β Anomaly DetectionΒΆ
from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import f1_score, average_precision_score
# Synthetic series with injected anomalies
np.random.seed(0)
N = 200
base_series = np.random.normal(0, 1, N)
anomaly_idx = np.sort(np.random.choice(N, 20, replace=False))
anom_series = base_series.copy()
anom_series[anomaly_idx] += np.random.choice([-6, 6], len(anomaly_idx))
true_labels = np.zeros(N, dtype=int)
true_labels[anomaly_idx] = 1
# Exercise 1: CUSUM anomaly detector
# Key insight: CUSUM flags points where the cumulative deviation from the mean
# exceeds a threshold, making it sensitive to sustained shifts rather than single spikes.
def cusum_anomaly(series, k=1.0, h=4.0):
mu, sigma = series.mean(), series.std()
s_pos, s_neg = np.zeros(len(series)), np.zeros(len(series))
flags = np.zeros(len(series), dtype=int)
for i in range(1, len(series)):
xi = (series[i] - mu) / sigma
s_pos[i] = max(0, s_pos[i-1] + xi - k)
s_neg[i] = max(0, s_neg[i-1] - xi - k)
if s_pos[i] > h or s_neg[i] > h:
flags[i] = 1
return flags
cusum_flags = cusum_anomaly(anom_series)
print(f'CUSUM F1: {f1_score(true_labels, cusum_flags):.3f}')
if TORCH_AVAILABLE:
# Exercise 2: LSTM autoencoder anomaly detection
# Key insight: The autoencoder learns normal patterns; high reconstruction error
# indicates the window deviates from what was seen during training.
window = 30
class LSTMAutoencoder(nn.Module):
def __init__(self, hidden=16):
super().__init__()
self.enc = nn.LSTM(1, hidden, batch_first=True)
self.dec = nn.LSTM(hidden, 1, batch_first=True)
def forward(self, x):
_, (h, c) = self.enc(x)
dec_input = h.permute(1, 0, 2).expand(-1, x.size(1), -1)
out, _ = self.dec(dec_input)
return out
normal_idx = np.where(true_labels == 0)[0]
windows_normal = []
for i in range(0, len(normal_idx) - window, 5):
w = normal_idx[i:i+window]
if len(w) == window:
windows_normal.append(anom_series[w])
if len(windows_normal) > 0:
Xae = torch.tensor(np.array(windows_normal), dtype=torch.float32).unsqueeze(-1)
ae_model = LSTMAutoencoder()
ae_opt = torch.optim.Adam(ae_model.parameters(), lr=1e-3)
ae_crit = nn.MSELoss()
for _ in range(30):
ae_model.train(); ae_opt.zero_grad()
ae_crit(ae_model(Xae), Xae).backward(); ae_opt.step()
ae_model.eval()
recon_errors = np.zeros(N)
with torch.no_grad():
for i in range(window, N):
seq = torch.tensor(anom_series[i-window:i], dtype=torch.float32).unsqueeze(0).unsqueeze(-1)
rec = ae_model(seq)
recon_errors[i] = nn.MSELoss()(rec, seq).item()
threshold = np.percentile(recon_errors[recon_errors > 0], 90)
ae_flags = (recon_errors > threshold).astype(int)
print(f'LSTM Autoencoder F1: {f1_score(true_labels, ae_flags):.3f}')
# Exercise 3: Consecutive anomaly filter
# Key insight: Requiring 3+ consecutive anomalous points eliminates isolated false positives
# caused by random noise spikes, at the cost of detecting slowly evolving anomalies.
def consecutive_filter(flags, min_consecutive=3):
filtered = np.zeros_like(flags)
count = 0
for i, f in enumerate(flags):
if f == 1:
count += 1
if count >= min_consecutive:
filtered[i-min_consecutive+1:i+1] = 1
else:
count = 0
return filtered
# Inject a block anomaly (3 consecutive)
block_series = base_series.copy()
block_series[50:53] += 8
block_true = np.zeros(N, dtype=int); block_true[50:53] = 1
z_scores = np.abs((block_series - block_series.mean()) / block_series.std())
raw_flags = (z_scores > 2.5).astype(int)
filtered_flags = consecutive_filter(raw_flags, min_consecutive=3)
print(f'Raw Z-score F1: {f1_score(block_true, raw_flags):.3f}')
print(f'After consecutive filter F1: {f1_score(block_true, filtered_flags):.3f}')
# Exercise 4: Compare all methods using F1 and PR-AUC
# Key insight: F1 balances precision and recall at a single threshold;
# PR-AUC captures performance across all thresholds β prefer it for imbalanced data.
from sklearn.preprocessing import StandardScaler
X_ad = anom_series.reshape(-1, 1)
# Z-score
z_flags = (np.abs((anom_series - anom_series.mean()) / anom_series.std()) > 3).astype(int)
# Rolling Z-score
s_roll = pd.Series(anom_series)
roll_z = (s_roll - s_roll.rolling(20, min_periods=5).mean()) / s_roll.rolling(20, min_periods=5).std()
rz_flags = (roll_z.abs().fillna(0) > 2.5).astype(int).values
# IQR
q1, q3 = np.percentile(anom_series, 25), np.percentile(anom_series, 75)
iqr_flags = ((anom_series < q1 - 1.5*(q3-q1)) | (anom_series > q3 + 1.5*(q3-q1))).astype(int)
# Isolation Forest
if_model = IsolationForest(contamination=0.1, random_state=42).fit(X_ad)
if_flags = (if_model.predict(X_ad) == -1).astype(int)
if_scores = -if_model.score_samples(X_ad)
# LOF
lof_model = LocalOutlierFactor(contamination=0.1)
lof_flags = (lof_model.fit_predict(X_ad) == -1).astype(int)
print(f'{"Method":<18} {"F1":>6} {"PR-AUC":>8}')
print('-' * 35)
for name, flags in [('Z-score', z_flags), ('Rolling-Z', rz_flags),
('IQR', iqr_flags), ('IsolationForest', if_flags),
('LOF', lof_flags), ('CUSUM', cusum_flags)]:
f1 = f1_score(true_labels, flags, zero_division=0)
ap = average_precision_score(true_labels, flags)
print(f'{name:<18} {f1:>6.3f} {ap:>8.3f}')
# Exercise 5: Apply pipeline to synthetic stock returns
# Key insight: Daily returns are approximately normal; outlier days (crashes/rallies)
# stand out clearly in standardized space, making Z-score effective for equities.
try:
import yfinance as yf
spy = yf.download('SPY', start='2020-01-01', end='2023-12-31', progress=False)
returns = spy['Adj Close'].pct_change().dropna().values
print('Using real SPY data')
except Exception:
np.random.seed(1)
returns = np.random.normal(0.0005, 0.012, 756)
returns[[50, 200, 400, 600]] += [-0.08, -0.05, 0.06, -0.10] # inject crashes
print('Using synthetic stock return data')
z_ret = np.abs((returns - returns.mean()) / returns.std())
ret_flags = (z_ret > 3.5).astype(int)
flagged_days = np.where(ret_flags)[0]
print(f'Flagged {len(flagged_days)} unusual return days')
plt.figure(figsize=(12, 4))
plt.plot(returns, alpha=0.7, label='Daily returns')
plt.scatter(flagged_days, returns[flagged_days], color='red', zorder=5, label='Anomalies')
plt.title('Anomaly Detection on Stock Returns'); plt.legend(); plt.show()
06 β Forecasting CompetitionΒΆ
# Exercise 1: Add LightGBM with lag features as 4th competitor
# Key insight: Gradient boosted trees on lag features often outperform classical
# time series models when nonlinear feature interactions exist.
try:
import lightgbm as lgb
LGB_AVAILABLE = True
except ImportError:
print('lightgbm not installed. Run: pip install lightgbm'); LGB_AVAILABLE = False
from sklearn.metrics import mean_absolute_error
if LGB_AVAILABLE:
def make_lag_features(series, lags=[1,2,3,6,12]):
df_lag = pd.DataFrame({'y': series})
for l in lags:
df_lag[f'lag_{l}'] = df_lag['y'].shift(l)
df_lag['month'] = series.index.month
return df_lag.dropna()
df_lag = make_lag_features(ts)
tr_cut = int(0.8 * len(df_lag))
X_lg = df_lag.drop('y', axis=1)
y_lg = df_lag['y']
lgb_model = lgb.LGBMRegressor(n_estimators=200, learning_rate=0.05, random_state=42)
lgb_model.fit(X_lg.iloc[:tr_cut], y_lg.iloc[:tr_cut])
lgb_preds = lgb_model.predict(X_lg.iloc[tr_cut:])
lgb_mae = mean_absolute_error(y_lg.iloc[tr_cut:], lgb_preds)
print(f'LightGBM MAE: {lgb_mae:.3f}')
leaderboard = pd.DataFrame({'Model': ['SARIMA', 'Holt-Winters', 'LightGBM'],
'MAE': [mean_absolute_error(test, manual_fc),
mean_absolute_error(test, hw_fc),
lgb_mae]})
print(leaderboard.sort_values('MAE').to_string(index=False))
# Exercise 2: TimeSeriesSplit cross-validation for ARIMA
# Key insight: Standard k-fold leaks future data into training;
# TimeSeriesSplit respects temporal ordering for unbiased CV MAE.
from sklearn.model_selection import TimeSeriesSplit
tscv = TimeSeriesSplit(n_splits=5)
cv_maes = []
for tr_idx, val_idx in tscv.split(ts):
if len(tr_idx) < 24:
continue
tr_cv, val_cv = ts.iloc[tr_idx], ts.iloc[val_idx]
try:
m_cv = SARIMAX(tr_cv, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
fc_cv = m_cv.forecast(len(val_cv))
cv_maes.append(mean_absolute_error(val_cv, fc_cv))
except Exception as e:
pass
holdout_mae = mean_absolute_error(test, manual_fc)
print(f'ARIMA CV MAE (avg): {np.mean(cv_maes):.3f}')
print(f'ARIMA Holdout MAE: {holdout_mae:.3f}')
# Exercise 3: SARIMAX with exogenous promo β already computed in Ex 02-05 above
# Recap for the competition leaderboard context.
print(f'SARIMA MAE (no exog): {mean_absolute_error(test_p, sarima_fc):.3f}')
print(f'SARIMAX MAE (w/ promo): {mean_absolute_error(test_p, sarimax_fc):.3f}')
if TORCH_AVAILABLE:
# Exercise 4: Conformal prediction intervals for LSTM
# Key insight: Conformal prediction gives distribution-free coverage guarantees;
# calibrate residuals on a held-out set, then apply the quantile at inference.
look_back = 12
X12, y12 = make_sequences(scaled, look_back)
n_total = len(X12)
n_train = int(0.6 * n_total)
n_cal = int(0.2 * n_total)
Xtr_, ytr_ = X12[:n_train], y12[:n_train]
Xca_, yca_ = X12[n_train:n_train+n_cal], y12[n_train:n_train+n_cal]
Xte_, yte_ = X12[n_train+n_cal:], y12[n_train+n_cal:]
conf_model, _ = train_lstm(Xtr_, ytr_, Xca_, yca_, epochs=30)
conf_model.eval()
with torch.no_grad():
cal_preds = conf_model(torch.tensor(Xca_, dtype=torch.float32).unsqueeze(-1)).numpy().flatten()
test_preds = conf_model(torch.tensor(Xte_, dtype=torch.float32).unsqueeze(-1)).numpy().flatten()
cal_residuals = np.abs(cal_preds - yca_)
alpha = 0.1 # target 90% coverage
quantile = np.quantile(cal_residuals, 1 - alpha)
coverage = np.mean(np.abs(test_preds - yte_) <= quantile)
print(f'Conformal interval half-width: {quantile:.5f}')
print(f'Empirical coverage on test set: {coverage:.3f} (target: {1-alpha:.2f})')
# Exercise 5: Horizon-aware ensemble (meta-model selects best forecaster per horizon step)
# Key insight: Different models may excel at different horizons;
# a meta-model trained on per-horizon errors learns to route predictions optimally.
from sklearn.linear_model import Ridge
# Use simple models: naive (last value), rolling mean, linear extrapolation
def naive_fc(history, h): return np.full(h, history.iloc[-1])
def roll_mean_fc(history, h, w=6): return np.full(h, history.iloc[-w:].mean())
def linreg_fc(history, h):
x = np.arange(len(history)); y = history.values
c = np.polyfit(x, y, 1)
return c[0] * np.arange(len(history), len(history)+h) + c[1]
H = 12
meta_X, meta_y = [], []
for start in range(60, len(ts) - H, 6):
hist = ts.iloc[:start]
actual_h = ts.iloc[start:start+H].values
preds = np.stack([naive_fc(hist, H), roll_mean_fc(hist, H), linreg_fc(hist, H)])
for h_step in range(H):
meta_X.append([preds[0,h_step], preds[1,h_step], preds[2,h_step], h_step])
meta_y.append(actual_h[h_step])
meta_X, meta_y = np.array(meta_X), np.array(meta_y)
split_meta = int(0.8 * len(meta_X))
meta_model = Ridge().fit(meta_X[:split_meta], meta_y[:split_meta])
meta_preds = meta_model.predict(meta_X[split_meta:])
meta_mae = mean_absolute_error(meta_y[split_meta:], meta_preds)
print(f'Horizon-aware meta-ensemble MAE: {meta_mae:.3f}')