import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from scipy.stats import multivariate_normal
import pandas as pd
from sklearn.mixture import GaussianMixture
from sklearn.cluster import KMeans

sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
np.random.seed(42)

1. Gaussian Mixture Models (GMM)ΒΆ

DefinitionΒΆ

A Gaussian Mixture Model is a weighted sum of Gaussian distributions:

\[p(\mathbf{x}) = \sum_{k=1}^K \pi_k \mathcal{N}(\mathbf{x}|\boldsymbol{\mu}_k, \boldsymbol{\Sigma}_k)\]

where:

  • \(K\) = number of components

  • \(\pi_k\) = mixing coefficient for component \(k\) (\(\sum_{k=1}^K \pi_k = 1\))

  • \(\boldsymbol{\mu}_k\) = mean of component \(k\)

  • \(\boldsymbol{\Sigma}_k\) = covariance of component \(k\)

Latent Variable InterpretationΒΆ

Introduce latent variable \(z_i \in \{1, \ldots, K\}\) indicating which component generated \(\mathbf{x}_i\):

\[p(z_i = k) = \pi_k\]
\[p(\mathbf{x}_i | z_i = k) = \mathcal{N}(\mathbf{x}_i | \boldsymbol{\mu}_k, \boldsymbol{\Sigma}_k)\]
# Generate synthetic data from a GMM
def generate_gmm_data(n_samples=300, random_state=42):
    """Generate data from a 3-component 2D GMM"""
    np.random.seed(random_state)
    
    # True parameters
    means = np.array([[0, 0], [3, 3], [-2, 3]])
    covariances = np.array([
        [[1.0, 0.3], [0.3, 1.0]],
        [[0.8, -0.2], [-0.2, 0.8]],
        [[1.5, 0.0], [0.0, 0.5]]
    ])
    weights = np.array([0.4, 0.35, 0.25])
    
    # Generate samples
    n_components = len(weights)
    n_per_component = np.random.multinomial(n_samples, weights)
    
    X = []
    y = []
    for k in range(n_components):
        X_k = np.random.multivariate_normal(means[k], covariances[k], n_per_component[k])
        X.append(X_k)
        y.extend([k] * n_per_component[k])
    
    X = np.vstack(X)
    y = np.array(y)
    
    # Shuffle
    idx = np.random.permutation(len(X))
    X = X[idx]
    y = y[idx]
    
    return X, y, means, covariances, weights

# Generate data
X, y_true, true_means, true_covs, true_weights = generate_gmm_data()

# Visualize
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

# Plot with true labels
axes[0].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.6, s=30)
axes[0].scatter(true_means[:, 0], true_means[:, 1], c='red', marker='X', 
               s=200, edgecolors='black', linewidth=2, label='True means')
axes[0].set_xlabel('x₁')
axes[0].set_ylabel('xβ‚‚')
axes[0].set_title('True Component Assignments')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Plot without labels (what we observe)
axes[1].scatter(X[:, 0], X[:, 1], alpha=0.6, s=30, color='gray')
axes[1].set_xlabel('x₁')
axes[1].set_ylabel('xβ‚‚')
axes[1].set_title('Observed Data (unlabeled)')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("True GMM Parameters:")
print(f"Weights: {true_weights}")
print(f"\nMeans:\n{true_means}")

2. K-Means as Hard EMΒΆ

K-means is a special case of EM with:

  • Hard assignments (not probabilistic)

  • Equal spherical covariances: \(\boldsymbol{\Sigma}_k = \sigma^2 \mathbf{I}\)

AlgorithmΒΆ

  1. E-step: Assign each point to nearest cluster $\(z_i = \arg\min_k \|\mathbf{x}_i - \boldsymbol{\mu}_k\|^2\)$

  2. M-step: Update cluster centers $\(\boldsymbol{\mu}_k = \frac{1}{|C_k|} \sum_{i \in C_k} \mathbf{x}_i\)$

class KMeansEM:
    """K-Means clustering from scratch"""
    
    def __init__(self, n_clusters=3, max_iter=100, tol=1e-4):
        self.n_clusters = n_clusters
        self.max_iter = max_iter
        self.tol = tol
        self.centers = None
        self.labels = None
        self.inertia_history = []
    
    def fit(self, X):
        n_samples, n_features = X.shape
        
        # Initialize centers randomly
        idx = np.random.choice(n_samples, self.n_clusters, replace=False)
        self.centers = X[idx].copy()
        
        for iteration in range(self.max_iter):
            # E-step: Assign points to nearest center
            distances = np.zeros((n_samples, self.n_clusters))
            for k in range(self.n_clusters):
                distances[:, k] = np.sum((X - self.centers[k])**2, axis=1)
            
            self.labels = np.argmin(distances, axis=1)
            
            # Compute inertia (within-cluster sum of squares)
            inertia = np.sum([distances[i, self.labels[i]] 
                            for i in range(n_samples)])
            self.inertia_history.append(inertia)
            
            # M-step: Update centers
            old_centers = self.centers.copy()
            for k in range(self.n_clusters):
                mask = self.labels == k
                if np.sum(mask) > 0:
                    self.centers[k] = X[mask].mean(axis=0)
            
            # Check convergence
            center_shift = np.sum((self.centers - old_centers)**2)
            if center_shift < self.tol:
                break
        
        return self

# Fit K-Means
kmeans_custom = KMeansEM(n_clusters=3)
kmeans_custom.fit(X)

# Compare with sklearn
kmeans_sklearn = KMeans(n_clusters=3, random_state=42, n_init=10)
kmeans_sklearn.fit(X)

# Visualize
fig, axes = plt.subplots(1, 3, figsize=(18, 5))

# Custom implementation
axes[0].scatter(X[:, 0], X[:, 1], c=kmeans_custom.labels, cmap='viridis', alpha=0.6, s=30)
axes[0].scatter(kmeans_custom.centers[:, 0], kmeans_custom.centers[:, 1], 
               c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[0].set_xlabel('x₁')
axes[0].set_ylabel('xβ‚‚')
axes[0].set_title('K-Means (Custom Implementation)')
axes[0].grid(True, alpha=0.3)

# Sklearn implementation
axes[1].scatter(X[:, 0], X[:, 1], c=kmeans_sklearn.labels_, cmap='viridis', alpha=0.6, s=30)
axes[1].scatter(kmeans_sklearn.cluster_centers_[:, 0], kmeans_sklearn.cluster_centers_[:, 1], 
               c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[1].set_xlabel('x₁')
axes[1].set_ylabel('xβ‚‚')
axes[1].set_title('K-Means (sklearn)')
axes[1].grid(True, alpha=0.3)

# Convergence plot
axes[2].plot(kmeans_custom.inertia_history, 'b-', linewidth=2, marker='o')
axes[2].set_xlabel('Iteration')
axes[2].set_ylabel('Inertia (Within-cluster SS)')
axes[2].set_title('K-Means Convergence')
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print(f"K-Means converged in {len(kmeans_custom.inertia_history)} iterations")
print(f"Final inertia: {kmeans_custom.inertia_history[-1]:.2f}")

3. EM Algorithm TheoryΒΆ

GoalΒΆ

Maximize marginal log-likelihood: $\(\ell(\boldsymbol{\theta}) = \sum_{i=1}^N \log p(\mathbf{x}_i | \boldsymbol{\theta})\)$

But \(p(\mathbf{x}_i | \boldsymbol{\theta})\) involves sum over latent variables, making direct optimization hard.

EM DecompositionΒΆ

For any distribution \(q(\mathbf{z}_i)\) over latent variables:

\[\log p(\mathbf{x}_i | \boldsymbol{\theta}) = \mathcal{L}(q, \boldsymbol{\theta}) + \text{KL}(q \| p)\]

where:

  • \(\mathcal{L}(q, \boldsymbol{\theta}) = \sum_{\mathbf{z}_i} q(\mathbf{z}_i) \log \frac{p(\mathbf{x}_i, \mathbf{z}_i | \boldsymbol{\theta})}{q(\mathbf{z}_i)}\) (ELBO)

  • \(\text{KL}(q \| p) = -\sum_{\mathbf{z}_i} q(\mathbf{z}_i) \log \frac{p(\mathbf{z}_i | \mathbf{x}_i, \boldsymbol{\theta})}{q(\mathbf{z}_i)}\)

EM AlgorithmΒΆ

E-step: Set \(q(\mathbf{z}_i) = p(\mathbf{z}_i | \mathbf{x}_i, \boldsymbol{\theta}^{\text{old}})\)

  • This makes \(\text{KL}(q \| p) = 0\)

  • So \(\mathcal{L} = \log p(\mathbf{x}_i | \boldsymbol{\theta})\)

M-step: Maximize \(\mathcal{L}\) with respect to \(\boldsymbol{\theta}\): $\(\boldsymbol{\theta}^{\text{new}} = \arg\max_{\boldsymbol{\theta}} \sum_i \mathbb{E}_{q}[\log p(\mathbf{x}_i, \mathbf{z}_i | \boldsymbol{\theta})]\)$

GuaranteeΒΆ

Each iteration increases the log-likelihood (or keeps it constant): $\(\ell(\boldsymbol{\theta}^{\text{new}}) \geq \ell(\boldsymbol{\theta}^{\text{old}})\)$

4. EM for GMMΒΆ

E-step: Compute ResponsibilitiesΒΆ

\[r_{ik} = p(z_i = k | \mathbf{x}_i, \boldsymbol{\theta}^{\text{old}}) = \frac{\pi_k \mathcal{N}(\mathbf{x}_i | \boldsymbol{\mu}_k, \boldsymbol{\Sigma}_k)}{\sum_{j=1}^K \pi_j \mathcal{N}(\mathbf{x}_i | \boldsymbol{\mu}_j, \boldsymbol{\Sigma}_j)}\]

M-step: Update ParametersΒΆ

\[\pi_k = \frac{1}{N} \sum_{i=1}^N r_{ik}\]
\[\boldsymbol{\mu}_k = \frac{\sum_{i=1}^N r_{ik} \mathbf{x}_i}{\sum_{i=1}^N r_{ik}}\]
\[\boldsymbol{\Sigma}_k = \frac{\sum_{i=1}^N r_{ik} (\mathbf{x}_i - \boldsymbol{\mu}_k)(\mathbf{x}_i - \boldsymbol{\mu}_k)^T}{\sum_{i=1}^N r_{ik}}\]
class GaussianMixtureEM:
    """Gaussian Mixture Model with EM algorithm"""
    
    def __init__(self, n_components=3, max_iter=100, tol=1e-4, reg_covar=1e-6):
        self.n_components = n_components
        self.max_iter = max_iter
        self.tol = tol
        self.reg_covar = reg_covar  # For numerical stability
        
        self.weights = None
        self.means = None
        self.covariances = None
        self.responsibilities = None
        self.log_likelihood_history = []
    
    def _initialize_parameters(self, X):
        """Initialize parameters using k-means++"""
        n_samples, n_features = X.shape
        
        # Use k-means for initialization
        kmeans = KMeans(n_clusters=self.n_components, n_init=1, random_state=42)
        labels = kmeans.fit_predict(X)
        
        self.means = kmeans.cluster_centers_
        self.weights = np.zeros(self.n_components)
        self.covariances = np.zeros((self.n_components, n_features, n_features))
        
        for k in range(self.n_components):
            mask = labels == k
            self.weights[k] = np.sum(mask) / n_samples
            if np.sum(mask) > 0:
                X_k = X[mask]
                self.covariances[k] = np.cov(X_k.T) + self.reg_covar * np.eye(n_features)
            else:
                self.covariances[k] = np.eye(n_features)
    
    def _e_step(self, X):
        """E-step: Compute responsibilities"""
        n_samples = X.shape[0]
        self.responsibilities = np.zeros((n_samples, self.n_components))
        
        # Compute p(x|z=k) for all k
        for k in range(self.n_components):
            self.responsibilities[:, k] = self.weights[k] * \
                multivariate_normal.pdf(X, self.means[k], self.covariances[k])
        
        # Normalize to get p(z=k|x)
        total = self.responsibilities.sum(axis=1, keepdims=True)
        self.responsibilities /= total
        
        # Compute log-likelihood
        log_likelihood = np.sum(np.log(total))
        return log_likelihood
    
    def _m_step(self, X):
        """M-step: Update parameters"""
        n_samples, n_features = X.shape
        
        # Effective number of points assigned to each component
        N_k = self.responsibilities.sum(axis=0)
        
        # Update weights
        self.weights = N_k / n_samples
        
        # Update means
        self.means = (self.responsibilities.T @ X) / N_k[:, np.newaxis]
        
        # Update covariances
        for k in range(self.n_components):
            diff = X - self.means[k]
            weighted_diff = diff * np.sqrt(self.responsibilities[:, k])[:, np.newaxis]
            self.covariances[k] = (weighted_diff.T @ weighted_diff) / N_k[k]
            # Add regularization for numerical stability
            self.covariances[k] += self.reg_covar * np.eye(n_features)
    
    def fit(self, X):
        """Fit GMM using EM algorithm"""
        self._initialize_parameters(X)
        
        prev_log_likelihood = -np.inf
        
        for iteration in range(self.max_iter):
            # E-step
            log_likelihood = self._e_step(X)
            self.log_likelihood_history.append(log_likelihood)
            
            # Check convergence
            if abs(log_likelihood - prev_log_likelihood) < self.tol:
                break
            
            prev_log_likelihood = log_likelihood
            
            # M-step
            self._m_step(X)
        
        return self
    
    def predict_proba(self, X):
        """Predict component probabilities"""
        n_samples = X.shape[0]
        proba = np.zeros((n_samples, self.n_components))
        
        for k in range(self.n_components):
            proba[:, k] = self.weights[k] * \
                multivariate_normal.pdf(X, self.means[k], self.covariances[k])
        
        proba /= proba.sum(axis=1, keepdims=True)
        return proba
    
    def predict(self, X):
        """Predict component labels"""
        return np.argmax(self.predict_proba(X), axis=1)

# Fit custom GMM
gmm_custom = GaussianMixtureEM(n_components=3)
gmm_custom.fit(X)
labels_custom = gmm_custom.predict(X)

# Fit sklearn GMM
gmm_sklearn = GaussianMixture(n_components=3, random_state=42)
gmm_sklearn.fit(X)
labels_sklearn = gmm_sklearn.predict(X)

print("GMM EM Results:")
print(f"Converged in {len(gmm_custom.log_likelihood_history)} iterations")
print(f"Final log-likelihood: {gmm_custom.log_likelihood_history[-1]:.2f}")
print(f"\nLearned weights: {gmm_custom.weights}")
print(f"True weights: {true_weights}")
print(f"\nLearned means:\n{gmm_custom.means}")
print(f"\nTrue means:\n{true_means}")
# Visualize results
fig, axes = plt.subplots(2, 3, figsize=(18, 12))

# Custom GMM clustering
axes[0, 0].scatter(X[:, 0], X[:, 1], c=labels_custom, cmap='viridis', alpha=0.6, s=30)
axes[0, 0].scatter(gmm_custom.means[:, 0], gmm_custom.means[:, 1], 
                  c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[0, 0].set_xlabel('x₁')
axes[0, 0].set_ylabel('xβ‚‚')
axes[0, 0].set_title('GMM EM (Custom)')
axes[0, 0].grid(True, alpha=0.3)

# Sklearn GMM clustering
axes[0, 1].scatter(X[:, 0], X[:, 1], c=labels_sklearn, cmap='viridis', alpha=0.6, s=30)
axes[0, 1].scatter(gmm_sklearn.means_[:, 0], gmm_sklearn.means_[:, 1], 
                  c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[0, 1].set_xlabel('x₁')
axes[0, 1].set_ylabel('xβ‚‚')
axes[0, 1].set_title('GMM EM (sklearn)')
axes[0, 1].grid(True, alpha=0.3)

# True clustering
axes[0, 2].scatter(X[:, 0], X[:, 1], c=y_true, cmap='viridis', alpha=0.6, s=30)
axes[0, 2].scatter(true_means[:, 0], true_means[:, 1], 
                  c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[0, 2].set_xlabel('x₁')
axes[0, 2].set_ylabel('xβ‚‚')
axes[0, 2].set_title('True Components')
axes[0, 2].grid(True, alpha=0.3)

# Convergence plot
axes[1, 0].plot(gmm_custom.log_likelihood_history, 'b-', linewidth=2, marker='o')
axes[1, 0].set_xlabel('Iteration')
axes[1, 0].set_ylabel('Log-likelihood')
axes[1, 0].set_title('EM Convergence')
axes[1, 0].grid(True, alpha=0.3)

# Responsibility heatmap for one component
# Create a grid
x_min, x_max = X[:, 0].min() - 1, X[:, 0].max() + 1
y_min, y_max = X[:, 1].min() - 1, X[:, 1].max() + 1
xx, yy = np.meshgrid(np.linspace(x_min, x_max, 100),
                     np.linspace(y_min, y_max, 100))
X_grid = np.c_[xx.ravel(), yy.ravel()]
proba = gmm_custom.predict_proba(X_grid)

# Plot responsibility for component 0
Z = proba[:, 0].reshape(xx.shape)
im = axes[1, 1].contourf(xx, yy, Z, levels=20, cmap='viridis', alpha=0.8)
axes[1, 1].scatter(X[:, 0], X[:, 1], c='white', edgecolors='black', 
                  alpha=0.3, s=20, linewidth=0.5)
axes[1, 1].scatter(gmm_custom.means[0, 0], gmm_custom.means[0, 1], 
                  c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[1, 1].set_xlabel('x₁')
axes[1, 1].set_ylabel('xβ‚‚')
axes[1, 1].set_title('Responsibility p(z=0|x)')
plt.colorbar(im, ax=axes[1, 1])

# Density plot
def gmm_density(X, gmm):
    """Compute GMM density"""
    density = np.zeros(len(X))
    for k in range(gmm.n_components):
        density += gmm.weights[k] * \
            multivariate_normal.pdf(X, gmm.means[k], gmm.covariances[k])
    return density

density = gmm_density(X_grid, gmm_custom).reshape(xx.shape)
axes[1, 2].contourf(xx, yy, density, levels=20, cmap='viridis', alpha=0.8)
axes[1, 2].scatter(X[:, 0], X[:, 1], c='white', edgecolors='black', 
                  alpha=0.3, s=20, linewidth=0.5)
axes[1, 2].scatter(gmm_custom.means[:, 0], gmm_custom.means[:, 1], 
                  c='red', marker='X', s=200, edgecolors='black', linewidth=2)
axes[1, 2].set_xlabel('x₁')
axes[1, 2].set_ylabel('xβ‚‚')
axes[1, 2].set_title('Learned Density p(x)')

plt.tight_layout()
plt.show()

5. Model Selection (BIC, AIC)ΒΆ

How do we choose the number of components \(K\)?

Bayesian Information Criterion (BIC)ΒΆ

\[\text{BIC} = -2 \log p(\mathcal{D}|\hat{\boldsymbol{\theta}}) + p \log N\]

where \(p\) = number of parameters

Akaike Information Criterion (AIC)ΒΆ

\[\text{AIC} = -2 \log p(\mathcal{D}|\hat{\boldsymbol{\theta}}) + 2p\]

Lower is better for both criteria.

# Model selection
def compute_bic_aic(X, gmm):
    """Compute BIC and AIC for GMM"""
    n_samples, n_features = X.shape
    
    # Log-likelihood
    log_likelihood = gmm.log_likelihood_history[-1]
    
    # Number of parameters
    # K-1 mixing coefficients + K*d means + K*d*(d+1)/2 covariance parameters
    n_params = (gmm.n_components - 1) + \
               gmm.n_components * n_features + \
               gmm.n_components * n_features * (n_features + 1) // 2
    
    bic = -2 * log_likelihood + n_params * np.log(n_samples)
    aic = -2 * log_likelihood + 2 * n_params
    
    return bic, aic, log_likelihood

# Try different numbers of components
k_values = range(1, 8)
bic_scores = []
aic_scores = []
log_likelihoods = []

for k in k_values:
    gmm = GaussianMixtureEM(n_components=k, max_iter=100)
    gmm.fit(X)
    bic, aic, ll = compute_bic_aic(X, gmm)
    bic_scores.append(bic)
    aic_scores.append(aic)
    log_likelihoods.append(ll)

# Visualize
fig, axes = plt.subplots(1, 3, figsize=(16, 5))

axes[0].plot(k_values, bic_scores, 'b-o', linewidth=2, markersize=8)
axes[0].axvline(x=3, color='r', linestyle='--', label='True K=3')
axes[0].set_xlabel('Number of Components (K)')
axes[0].set_ylabel('BIC')
axes[0].set_title('Bayesian Information Criterion\n(lower is better)')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

axes[1].plot(k_values, aic_scores, 'g-o', linewidth=2, markersize=8)
axes[1].axvline(x=3, color='r', linestyle='--', label='True K=3')
axes[1].set_xlabel('Number of Components (K)')
axes[1].set_ylabel('AIC')
axes[1].set_title('Akaike Information Criterion\n(lower is better)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

axes[2].plot(k_values, log_likelihoods, 'purple', linewidth=2, marker='o', markersize=8)
axes[2].axvline(x=3, color='r', linestyle='--', label='True K=3')
axes[2].set_xlabel('Number of Components (K)')
axes[2].set_ylabel('Log-likelihood')
axes[2].set_title('Log-likelihood\n(higher is better, but overfits)')
axes[2].legend()
axes[2].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

best_k_bic = k_values[np.argmin(bic_scores)]
best_k_aic = k_values[np.argmin(aic_scores)]

print("Model Selection Results:")
print(f"Best K according to BIC: {best_k_bic}")
print(f"Best K according to AIC: {best_k_aic}")
print(f"True K: 3")
print(f"\nNote: Log-likelihood always increases with K (overfitting)")
print(f"BIC/AIC penalize complexity to prevent overfitting")

6. Mixture of BernoullisΒΆ

GMM is for continuous data. For binary data, use Mixture of Bernoullis.

ModelΒΆ

\[p(\mathbf{x}) = \sum_{k=1}^K \pi_k \prod_{j=1}^d \text{Ber}(x_j | \mu_{kj})\]

where \(\mu_{kj}\) is the probability of feature \(j\) being 1 in component \(k\).

Application: Image clusteringΒΆ

# Generate binary image data
def generate_binary_patterns(n_samples=300):
    """Generate binary patterns (simplified images)"""
    np.random.seed(42)
    
    # Three patterns: vertical bars, horizontal bars, checkerboard
    patterns = []
    labels = []
    
    # Pattern 1: Vertical bars
    for _ in range(n_samples // 3):
        img = np.random.rand(8, 8) < 0.2
        img[:, [1, 3, 5, 7]] = True
        patterns.append(img.ravel())
        labels.append(0)
    
    # Pattern 2: Horizontal bars
    for _ in range(n_samples // 3):
        img = np.random.rand(8, 8) < 0.2
        img[[1, 3, 5, 7], :] = True
        patterns.append(img.ravel())
        labels.append(1)
    
    # Pattern 3: Checkerboard
    for _ in range(n_samples // 3):
        img = np.zeros((8, 8), dtype=bool)
        for i in range(8):
            for j in range(8):
                if (i + j) % 2 == 0:
                    img[i, j] = True
        # Add noise
        noise = np.random.rand(8, 8) < 0.1
        img = np.logical_xor(img, noise)
        patterns.append(img.ravel())
        labels.append(2)
    
    X = np.array(patterns, dtype=float)
    y = np.array(labels)
    
    # Shuffle
    idx = np.random.permutation(len(X))
    return X[idx], y[idx]

X_binary, y_binary = generate_binary_patterns()

# Visualize samples
fig, axes = plt.subplots(3, 6, figsize=(12, 6))
for i in range(3):
    for j in range(6):
        idx = np.where(y_binary == i)[0][j]
        axes[i, j].imshow(X_binary[idx].reshape(8, 8), cmap='gray')
        axes[i, j].axis('off')
        if j == 0:
            axes[i, j].set_ylabel(f'Pattern {i}', fontsize=12)

plt.suptitle('Sample Binary Patterns', fontsize=14)
plt.tight_layout()
plt.show()

print(f"Dataset: {len(X_binary)} binary images of size 8x8")
print(f"Feature dimension: {X_binary.shape[1]}")
class MixtureOfBernoullis:
    """Mixture of Bernoulli distributions"""
    
    def __init__(self, n_components=3, max_iter=100, tol=1e-4):
        self.n_components = n_components
        self.max_iter = max_iter
        self.tol = tol
        
        self.weights = None
        self.theta = None  # P(x_j=1|z=k) for each component k, feature j
        self.log_likelihood_history = []
    
    def fit(self, X):
        n_samples, n_features = X.shape
        
        # Initialize randomly
        self.weights = np.ones(self.n_components) / self.n_components
        self.theta = np.random.rand(self.n_components, n_features) * 0.4 + 0.3
        
        prev_ll = -np.inf
        
        for iteration in range(self.max_iter):
            # E-step: Compute responsibilities
            log_resp = np.zeros((n_samples, self.n_components))
            
            for k in range(self.n_components):
                # Log p(x|z=k) = sum_j [x_j log theta_kj + (1-x_j) log(1-theta_kj)]
                log_prob = X @ np.log(self.theta[k] + 1e-10) + \
                          (1 - X) @ np.log(1 - self.theta[k] + 1e-10)
                log_resp[:, k] = np.log(self.weights[k] + 1e-10) + log_prob
            
            # Normalize
            log_sum = np.logaddexp.reduce(log_resp, axis=1, keepdims=True)
            responsibilities = np.exp(log_resp - log_sum)
            
            # Log-likelihood
            log_likelihood = np.sum(log_sum)
            self.log_likelihood_history.append(log_likelihood)
            
            # Check convergence
            if abs(log_likelihood - prev_ll) < self.tol:
                break
            prev_ll = log_likelihood
            
            # M-step: Update parameters
            N_k = responsibilities.sum(axis=0)
            self.weights = N_k / n_samples
            self.theta = (responsibilities.T @ X) / N_k[:, np.newaxis]
            
            # Clip to avoid numerical issues
            self.theta = np.clip(self.theta, 1e-10, 1 - 1e-10)
        
        return self
    
    def predict(self, X):
        n_samples = X.shape[0]
        log_resp = np.zeros((n_samples, self.n_components))
        
        for k in range(self.n_components):
            log_prob = X @ np.log(self.theta[k] + 1e-10) + \
                      (1 - X) @ np.log(1 - self.theta[k] + 1e-10)
            log_resp[:, k] = np.log(self.weights[k] + 1e-10) + log_prob
        
        return np.argmax(log_resp, axis=1)

# Fit model
mob = MixtureOfBernoullis(n_components=3)
mob.fit(X_binary)
labels_mob = mob.predict(X_binary)

# Visualize learned patterns
fig, axes = plt.subplots(2, 3, figsize=(12, 8))

# Top row: learned patterns (theta)
for k in range(3):
    axes[0, k].imshow(mob.theta[k].reshape(8, 8), cmap='gray', vmin=0, vmax=1)
    axes[0, k].set_title(f'Learned Pattern {k}\nWeight: {mob.weights[k]:.3f}')
    axes[0, k].axis('off')

# Bottom row: example images assigned to each cluster
for k in range(3):
    idx = np.where(labels_mob == k)[0]
    if len(idx) > 0:
        # Show average of assigned images
        avg_img = X_binary[idx].mean(axis=0)
        axes[1, k].imshow(avg_img.reshape(8, 8), cmap='gray', vmin=0, vmax=1)
        axes[1, k].set_title(f'Average of {len(idx)} assigned images')
    axes[1, k].axis('off')

plt.tight_layout()
plt.show()

# Convergence
plt.figure(figsize=(10, 5))
plt.plot(mob.log_likelihood_history, linewidth=2, marker='o')
plt.xlabel('Iteration')
plt.ylabel('Log-likelihood')
plt.title('Mixture of Bernoullis Convergence')
plt.grid(True, alpha=0.3)
plt.show()

print(f"Converged in {len(mob.log_likelihood_history)} iterations")

SummaryΒΆ

In this notebook, we covered:

  1. Gaussian Mixture Models: Soft clustering with probabilistic assignments

  2. K-Means: Hard EM algorithm with spherical clusters

  3. EM Algorithm: General framework for maximum likelihood with latent variables

  4. EM for GMM: Detailed derivation and implementation

  5. Model Selection: Using BIC/AIC to choose number of components

  6. Mixture of Bernoullis: Extension to binary data

Key TakeawaysΒΆ

EM AlgorithmΒΆ

  • E-step: Compute posterior over latent variables (responsibilities)

  • M-step: Maximize expected complete-data log-likelihood

  • Guarantee: Monotonically increases likelihood

  • Limitation: Can get stuck in local optima

GMM vs K-MeansΒΆ

Feature

K-Means

GMM

Assignments

Hard

Soft (probabilistic)

Cluster shape

Spherical

Elliptical

Covariance

Same for all

Different per component

Uncertainty

No

Yes

InitializationΒΆ

  • Use k-means++ or multiple random restarts

  • EM is sensitive to initialization

  • Run multiple times and pick best log-likelihood

Model SelectionΒΆ

  • BIC: More conservative (larger penalty)

  • AIC: More liberal (smaller penalty)

  • Cross-validation for predictive tasks

ExtensionsΒΆ

  • Diagonal covariances: Faster, more stable

  • Tied covariances: All components share same shape

  • Bayesian GMM: Put priors on parameters

  • Infinite mixtures: Dirichlet process mixtures

  • Other distributions: Student-t, von Mises, etc.

ExercisesΒΆ

  1. Implement diagonal covariance GMM

  2. Add multiple random restarts to avoid local optima

  3. Implement cross-validation for model selection

  4. Apply GMM to real data (e.g., iris dataset)

  5. Implement mixture of multinomials for text clustering