1. The EM FrameworkΒΆ

MotivationΒΆ

Problem: Maximize likelihood with latent (hidden) variables

  • Observed data: \(X = \{x_1, ..., x_n\}\)

  • Latent variables: \(Z = \{z_1, ..., z_n\}\)

  • Parameters: \(\theta\)

Goal: Maximize incomplete data likelihood: $\(\max_\theta \log p(X|\theta) = \max_\theta \log \sum_Z p(X, Z|\theta)\)$

Challenge: Summation inside log makes direct optimization hard!

EM SolutionΒΆ

Iteratively:

  1. E-step: Compute expected complete log-likelihood

  2. M-step: Maximize this expectation

Key Insight: Easier to maximize \(\log p(X, Z|\theta)\) if we knew \(Z\)!

import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.special import logsumexp
from matplotlib.patches import Ellipse

# Set style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (12, 4)
np.random.seed(42)

2. Mathematical DerivationΒΆ

Lower Bound on Log-LikelihoodΒΆ

For any distribution \(q(Z)\):

\[\log p(X|\theta) = \log \sum_Z p(X,Z|\theta) = \log \sum_Z q(Z) \frac{p(X,Z|\theta)}{q(Z)}\]

By Jensen’s inequality: $\(\log p(X|\theta) \geq \sum_Z q(Z) \log \frac{p(X,Z|\theta)}{q(Z)} = \mathcal{L}(q, \theta)\)$

This is the Evidence Lower BOund (ELBO).

EM AlgorithmΒΆ

E-step: Set \(q(Z) = p(Z|X, \theta^{old})\) (makes bound tight)

M-step: \(\theta^{new} = \arg\max_\theta \mathcal{L}(q, \theta)\)

Equivalently: $\(\theta^{new} = \arg\max_\theta Q(\theta, \theta^{old})\)$

where: $\(Q(\theta, \theta^{old}) = \mathbb{E}_{Z|X,\theta^{old}}[\log p(X,Z|\theta)]\)$

3. Convergence ProofΒΆ

Theorem: EM Increases LikelihoodΒΆ

At each iteration: \(\log p(X|\theta^{new}) \geq \log p(X|\theta^{old})\)

Proof:

  1. After E-step: \(q^{new}(Z) = p(Z|X, \theta^{old})\)

  2. ELBO equals log-likelihood: $\(\mathcal{L}(q^{new}, \theta^{old}) = \log p(X|\theta^{old})\)$

  3. M-step increases ELBO: $\(\mathcal{L}(q^{new}, \theta^{new}) \geq \mathcal{L}(q^{new}, \theta^{old})\)$

  4. ELBO lower bounds likelihood: $\(\log p(X|\theta^{new}) \geq \mathcal{L}(q^{new}, \theta^{new})\)$

Combining: \(\log p(X|\theta^{new}) \geq \log p(X|\theta^{old})\) βœ“

ConvergenceΒΆ

  • Likelihood increases monotonically

  • Bounded above β†’ converges

  • To local optimum (not global!)

4. Gaussian Mixture Model (GMM)ΒΆ

ModelΒΆ

Data generated from \(K\) Gaussian components:

\[p(x) = \sum_{k=1}^K \pi_k \mathcal{N}(x | \mu_k, \Sigma_k)\]

Latent variable: \(z_i \in \{1, ..., K\}\) (component membership)

Parameters: \(\theta = \{\pi, \mu, \Sigma\}\)

  • \(\pi_k\): mixture weight

  • \(\mu_k\): mean of component \(k\)

  • \(\Sigma_k\): covariance of component \(k\)

# Generate synthetic GMM data
def generate_gmm_data(n_samples=300, n_components=3):
    """Generate samples from a 2D Gaussian Mixture"""
    # True parameters
    means = np.array([[0, 0], [3, 3], [-2, 4]])
    covs = np.array([
        [[1, 0.5], [0.5, 1]],
        [[0.8, -0.3], [-0.3, 0.8]],
        [[1.2, 0], [0, 0.6]]
    ])
    weights = np.array([0.3, 0.5, 0.2])
    
    # Sample components
    components = np.random.choice(n_components, n_samples, p=weights)
    
    # Sample from each component
    samples = np.zeros((n_samples, 2))
    for k in range(n_components):
        mask = (components == k)
        samples[mask] = np.random.multivariate_normal(
            means[k], covs[k], mask.sum()
        )
    
    return samples, components, means, covs, weights

# Generate data
X, true_labels, true_means, true_covs, true_weights = generate_gmm_data(n_samples=500)

# Visualize
plt.figure(figsize=(10, 6))
scatter = plt.scatter(X[:, 0], X[:, 1], c=true_labels, cmap='viridis', alpha=0.6, edgecolors='k')
plt.scatter(true_means[:, 0], true_means[:, 1], c='red', marker='X', s=300, edgecolors='black', linewidth=2, label='True means')
plt.colorbar(scatter, label='True Component')
plt.title('Generated GMM Data (3 components)', fontsize=14, fontweight='bold')
plt.xlabel('$x_1$')
plt.ylabel('$x_2$')
plt.legend()
plt.grid(True, alpha=0.3)
plt.axis('equal')
plt.show()

print(f"Data shape: {X.shape}")
print(f"True mixing weights: {true_weights}")

5. EM for GMMΒΆ

E-Step: Compute ResponsibilitiesΒΆ

Responsibility (posterior probability): $\(\gamma_{ik} = p(z_i = k | x_i, \theta^{old}) = \frac{\pi_k \mathcal{N}(x_i|\mu_k, \Sigma_k)}{\sum_{j=1}^K \pi_j \mathcal{N}(x_i|\mu_j, \Sigma_j)}\)$

M-Step: Update ParametersΒΆ

Mixing weights: $\(\pi_k^{new} = \frac{1}{n} \sum_{i=1}^n \gamma_{ik}\)$

Means: $\(\mu_k^{new} = \frac{\sum_{i=1}^n \gamma_{ik} x_i}{\sum_{i=1}^n \gamma_{ik}}\)$

Covariances: $\(\Sigma_k^{new} = \frac{\sum_{i=1}^n \gamma_{ik} (x_i - \mu_k^{new})(x_i - \mu_k^{new})^T}{\sum_{i=1}^n \gamma_{ik}}\)$

class GaussianMixtureEM:
    """EM algorithm for Gaussian Mixture Models"""
    
    def __init__(self, n_components=3, max_iter=100, tol=1e-4):
        self.K = n_components
        self.max_iter = max_iter
        self.tol = tol
        
    def _initialize(self, X):
        """Initialize parameters randomly"""
        n, d = X.shape
        
        # Random initialization
        self.weights = np.ones(self.K) / self.K
        
        # K-means++ style initialization for means
        indices = np.random.choice(n, self.K, replace=False)
        self.means = X[indices]
        
        # Initialize with spherical covariances
        self.covs = np.array([np.eye(d) for _ in range(self.K)])
        
    def _e_step(self, X):
        """E-step: Compute responsibilities"""
        n = X.shape[0]
        responsibilities = np.zeros((n, self.K))
        
        for k in range(self.K):
            # Compute p(x|z=k)
            responsibilities[:, k] = self.weights[k] * stats.multivariate_normal.pdf(
                X, self.means[k], self.covs[k]
            )
        
        # Normalize to get p(z|x)
        responsibilities /= responsibilities.sum(axis=1, keepdims=True) + 1e-10
        
        return responsibilities
    
    def _m_step(self, X, responsibilities):
        """M-step: Update parameters"""
        n, d = X.shape
        
        # Effective number of points per component
        Nk = responsibilities.sum(axis=0)
        
        # Update weights
        self.weights = Nk / n
        
        # Update means
        self.means = (responsibilities.T @ X) / Nk[:, np.newaxis]
        
        # Update covariances
        for k in range(self.K):
            diff = X - self.means[k]
            self.covs[k] = (responsibilities[:, k:k+1] * diff).T @ diff / Nk[k]
            # Add small regularization for numerical stability
            self.covs[k] += np.eye(d) * 1e-6
    
    def _compute_log_likelihood(self, X):
        """Compute log-likelihood"""
        n = X.shape[0]
        log_prob = np.zeros((n, self.K))
        
        for k in range(self.K):
            log_prob[:, k] = np.log(self.weights[k] + 1e-10) + \
                            stats.multivariate_normal.logpdf(X, self.means[k], self.covs[k])
        
        return logsumexp(log_prob, axis=1).sum()
    
    def fit(self, X, verbose=True):
        """Fit GMM using EM"""
        self._initialize(X)
        
        log_likelihood_history = []
        
        for iteration in range(self.max_iter):
            # E-step
            responsibilities = self._e_step(X)
            
            # M-step
            self._m_step(X, responsibilities)
            
            # Compute log-likelihood
            log_likelihood = self._compute_log_likelihood(X)
            log_likelihood_history.append(log_likelihood)
            
            if verbose and (iteration + 1) % 10 == 0:
                print(f"Iteration {iteration+1}: Log-likelihood = {log_likelihood:.2f}")
            
            # Check convergence
            if iteration > 0:
                if abs(log_likelihood_history[-1] - log_likelihood_history[-2]) < self.tol:
                    if verbose:
                        print(f"\nβœ“ Converged at iteration {iteration+1}")
                    break
        
        self.log_likelihood_history = log_likelihood_history
        return self
    
    def predict(self, X):
        """Predict component labels"""
        responsibilities = self._e_step(X)
        return responsibilities.argmax(axis=1)

# Fit GMM using EM
print("Fitting GMM with EM algorithm...")
print("="*50)
gmm = GaussianMixtureEM(n_components=3, max_iter=100)
gmm.fit(X, verbose=True)
# Plot convergence
plt.figure(figsize=(10, 5))
plt.plot(gmm.log_likelihood_history, 'o-', linewidth=2, markersize=6)
plt.xlabel('Iteration', fontsize=12)
plt.ylabel('Log-Likelihood', fontsize=12)
plt.title('EM Convergence: Log-Likelihood vs Iteration', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3)
plt.show()

print(f"\nFinal log-likelihood: {gmm.log_likelihood_history[-1]:.2f}")
print(f"Converged in {len(gmm.log_likelihood_history)} iterations")
# Visualize results
def plot_gaussian_ellipse(mean, cov, ax, color, alpha=0.3):
    """Plot ellipse representing 2-sigma contour of Gaussian"""
    # Eigenvalues and eigenvectors
    eigenvalues, eigenvectors = np.linalg.eigh(cov)
    angle = np.degrees(np.arctan2(eigenvectors[1, 0], eigenvectors[0, 0]))
    
    # 2-sigma ellipse
    width, height = 2 * 2 * np.sqrt(eigenvalues)
    ellipse = Ellipse(mean, width, height, angle=angle,
                     facecolor=color, alpha=alpha, edgecolor=color, linewidth=2)
    ax.add_patch(ellipse)

# Get predicted labels
predicted_labels = gmm.predict(X)

fig, axes = plt.subplots(1, 2, figsize=(15, 6))

# Plot 1: Predicted clustering
axes[0].scatter(X[:, 0], X[:, 1], c=predicted_labels, cmap='viridis', alpha=0.6, edgecolors='k')
axes[0].scatter(gmm.means[:, 0], gmm.means[:, 1], c='red', marker='X', s=300, 
               edgecolors='black', linewidth=2, label='Estimated means')

# Plot Gaussian ellipses
colors = plt.cm.viridis(np.linspace(0, 1, gmm.K))
for k in range(gmm.K):
    plot_gaussian_ellipse(gmm.means[k], gmm.covs[k], axes[0], colors[k])

axes[0].set_title('EM Result: Predicted Clusters', fontsize=14, fontweight='bold')
axes[0].set_xlabel('$x_1$')
axes[0].set_ylabel('$x_2$')
axes[0].legend()
axes[0].grid(True, alpha=0.3)
axes[0].axis('equal')

# Plot 2: True clustering
axes[1].scatter(X[:, 0], X[:, 1], c=true_labels, cmap='viridis', alpha=0.6, edgecolors='k')
axes[1].scatter(true_means[:, 0], true_means[:, 1], c='red', marker='X', s=300,
               edgecolors='black', linewidth=2, label='True means')
axes[1].set_title('Ground Truth: True Clusters', fontsize=14, fontweight='bold')
axes[1].set_xlabel('$x_1$')
axes[1].set_ylabel('$x_2$')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
axes[1].axis('equal')

plt.tight_layout()
plt.show()

print("\nEstimated mixing weights:", gmm.weights.round(3))
print("True mixing weights:     ", true_weights.round(3))

6. Visualizing the E and M StepsΒΆ

Let’s see how parameters evolve during EM iterations.

# Re-run EM with visualization
class GaussianMixtureEM_Visual(GaussianMixtureEM):
    """EM with parameter history for visualization"""
    
    def fit(self, X, verbose=True):
        """Fit GMM and record parameter history"""
        self._initialize(X)
        
        self.history = {
            'means': [self.means.copy()],
            'weights': [self.weights.copy()],
            'log_likelihood': []
        }
        
        for iteration in range(self.max_iter):
            # E-step
            responsibilities = self._e_step(X)
            
            # M-step
            self._m_step(X, responsibilities)
            
            # Record
            self.history['means'].append(self.means.copy())
            self.history['weights'].append(self.weights.copy())
            
            # Log-likelihood
            log_likelihood = self._compute_log_likelihood(X)
            self.history['log_likelihood'].append(log_likelihood)
            
            # Check convergence
            if iteration > 0:
                if abs(self.history['log_likelihood'][-1] - 
                      self.history['log_likelihood'][-2]) < self.tol:
                    break
        
        return self

# Fit with history
gmm_visual = GaussianMixtureEM_Visual(n_components=3, max_iter=50)
gmm_visual.fit(X, verbose=False)

# Visualize evolution
iterations_to_plot = [0, 2, 5, 10, 20, len(gmm_visual.history['means'])-1]
fig, axes = plt.subplots(2, 3, figsize=(16, 10))
axes = axes.ravel()

for idx, iteration in enumerate(iterations_to_plot):
    ax = axes[idx]
    
    # Plot data
    ax.scatter(X[:, 0], X[:, 1], alpha=0.4, s=20)
    
    # Plot means at this iteration
    means = gmm_visual.history['means'][iteration]
    ax.scatter(means[:, 0], means[:, 1], c='red', marker='X', s=300,
              edgecolors='black', linewidth=2)
    
    # Title
    if iteration == 0:
        ax.set_title(f'Initialization', fontsize=12, fontweight='bold')
    else:
        ll = gmm_visual.history['log_likelihood'][iteration-1]
        ax.set_title(f'Iteration {iteration} (LL={ll:.1f})', fontsize=12, fontweight='bold')
    
    ax.set_xlabel('$x_1$')
    ax.set_ylabel('$x_2$')
    ax.grid(True, alpha=0.3)
    ax.axis('equal')

plt.tight_layout()
plt.show()

print("βœ“ Visualization shows EM converging to optimal cluster centers!")

7. SummaryΒΆ

EM Algorithm FrameworkΒΆ

βœ… E-step: Compute posterior \(p(Z|X, \theta)\) (responsibilities) βœ… M-step: Maximize expected complete log-likelihood βœ… Convergence: Monotonically increases likelihood to local optimum

Key PropertiesΒΆ

  1. General framework for latent variable models

  2. Guaranteed convergence to local optimum

  3. Computationally efficient for many models

  4. Interpretable updates (soft assignment β†’ parameter update)

GMM ImplementationΒΆ

βœ… Derived E and M steps mathematically βœ… Implemented from scratch in Python βœ… Visualized convergence and clustering βœ… Compared with ground truth

LimitationsΒΆ

⚠️ Converges to local optimum (initialization matters) ⚠️ Requires specifying number of components ⚠️ Sensitive to initialization (use K-means++, multiple restarts) ⚠️ Covariance matrices can become singular

ExtensionsΒΆ

  • Bayesian EM: Prior on parameters

  • Variational EM: Approximate posterior

  • Infinite mixtures: Dirichlet Process GMM

  • Online EM: Streaming data

Next Notebook: 10_markov_chain_monte_carlo.ipynb