import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import linalg

sns.set_style('whitegrid')
np.random.seed(42)

1. State Space ModelΒΆ

Linear Gaussian State Space ModelΒΆ

State equation (dynamics): $\(x_t = A x_{t-1} + B u_t + w_t, \quad w_t \sim \mathcal{N}(0, Q)\)$

Observation equation (measurement): $\(y_t = C x_t + D u_t + v_t, \quad v_t \sim \mathcal{N}(0, R)\)$

where:

  • \(x_t \in \mathbb{R}^n\): Hidden state

  • \(y_t \in \mathbb{R}^m\): Observation

  • \(u_t \in \mathbb{R}^p\): Control input (optional)

  • \(A\): State transition matrix

  • \(C\): Observation matrix

  • \(Q\): Process noise covariance

  • \(R\): Measurement noise covariance

Example: Position-Velocity TrackingΒΆ

State: \(x_t = [\text{position}, \text{velocity}]^T\)

\[\begin{split}x_t = \begin{bmatrix} 1 & \Delta t \\ 0 & 1 \end{bmatrix} x_{t-1} + w_t\end{split}\]

Observation: \(y_t = [1, 0] x_t + v_t\) (observe position only)

2. Kalman Filter AlgorithmΒΆ

GoalΒΆ

Estimate \(p(x_t | y_{1:t})\) recursively.

Prediction StepΒΆ

Prior: $\(\hat{x}_{t|t-1} = A \hat{x}_{t-1|t-1} + B u_t\)\( \)\(P_{t|t-1} = A P_{t-1|t-1} A^T + Q\)$

Update StepΒΆ

Kalman gain: $\(K_t = P_{t|t-1} C^T (C P_{t|t-1} C^T + R)^{-1}\)$

Posterior: $\(\hat{x}_{t|t} = \hat{x}_{t|t-1} + K_t (y_t - C \hat{x}_{t|t-1})\)\( \)\(P_{t|t} = (I - K_t C) P_{t|t-1}\)$

PropertiesΒΆ

  • Optimal for linear Gaussian systems

  • Recursive: Only need previous estimate

  • Closed-form: No sampling required

class KalmanFilter:
    """Linear Kalman Filter."""
    def __init__(self, A, C, Q, R, x0=None, P0=None):
        self.A = A  # State transition
        self.C = C  # Observation
        self.Q = Q  # Process noise
        self.R = R  # Measurement noise
        
        self.n = A.shape[0]  # State dimension
        self.m = C.shape[0]  # Observation dimension
        
        # Initialize
        self.x = x0 if x0 is not None else np.zeros(self.n)
        self.P = P0 if P0 is not None else np.eye(self.n)
    
    def predict(self, u=None):
        """Prediction step."""
        # State prediction
        self.x = self.A @ self.x
        if u is not None:
            self.x += u
        
        # Covariance prediction
        self.P = self.A @ self.P @ self.A.T + self.Q
        
        return self.x, self.P
    
    def update(self, y):
        """Update step."""
        # Innovation
        y_pred = self.C @ self.x
        innovation = y - y_pred
        
        # Innovation covariance
        S = self.C @ self.P @ self.C.T + self.R
        
        # Kalman gain
        K = self.P @ self.C.T @ np.linalg.inv(S)
        
        # State update
        self.x = self.x + K @ innovation
        
        # Covariance update
        self.P = (np.eye(self.n) - K @ self.C) @ self.P
        
        return self.x, self.P
    
    def filter(self, observations):
        """Run full filtering on sequence."""
        filtered_states = []
        filtered_covs = []
        
        for y in observations:
            self.predict()
            x, P = self.update(y)
            filtered_states.append(x.copy())
            filtered_covs.append(P.copy())
        
        return np.array(filtered_states), np.array(filtered_covs)
# Generate synthetic data: tracking object with constant velocity
dt = 0.1  # Time step
T = 100   # Number of steps

# State transition: [position, velocity]
A = np.array([[1, dt],
              [0, 1]])

# Observation: position only
C = np.array([[1, 0]])

# Noise covariances
Q = np.array([[0.01, 0],
              [0, 0.01]])  # Process noise
R = np.array([[1.0]])      # Measurement noise

# Generate true trajectory
true_states = np.zeros((T, 2))
true_states[0] = [0, 1]  # Start at position 0, velocity 1

for t in range(1, T):
    true_states[t] = A @ true_states[t-1] + np.random.multivariate_normal([0, 0], Q)

# Generate noisy observations
observations = true_states @ C.T + np.random.normal(0, np.sqrt(R[0, 0]), (T, 1))

# Run Kalman filter
kf = KalmanFilter(A, C, Q, R, x0=np.array([0, 0]), P0=np.eye(2))
filtered_states, _ = kf.filter(observations)

# Visualize
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Position
axes[0].plot(true_states[:, 0], 'g-', linewidth=2, label='True position')
axes[0].scatter(range(T), observations[:, 0], s=10, alpha=0.5, label='Noisy observations')
axes[0].plot(filtered_states[:, 0], 'r-', linewidth=2, label='Kalman filter estimate')
axes[0].set_ylabel('Position', fontsize=11)
axes[0].set_title('Kalman Filter: Position Tracking', fontsize=12)
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Velocity
axes[1].plot(true_states[:, 1], 'g-', linewidth=2, label='True velocity')
axes[1].plot(filtered_states[:, 1], 'r-', linewidth=2, label='Kalman filter estimate')
axes[1].set_xlabel('Time step', fontsize=11)
axes[1].set_ylabel('Velocity', fontsize=11)
axes[1].set_title('Velocity Estimation (unobserved)', fontsize=12)
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# Compute errors
position_rmse = np.sqrt(np.mean((filtered_states[:, 0] - true_states[:, 0])**2))
velocity_rmse = np.sqrt(np.mean((filtered_states[:, 1] - true_states[:, 1])**2))

print(f"Position RMSE: {position_rmse:.4f}")
print(f"Velocity RMSE: {velocity_rmse:.4f}")

3. Kalman Smoother (RTS)ΒΆ

Rauch-Tung-Striebel SmootherΒΆ

Compute \(p(x_t | y_{1:T})\) using all observations.

Backward pass after forward filtering:

\[G_t = P_{t|t} A^T P_{t+1|t}^{-1}\]
\[\hat{x}_{t|T} = \hat{x}_{t|t} + G_t (\hat{x}_{t+1|T} - \hat{x}_{t+1|t})\]
\[P_{t|T} = P_{t|t} + G_t (P_{t+1|T} - P_{t+1|t}) G_t^T\]
def rts_smoother(A, filtered_states, filtered_covs):
    """Rauch-Tung-Striebel smoother."""
    T = len(filtered_states)
    n = A.shape[0]
    
    # Initialize
    smoothed_states = np.zeros_like(filtered_states)
    smoothed_covs = np.zeros_like(filtered_covs)
    
    # Last time step
    smoothed_states[-1] = filtered_states[-1]
    smoothed_covs[-1] = filtered_covs[-1]
    
    # Backward pass
    for t in range(T-2, -1, -1):
        # Predicted state and covariance
        x_pred = A @ filtered_states[t]
        P_pred = A @ filtered_covs[t] @ A.T + Q
        
        # Smoother gain
        G = filtered_covs[t] @ A.T @ np.linalg.inv(P_pred)
        
        # Smoothed estimates
        smoothed_states[t] = filtered_states[t] + G @ (smoothed_states[t+1] - x_pred)
        smoothed_covs[t] = filtered_covs[t] + G @ (smoothed_covs[t+1] - P_pred) @ G.T
    
    return smoothed_states, smoothed_covs

# Apply smoother
smoothed_states, _ = rts_smoother(A, filtered_states, filtered_covs)

# Compare
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

axes[0].plot(true_states[:, 0], 'g-', linewidth=2, label='True')
axes[0].plot(filtered_states[:, 0], 'r--', linewidth=2, alpha=0.7, label='Filter')
axes[0].plot(smoothed_states[:, 0], 'b-', linewidth=2, label='Smoother')
axes[0].set_ylabel('Position', fontsize=11)
axes[0].set_title('Filter vs Smoother', fontsize=12)
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].plot(true_states[:, 1], 'g-', linewidth=2, label='True')
axes[1].plot(filtered_states[:, 1], 'r--', linewidth=2, alpha=0.7, label='Filter')
axes[1].plot(smoothed_states[:, 1], 'b-', linewidth=2, label='Smoother')
axes[1].set_xlabel('Time step', fontsize=11)
axes[1].set_ylabel('Velocity', fontsize=11)
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

# RMSE comparison
filter_rmse = np.sqrt(np.mean((filtered_states[:, 0] - true_states[:, 0])**2))
smoother_rmse = np.sqrt(np.mean((smoothed_states[:, 0] - true_states[:, 0])**2))

print(f"Filter RMSE: {filter_rmse:.4f}")
print(f"Smoother RMSE: {smoother_rmse:.4f}")
print(f"Improvement: {(filter_rmse - smoother_rmse) / filter_rmse * 100:.1f}%")

SummaryΒΆ

State Space Model:ΒΆ

\[x_t = A x_{t-1} + w_t, \quad y_t = C x_t + v_t\]

Kalman Filter (Online):ΒΆ

Predict: $\(\hat{x}_{t|t-1} = A \hat{x}_{t-1|t-1}\)\( \)\(P_{t|t-1} = A P_{t-1|t-1} A^T + Q\)$

Update: $\(K_t = P_{t|t-1} C^T (C P_{t|t-1} C^T + R)^{-1}\)\( \)\(\hat{x}_{t|t} = \hat{x}_{t|t-1} + K_t (y_t - C \hat{x}_{t|t-1})\)$

Properties:ΒΆ

  • Optimal for linear Gaussian systems

  • Recursive: \(O(n^3)\) per step

  • Estimates hidden states from noisy observations

Extensions:ΒΆ

  • Extended KF (EKF): Nonlinear via linearization

  • Unscented KF (UKF): Nonlinear via sigma points

  • Particle filter: Nonlinear non-Gaussian

Applications:ΒΆ

  • Robotics (SLAM, navigation)

  • Finance (portfolio tracking)

  • Signal processing

  • Time series forecasting

Next Steps:ΒΆ

  • 06_variational_inference.ipynb - Bayesian inference

  • 09_expectation_maximization.ipynb - Parameter learning

  • Phase 20 for real-time streaming applications