Solutions: Time Series & Forecasting TrackΒΆ

Worked solutions to all exercises from the time-series-forecasting/ notebooks.

01 β€” Time Series FundamentalsΒΆ

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy import stats
from statsmodels.tsa.stattools import adfuller
from statsmodels.tsa.seasonal import STL

# Synthetic monthly sales with trend and seasonality
np.random.seed(42)
n = 120
dates = pd.date_range('2014-01', periods=n, freq='MS')
trend = np.linspace(100, 300, n)
seasonal = 30 * np.sin(2 * np.pi * np.arange(n) / 12)
noise = np.random.normal(0, 10, n)
sales = trend + seasonal + noise
ts = pd.Series(sales, index=dates, name='sales')

# Exercise 1: Box-Cox transformation to stabilize variance
# Key insight: Box-Cox finds lambda that makes variance constant across the series.
# lambda=0 => log transform; ADF p-value should improve (lower) after transform.
shifted = ts - ts.min() + 1  # Box-Cox requires positive values
transformed, lam = stats.boxcox(shifted)
transformed_series = pd.Series(transformed, index=dates)

adf_orig = adfuller(ts)[1]
adf_bc   = adfuller(transformed_series)[1]
print(f'Box-Cox lambda: {lam:.3f}')
print(f'ADF p-value  BEFORE Box-Cox: {adf_orig:.4f}')
print(f'ADF p-value  AFTER  Box-Cox: {adf_bc:.4f}')
print('Interpretation: lower p-value => more evidence of stationarity after transform.')
# Exercise 2: Additive vs Multiplicative STL decomposition
# Key insight: Additive when seasonal amplitude is constant; multiplicative when it
# grows proportionally with the level (heteroscedastic seasonality).

stl_add  = STL(ts, period=12, robust=True).fit()

# Multiplicative: decompose log-transformed series then exponentiate
log_ts   = np.log(shifted)
stl_mult = STL(log_ts, period=12, robust=True).fit()

fig, axes = plt.subplots(4, 2, figsize=(14, 10))
for i, (comp, label) in enumerate(zip(
    [stl_add.observed, stl_add.trend, stl_add.seasonal, stl_add.resid],
    ['Observed', 'Trend', 'Seasonal', 'Residual'])):
    axes[i, 0].plot(comp); axes[i, 0].set_title(f'Additive {label}')
    axes[i, 1].plot([stl_mult.observed, stl_mult.trend,
                     stl_mult.seasonal, stl_mult.resid][i])
    axes[i, 1].set_title(f'Multiplicative (log) {label}')
plt.tight_layout()
plt.show()
print('Use additive when seasonal swings stay constant; multiplicative when they grow with trend.')
# Exercise 3: Rolling correlation between two time series
# Key insight: Static correlation misses time-varying relationships;
# rolling correlation reveals when sales and temperature become coupled.

temperature = 15 + 10 * np.sin(2 * np.pi * np.arange(n) / 12) + np.random.normal(0, 2, n)
temp_series = pd.Series(temperature, index=dates, name='temperature')

df = pd.DataFrame({'sales': ts, 'temperature': temp_series})
rolling_corr = df['sales'].rolling(window=12).corr(df['temperature'])

fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(12, 6), sharex=True)
ax1.plot(ts, label='Sales'); ax1.plot(temp_series, label='Temperature'); ax1.legend()
ax2.plot(rolling_corr, color='purple', label='12-month rolling correlation')
ax2.axhline(0, linestyle='--', color='grey')
ax2.set_ylabel('Correlation'); ax2.legend()
plt.tight_layout(); plt.show()
# Exercise 4: CUSUM change point detection
# Key insight: CUSUM accumulates deviations from target mean; when the cumulative
# sum exceeds a threshold h it signals a structural shift in the mean.

def cusum_detector(series, k=0.5, h=5):
    """k=allowance (half slack), h=decision interval threshold (in std units)."""
    mu    = series.mean()
    sigma = series.std()
    s_pos = np.zeros(len(series))
    s_neg = np.zeros(len(series))
    flags = []
    for i in range(1, len(series)):
        xi     = (series.iloc[i] - mu) / sigma
        s_pos[i] = max(0, s_pos[i-1] + xi - k)
        s_neg[i] = max(0, s_neg[i-1] - xi - k)
        if s_pos[i] > h or s_neg[i] > h:
            flags.append(series.index[i])
    return s_pos, s_neg, flags

# Inject a mean shift at month 60
ts_shift = ts.copy()
ts_shift.iloc[60:] += 80

sp, sn, change_points = cusum_detector(ts_shift)
print(f'Change points detected at: {change_points[:5]}')

plt.figure(figsize=(12, 4))
plt.plot(ts_shift, label='Series with shift')
for cp in change_points[:3]:
    plt.axvline(cp, color='red', linestyle='--', alpha=0.7)
plt.title('CUSUM Change Point Detection'); plt.legend(); plt.show()
# Exercise 5: Seasonal subseries plot
# Key insight: Each panel shows the same month across all years, making
# year-over-year trends within each season clearly visible.

df_ss = ts.to_frame()
df_ss['month'] = df_ss.index.month
df_ss['year']  = df_ss.index.year

month_names = ['Jan','Feb','Mar','Apr','May','Jun',
               'Jul','Aug','Sep','Oct','Nov','Dec']
fig, axes = plt.subplots(3, 4, figsize=(14, 8), sharey=True)
for m, ax in enumerate(axes.flatten(), start=1):
    subset = df_ss[df_ss['month'] == m]
    ax.plot(subset['year'], subset['sales'], marker='o')
    ax.axhline(subset['sales'].mean(), color='red', linestyle='--', linewidth=1)
    ax.set_title(month_names[m-1])
    ax.set_xlabel('Year')
fig.suptitle('Seasonal Subseries Plot', fontsize=14)
plt.tight_layout(); plt.show()

02 β€” ARIMA / SARIMAΒΆ

from statsmodels.tsa.statespace.sarimax import SARIMAX
from statsmodels.stats.diagnostic import acorr_ljungbox
from sklearn.metrics import mean_absolute_percentage_error

train, test = ts[:-24], ts[-24:]

# Exercise 1: auto_arima vs manual SARIMA order selection
# Key insight: auto_arima uses AIC/BIC to search the order space; comparing to
# manual order reveals whether domain knowledge can beat grid search.
try:
    from pmdarima import auto_arima
    auto_model = auto_arima(train, seasonal=True, m=12,
                             stepwise=True, suppress_warnings=True, error_action='ignore')
    print('Auto ARIMA order:', auto_model.order, auto_model.seasonal_order)
except ImportError:
    print('pmdarima not installed; using manual order (1,1,1)(1,1,1,12)')

# Manual order based on ACF/PACF inspection
manual_model = SARIMAX(train, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
manual_fc    = manual_model.forecast(24)
manual_mape  = mean_absolute_percentage_error(test, manual_fc)
print(f'Manual SARIMA MAPE on holdout: {manual_mape:.4f}')
# Exercise 2: Rolling forecast with expanding window (refit every 4 weeks)
# Key insight: Expanding window uses all data up to each point, reducing parameter
# variance but increasing compute; simulates real deployment conditions.

rolling_preds = []
refit_freq = 4  # refit every 4 periods
model_fit = None

for i in range(len(test)):
    if i % refit_freq == 0 or model_fit is None:
        window = ts[:len(train) + i]
        model_fit = SARIMAX(window, order=(1,1,1),
                            seasonal_order=(1,1,1,12)).fit(disp=False)
    pred = model_fit.forecast(1)
    rolling_preds.append(pred.iloc[0])

rolling_mape = mean_absolute_percentage_error(test, rolling_preds)
print(f'Rolling SARIMA MAPE (refit every {refit_freq} periods): {rolling_mape:.4f}')
# Exercise 3: Ljung-Box test on SARIMA residuals
# Key insight: If we reject H0 (p < 0.05), the residuals still contain autocorrelation,
# meaning the model has not captured all structure β€” higher-order terms may be needed.

residuals = manual_model.resid
lb_result  = acorr_ljungbox(residuals, lags=[12, 24], return_df=True)
print(lb_result)
print('\nInterpretation:')
for lag, row in lb_result.iterrows():
    verdict = 'REJECT (residual autocorrelation remains)' if row['lb_pvalue'] < 0.05 else 'FAIL TO REJECT (residuals look white-noise)'
    print(f'  Lag {lag}: p={row["lb_pvalue"]:.4f} => {verdict}')
# Exercise 4: SARIMA vs Holt-Winters on the same holdout
# Key insight: Holt-Winters is simpler and faster; SARIMA may edge it out
# when autocorrelation structure is more complex than exponential smoothing captures.

from statsmodels.tsa.holtwinters import ExponentialSmoothing

hw_model = ExponentialSmoothing(train, trend='add', seasonal='add', seasonal_periods=12).fit()
hw_fc    = hw_model.forecast(24)
hw_mape  = mean_absolute_percentage_error(test, hw_fc)

print(f'SARIMA MAPE:       {manual_mape:.4f}')
print(f'Holt-Winters MAPE: {hw_mape:.4f}')

plt.figure(figsize=(12, 4))
plt.plot(test.index, test.values, label='Actual')
plt.plot(test.index, manual_fc.values, label=f'SARIMA (MAPE={manual_mape:.3f})')
plt.plot(test.index, hw_fc.values,    label=f'Holt-Winters (MAPE={hw_mape:.3f})')
plt.legend(); plt.title('SARIMA vs Holt-Winters'); plt.show()
# Exercise 5: SARIMAX with exogenous promo flag
# Key insight: Adding an exogenous feature lets SARIMAX absorb promotional lift
# that would otherwise show up as unexplained spikes in residuals.

promo = pd.Series(np.random.binomial(1, 0.15, n), index=dates, name='promo')
ts_promo = ts + promo * 40  # promotions add ~40 units

train_p, test_p   = ts_promo[:-24], ts_promo[-24:]
promo_train, promo_test = promo[:-24], promo[-24:]

sarimax = SARIMAX(train_p, exog=promo_train.values.reshape(-1,1),
                  order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
sarimax_fc = sarimax.forecast(24, exog=promo_test.values.reshape(-1,1))

sarima_base = SARIMAX(train_p, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
sarima_fc   = sarima_base.forecast(24)

mape_x  = mean_absolute_percentage_error(test_p, sarimax_fc)
mape_no = mean_absolute_percentage_error(test_p, sarima_fc)
print(f'SARIMA MAPE  (no exog): {mape_no:.4f}')
print(f'SARIMAX MAPE (w/ promo): {mape_x:.4f}')
print(f'Improvement: {(mape_no - mape_x)/mape_no*100:.1f}%')

03 β€” Prophet ForecastingΒΆ

# Exercises require Prophet. Install with: pip install prophet
try:
    from prophet import Prophet
    from prophet.diagnostics import cross_validation, performance_metrics
    from prophet.plot import plot_cross_validation_metric
    PROPHET_AVAILABLE = True
except ImportError:
    print('Prophet not installed. Run: pip install prophet')
    PROPHET_AVAILABLE = False

# Prepare prophet-format dataframe
df_prophet = pd.DataFrame({'ds': dates, 'y': ts.values})
if PROPHET_AVAILABLE:
    # Exercise 1: Cross-validate Prophet β€” 6-month horizon, 1-month step
    # Key insight: CV over multiple cutoffs shows whether MAPE degrades for
    # longer horizons, helping decide how far ahead the model is trustworthy.
    m = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False)
    m.fit(df_prophet)
    df_cv = cross_validation(m, initial='730 days', period='30 days', horizon='180 days')
    df_perf = performance_metrics(df_cv)
    print(df_perf[['horizon','mape']].head(10))
    ax = plot_cross_validation_metric(df_cv, metric='mape')
    plt.title('Prophet CV: MAPE over forecast horizon'); plt.show()
if PROPHET_AVAILABLE:
    # Exercise 2: Changepoint sensitivity β€” more vs fewer changepoints
    # Key insight: More changepoints β†’ flexible trend but risk of overfitting;
    # fewer β†’ smoother but may miss real structural breaks.
    fig, axes = plt.subplots(1, 2, figsize=(14, 4))
    for ax, n_cp, title in zip(axes, [5, 50], ['Few changepoints (n=5)', 'Many changepoints (n=50)']):
        m_cp = Prophet(n_changepoints=n_cp, yearly_seasonality=True,
                       weekly_seasonality=False, daily_seasonality=False)
        m_cp.fit(df_prophet)
        future = m_cp.make_future_dataframe(periods=24, freq='MS')
        fc = m_cp.predict(future)
        ax.plot(df_prophet['ds'], df_prophet['y'], label='Actual')
        ax.plot(fc['ds'], fc['trend'], label='Trend', color='red')
        ax.set_title(title); ax.legend()
    plt.tight_layout(); plt.show()
if PROPHET_AVAILABLE:
    # Exercise 3: Custom holiday effect for a major shopping event
    # Key insight: Explicit holiday components isolate the event's lift so the
    # trend doesn't absorb it and future forecasts reflect expected promotions.
    black_fridays = pd.DataFrame({
        'holiday': 'black_friday',
        'ds': pd.to_datetime(['2014-11-28','2015-11-27','2016-11-25',
                              '2017-11-24','2018-11-23','2019-11-29',
                              '2020-11-27','2021-11-26','2022-11-25','2023-11-24']),
        'lower_window': -2,
        'upper_window': 4
    })
    m_h = Prophet(holidays=black_fridays, yearly_seasonality=True,
                  weekly_seasonality=False, daily_seasonality=False)
    m_h.fit(df_prophet)
    future = m_h.make_future_dataframe(periods=24, freq='MS')
    fc_h = m_h.predict(future)
    holiday_effect = fc_h[fc_h['ds'].dt.month == 11]['black_friday'].mean()
    print(f'Estimated Black Friday lift in November: {holiday_effect:.2f} units')
if PROPHET_AVAILABLE:
    # Exercise 4: Prophet with temperature as add_regressor
    # Key insight: External regressors must be available at forecast time;
    # adding temperature captures weather-driven demand not explained by seasonality alone.
    df_reg = df_prophet.copy()
    df_reg['temperature'] = temp_series.values

    m_reg = Prophet(yearly_seasonality=True, weekly_seasonality=False, daily_seasonality=False)
    m_reg.add_regressor('temperature')
    m_reg.fit(df_reg)

    future_reg = m_reg.make_future_dataframe(periods=24, freq='MS')
    # Forecast future temperature using seasonal pattern
    future_reg['temperature'] = 15 + 10 * np.sin(2 * np.pi * np.arange(len(future_reg)) / 12)
    fc_reg = m_reg.predict(future_reg)
    print('Regressor coefficient (temperature):', m_reg.params['beta'][0])
if PROPHET_AVAILABLE:
    # Exercise 5: Prophet ensemble β€” average 3 models with different priors
    # Key insight: Ensembling diversifies model uncertainty; different changepoint
    # and seasonality priors cover different structural assumptions.
    configs = [
        dict(changepoint_prior_scale=0.01, seasonality_prior_scale=1.0),
        dict(changepoint_prior_scale=0.10, seasonality_prior_scale=5.0),
        dict(changepoint_prior_scale=0.50, seasonality_prior_scale=10.0),
    ]
    ensemble_preds = []
    for cfg in configs:
        m_e = Prophet(yearly_seasonality=True, weekly_seasonality=False,
                      daily_seasonality=False, **cfg)
        m_e.fit(df_prophet)
        future_e = m_e.make_future_dataframe(periods=24, freq='MS')
        fc_e = m_e.predict(future_e)
        ensemble_preds.append(fc_e['yhat'].values)

    ensemble_mean = np.mean(ensemble_preds, axis=0)
    ensemble_forecast = pd.Series(ensemble_mean[-24:], index=test.index)
    ens_mape = mean_absolute_percentage_error(test, ensemble_forecast)
    print(f'Prophet Ensemble MAPE: {ens_mape:.4f}')

04 β€” LSTM Time SeriesΒΆ

import warnings
warnings.filterwarnings('ignore')

try:
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader, TensorDataset
    TORCH_AVAILABLE = True
except ImportError:
    print('PyTorch not installed. Run: pip install torch')
    TORCH_AVAILABLE = False

from sklearn.preprocessing import MinMaxScaler

# Scale data
scaler = MinMaxScaler()
scaled = scaler.fit_transform(ts.values.reshape(-1, 1)).flatten()

def make_sequences(data, look_back):
    X, y = [], []
    for i in range(len(data) - look_back):
        X.append(data[i:i+look_back])
        y.append(data[i+look_back])
    return np.array(X), np.array(y)
if TORCH_AVAILABLE:
    class SimpleLSTM(nn.Module):
        def __init__(self, input_size=1, hidden=32, dropout=0.0):
            super().__init__()
            self.lstm = nn.LSTM(input_size, hidden, batch_first=True)
            self.drop = nn.Dropout(dropout)
            self.fc   = nn.Linear(hidden, 1)
        def forward(self, x):
            out, _ = self.lstm(x)
            return self.fc(self.drop(out[:, -1, :]))

    def train_lstm(X_tr, y_tr, X_val, y_val, input_size=1, epochs=30, dropout=0.0, clip_norm=None):
        model = SimpleLSTM(input_size=input_size, dropout=dropout)
        opt   = torch.optim.Adam(model.parameters(), lr=1e-3)
        criterion = nn.MSELoss()
        Xt = torch.tensor(X_tr, dtype=torch.float32).unsqueeze(-1) if input_size==1 else torch.tensor(X_tr, dtype=torch.float32)
        yt = torch.tensor(y_tr, dtype=torch.float32).unsqueeze(-1)
        Xv = torch.tensor(X_val, dtype=torch.float32).unsqueeze(-1) if input_size==1 else torch.tensor(X_val, dtype=torch.float32)
        yv = torch.tensor(y_val, dtype=torch.float32).unsqueeze(-1)
        val_losses = []
        for _ in range(epochs):
            model.train()
            opt.zero_grad()
            loss = criterion(model(Xt), yt)
            loss.backward()
            if clip_norm:
                nn.utils.clip_grad_norm_(model.parameters(), clip_norm)
            opt.step()
            model.eval()
            with torch.no_grad():
                val_losses.append(criterion(model(Xv), yv).item())
        return model, val_losses

    # Exercise 1: Compare look_back values
    # Key insight: Short look_back misses long seasonal cycles; too long adds noise.
    # Optimal look_back often aligns with the dominant seasonality period.
    split = int(0.8 * len(scaled))
    rmse_by_lb = {}
    for lb in [7, 14, 30, 60]:
        if lb >= split:
            continue
        X, y = make_sequences(scaled, lb)
        X_tr, y_tr = X[:split-lb], y[:split-lb]
        X_val, y_val = X[split-lb:], y[split-lb:]
        if len(X_tr) < 5 or len(X_val) < 2:
            continue
        _, val_losses = train_lstm(X_tr, y_tr, X_val, y_val, epochs=20)
        rmse_by_lb[lb] = np.sqrt(val_losses[-1])

    print('Val RMSE by look_back:', rmse_by_lb)
    plt.figure(figsize=(7, 4))
    plt.plot(list(rmse_by_lb.keys()), list(rmse_by_lb.values()), marker='o')
    plt.xlabel('look_back'); plt.ylabel('Val RMSE')
    plt.title('LSTM: Val RMSE vs look_back'); plt.show()
if TORCH_AVAILABLE:
    # Exercise 2: Add day-of-week and month-of-year as extra features
    # Key insight: Calendar features inject domain knowledge about periodicity
    # that the LSTM would otherwise need many epochs to learn from raw values.
    look_back = 12
    month_feat = (dates.month - 1) / 11.0  # normalize 0-1
    dow_feat   = dates.dayofweek / 6.0
    feat_matrix = np.column_stack([scaled, month_feat, dow_feat])

    X_mf, y_mf = [], []
    for i in range(len(feat_matrix) - look_back):
        X_mf.append(feat_matrix[i:i+look_back])
        y_mf.append(scaled[i+look_back])
    X_mf, y_mf = np.array(X_mf), np.array(y_mf)

    split2 = int(0.8 * len(X_mf))
    _, vl_base = train_lstm(X_mf[:split2, :, 0], y_mf[:split2], X_mf[split2:, :, 0], y_mf[split2:], input_size=1, epochs=30)
    
    class MultiLSTM(nn.Module):
        def __init__(self, input_size=3, hidden=32):
            super().__init__()
            self.lstm = nn.LSTM(input_size, hidden, batch_first=True)
            self.fc   = nn.Linear(hidden, 1)
        def forward(self, x): out, _ = self.lstm(x); return self.fc(out[:, -1, :])

    model_mf = MultiLSTM(input_size=3)
    opt = torch.optim.Adam(model_mf.parameters(), lr=1e-3)
    criterion = nn.MSELoss()
    Xt = torch.tensor(X_mf[:split2], dtype=torch.float32)
    yt = torch.tensor(y_mf[:split2], dtype=torch.float32).unsqueeze(-1)
    Xv = torch.tensor(X_mf[split2:], dtype=torch.float32)
    yv = torch.tensor(y_mf[split2:], dtype=torch.float32).unsqueeze(-1)
    vl_multi = []
    for _ in range(30):
        model_mf.train(); opt.zero_grad()
        criterion(model_mf(Xt), yt).backward(); opt.step()
        model_mf.eval()
        with torch.no_grad(): vl_multi.append(criterion(model_mf(Xv), yv).item())

    print(f'Final val loss β€” univariate: {vl_base[-1]:.6f}, with calendar features: {vl_multi[-1]:.6f}')
if TORCH_AVAILABLE:
    # Exercise 3: Dropout tuning
    # Key insight: Dropout acts as regularization; too little β†’ overfit,
    # too much β†’ underfit. Optimal value minimizes the val/train loss gap.
    look_back = 12
    X12, y12 = make_sequences(scaled, look_back)
    sp = int(0.8 * len(X12))
    Xtr12, ytr12 = X12[:sp], y12[:sp]
    Xv12, yv12   = X12[sp:], y12[sp:]

    dropout_results = {}
    for dr in [0.0, 0.1, 0.2, 0.3]:
        _, vl = train_lstm(Xtr12, ytr12, Xv12, yv12, epochs=40, dropout=dr)
        dropout_results[dr] = vl

    plt.figure(figsize=(10, 4))
    for dr, vl in dropout_results.items():
        plt.plot(vl, label=f'dropout={dr}')
    plt.xlabel('Epoch'); plt.ylabel('Val Loss')
    plt.title('LSTM Dropout Tuning'); plt.legend(); plt.show()
if TORCH_AVAILABLE:
    # Exercise 4: Walk-forward evaluation with monthly refitting
    # Key insight: Walk-forward avoids look-ahead bias; retraining every 30 steps
    # keeps the model fresh without the cost of per-step retraining.
    look_back = 12
    wf_preds = []
    refit_every = 10  # every 10 steps (reduced for demo speed)
    test_start = int(0.8 * len(scaled))

    for i in range(test_start, min(test_start + 20, len(scaled))):
        if (i - test_start) % refit_every == 0:
            X_wf, y_wf = make_sequences(scaled[:i], look_back)
            if len(X_wf) > 5:
                wf_model, _ = train_lstm(X_wf[:-2], y_wf[:-2], X_wf[-2:], y_wf[-2:], epochs=15)
        seq = torch.tensor(scaled[i-look_back:i], dtype=torch.float32).unsqueeze(0).unsqueeze(-1)
        wf_model.eval()
        with torch.no_grad():
            wf_preds.append(wf_model(seq).item())

    actual_wf = scaled[test_start:test_start + len(wf_preds)]
    rmse_wf = np.sqrt(np.mean((np.array(wf_preds) - actual_wf)**2))
    print(f'Walk-forward RMSE (scaled): {rmse_wf:.5f}')
if TORCH_AVAILABLE:
    # Exercise 5: Gradient clipping β€” compare training stability
    # Key insight: Without clipping, exploding gradients cause loss spikes;
    # max_norm=1.0 keeps updates bounded and smooths the training curve.
    look_back = 12
    X12, y12 = make_sequences(scaled, look_back)
    sp = int(0.8 * len(X12))

    _, vl_no_clip  = train_lstm(X12[:sp], y12[:sp], X12[sp:], y12[sp:], epochs=50, clip_norm=None)
    _, vl_clipped  = train_lstm(X12[:sp], y12[:sp], X12[sp:], y12[sp:], epochs=50, clip_norm=1.0)

    plt.figure(figsize=(10, 4))
    plt.plot(vl_no_clip, label='No gradient clipping', alpha=0.8)
    plt.plot(vl_clipped,  label='Gradient clipping max_norm=1.0', alpha=0.8)
    plt.xlabel('Epoch'); plt.ylabel('Val Loss')
    plt.title('Training Stability: Gradient Clipping'); plt.legend(); plt.show()

05 β€” Anomaly DetectionΒΆ

from sklearn.ensemble import IsolationForest
from sklearn.neighbors import LocalOutlierFactor
from sklearn.metrics import f1_score, average_precision_score

# Synthetic series with injected anomalies
np.random.seed(0)
N = 200
base_series = np.random.normal(0, 1, N)
anomaly_idx = np.sort(np.random.choice(N, 20, replace=False))
anom_series = base_series.copy()
anom_series[anomaly_idx] += np.random.choice([-6, 6], len(anomaly_idx))
true_labels = np.zeros(N, dtype=int)
true_labels[anomaly_idx] = 1

# Exercise 1: CUSUM anomaly detector
# Key insight: CUSUM flags points where the cumulative deviation from the mean
# exceeds a threshold, making it sensitive to sustained shifts rather than single spikes.
def cusum_anomaly(series, k=1.0, h=4.0):
    mu, sigma = series.mean(), series.std()
    s_pos, s_neg = np.zeros(len(series)), np.zeros(len(series))
    flags = np.zeros(len(series), dtype=int)
    for i in range(1, len(series)):
        xi = (series[i] - mu) / sigma
        s_pos[i] = max(0, s_pos[i-1] + xi - k)
        s_neg[i] = max(0, s_neg[i-1] - xi - k)
        if s_pos[i] > h or s_neg[i] > h:
            flags[i] = 1
    return flags

cusum_flags = cusum_anomaly(anom_series)
print(f'CUSUM F1: {f1_score(true_labels, cusum_flags):.3f}')
if TORCH_AVAILABLE:
    # Exercise 2: LSTM autoencoder anomaly detection
    # Key insight: The autoencoder learns normal patterns; high reconstruction error
    # indicates the window deviates from what was seen during training.
    window = 30

    class LSTMAutoencoder(nn.Module):
        def __init__(self, hidden=16):
            super().__init__()
            self.enc = nn.LSTM(1, hidden, batch_first=True)
            self.dec = nn.LSTM(hidden, 1, batch_first=True)
        def forward(self, x):
            _, (h, c) = self.enc(x)
            dec_input = h.permute(1, 0, 2).expand(-1, x.size(1), -1)
            out, _ = self.dec(dec_input)
            return out

    normal_idx = np.where(true_labels == 0)[0]
    windows_normal = []
    for i in range(0, len(normal_idx) - window, 5):
        w = normal_idx[i:i+window]
        if len(w) == window:
            windows_normal.append(anom_series[w])
    if len(windows_normal) > 0:
        Xae = torch.tensor(np.array(windows_normal), dtype=torch.float32).unsqueeze(-1)
        ae_model = LSTMAutoencoder()
        ae_opt   = torch.optim.Adam(ae_model.parameters(), lr=1e-3)
        ae_crit  = nn.MSELoss()
        for _ in range(30):
            ae_model.train(); ae_opt.zero_grad()
            ae_crit(ae_model(Xae), Xae).backward(); ae_opt.step()

        ae_model.eval()
        recon_errors = np.zeros(N)
        with torch.no_grad():
            for i in range(window, N):
                seq = torch.tensor(anom_series[i-window:i], dtype=torch.float32).unsqueeze(0).unsqueeze(-1)
                rec = ae_model(seq)
                recon_errors[i] = nn.MSELoss()(rec, seq).item()

        threshold = np.percentile(recon_errors[recon_errors > 0], 90)
        ae_flags  = (recon_errors > threshold).astype(int)
        print(f'LSTM Autoencoder F1: {f1_score(true_labels, ae_flags):.3f}')
# Exercise 3: Consecutive anomaly filter
# Key insight: Requiring 3+ consecutive anomalous points eliminates isolated false positives
# caused by random noise spikes, at the cost of detecting slowly evolving anomalies.
def consecutive_filter(flags, min_consecutive=3):
    filtered = np.zeros_like(flags)
    count = 0
    for i, f in enumerate(flags):
        if f == 1:
            count += 1
            if count >= min_consecutive:
                filtered[i-min_consecutive+1:i+1] = 1
        else:
            count = 0
    return filtered

# Inject a block anomaly (3 consecutive)
block_series = base_series.copy()
block_series[50:53] += 8
block_true   = np.zeros(N, dtype=int); block_true[50:53] = 1

z_scores = np.abs((block_series - block_series.mean()) / block_series.std())
raw_flags = (z_scores > 2.5).astype(int)
filtered_flags = consecutive_filter(raw_flags, min_consecutive=3)

print(f'Raw Z-score F1:             {f1_score(block_true, raw_flags):.3f}')
print(f'After consecutive filter F1: {f1_score(block_true, filtered_flags):.3f}')
# Exercise 4: Compare all methods using F1 and PR-AUC
# Key insight: F1 balances precision and recall at a single threshold;
# PR-AUC captures performance across all thresholds β€” prefer it for imbalanced data.
from sklearn.preprocessing import StandardScaler

X_ad = anom_series.reshape(-1, 1)

# Z-score
z_flags  = (np.abs((anom_series - anom_series.mean()) / anom_series.std()) > 3).astype(int)

# Rolling Z-score
s_roll = pd.Series(anom_series)
roll_z = (s_roll - s_roll.rolling(20, min_periods=5).mean()) / s_roll.rolling(20, min_periods=5).std()
rz_flags = (roll_z.abs().fillna(0) > 2.5).astype(int).values

# IQR
q1, q3   = np.percentile(anom_series, 25), np.percentile(anom_series, 75)
iqr_flags = ((anom_series < q1 - 1.5*(q3-q1)) | (anom_series > q3 + 1.5*(q3-q1))).astype(int)

# Isolation Forest
if_model  = IsolationForest(contamination=0.1, random_state=42).fit(X_ad)
if_flags  = (if_model.predict(X_ad) == -1).astype(int)
if_scores = -if_model.score_samples(X_ad)

# LOF
lof_model = LocalOutlierFactor(contamination=0.1)
lof_flags = (lof_model.fit_predict(X_ad) == -1).astype(int)

print(f'{"Method":<18} {"F1":>6} {"PR-AUC":>8}')
print('-' * 35)
for name, flags in [('Z-score', z_flags), ('Rolling-Z', rz_flags),
                     ('IQR', iqr_flags), ('IsolationForest', if_flags),
                     ('LOF', lof_flags), ('CUSUM', cusum_flags)]:
    f1  = f1_score(true_labels, flags, zero_division=0)
    ap  = average_precision_score(true_labels, flags)
    print(f'{name:<18} {f1:>6.3f} {ap:>8.3f}')
# Exercise 5: Apply pipeline to synthetic stock returns
# Key insight: Daily returns are approximately normal; outlier days (crashes/rallies)
# stand out clearly in standardized space, making Z-score effective for equities.
try:
    import yfinance as yf
    spy = yf.download('SPY', start='2020-01-01', end='2023-12-31', progress=False)
    returns = spy['Adj Close'].pct_change().dropna().values
    print('Using real SPY data')
except Exception:
    np.random.seed(1)
    returns = np.random.normal(0.0005, 0.012, 756)
    returns[[50, 200, 400, 600]] += [-0.08, -0.05, 0.06, -0.10]  # inject crashes
    print('Using synthetic stock return data')

z_ret   = np.abs((returns - returns.mean()) / returns.std())
ret_flags = (z_ret > 3.5).astype(int)
flagged_days = np.where(ret_flags)[0]
print(f'Flagged {len(flagged_days)} unusual return days')

plt.figure(figsize=(12, 4))
plt.plot(returns, alpha=0.7, label='Daily returns')
plt.scatter(flagged_days, returns[flagged_days], color='red', zorder=5, label='Anomalies')
plt.title('Anomaly Detection on Stock Returns'); plt.legend(); plt.show()

06 β€” Forecasting CompetitionΒΆ

# Exercise 1: Add LightGBM with lag features as 4th competitor
# Key insight: Gradient boosted trees on lag features often outperform classical
# time series models when nonlinear feature interactions exist.
try:
    import lightgbm as lgb
    LGB_AVAILABLE = True
except ImportError:
    print('lightgbm not installed. Run: pip install lightgbm'); LGB_AVAILABLE = False

from sklearn.metrics import mean_absolute_error

if LGB_AVAILABLE:
    def make_lag_features(series, lags=[1,2,3,6,12]):
        df_lag = pd.DataFrame({'y': series})
        for l in lags:
            df_lag[f'lag_{l}'] = df_lag['y'].shift(l)
        df_lag['month'] = series.index.month
        return df_lag.dropna()

    df_lag = make_lag_features(ts)
    tr_cut = int(0.8 * len(df_lag))
    X_lg = df_lag.drop('y', axis=1)
    y_lg = df_lag['y']
    lgb_model = lgb.LGBMRegressor(n_estimators=200, learning_rate=0.05, random_state=42)
    lgb_model.fit(X_lg.iloc[:tr_cut], y_lg.iloc[:tr_cut])
    lgb_preds = lgb_model.predict(X_lg.iloc[tr_cut:])
    lgb_mae   = mean_absolute_error(y_lg.iloc[tr_cut:], lgb_preds)
    print(f'LightGBM MAE: {lgb_mae:.3f}')

    leaderboard = pd.DataFrame({'Model': ['SARIMA', 'Holt-Winters', 'LightGBM'],
                                  'MAE': [mean_absolute_error(test, manual_fc),
                                          mean_absolute_error(test, hw_fc),
                                          lgb_mae]})
    print(leaderboard.sort_values('MAE').to_string(index=False))
# Exercise 2: TimeSeriesSplit cross-validation for ARIMA
# Key insight: Standard k-fold leaks future data into training;
# TimeSeriesSplit respects temporal ordering for unbiased CV MAE.
from sklearn.model_selection import TimeSeriesSplit

tscv   = TimeSeriesSplit(n_splits=5)
cv_maes = []
for tr_idx, val_idx in tscv.split(ts):
    if len(tr_idx) < 24:
        continue
    tr_cv, val_cv = ts.iloc[tr_idx], ts.iloc[val_idx]
    try:
        m_cv = SARIMAX(tr_cv, order=(1,1,1), seasonal_order=(1,1,1,12)).fit(disp=False)
        fc_cv = m_cv.forecast(len(val_cv))
        cv_maes.append(mean_absolute_error(val_cv, fc_cv))
    except Exception as e:
        pass

holdout_mae = mean_absolute_error(test, manual_fc)
print(f'ARIMA CV MAE (avg): {np.mean(cv_maes):.3f}')
print(f'ARIMA Holdout MAE:  {holdout_mae:.3f}')
# Exercise 3: SARIMAX with exogenous promo β€” already computed in Ex 02-05 above
# Recap for the competition leaderboard context.
print(f'SARIMA  MAE (no exog):  {mean_absolute_error(test_p, sarima_fc):.3f}')
print(f'SARIMAX MAE (w/ promo): {mean_absolute_error(test_p, sarimax_fc):.3f}')
if TORCH_AVAILABLE:
    # Exercise 4: Conformal prediction intervals for LSTM
    # Key insight: Conformal prediction gives distribution-free coverage guarantees;
    # calibrate residuals on a held-out set, then apply the quantile at inference.
    look_back = 12
    X12, y12 = make_sequences(scaled, look_back)
    n_total   = len(X12)
    n_train   = int(0.6 * n_total)
    n_cal     = int(0.2 * n_total)

    Xtr_, ytr_ = X12[:n_train], y12[:n_train]
    Xca_, yca_ = X12[n_train:n_train+n_cal], y12[n_train:n_train+n_cal]
    Xte_, yte_ = X12[n_train+n_cal:], y12[n_train+n_cal:]

    conf_model, _ = train_lstm(Xtr_, ytr_, Xca_, yca_, epochs=30)
    conf_model.eval()

    with torch.no_grad():
        cal_preds = conf_model(torch.tensor(Xca_, dtype=torch.float32).unsqueeze(-1)).numpy().flatten()
        test_preds = conf_model(torch.tensor(Xte_, dtype=torch.float32).unsqueeze(-1)).numpy().flatten()

    cal_residuals = np.abs(cal_preds - yca_)
    alpha = 0.1  # target 90% coverage
    quantile = np.quantile(cal_residuals, 1 - alpha)
    coverage = np.mean(np.abs(test_preds - yte_) <= quantile)
    print(f'Conformal interval half-width: {quantile:.5f}')
    print(f'Empirical coverage on test set: {coverage:.3f} (target: {1-alpha:.2f})')
# Exercise 5: Horizon-aware ensemble (meta-model selects best forecaster per horizon step)
# Key insight: Different models may excel at different horizons;
# a meta-model trained on per-horizon errors learns to route predictions optimally.
from sklearn.linear_model import Ridge

# Use simple models: naive (last value), rolling mean, linear extrapolation
def naive_fc(history, h): return np.full(h, history.iloc[-1])
def roll_mean_fc(history, h, w=6): return np.full(h, history.iloc[-w:].mean())
def linreg_fc(history, h):
    x = np.arange(len(history)); y = history.values
    c = np.polyfit(x, y, 1)
    return c[0] * np.arange(len(history), len(history)+h) + c[1]

H = 12
meta_X, meta_y = [], []
for start in range(60, len(ts) - H, 6):
    hist = ts.iloc[:start]
    actual_h = ts.iloc[start:start+H].values
    preds = np.stack([naive_fc(hist, H), roll_mean_fc(hist, H), linreg_fc(hist, H)])
    for h_step in range(H):
        meta_X.append([preds[0,h_step], preds[1,h_step], preds[2,h_step], h_step])
        meta_y.append(actual_h[h_step])

meta_X, meta_y = np.array(meta_X), np.array(meta_y)
split_meta = int(0.8 * len(meta_X))
meta_model = Ridge().fit(meta_X[:split_meta], meta_y[:split_meta])
meta_preds  = meta_model.predict(meta_X[split_meta:])
meta_mae    = mean_absolute_error(meta_y[split_meta:], meta_preds)
print(f'Horizon-aware meta-ensemble MAE: {meta_mae:.3f}')