1. The EM FrameworkΒΆ
MotivationΒΆ
Problem: Maximize likelihood with latent (hidden) variables
Observed data: \(X = \{x_1, ..., x_n\}\)
Latent variables: \(Z = \{z_1, ..., z_n\}\)
Parameters: \(\theta\)
Goal: Maximize incomplete data likelihood: $\(\max_\theta \log p(X|\theta) = \max_\theta \log \sum_Z p(X, Z|\theta)\)$
Challenge: Summation inside log makes direct optimization hard!
EM SolutionΒΆ
Iteratively:
E-step: Compute expected complete log-likelihood
M-step: Maximize this expectation
Key Insight: Easier to maximize \(\log p(X, Z|\theta)\) if we knew \(Z\)!
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.special import logsumexp
from matplotlib.patches import Ellipse
# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 4)
np.random.seed(42)
2. Mathematical DerivationΒΆ
Lower Bound on Log-LikelihoodΒΆ
For any distribution \(q(Z)\):
By Jensenβs inequality: $\(\log p(X|\theta) \geq \sum_Z q(Z) \log \frac{p(X,Z|\theta)}{q(Z)} = \mathcal{L}(q, \theta)\)$
This is the Evidence Lower BOund (ELBO).
EM AlgorithmΒΆ
E-step: Set \(q(Z) = p(Z|X, \theta^{old})\) (makes bound tight)
M-step: \(\theta^{new} = \arg\max_\theta \mathcal{L}(q, \theta)\)
Equivalently: $\(\theta^{new} = \arg\max_\theta Q(\theta, \theta^{old})\)$
where: $\(Q(\theta, \theta^{old}) = \mathbb{E}_{Z|X,\theta^{old}}[\log p(X,Z|\theta)]\)$
3. Convergence ProofΒΆ
Theorem: EM Increases LikelihoodΒΆ
At each iteration: \(\log p(X|\theta^{new}) \geq \log p(X|\theta^{old})\)
Proof:
After E-step: \(q^{new}(Z) = p(Z|X, \theta^{old})\)
ELBO equals log-likelihood: $\(\mathcal{L}(q^{new}, \theta^{old}) = \log p(X|\theta^{old})\)$
M-step increases ELBO: $\(\mathcal{L}(q^{new}, \theta^{new}) \geq \mathcal{L}(q^{new}, \theta^{old})\)$
ELBO lower bounds likelihood: $\(\log p(X|\theta^{new}) \geq \mathcal{L}(q^{new}, \theta^{new})\)$
Combining: \(\log p(X|\theta^{new}) \geq \log p(X|\theta^{old})\) β
ConvergenceΒΆ
Likelihood increases monotonically
Bounded above β converges
To local optimum (not global!)
4. Gaussian Mixture Model (GMM)ΒΆ
ModelΒΆ
Data generated from \(K\) Gaussian components:
Latent variable: \(z_i \in \{1, ..., K\}\) (component membership)
Parameters: \(\theta = \{\pi, \mu, \Sigma\}\)
\(\pi_k\): mixture weight
\(\mu_k\): mean of component \(k\)
\(\Sigma_k\): covariance of component \(k\)
# Generate synthetic GMM data
def generate_gmm_data(n_samples=300, n_components=3):
"""Generate samples from a 2D Gaussian Mixture"""
# True parameters
means = np.array([[0, 0], [3, 3], [-2, 4]])
covs = np.array([
[[1, 0.5], [0.5, 1]],
[[0.8, -0.3], [-0.3, 0.8]],
[[1.2, 0], [0, 0.6]]
])
weights = np.array([0.3, 0.5, 0.2])
# Sample components
components = np.random.choice(n_components, n_samples, p=weights)
# Sample from each component
samples = np.zeros((n_samples, 2))
for k in range(n_components):
mask = (components == k)
samples[mask] = np.random.multivariate_normal(
means[k], covs[k], mask.sum()
)
return samples, components, means, covs, weights
# Generate data
X, true_labels, true_means, true_covs, true_weights = generate_gmm_data(n_samples=500)
# Visualize
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X[:, 0], X[:, 1], c=true_labels, cmap='viridis', alpha=0.6, edgecolors='k')
plt.scatter(true_means[:, 0], true_means[:, 1], c='red', marker='X', s=300, edgecolors='black', linewidth=2, label='True means')
plt.colorbar(scatter, label='True Component')
plt.title('Generated GMM Data (3 components)', fontsize=14, fontweight='bold')
plt.xlabel('$x_1$')
plt.ylabel('$x_2$')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')
plt.show()
print(f"Data shape: {X.shape}")
print(f"True mixing weights: {true_weights}")
5. EM for GMMΒΆ
E-Step: Compute ResponsibilitiesΒΆ
Responsibility (posterior probability): $\(\gamma_{ik} = p(z_i = k | x_i, \theta^{old}) = \frac{\pi_k \mathcal{N}(x_i|\mu_k, \Sigma_k)}{\sum_{j=1}^K \pi_j \mathcal{N}(x_i|\mu_j, \Sigma_j)}\)$
M-Step: Update ParametersΒΆ
Mixing weights: $\(\pi_k^{new} = \frac{1}{n} \sum_{i=1}^n \gamma_{ik}\)$
Means: $\(\mu_k^{new} = \frac{\sum_{i=1}^n \gamma_{ik} x_i}{\sum_{i=1}^n \gamma_{ik}}\)$
Covariances: $\(\Sigma_k^{new} = \frac{\sum_{i=1}^n \gamma_{ik} (x_i - \mu_k^{new})(x_i - \mu_k^{new})^T}{\sum_{i=1}^n \gamma_{ik}}\)$
class GaussianMixtureEM:
"""EM algorithm for Gaussian Mixture Models"""
def __init__(self, n_components=3, max_iter=100, tol=1e-4):
self.K = n_components
self.max_iter = max_iter
self.tol = tol
def _initialize(self, X):
"""Initialize parameters randomly"""
n, d = X.shape
# Random initialization
self.weights = np.ones(self.K) / self.K
# K-means++ style initialization for means
indices = np.random.choice(n, self.K, replace=False)
self.means = X[indices]
# Initialize with spherical covariances
self.covs = np.array([np.eye(d) for _ in range(self.K)])
def _e_step(self, X):
"""E-step: Compute responsibilities"""
n = X.shape[0]
responsibilities = np.zeros((n, self.K))
for k in range(self.K):
# Compute p(x|z=k)
responsibilities[:, k] = self.weights[k] * stats.multivariate_normal.pdf(
X, self.means[k], self.covs[k]
)
# Normalize to get p(z|x)
responsibilities /= responsibilities.sum(axis=1, keepdims=True) + 1e-10
return responsibilities
def _m_step(self, X, responsibilities):
"""M-step: Update parameters"""
n, d = X.shape
# Effective number of points per component
Nk = responsibilities.sum(axis=0)
# Update weights
self.weights = Nk / n
# Update means
self.means = (responsibilities.T @ X) / Nk[:, np.newaxis]
# Update covariances
for k in range(self.K):
diff = X - self.means[k]
self.covs[k] = (responsibilities[:, k:k+1] * diff).T @ diff / Nk[k]
# Add small regularization for numerical stability
self.covs[k] += np.eye(d) * 1e-6
def _compute_log_likelihood(self, X):
"""Compute log-likelihood"""
n = X.shape[0]
log_prob = np.zeros((n, self.K))
for k in range(self.K):
log_prob[:, k] = np.log(self.weights[k] + 1e-10) + \
stats.multivariate_normal.logpdf(X, self.means[k], self.covs[k])
return logsumexp(log_prob, axis=1).sum()
def fit(self, X, verbose=True):
"""Fit GMM using EM"""
self._initialize(X)
log_likelihood_history = []
for iteration in range(self.max_iter):
# E-step
responsibilities = self._e_step(X)
# M-step
self._m_step(X, responsibilities)
# Compute log-likelihood
log_likelihood = self._compute_log_likelihood(X)
log_likelihood_history.append(log_likelihood)
if verbose and (iteration + 1) % 10 == 0:
print(f"Iteration {iteration+1}: Log-likelihood = {log_likelihood:.2f}")
# Check convergence
if iteration > 0:
if abs(log_likelihood_history[-1] - log_likelihood_history[-2]) < self.tol:
if verbose:
print(f"\nβ Converged at iteration {iteration+1}")
break
self.log_likelihood_history = log_likelihood_history
return self
def predict(self, X):
"""Predict component labels"""
responsibilities = self._e_step(X)
return responsibilities.argmax(axis=1)
# Fit GMM using EM
print("Fitting GMM with EM algorithm...")
print("="*50)
gmm = GaussianMixtureEM(n_components=3, max_iter=100)
gmm.fit(X, verbose=True)
# Plot convergence
plt.figure(figsize=(10, 5))
plt.plot(gmm.log_likelihood_history, 'o-', linewidth=2, markersize=6)
plt.xlabel('Iteration', fontsize=12)
plt.ylabel('Log-Likelihood', fontsize=12)
plt.title('EM Convergence: Log-Likelihood vs Iteration', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.show()
print(f"\nFinal log-likelihood: {gmm.log_likelihood_history[-1]:.2f}")
print(f"Converged in {len(gmm.log_likelihood_history)} iterations")
# Visualize results
def plot_gaussian_ellipse(mean, cov, ax, color, alpha=0.3):
"""Plot ellipse representing 2-sigma contour of Gaussian"""
# Eigenvalues and eigenvectors
eigenvalues, eigenvectors = np.linalg.eigh(cov)
angle = np.degrees(np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0]))
# 2-sigma ellipse
width, height = 2 * 2 * np.sqrt(eigenvalues)
ellipse = Ellipse(mean, width, height, angle=angle,
facecolor=color, alpha=alpha, edgecolor=color, linewidth=2)
ax.add_patch(ellipse)
# Get predicted labels
predicted_labels = gmm.predict(X)
fig, axes = plt.subplots(1, 2, figsize=(15, 6))
# Plot 1: Predicted clustering
axes[0].scatter(X[:, 0], X[:, 1], c=predicted_labels, cmap='viridis', alpha=0.6, edgecolors='k')
axes[0].scatter(gmm.means[:, 0], gmm.means[:, 1], c='red', marker='X', s=300,
edgecolors='black', linewidth=2, label='Estimated means')
# Plot Gaussian ellipses
colors = plt.cm.viridis(np.linspace(0, 1, gmm.K))
for k in range(gmm.K):
plot_gaussian_ellipse(gmm.means[k], gmm.covs[k], axes[0], colors[k])
axes[0].set_title('EM Result: Predicted Clusters', fontsize=14, fontweight='bold')
axes[0].set_xlabel('$x_1$')
axes[0].set_ylabel('$x_2$')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].axis('equal')
# Plot 2: True clustering
axes[1].scatter(X[:, 0], X[:, 1], c=true_labels, cmap='viridis', alpha=0.6, edgecolors='k')
axes[1].scatter(true_means[:, 0], true_means[:, 1], c='red', marker='X', s=300,
edgecolors='black', linewidth=2, label='True means')
axes[1].set_title('Ground Truth: True Clusters', fontsize=14, fontweight='bold')
axes[1].set_xlabel('$x_1$')
axes[1].set_ylabel('$x_2$')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].axis('equal')
plt.tight_layout()
plt.show()
print("\nEstimated mixing weights:", gmm.weights.round(3))
print("True mixing weights: ", true_weights.round(3))
6. Visualizing the E and M StepsΒΆ
Letβs see how parameters evolve during EM iterations.
# Re-run EM with visualization
class GaussianMixtureEM_Visual(GaussianMixtureEM):
"""EM with parameter history for visualization"""
def fit(self, X, verbose=True):
"""Fit GMM and record parameter history"""
self._initialize(X)
self.history = {
'means': [self.means.copy()],
'weights': [self.weights.copy()],
'log_likelihood': []
}
for iteration in range(self.max_iter):
# E-step
responsibilities = self._e_step(X)
# M-step
self._m_step(X, responsibilities)
# Record
self.history['means'].append(self.means.copy())
self.history['weights'].append(self.weights.copy())
# Log-likelihood
log_likelihood = self._compute_log_likelihood(X)
self.history['log_likelihood'].append(log_likelihood)
# Check convergence
if iteration > 0:
if abs(self.history['log_likelihood'][-1] -
self.history['log_likelihood'][-2]) < self.tol:
break
return self
# Fit with history
gmm_visual = GaussianMixtureEM_Visual(n_components=3, max_iter=50)
gmm_visual.fit(X, verbose=False)
# Visualize evolution
iterations_to_plot = [0, 2, 5, 10, 20, len(gmm_visual.history['means'])-1]
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
axes = axes.ravel()
for idx, iteration in enumerate(iterations_to_plot):
ax = axes[idx]
# Plot data
ax.scatter(X[:, 0], X[:, 1], alpha=0.4, s=20)
# Plot means at this iteration
means = gmm_visual.history['means'][iteration]
ax.scatter(means[:, 0], means[:, 1], c='red', marker='X', s=300,
edgecolors='black', linewidth=2)
# Title
if iteration == 0:
ax.set_title(f'Initialization', fontsize=12, fontweight='bold')
else:
ll = gmm_visual.history['log_likelihood'][iteration-1]
ax.set_title(f'Iteration {iteration} (LL={ll:.1f})', fontsize=12, fontweight='bold')
ax.set_xlabel('$x_1$')
ax.set_ylabel('$x_2$')
ax.grid(True, alpha=0.3)
ax.axis('equal')
plt.tight_layout()
plt.show()
print("β Visualization shows EM converging to optimal cluster centers!")
7. SummaryΒΆ
EM Algorithm FrameworkΒΆ
β E-step: Compute posterior \(p(Z|X, \theta)\) (responsibilities) β M-step: Maximize expected complete log-likelihood β Convergence: Monotonically increases likelihood to local optimum
Key PropertiesΒΆ
General framework for latent variable models
Guaranteed convergence to local optimum
Computationally efficient for many models
Interpretable updates (soft assignment β parameter update)
GMM ImplementationΒΆ
β Derived E and M steps mathematically β Implemented from scratch in Python β Visualized convergence and clustering β Compared with ground truth
LimitationsΒΆ
β οΈ Converges to local optimum (initialization matters) β οΈ Requires specifying number of components β οΈ Sensitive to initialization (use K-means++, multiple restarts) β οΈ Covariance matrices can become singular
ExtensionsΒΆ
Bayesian EM: Prior on parameters
Variational EM: Approximate posterior
Infinite mixtures: Dirichlet Process GMM
Online EM: Streaming data
Next Notebook: 10_markov_chain_monte_carlo.ipynb