import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import multivariate_normal
import pandas as pd
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
np.random.seed(42)
1. Monte Carlo BasicsΒΆ
GoalΒΆ
Estimate expectations under a distribution \(p(x)\):
where \(x^{(i)} \sim p(x)\)
Law of Large NumbersΒΆ
As \(N \to \infty\), the sample average converges to the true expectation.
Central Limit TheoremΒΆ
The error decreases as \(O(1/\sqrt{N})\)
# Demonstrate Monte Carlo estimation
def monte_carlo_demo():
"""Estimate Ο using Monte Carlo"""
# Sample points uniformly in [0,1] x [0,1]
# Count how many fall inside unit circle
# Ο β 4 * (points inside circle) / (total points)
n_samples_list = [100, 1000, 10000, 100000]
fig, axes = plt.subplots(2, 2, figsize=(14, 12))
axes = axes.flatten()
for idx, n_samples in enumerate(n_samples_list):
# Generate random points
x = np.random.uniform(0, 1, n_samples)
y = np.random.uniform(0, 1, n_samples)
# Check if inside circle
inside = (x**2 + y**2) <= 1
# Estimate Ο
pi_estimate = 4 * np.sum(inside) / n_samples
error = abs(pi_estimate - np.pi)
# Plot
ax = axes[idx]
# Plot subset of points for visibility
plot_n = min(n_samples, 2000)
ax.scatter(x[:plot_n][inside[:plot_n]], y[:plot_n][inside[:plot_n]],
c='blue', s=1, alpha=0.5, label='Inside')
ax.scatter(x[:plot_n][~inside[:plot_n]], y[:plot_n][~inside[:plot_n]],
c='red', s=1, alpha=0.5, label='Outside')
# Draw quarter circle
theta = np.linspace(0, np.pi/2, 100)
ax.plot(np.cos(theta), np.sin(theta), 'k-', linewidth=2)
ax.set_xlim(0, 1)
ax.set_ylim(0, 1)
ax.set_aspect('equal')
ax.set_title(f'N={n_samples}\nΟ β {pi_estimate:.6f}\nError: {error:.6f}')
ax.legend(loc='upper right')
ax.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
monte_carlo_demo()
print("Monte Carlo Estimation:")
print("- Error decreases as O(1/βN)")
print("- Works in high dimensions")
print("- Requires ability to sample from p(x)")
2. Metropolis-Hastings AlgorithmΒΆ
ProblemΒΆ
We want to sample from \(p(x)\) but can only evaluate \(p(x)\) up to a constant: $\(p(x) = \frac{\tilde{p}(x)}{Z}\)$
where \(Z\) is unknown.
AlgorithmΒΆ
Start with \(x^{(0)}\)
For \(t = 1, 2, \ldots\):
Propose: \(x' \sim q(x'|x^{(t-1)})\)
Compute acceptance ratio: $\(\alpha = \min\left(1, \frac{\tilde{p}(x')q(x^{(t-1)}|x')}{\tilde{p}(x^{(t-1)})q(x'|x^{(t-1)})}\right)\)$
Accept with probability \(\alpha\):
If accept: \(x^{(t)} = x'\)
If reject: \(x^{(t)} = x^{(t-1)}\)
Symmetric Proposal (Metropolis)ΒΆ
If \(q(x'|x) = q(x|x')\) (e.g., Gaussian random walk): $\(\alpha = \min\left(1, \frac{\tilde{p}(x')}{\tilde{p}(x^{(t-1)})}\right)\)$
MCMC Theory: Advanced Mathematical FoundationsΒΆ
1. Markov Chain FundamentalsΒΆ
A Markov chain is a sequence of random variables \(X_0, X_1, X_2, \ldots\) satisfying:
The transition kernel \(T(x' | x) = P(X_{t+1} = x' | X_t = x)\) governs state transitions.
Key Properties:
Irreducibility: Any state can be reached from any other state (ensures exploration)
Aperiodicity: Chain doesnβt get stuck in cycles (ensures convergence)
Ergodicity: Irreducible + aperiodic β unique stationary distribution
2. Detailed Balance and StationarityΒΆ
A distribution \(\pi(x)\) is stationary if:
Detailed Balance (Stronger Condition):
If the transition kernel satisfies:
then \(\pi(x)\) is stationary. This is the reversibility condition.
Proof of Stationarity from Detailed Balance:
where we used \(\int T(x|x') dx = 1\) (transition kernel is normalized).
3. Metropolis-Hastings: Complete DerivationΒΆ
We want to design \(T(x'|x)\) such that \(\pi(x)\) (target) is stationary.
Construction:
where:
\(q(x'|x)\): Proposal distribution
\(\alpha(x'|x)\): Acceptance probability
\(r(x) = 1 - \int q(x'|x)\alpha(x'|x)dx'\): Rejection probability (stay at \(x\))
Enforce Detailed Balance:
Symmetric Choice:
Verification of Detailed Balance:
Case 1: If \(\pi(x')q(x|x') \geq \pi(x)q(x'|x)\), then \(\alpha(x'|x) = 1\) and:
Convergence Rate:
Under regularity conditions, the distribution \(p_t(x)\) converges to \(\pi(x)\) exponentially:
where \(\rho < 1\) is the spectral gap of the transition operator.
Total Variation Distance:
5. Gibbs SamplingΒΆ
For multivariate \(x = (x_1, \ldots, x_d)\), Gibbs sampling updates one component at a time:
Algorithm:
For t = 1, 2, ...
Sample xββ½α΅βΎ ~ Ο(xβ | xββ½α΅β»ΒΉβΎ, ..., xββ½α΅β»ΒΉβΎ)
Sample xββ½α΅βΎ ~ Ο(xβ | xββ½α΅βΎ, xββ½α΅β»ΒΉβΎ, ..., xββ½α΅β»ΒΉβΎ)
...
Sample xββ½α΅βΎ ~ Ο(xβ | xββ½α΅βΎ, ..., xββββ½α΅βΎ)
Key Property: Acceptance probability is always 1 (no rejections).
Detailed Balance:
where \(x_{-i} = (x_1, \ldots, x_{i-1}, x_{i+1}, \ldots, x_d)\).
6. Hamiltonian Monte Carlo (HMC)ΒΆ
HMC uses Hamiltonian dynamics from physics to propose distant states efficiently.
Setup:
Position: \(q\) (parameters we want to sample)
Momentum: \(p \sim \mathcal{N}(0, M)\) (auxiliary variable)
Joint Distribution:
where:
\(U(q) = -\log \pi(q)\): Potential energy (negative log posterior)
\(K(p) = \frac{1}{2}p^T M^{-1} p\): Kinetic energy (quadratic in momentum)
Hamiltonian Dynamics:
Properties:
Energy Conservation: \(H(q(t), p(t)) = \text{constant}\)
Reversibility: \(T(q', p' | q, p) = T(q, p | q', -p')\)
Volume Preservation: Determinant of Jacobian = 1
Leapfrog Integrator (Discretization):
For step size \(\epsilon\):
HMC Algorithm:
For each iteration:
1. Sample momentum: p ~ N(0, M)
2. Simulate Hamiltonian dynamics for L steps using leapfrog
3. Compute acceptance ratio:
Ξ± = min(1, exp(-H(q', p') + H(q, p)))
4. Accept/reject (q', -p') with probability Ξ±
5. Return q' (discard momentum)
Advantages:
Proposes distant states (reduces random walk behavior)
Uses gradient information (efficient in high dimensions)
Acceptance rate typically > 65%
Tuning:
\(\epsilon\): Step size (too large β numerical error, too small β random walk)
\(L\): Number of steps (longer β more distant proposals)
7. No-U-Turn Sampler (NUTS)ΒΆ
Problem with HMC: Need to tune \(L\) (trajectory length).
NUTS Solution: Automatically determine when to stop simulation.
Stopping Criterion:
Build a binary tree of states by doubling trajectory length. Stop when:
where \(\theta_+, p_+\) are forward endpoint and \(\theta_-, p_-\) are backward endpoint.
Intuition: Trajectory starts making a βU-turnβ (moving back toward starting point).
Advantages:
No manual tuning of trajectory length
Adapts to local geometry of posterior
Used in Stan, PyMC3, TensorFlow Probability
8. Convergence DiagnosticsΒΆ
A. Trace Plots:
Visual inspection of sample paths
Should look like βfuzzy caterpillarβ
Bad: trends, patterns, getting stuck
B. Autocorrelation:
Should decay to 0 quickly
High autocorrelation β poor mixing
C. Effective Sample Size (ESS):
Accounts for autocorrelation
\(n_{\text{eff}} \ll n\) indicates high autocorrelation
D. Gelman-Rubin \(\hat{R}\) Statistic:
Run \(M\) chains with different initializations. Compute:
where:
\(W\): Within-chain variance (average variance across chains)
\(\hat{V}\): Total variance (pooled variance estimate)
Interpretation:
\(\hat{R} \approx 1\): Chains have converged
\(\hat{R} > 1.1\): Chains have not converged (need more iterations)
Formula:
9. Practical MCMC Best PracticesΒΆ
Initialization:
Use multiple chains with overdispersed starting points
Check convergence with \(\hat{R}\)
Burn-in:
Discard initial samples (transient phase)
Typically 10-50% of samples
Thinning:
Keep every \(k\)-th sample to reduce autocorrelation
Trades sample size for independence
Often unnecessary if ESS is sufficient
Acceptance Rate Guidelines:
Random walk MH: 20-40% (high-dim), 40-60% (low-dim)
HMC: 60-80%
Gibbs: 100% (always accepts)
Reparameterization:
Transform to make posterior closer to Gaussian
Reduces correlations between parameters
Example: Log-transform positive parameters
def metropolis_hastings(log_target, x_init, proposal_std, n_samples, burn_in=1000):
"""
Metropolis-Hastings with Gaussian random walk proposal
Parameters:
- log_target: Log of target distribution (unnormalized)
- x_init: Initial state
- proposal_std: Standard deviation of Gaussian proposal
- n_samples: Number of samples to generate
- burn_in: Number of initial samples to discard
"""
d = len(x_init)
samples = np.zeros((n_samples + burn_in, d))
samples[0] = x_init
n_accepted = 0
for t in range(1, n_samples + burn_in):
# Propose new state (Gaussian random walk)
x_current = samples[t-1]
x_proposed = x_current + np.random.randn(d) * proposal_std
# Compute acceptance ratio (in log space)
log_alpha = log_target(x_proposed) - log_target(x_current)
# Accept or reject
if np.log(np.random.rand()) < log_alpha:
samples[t] = x_proposed
n_accepted += 1
else:
samples[t] = x_current
acceptance_rate = n_accepted / (n_samples + burn_in)
# Discard burn-in
samples = samples[burn_in:]
return samples, acceptance_rate
# Example: Sample from 2D Gaussian mixture
def log_mixture_gaussian(x):
"""Log probability of Gaussian mixture"""
# Two components
mu1 = np.array([2, 2])
mu2 = np.array([-2, -2])
sigma = np.eye(2) * 0.5
log_p1 = multivariate_normal.logpdf(x, mu1, sigma)
log_p2 = multivariate_normal.logpdf(x, mu2, sigma)
# log(p1 + p2) = log(exp(log_p1) + exp(log_p2))
return np.logaddexp(log_p1, log_p2) + np.log(0.5)
# Run MH with different proposal variances
x_init = np.array([0.0, 0.0])
n_samples = 5000
proposal_stds = [0.1, 0.5, 2.0]
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
for idx, proposal_std in enumerate(proposal_stds):
samples, accept_rate = metropolis_hastings(
log_mixture_gaussian, x_init, proposal_std, n_samples, burn_in=1000)
# Trace plot
axes[0, idx].plot(samples[:, 0], alpha=0.7, linewidth=0.5)
axes[0, idx].set_xlabel('Iteration')
axes[0, idx].set_ylabel('xβ')
axes[0, idx].set_title(f'Ο={proposal_std}\nAccept: {accept_rate:.1%}')
axes[0, idx].grid(True, alpha=0.3)
# Scatter plot
axes[1, idx].scatter(samples[:, 0], samples[:, 1], alpha=0.3, s=1)
axes[1, idx].set_xlabel('xβ')
axes[1, idx].set_ylabel('xβ')
axes[1, idx].set_xlim(-5, 5)
axes[1, idx].set_ylim(-5, 5)
# True distribution contours
x1 = np.linspace(-5, 5, 100)
x2 = np.linspace(-5, 5, 100)
X1, X2 = np.meshgrid(x1, x2)
Z = np.zeros_like(X1)
for i in range(len(x1)):
for j in range(len(x2)):
Z[j, i] = np.exp(log_mixture_gaussian(np.array([X1[j, i], X2[j, i]])))
axes[1, idx].contour(X1, X2, Z, levels=10, alpha=0.5, colors='red')
axes[1, idx].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("Metropolis-Hastings:")
print("- Small proposal Ο: High acceptance, slow exploration")
print("- Large proposal Ο: Low acceptance, can get stuck")
print("- Optimal acceptance rate: ~20-50% for high dimensions")
# Advanced MCMC: HMC and Convergence Diagnostics Implementation
import numpy as np
import matplotlib.pyplot as plt
from scipy.stats import multivariate_normal
# ============================================================
# 1. Hamiltonian Monte Carlo Implementation
# ============================================================
def hamiltonian_monte_carlo(log_prob, grad_log_prob, x_init, n_samples,
epsilon=0.1, L=20, mass=None):
"""
Hamiltonian Monte Carlo sampler.
Parameters:
-----------
log_prob : callable
Log probability function (unnormalized)
grad_log_prob : callable
Gradient of log probability
x_init : array
Initial state
n_samples : int
Number of samples
epsilon : float
Step size for leapfrog integrator
L : int
Number of leapfrog steps
mass : array
Mass matrix (inverse covariance of momentum)
"""
d = len(x_init)
if mass is None:
mass = np.eye(d)
mass_inv = np.linalg.inv(mass)
samples = np.zeros((n_samples, d))
samples[0] = x_init
n_accepted = 0
energies = []
for i in range(1, n_samples):
q = samples[i-1].copy()
# Sample momentum
p = np.random.multivariate_normal(np.zeros(d), mass)
# Compute initial Hamiltonian
current_U = -log_prob(q)
current_K = 0.5 * p.T @ mass_inv @ p
current_H = current_U + current_K
# Leapfrog integration
q_new = q.copy()
p_new = p.copy()
# Half step for momentum
p_new = p_new + 0.5 * epsilon * grad_log_prob(q_new)
# L-1 full steps
for _ in range(L):
# Full step for position
q_new = q_new + epsilon * mass_inv @ p_new
# Full step for momentum (except last)
if _ < L - 1:
p_new = p_new + epsilon * grad_log_prob(q_new)
# Final half step for momentum
p_new = p_new + 0.5 * epsilon * grad_log_prob(q_new)
# Negate momentum for reversibility
p_new = -p_new
# Compute proposed Hamiltonian
proposed_U = -log_prob(q_new)
proposed_K = 0.5 * p_new.T @ mass_inv @ p_new
proposed_H = proposed_U + proposed_K
# Metropolis acceptance
log_accept_ratio = current_H - proposed_H
if np.log(np.random.rand()) < log_accept_ratio:
samples[i] = q_new
n_accepted += 1
else:
samples[i] = q
energies.append(proposed_H - current_H)
acceptance_rate = n_accepted / n_samples
return samples, acceptance_rate, np.array(energies)
# ============================================================
# 2. Convergence Diagnostics
# ============================================================
def compute_autocorrelation(samples, max_lag=100):
"""Compute autocorrelation function."""
n = len(samples)
mean = np.mean(samples)
var = np.var(samples)
acf = np.zeros(max_lag)
for k in range(max_lag):
if k < n:
acf[k] = np.mean((samples[:n-k] - mean) * (samples[k:] - mean)) / var
return acf
def effective_sample_size(samples, max_lag=100):
"""
Compute effective sample size accounting for autocorrelation.
ESS = n / (1 + 2 * sum(Οβ))
"""
acf = compute_autocorrelation(samples, max_lag)
# Sum until autocorrelation becomes insignificant
# (or use all lags)
sum_rho = 0
for k in range(1, max_lag):
if acf[k] < 0.05: # Cutoff at 5%
break
sum_rho += acf[k]
ess = len(samples) / (1 + 2 * sum_rho)
return ess, acf
def gelman_rubin(chains):
"""
Compute Gelman-Rubin RΜ statistic for multiple chains.
Parameters:
-----------
chains : list of arrays
List of MCMC chains (each chain is nΓd array)
Returns:
--------
R_hat : float
RΜ statistic (should be β 1 for convergence)
"""
M = len(chains) # Number of chains
n = len(chains[0]) # Length of each chain
# Compute chain means
chain_means = np.array([np.mean(chain, axis=0) for chain in chains])
overall_mean = np.mean(chain_means, axis=0)
# Within-chain variance
W = np.mean([np.var(chain, axis=0, ddof=1) for chain in chains], axis=0)
# Between-chain variance
B = n / (M - 1) * np.sum((chain_means - overall_mean)**2, axis=0)
# Pooled variance estimate
var_plus = ((n - 1) / n) * W + (1 / n) * B
# R-hat
R_hat = np.sqrt(var_plus / W)
return R_hat
# ============================================================
# 3. Example: 2D Banana-Shaped Distribution (Rosenbrock)
# ============================================================
def rosenbrock_log_prob(x, a=1, b=100):
"""
Log probability of banana-shaped distribution.
Unnormalized: p(x) β exp(-f(x)) where f is Rosenbrock function.
"""
return -(a - x[0])**2 - b * (x[1] - x[0]**2)**2
def rosenbrock_grad(x, a=1, b=100):
"""Gradient of log probability."""
grad = np.zeros(2)
grad[0] = 2 * (a - x[0]) + 4 * b * x[0] * (x[1] - x[0]**2)
grad[1] = -2 * b * (x[1] - x[0]**2)
return grad
# Run HMC
print("Running HMC on banana-shaped distribution...")
x_init = np.array([0.0, 0.0])
n_samples = 3000
# HMC with different step sizes
step_sizes = [0.01, 0.05, 0.15]
hmc_results = []
fig, axes = plt.subplots(2, 3, figsize=(18, 11))
for idx, eps in enumerate(step_sizes):
samples, accept_rate, energies = hamiltonian_monte_carlo(
lambda x: rosenbrock_log_prob(x, a=1, b=20),
lambda x: rosenbrock_grad(x, a=1, b=20),
x_init, n_samples, epsilon=eps, L=20
)
hmc_results.append((samples, accept_rate))
# Trace plot
axes[0, idx].plot(samples[:, 0], alpha=0.7, linewidth=0.5, label='xβ')
axes[0, idx].plot(samples[:, 1], alpha=0.7, linewidth=0.5, label='xβ')
axes[0, idx].set_xlabel('Iteration', fontsize=11)
axes[0, idx].set_ylabel('Value', fontsize=11)
axes[0, idx].set_title(f'HMC Trace (Ξ΅={eps})\nAccept: {accept_rate:.1%}',
fontsize=12, fontweight='bold')
axes[0, idx].legend()
axes[0, idx].grid(True, alpha=0.3)
# Samples plot
axes[1, idx].scatter(samples[::10, 0], samples[::10, 1],
alpha=0.5, s=5, c=range(0, len(samples), 10),
cmap='viridis')
axes[1, idx].set_xlabel('xβ', fontsize=11)
axes[1, idx].set_ylabel('xβ', fontsize=11)
axes[1, idx].set_title('Sample Distribution', fontsize=12)
axes[1, idx].set_xlim(-2, 3)
axes[1, idx].set_ylim(-2, 5)
axes[1, idx].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('hmc_banana_distribution.png', dpi=150, bbox_inches='tight')
plt.show()
print("\n" + "="*70)
print("HMC RESULTS")
print("="*70)
for idx, eps in enumerate(step_sizes):
_, accept_rate = hmc_results[idx]
print(f"Step size Ξ΅={eps:5.3f} β Acceptance rate: {accept_rate:.1%}")
print("="*70)
print("Optimal HMC acceptance rate: 60-80%")
print("="*70)
3. Gibbs SamplingΒΆ
AlgorithmΒΆ
For multivariate distribution \(p(x_1, \ldots, x_d)\):
Initialize \(\mathbf{x}^{(0)} = (x_1^{(0)}, \ldots, x_d^{(0)})\)
For \(t = 1, 2, \ldots\):
Sample \(x_1^{(t)} \sim p(x_1 | x_2^{(t-1)}, \ldots, x_d^{(t-1)})\)
Sample \(x_2^{(t)} \sim p(x_2 | x_1^{(t)}, x_3^{(t-1)}, \ldots, x_d^{(t-1)})\)
β¦
Sample \(x_d^{(t)} \sim p(x_d | x_1^{(t)}, \ldots, x_{d-1}^{(t)})\)
Key PropertyΒΆ
Acceptance rate is always 100%!
Requires knowing conditional distributions
Special case of Metropolis-Hastings
def gibbs_sampling_gaussian(mu, Sigma, n_samples, burn_in=1000):
"""
Gibbs sampling for 2D Gaussian
For Gaussian, conditional distributions are also Gaussian:
p(xβ|xβ) = N(ΞΌβ + Ξ£ββ/Ξ£ββ(xβ - ΞΌβ), Ξ£ββ - Ξ£ββΒ²/Ξ£ββ)
"""
samples = np.zeros((n_samples + burn_in, 2))
samples[0] = [0, 0] # Initialize
# Extract parameters
mu1, mu2 = mu
s11, s12, s22 = Sigma[0,0], Sigma[0,1], Sigma[1,1]
for t in range(1, n_samples + burn_in):
# Sample xβ given xβ
x2_current = samples[t-1, 1]
mu_cond = mu1 + (s12/s22) * (x2_current - mu2)
var_cond = s11 - s12**2/s22
samples[t, 0] = np.random.randn() * np.sqrt(var_cond) + mu_cond
# Sample xβ given xβ
x1_current = samples[t, 0]
mu_cond = mu2 + (s12/s11) * (x1_current - mu1)
var_cond = s22 - s12**2/s11
samples[t, 1] = np.random.randn() * np.sqrt(var_cond) + mu_cond
return samples[burn_in:]
# True distribution parameters
mu_true = np.array([1, -1])
Sigma_true = np.array([[1.0, 0.8],
[0.8, 1.0]])
# Run Gibbs sampling
gibbs_samples = gibbs_sampling_gaussian(mu_true, Sigma_true, n_samples=5000)
# Compare with direct sampling
direct_samples = np.random.multivariate_normal(mu_true, Sigma_true, 5000)
# Visualize
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# Gibbs trace
axes[0, 0].plot(gibbs_samples[:500, 0], alpha=0.7)
axes[0, 0].axhline(y=mu_true[0], color='r', linestyle='--', label='True mean')
axes[0, 0].set_ylabel('xβ')
axes[0, 0].set_title('Gibbs Trace (xβ)')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 1].plot(gibbs_samples[:500, 1], alpha=0.7)
axes[0, 1].axhline(y=mu_true[1], color='r', linestyle='--', label='True mean')
axes[0, 1].set_ylabel('xβ')
axes[0, 1].set_title('Gibbs Trace (xβ)')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Show Gibbs path
axes[0, 2].plot(gibbs_samples[:100, 0], gibbs_samples[:100, 1],
'o-', alpha=0.5, markersize=3, linewidth=0.5)
axes[0, 2].plot(gibbs_samples[0, 0], gibbs_samples[0, 1], 'go',
markersize=10, label='Start')
axes[0, 2].set_xlabel('xβ')
axes[0, 2].set_ylabel('xβ')
axes[0, 2].set_title('Gibbs Sampling Path (first 100)')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)
# Scatter plots
axes[1, 0].scatter(gibbs_samples[:, 0], gibbs_samples[:, 1], alpha=0.3, s=1)
axes[1, 0].set_xlabel('xβ')
axes[1, 0].set_ylabel('xβ')
axes[1, 0].set_title('Gibbs Samples')
axes[1, 0].grid(True, alpha=0.3)
axes[1, 1].scatter(direct_samples[:, 0], direct_samples[:, 1], alpha=0.3, s=1)
axes[1, 1].set_xlabel('xβ')
axes[1, 1].set_ylabel('xβ')
axes[1, 1].set_title('Direct Samples')
axes[1, 1].grid(True, alpha=0.3)
# Histograms comparison
axes[1, 2].hist(gibbs_samples[:, 0], bins=50, alpha=0.5, density=True, label='Gibbs')
axes[1, 2].hist(direct_samples[:, 0], bins=50, alpha=0.5, density=True, label='Direct')
axes[1, 2].set_xlabel('xβ')
axes[1, 2].set_ylabel('Density')
axes[1, 2].set_title('Marginal Distribution')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("Gibbs Sampling Results:")
print(f"\nTrue mean: {mu_true}")
print(f"Gibbs mean: {gibbs_samples.mean(axis=0)}")
print(f"Direct mean: {direct_samples.mean(axis=0)}")
print(f"\nTrue covariance:\n{Sigma_true}")
print(f"\nGibbs covariance:\n{np.cov(gibbs_samples.T)}")
print(f"\nDirect covariance:\n{np.cov(direct_samples.T)}")
4. Convergence DiagnosticsΒΆ
How do we know when MCMC has converged?ΒΆ
Trace plots: Visual inspection of sample paths
Running mean: Should stabilize after convergence
Autocorrelation: Measure dependence between samples
Effective sample size (ESS): Accounts for autocorrelation
Gelman-Rubin statistic: Compare multiple chains
def compute_autocorrelation(samples, max_lag=100):
"""Compute autocorrelation function"""
n = len(samples)
samples_centered = samples - samples.mean()
autocorr = np.zeros(max_lag + 1)
variance = np.var(samples)
for lag in range(max_lag + 1):
if lag == 0:
autocorr[lag] = 1.0
else:
autocorr[lag] = np.mean(
samples_centered[:-lag] * samples_centered[lag:]
) / variance
return autocorr
def effective_sample_size(samples):
"""Compute effective sample size"""
n = len(samples)
autocorr = compute_autocorrelation(samples, max_lag=min(n//2, 500))
# Sum autocorrelations until they become negative
# ESS = N / (1 + 2 * sum of autocorrelations)
sum_autocorr = 0
for rho in autocorr[1:]:
if rho < 0:
break
sum_autocorr += rho
ess = n / (1 + 2 * sum_autocorr)
return ess
# Analyze Gibbs samples
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
# Running mean
running_mean = np.cumsum(gibbs_samples[:, 0]) / np.arange(1, len(gibbs_samples) + 1)
axes[0, 0].plot(running_mean, linewidth=2)
axes[0, 0].axhline(y=mu_true[0], color='r', linestyle='--', linewidth=2, label='True mean')
axes[0, 0].set_xlabel('Iteration')
axes[0, 0].set_ylabel('Running Mean')
axes[0, 0].set_title('Convergence of Mean')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
# Autocorrelation
autocorr = compute_autocorrelation(gibbs_samples[:, 0], max_lag=100)
axes[0, 1].bar(range(len(autocorr)), autocorr, alpha=0.7)
axes[0, 1].set_xlabel('Lag')
axes[0, 1].set_ylabel('Autocorrelation')
axes[0, 1].set_title('Autocorrelation Function')
axes[0, 1].axhline(y=0, color='k', linestyle='-', linewidth=0.5)
axes[0, 1].grid(True, alpha=0.3)
# Histogram with true density
axes[1, 0].hist(gibbs_samples[:, 0], bins=50, density=True, alpha=0.7, label='Samples')
x_range = np.linspace(-3, 5, 200)
# Marginal distribution
true_marginal = stats.norm.pdf(x_range, mu_true[0], np.sqrt(Sigma_true[0, 0]))
axes[1, 0].plot(x_range, true_marginal, 'r-', linewidth=2, label='True density')
axes[1, 0].set_xlabel('xβ')
axes[1, 0].set_ylabel('Density')
axes[1, 0].set_title('Sample Distribution vs True')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
# ESS over time
ess_values = []
sample_sizes = np.arange(100, len(gibbs_samples), 100)
for n in sample_sizes:
ess = effective_sample_size(gibbs_samples[:n, 0])
ess_values.append(ess)
axes[1, 1].plot(sample_sizes, ess_values, linewidth=2, label='ESS')
axes[1, 1].plot(sample_sizes, sample_sizes, 'r--', linewidth=2, label='Perfect (ESS=N)')
axes[1, 1].set_xlabel('Number of samples')
axes[1, 1].set_ylabel('Effective Sample Size')
axes[1, 1].set_title('Effective Sample Size Growth')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Compute final ESS
ess_final = effective_sample_size(gibbs_samples[:, 0])
print(f"\nConvergence Diagnostics:")
print(f"Total samples: {len(gibbs_samples)}")
print(f"Effective sample size: {ess_final:.0f}")
print(f"ESS ratio: {ess_final/len(gibbs_samples):.2%}")
print(f"\nInterpretation:")
print(f" {len(gibbs_samples)} correlated samples β {ess_final:.0f} independent samples")
5. Bayesian Linear Regression with MCMCΒΆ
ModelΒΆ
PriorΒΆ
Posterior (analytic)ΒΆ
Weβll use MCMC to sample from this posterior and compare with analytic solution.
# Generate regression data
np.random.seed(42)
n = 50
X_reg = np.random.randn(n, 1)
X_reg = np.column_stack([np.ones(n), X_reg]) # Add intercept
w_true = np.array([2.0, 3.0])
sigma_true = 1.0
y_reg = X_reg @ w_true + np.random.randn(n) * sigma_true
# Prior parameters
lambda_prior = 1.0
# Log posterior (up to constant)
def log_posterior(w):
# Log likelihood
residuals = y_reg - X_reg @ w
log_lik = -0.5 * np.sum(residuals**2) / sigma_true**2
# Log prior
log_prior = -0.5 * lambda_prior * np.sum(w**2)
return log_lik + log_prior
# MCMC sampling
w_init = np.zeros(2)
mcmc_samples, accept_rate = metropolis_hastings(
log_posterior, w_init, proposal_std=0.2, n_samples=10000, burn_in=2000)
# Analytic posterior
S_N = np.linalg.inv(lambda_prior * np.eye(2) + (1/sigma_true**2) * X_reg.T @ X_reg)
w_N = (1/sigma_true**2) * S_N @ X_reg.T @ y_reg
# Sample from analytic posterior
analytic_samples = np.random.multivariate_normal(w_N, S_N, 10000)
print("Bayesian Linear Regression Results:")
print(f"\nTrue weights: {w_true}")
print(f"\nMCMC estimates:")
print(f" Mean: {mcmc_samples.mean(axis=0)}")
print(f" Std: {mcmc_samples.std(axis=0)}")
print(f"\nAnalytic posterior:")
print(f" Mean: {w_N}")
print(f" Std: {np.sqrt(np.diag(S_N))}")
print(f"\nMCMC acceptance rate: {accept_rate:.2%}")
# Visualize
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
# Trace plots
axes[0, 0].plot(mcmc_samples[:, 0], alpha=0.7, linewidth=0.5)
axes[0, 0].axhline(y=w_true[0], color='r', linestyle='--', label='True')
axes[0, 0].axhline(y=w_N[0], color='g', linestyle='--', label='Posterior mean')
axes[0, 0].set_ylabel('wβ (intercept)')
axes[0, 0].set_title('MCMC Trace')
axes[0, 0].legend()
axes[0, 0].grid(True, alpha=0.3)
axes[0, 1].plot(mcmc_samples[:, 1], alpha=0.7, linewidth=0.5)
axes[0, 1].axhline(y=w_true[1], color='r', linestyle='--', label='True')
axes[0, 1].axhline(y=w_N[1], color='g', linestyle='--', label='Posterior mean')
axes[0, 1].set_ylabel('wβ (slope)')
axes[0, 1].set_title('MCMC Trace')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)
# Joint posterior
axes[0, 2].scatter(mcmc_samples[:, 0], mcmc_samples[:, 1], alpha=0.3, s=1, label='MCMC')
axes[0, 2].scatter(analytic_samples[:, 0], analytic_samples[:, 1],
alpha=0.1, s=1, color='red', label='Analytic')
axes[0, 2].plot(w_true[0], w_true[1], 'g*', markersize=15, label='True')
axes[0, 2].set_xlabel('wβ')
axes[0, 2].set_ylabel('wβ')
axes[0, 2].set_title('Joint Posterior')
axes[0, 2].legend()
axes[0, 2].grid(True, alpha=0.3)
# Marginal histograms
axes[1, 0].hist(mcmc_samples[:, 0], bins=50, density=True, alpha=0.7, label='MCMC')
axes[1, 0].hist(analytic_samples[:, 0], bins=50, density=True, alpha=0.5, label='Analytic')
axes[1, 0].axvline(x=w_true[0], color='r', linestyle='--', linewidth=2, label='True')
axes[1, 0].set_xlabel('wβ')
axes[1, 0].set_ylabel('Density')
axes[1, 0].set_title('Marginal Posterior (Intercept)')
axes[1, 0].legend()
axes[1, 0].grid(True, alpha=0.3)
axes[1, 1].hist(mcmc_samples[:, 1], bins=50, density=True, alpha=0.7, label='MCMC')
axes[1, 1].hist(analytic_samples[:, 1], bins=50, density=True, alpha=0.5, label='Analytic')
axes[1, 1].axvline(x=w_true[1], color='r', linestyle='--', linewidth=2, label='True')
axes[1, 1].set_xlabel('wβ')
axes[1, 1].set_ylabel('Density')
axes[1, 1].set_title('Marginal Posterior (Slope)')
axes[1, 1].legend()
axes[1, 1].grid(True, alpha=0.3)
# Posterior predictive
x_test = np.linspace(-3, 3, 100)
X_test = np.column_stack([np.ones(len(x_test)), x_test])
# Sample predictions
y_pred_samples = X_test @ mcmc_samples[:1000].T
y_pred_mean = y_pred_samples.mean(axis=1)
y_pred_std = y_pred_samples.std(axis=1)
axes[1, 2].scatter(X_reg[:, 1], y_reg, alpha=0.6, label='Data')
axes[1, 2].plot(x_test, y_pred_mean, 'b-', linewidth=2, label='Posterior mean')
axes[1, 2].fill_between(x_test,
y_pred_mean - 2*y_pred_std,
y_pred_mean + 2*y_pred_std,
alpha=0.3, label='95% credible interval')
axes[1, 2].plot(x_test, X_test @ w_true, 'r--', linewidth=2, label='True line')
axes[1, 2].set_xlabel('x')
axes[1, 2].set_ylabel('y')
axes[1, 2].set_title('Posterior Predictive')
axes[1, 2].legend()
axes[1, 2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Convergence Diagnostics Visualization
# ============================================================
# 1. Autocorrelation Analysis
# ============================================================
# Take best HMC results (middle step size)
best_samples, _ = hmc_results[1]
# Compute diagnostics for both dimensions
ess_x1, acf_x1 = effective_sample_size(best_samples[:, 0], max_lag=100)
ess_x2, acf_x2 = effective_sample_size(best_samples[:, 1], max_lag=100)
fig, axes = plt.subplots(1, 3, figsize=(18, 5))
# Autocorrelation plots
lags = np.arange(len(acf_x1))
axes[0].plot(lags, acf_x1, 'b-', linewidth=2, label='xβ')
axes[0].plot(lags, acf_x2, 'r-', linewidth=2, label='xβ')
axes[0].axhline(y=0, color='k', linestyle='--', alpha=0.3)
axes[0].axhline(y=0.05, color='gray', linestyle=':', alpha=0.5, label='5% threshold')
axes[0].axhline(y=-0.05, color='gray', linestyle=':', alpha=0.5)
axes[0].set_xlabel('Lag', fontsize=12)
axes[0].set_ylabel('Autocorrelation', fontsize=12)
axes[0].set_title('Autocorrelation Function (ACF)', fontsize=13, fontweight='bold')
axes[0].legend(fontsize=11)
axes[0].grid(True, alpha=0.3)
# ESS comparison
axes[1].bar(['xβ', 'xβ'], [ess_x1, ess_x2], color=['blue', 'red'], alpha=0.7, edgecolor='black')
axes[1].axhline(y=n_samples, color='green', linestyle='--', linewidth=2, label=f'Total samples ({n_samples})')
axes[1].set_ylabel('Effective Sample Size', fontsize=12)
axes[1].set_title('Effective Sample Size (ESS)', fontsize=13, fontweight='bold')
axes[1].legend(fontsize=11)
axes[1].grid(True, alpha=0.3, axis='y')
# Add text annotations
for i, (dim, ess) in enumerate([('xβ', ess_x1), ('xβ', ess_x2)]):
efficiency = ess / n_samples * 100
axes[1].text(i, ess + 50, f'{ess:.0f}\n({efficiency:.1f}%)',
ha='center', va='bottom', fontsize=10, fontweight='bold')
# Efficiency comparison: MH vs HMC
# Run quick MH for comparison
mh_samples, mh_accept = metropolis_hastings(
lambda x: rosenbrock_log_prob(x, a=1, b=20),
np.array([0.0, 0.0]), 0.5, 1000, burn_in=500
)
ess_mh, _ = effective_sample_size(mh_samples[:, 0], max_lag=50)
ess_hmc, _ = effective_sample_size(best_samples[:1000, 0], max_lag=50)
methods = ['Metropolis-Hastings', 'HMC']
ess_values = [ess_mh, ess_hmc]
colors = ['orange', 'green']
axes[2].bar(methods, ess_values, color=colors, alpha=0.7, edgecolor='black')
axes[2].set_ylabel('ESS (1000 samples)', fontsize=12)
axes[2].set_title('Sampling Efficiency: MH vs HMC', fontsize=13, fontweight='bold')
axes[2].grid(True, alpha=0.3, axis='y')
for i, (method, ess) in enumerate(zip(methods, ess_values)):
axes[2].text(i, ess + 10, f'{ess:.0f}', ha='center', va='bottom',
fontsize=11, fontweight='bold')
plt.tight_layout()
plt.savefig('mcmc_convergence_diagnostics.png', dpi=150, bbox_inches='tight')
plt.show()
print("\n" + "="*70)
print("CONVERGENCE DIAGNOSTICS SUMMARY")
print("="*70)
print(f"Dimension xβ:")
print(f" Total samples: {n_samples}")
print(f" Effective samples: {ess_x1:.0f}")
print(f" Efficiency: {ess_x1/n_samples*100:.1f}%")
print(f"\nDimension xβ:")
print(f" Total samples: {n_samples}")
print(f" Effective samples: {ess_x2:.0f}")
print(f" Efficiency: {ess_x2/n_samples*100:.1f}%")
print("\n" + "="*70)
print("METHOD COMPARISON (1000 samples)")
print("="*70)
print(f"Metropolis-Hastings ESS: {ess_mh:.0f}")
print(f"HMC ESS: {ess_hmc:.0f}")
print(f"HMC speedup: {ess_hmc/ess_mh:.1f}x")
print("="*70)
# ============================================================
# 2. Gelman-Rubin Convergence Test
# ============================================================
print("\n" + "="*70)
print("GELMAN-RUBIN CONVERGENCE TEST")
print("="*70)
# Run 4 chains with different initializations
n_chains = 4
chain_length = 2000
initializations = [
np.array([0.0, 0.0]),
np.array([2.0, 2.0]),
np.array([-1.0, 1.0]),
np.array([1.5, -1.0])
]
chains = []
for init in initializations:
samples, _, _ = hamiltonian_monte_carlo(
lambda x: rosenbrock_log_prob(x, a=1, b=20),
lambda x: rosenbrock_grad(x, a=1, b=20),
init, chain_length, epsilon=0.05, L=20
)
chains.append(samples)
# Compute R-hat
R_hat = gelman_rubin(chains)
print(f"Number of chains: {n_chains}")
print(f"Chain length: {chain_length}")
print(f"\nRΜ statistics:")
print(f" Dimension xβ: RΜ = {R_hat[0]:.4f}")
print(f" Dimension xβ: RΜ = {R_hat[1]:.4f}")
print(f"\nInterpretation:")
if np.max(R_hat) < 1.01:
print(" β EXCELLENT convergence (RΜ < 1.01)")
elif np.max(R_hat) < 1.05:
print(" β Good convergence (RΜ < 1.05)")
elif np.max(R_hat) < 1.1:
print(" β Acceptable convergence (RΜ < 1.1)")
else:
print(" β Poor convergence (RΜ β₯ 1.1) - Need more iterations!")
print("="*70)
# Visualize chains
fig, axes = plt.subplots(2, 2, figsize=(16, 10))
# Trace plots for both dimensions
for dim in range(2):
ax = axes[dim, 0]
for i, chain in enumerate(chains):
ax.plot(chain[:, dim], alpha=0.7, linewidth=0.8, label=f'Chain {i+1}')
ax.set_xlabel('Iteration', fontsize=11)
ax.set_ylabel(f'x_{dim+1}', fontsize=11)
ax.set_title(f'Trace Plot: x_{dim+1} (RΜ={R_hat[dim]:.4f})',
fontsize=12, fontweight='bold')
ax.legend(fontsize=9, loc='upper right')
ax.grid(True, alpha=0.3)
# Combined scatter for both dimensions
axes[0, 1].set_title('All Chains Combined', fontsize=12, fontweight='bold')
for i, chain in enumerate(chains):
axes[0, 1].scatter(chain[::5, 0], chain[::5, 1],
alpha=0.4, s=2, label=f'Chain {i+1}')
axes[0, 1].set_xlabel('xβ', fontsize=11)
axes[0, 1].set_ylabel('xβ', fontsize=11)
axes[0, 1].legend(fontsize=9)
axes[0, 1].grid(True, alpha=0.3)
# Density comparison
from scipy.stats import gaussian_kde
axes[1, 1].set_title('Marginal Density: xβ', fontsize=12, fontweight='bold')
x_grid = np.linspace(-2, 3, 200)
for i, chain in enumerate(chains):
kde = gaussian_kde(chain[:, 0])
density = kde(x_grid)
axes[1, 1].plot(x_grid, density, alpha=0.7, linewidth=2, label=f'Chain {i+1}')
axes[1, 1].set_xlabel('xβ', fontsize=11)
axes[1, 1].set_ylabel('Density', fontsize=11)
axes[1, 1].legend(fontsize=9)
axes[1, 1].grid(True, alpha=0.3)
plt.tight_layout()
plt.savefig('gelman_rubin_convergence.png', dpi=150, bbox_inches='tight')
plt.show()
print("\nβ Convergence diagnostics complete!")
print(" - Autocorrelation analysis: Measures sample independence")
print(" - Effective Sample Size: Accounts for autocorrelation")
print(" - Gelman-Rubin RΜ: Multi-chain convergence assessment")
SummaryΒΆ
In this notebook, we covered:
Monte Carlo Basics: Approximating expectations via sampling
Metropolis-Hastings: General-purpose MCMC algorithm
Gibbs Sampling: Coordinate-wise sampling for conjugate models
Convergence Diagnostics: ESS, autocorrelation, trace plots
Bayesian Linear Regression: MCMC for posterior inference
Key TakeawaysΒΆ
MCMC: Generate samples from complex distributions
Metropolis-Hastings:
Works when you can evaluate p(x) up to constant
Tuning proposal variance is critical
Target 20-50% acceptance rate
Gibbs Sampling:
Special case with 100% acceptance
Requires conditional distributions
Can be slow if variables highly correlated
Convergence:
Always use burn-in period
Monitor multiple diagnostics
Run multiple chains from different initializations
Effective Sample Size: Accounts for autocorrelation
ComparisonΒΆ
Algorithm |
Pros |
Cons |
|---|---|---|
Metropolis-Hastings |
General, simple |
Need to tune proposal |
Gibbs |
100% acceptance |
Need conditionals, slow mixing |
HMC |
Fast mixing, efficient |
Complex, needs gradients |
Modern MCMCΒΆ
Hamiltonian Monte Carlo (HMC): Uses gradient information
NUTS: No-U-Turn Sampler (used in Stan, PyMC3)
Variational Inference: Faster approximate alternative
ExercisesΒΆ
Implement Hamiltonian Monte Carlo
Derive the Gibbs sampler for a mixture of Gaussians
Implement the Gelman-Rubin convergence diagnostic
Use MCMC for Bayesian logistic regression
Compare MCMC with variational inference