import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
import pandas as pd
sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
np.random.seed(42)
2. Forward Algorithm (Filtering)ΒΆ
Compute \(p(\mathbf{x}_{1:T})\) and \(p(z_t|\mathbf{x}_{1:t})\) efficiently.
Forward MessageΒΆ
RecursionΒΆ
Base case: \(\alpha_1(j) = \pi_j p(x_1|z_1=j)\)
Recursive case: $\(\alpha_t(j) = p(x_t|z_t=j) \sum_{i=1}^K \alpha_{t-1}(i) A_{ij}\)$
Marginal LikelihoodΒΆ
def forward_algorithm(hmm, observations):
"""
Forward algorithm for HMM
Returns:
- alpha: Forward probabilities [T x n_states]
- log_likelihood: Log p(observations)
"""
T = len(observations)
n_states = hmm.n_states
# Use log probabilities for numerical stability
log_alpha = np.zeros((T, n_states))
# Base case: t=0
log_alpha[0] = np.log(hmm.pi) + np.log(hmm.B[:, observations[0]])
# Recursive case
for t in range(1, T):
for j in range(n_states):
# log(sum(exp(log_vals))) using log-sum-exp trick
log_vals = log_alpha[t-1] + np.log(hmm.A[:, j])
log_alpha[t, j] = np.logaddexp.reduce(log_vals) + np.log(hmm.B[j, observations[t]])
# Marginal likelihood
log_likelihood = np.logaddexp.reduce(log_alpha[-1])
# Convert back to probabilities
alpha = np.exp(log_alpha)
# Normalize each time step
alpha = alpha / alpha.sum(axis=1, keepdims=True)
return alpha, log_likelihood
# Run forward algorithm
alpha, log_likelihood = forward_algorithm(weather_hmm, observations)
print(f"Forward Algorithm Results:")
print(f"Log-likelihood: {log_likelihood:.4f}")
print(f"Likelihood: {np.exp(log_likelihood):.6e}")
# Visualize forward probabilities (filtering)
fig, axes = plt.subplots(3, 1, figsize=(14, 10))
# True states
axes[0].plot(true_states, 'o-', linewidth=2, markersize=8, color='blue')
axes[0].set_yticks([0, 1])
axes[0].set_yticklabels(state_names)
axes[0].set_ylabel('True State')
axes[0].set_title('True Hidden States', fontweight='bold')
axes[0].grid(True, alpha=0.3)
# Filtered probabilities
for state_idx, name in enumerate(state_names):
axes[1].plot(alpha[:, state_idx], linewidth=2, label=name, marker='o', markersize=4)
axes[1].set_ylabel('Probability')
axes[1].set_title('Filtered State Probabilities p(z_t | x_{1:t})', fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Most likely state from filtering
filtered_states = np.argmax(alpha, axis=1)
axes[2].plot(filtered_states, 's-', linewidth=2, markersize=8, color='green')
axes[2].set_yticks([0, 1])
axes[2].set_yticklabels(state_names)
axes[2].set_xlabel('Time')
axes[2].set_ylabel('Predicted State')
axes[2].set_title('Most Likely State (from Filtering)', fontweight='bold')
axes[2].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
# Accuracy
accuracy = np.mean(filtered_states == true_states)
print(f"\nFiltering accuracy: {accuracy:.2%}")
3. Backward AlgorithmΒΆ
Backward MessageΒΆ
RecursionΒΆ
Base case: \(\beta_T(i) = 1\)
Recursive case: $\(\beta_t(i) = \sum_{j=1}^K A_{ij} p(x_{t+1}|z_{t+1}=j) \beta_{t+1}(j)\)$
SmoothingΒΆ
Combining forward and backward:
def backward_algorithm(hmm, observations):
"""
Backward algorithm for HMM
Returns:
- beta: Backward probabilities [T x n_states]
"""
T = len(observations)
n_states = hmm.n_states
# Use log probabilities
log_beta = np.zeros((T, n_states))
# Base case: t=T (log(1) = 0)
log_beta[-1] = 0
# Recursive case (backward in time)
for t in range(T-2, -1, -1):
for i in range(n_states):
log_vals = (np.log(hmm.A[i, :]) +
np.log(hmm.B[:, observations[t+1]]) +
log_beta[t+1])
log_beta[t, i] = np.logaddexp.reduce(log_vals)
beta = np.exp(log_beta)
return beta
def forward_backward(hmm, observations):
"""
Forward-backward algorithm (smoothing)
Returns:
- gamma: Smoothed state probabilities p(z_t | x_{1:T})
- xi: Pairwise probabilities p(z_t, z_{t+1} | x_{1:T})
"""
T = len(observations)
n_states = hmm.n_states
# Run forward and backward
alpha, log_likelihood = forward_algorithm(hmm, observations)
beta = backward_algorithm(hmm, observations)
# Smoothed probabilities: Ξ³_t(j) = Ξ±_t(j) Ξ²_t(j) / p(x_{1:T})
gamma = alpha * beta
gamma = gamma / gamma.sum(axis=1, keepdims=True)
# Pairwise probabilities ΞΎ_t(i,j) = p(z_t=i, z_{t+1}=j | x_{1:T})
xi = np.zeros((T-1, n_states, n_states))
for t in range(T-1):
for i in range(n_states):
for j in range(n_states):
xi[t, i, j] = (alpha[t, i] *
hmm.A[i, j] *
hmm.B[j, observations[t+1]] *
beta[t+1, j])
xi[t] = xi[t] / xi[t].sum()
return gamma, xi, log_likelihood
# Run forward-backward
gamma, xi, _ = forward_backward(weather_hmm, observations)
# Visualize smoothing vs filtering
fig, axes = plt.subplots(4, 1, figsize=(14, 12))
# True states
axes[0].plot(true_states, 'o-', linewidth=2, markersize=8, color='blue')
axes[0].set_yticks([0, 1])
axes[0].set_yticklabels(state_names)
axes[0].set_ylabel('True State')
axes[0].set_title('True Hidden States', fontweight='bold')
axes[0].grid(True, alpha=0.3)
# Filtered probabilities
for state_idx, name in enumerate(state_names):
axes[1].plot(alpha[:, state_idx], linewidth=2, label=name, alpha=0.7)
axes[1].set_ylabel('Probability')
axes[1].set_title('Filtered: p(z_t | x_{1:t}) - Only past observations', fontweight='bold')
axes[1].legend()
axes[1].grid(True, alpha=0.3)
# Smoothed probabilities
for state_idx, name in enumerate(state_names):
axes[2].plot(gamma[:, state_idx], linewidth=2, label=name, marker='o', markersize=4)
axes[2].set_ylabel('Probability')
axes[2].set_title('Smoothed: p(z_t | x_{1:T}) - All observations', fontweight='bold')
axes[2].legend()
axes[2].grid(True, alpha=0.3)
# Most likely smoothed states
smoothed_states = np.argmax(gamma, axis=1)
axes[3].plot(smoothed_states, 's-', linewidth=2, markersize=8, color='purple')
axes[3].set_yticks([0, 1])
axes[3].set_yticklabels(state_names)
axes[3].set_xlabel('Time')
axes[3].set_ylabel('Predicted State')
axes[3].set_title('Most Likely State (from Smoothing)', fontweight='bold')
axes[3].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("Comparison:")
print(f"Filtering accuracy: {np.mean(filtered_states == true_states):.2%}")
print(f"Smoothing accuracy: {np.mean(smoothed_states == true_states):.2%}")
print("\nSmoothing uses all observations β better estimates!")
4. Viterbi Algorithm (Decoding)ΒΆ
Find the most likely state sequence:
AlgorithmΒΆ
Base case: \(\delta_1(j) = \pi_j p(x_1|z_1=j)\)
Recursion: $\(\delta_t(j) = \max_i [\delta_{t-1}(i) A_{ij}] p(x_t|z_t=j)\)$
Backtracking: Store argmax at each step, then trace back from end.
def viterbi_algorithm(hmm, observations):
"""
Viterbi algorithm for finding most likely state sequence
Returns:
- path: Most likely state sequence
- log_prob: Log probability of this sequence
"""
T = len(observations)
n_states = hmm.n_states
# Use log probabilities
log_delta = np.zeros((T, n_states))
psi = np.zeros((T, n_states), dtype=int)
# Base case
log_delta[0] = np.log(hmm.pi) + np.log(hmm.B[:, observations[0]])
# Recursion
for t in range(1, T):
for j in range(n_states):
# Find max over previous states
log_vals = log_delta[t-1] + np.log(hmm.A[:, j])
psi[t, j] = np.argmax(log_vals)
log_delta[t, j] = log_vals[psi[t, j]] + np.log(hmm.B[j, observations[t]])
# Backtracking
path = np.zeros(T, dtype=int)
path[-1] = np.argmax(log_delta[-1])
for t in range(T-2, -1, -1):
path[t] = psi[t+1, path[t+1]]
log_prob = np.max(log_delta[-1])
return path, log_prob
# Run Viterbi
viterbi_path, viterbi_log_prob = viterbi_algorithm(weather_hmm, observations)
print(f"Viterbi Algorithm Results:")
print(f"Log probability of best path: {viterbi_log_prob:.4f}")
print(f"Viterbi accuracy: {np.mean(viterbi_path == true_states):.2%}")
# Compare all methods
fig, axes = plt.subplots(5, 1, figsize=(14, 14))
# True states
axes[0].plot(true_states, 'o-', linewidth=2, markersize=8, color='blue')
axes[0].set_yticks([0, 1])
axes[0].set_yticklabels(state_names)
axes[0].set_ylabel('State')
axes[0].set_title('True Hidden States', fontweight='bold')
axes[0].grid(True, alpha=0.3)
# Observations
axes[1].plot(observations, 's-', linewidth=2, markersize=8, color='red')
axes[1].set_yticks([0, 1, 2])
axes[1].set_yticklabels(obs_names)
axes[1].set_ylabel('Activity')
axes[1].set_title('Observations', fontweight='bold')
axes[1].grid(True, alpha=0.3)
# Filtered
axes[2].plot(filtered_states, '^-', linewidth=2, markersize=8, color='green', alpha=0.7)
axes[2].set_yticks([0, 1])
axes[2].set_yticklabels(state_names)
axes[2].set_ylabel('State')
axes[2].set_title(f'Filtered (Online) - Acc: {np.mean(filtered_states == true_states):.1%}',
fontweight='bold')
axes[2].grid(True, alpha=0.3)
# Smoothed
axes[3].plot(smoothed_states, 's-', linewidth=2, markersize=8, color='purple', alpha=0.7)
axes[3].set_yticks([0, 1])
axes[3].set_yticklabels(state_names)
axes[3].set_ylabel('State')
axes[3].set_title(f'Smoothed (Offline, Marginal) - Acc: {np.mean(smoothed_states == true_states):.1%}',
fontweight='bold')
axes[3].grid(True, alpha=0.3)
# Viterbi
axes[4].plot(viterbi_path, 'd-', linewidth=2, markersize=8, color='orange')
axes[4].set_yticks([0, 1])
axes[4].set_yticklabels(state_names)
axes[4].set_xlabel('Time')
axes[4].set_ylabel('State')
axes[4].set_title(f'Viterbi (Offline, Joint MAP) - Acc: {np.mean(viterbi_path == true_states):.1%}',
fontweight='bold')
axes[4].grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
print("\nKey Differences:")
print("- Filtering: Uses only past observations (online)")
print("- Smoothing: Uses all observations, marginal probabilities")
print("- Viterbi: Uses all observations, finds single best path (joint MAP)")
5. Baum-Welch Algorithm (Learning)ΒΆ
Learn HMM parameters from observations using EM algorithm.
E-StepΒΆ
Compute expected sufficient statistics using forward-backward:
\(\gamma_t(i) = p(z_t=i|\mathbf{x}_{1:T})\)
\(\xi_t(i,j) = p(z_t=i, z_{t+1}=j|\mathbf{x}_{1:T})\)
M-StepΒΆ
Update parameters:
def baum_welch(observations, n_states, n_observations, n_iterations=50, verbose=True):
"""
Baum-Welch algorithm (EM for HMM)
Returns:
- hmm: Learned HMM
- log_likelihoods: Log-likelihood at each iteration
"""
# Initialize HMM randomly
hmm = DiscreteHMM(n_states, n_observations)
log_likelihoods = []
T = len(observations)
for iteration in range(n_iterations):
# E-step: Forward-backward
gamma, xi, log_likelihood = forward_backward(hmm, observations)
log_likelihoods.append(log_likelihood)
if verbose and iteration % 10 == 0:
print(f"Iteration {iteration}: log-likelihood = {log_likelihood:.4f}")
# M-step: Update parameters
# Update initial distribution
hmm.pi = gamma[0]
# Update transition matrix
for i in range(n_states):
for j in range(n_states):
hmm.A[i, j] = np.sum(xi[:, i, j]) / np.sum(gamma[:-1, i])
# Update emission matrix
for j in range(n_states):
for k in range(n_observations):
mask = (observations == k)
hmm.B[j, k] = np.sum(gamma[mask, j]) / np.sum(gamma[:, j])
# Normalize (for numerical stability)
hmm.A = hmm.A / hmm.A.sum(axis=1, keepdims=True)
hmm.B = hmm.B / hmm.B.sum(axis=1, keepdims=True)
return hmm, log_likelihoods
# Generate training data
np.random.seed(42)
train_states, train_obs = weather_hmm.sample(500)
print("Learning HMM from observations...\n")
learned_hmm, log_likelihoods = baum_welch(train_obs, n_states=2, n_observations=3,
n_iterations=50, verbose=True)
# Compare learned vs true parameters
print("\n" + "="*60)
print("PARAMETER COMPARISON")
print("="*60)
print("\nInitial Distribution:")
print(f"True: {weather_hmm.pi}")
print(f"Learned: {learned_hmm.pi}")
print("\nTransition Matrix:")
print("True:")
print(pd.DataFrame(weather_hmm.A, index=state_names, columns=state_names))
print("\nLearned:")
print(pd.DataFrame(learned_hmm.A, index=state_names, columns=state_names))
print("\nEmission Matrix:")
print("True:")
print(pd.DataFrame(weather_hmm.B, index=state_names, columns=obs_names))
print("\nLearned:")
print(pd.DataFrame(learned_hmm.B, index=state_names, columns=obs_names))
# Plot learning curve
plt.figure(figsize=(12, 5))
plt.plot(log_likelihoods, linewidth=2)
plt.xlabel('Iteration')
plt.ylabel('Log-Likelihood')
plt.title('Baum-Welch Learning Curve', fontweight='bold')
plt.grid(True, alpha=0.3)
plt.tight_layout()
plt.show()
6. Application: Part-of-Speech TaggingΒΆ
What: This code applies the HMM framework to part-of-speech (POS) tagging β assigning grammatical labels (DET, NOUN, VERB, ADJ) to each word in a sentence. The hidden states are POS tags, the observations are words, and the Viterbi algorithm decodes the most likely tag sequence given a sentence.
Why: POS tagging is a classic sequence labeling problem that maps directly onto the HMM framework. The transition matrix \(\mathbf{A}\) encodes grammar rules (e.g., determiners are likely followed by nouns or adjectives), while the emission matrix \(\mathbf{B}\) encodes lexical preferences (e.g., βtheβ is almost always a determiner). The Viterbi algorithm finds the globally optimal tag sequence by considering all words simultaneously, rather than greedily tagging each word independently.
Connection: HMM-based POS taggers were the standard in NLP for decades and still serve as baselines. Modern systems use conditional random fields (CRFs) or neural sequence models (BiLSTM-CRF, transformers), but the HMM formulation provides the conceptual foundation for all structured prediction in NLP, including named entity recognition, chunking, and information extraction.
# Simple POS tagging example
# States: Noun, Verb, Adjective
# Observations: Different words
# Create a toy corpus
sentences = [
"the cat runs fast",
"the dog runs",
"fast cats run",
"the fast dog",
]
# True POS tags
true_tags = [
["DET", "NOUN", "VERB", "ADJ"],
["DET", "NOUN", "VERB"],
["ADJ", "NOUN", "VERB"],
["DET", "ADJ", "NOUN"],
]
# Build vocabulary
vocab = sorted(set(" ".join(sentences).split()))
word_to_idx = {w: i for i, w in enumerate(vocab)}
tag_list = ["DET", "NOUN", "VERB", "ADJ"]
tag_to_idx = {t: i for i, t in enumerate(tag_list)}
print("POS Tagging Example")
print(f"\nVocabulary: {vocab}")
print(f"POS Tags: {tag_list}")
# Create HMM for POS tagging
pos_hmm = DiscreteHMM(n_states=4, n_observations=len(vocab))
# Set reasonable parameters (normally learned from data)
pos_hmm.pi = np.array([0.5, 0.2, 0.1, 0.2]) # Start often with DET
# Transition matrix (simplified grammar rules)
# DET -> NOUN/ADJ likely
# NOUN -> VERB likely
# VERB -> ADV/end likely
# ADJ -> NOUN likely
pos_hmm.A = np.array([
[0.1, 0.5, 0.1, 0.3], # From DET
[0.2, 0.2, 0.5, 0.1], # From NOUN
[0.3, 0.3, 0.1, 0.3], # From VERB
[0.2, 0.6, 0.1, 0.1], # From ADJ
])
# Emission matrix (word likelihoods for each POS)
pos_hmm.B = np.array([
[0.0, 0.05, 0.05, 0.05, 0.05, 0.8], # DET: "the" very likely
[0.3, 0.3, 0.05, 0.05, 0.3, 0.0], # NOUN: cat, cats, dog
[0.05, 0.05, 0.05, 0.4, 0.4, 0.05], # VERB: run, runs
[0.5, 0.05, 0.4, 0.05, 0.0, 0.0], # ADJ: fast
])
pos_hmm.B = pos_hmm.B / pos_hmm.B.sum(axis=1, keepdims=True)
# Test on a sentence
test_sentence = "the fast cat runs"
test_words = test_sentence.split()
test_obs = np.array([word_to_idx[w] for w in test_words])
# Use Viterbi to find most likely POS sequence
predicted_tags, _ = viterbi_algorithm(pos_hmm, test_obs)
predicted_tag_names = [tag_list[i] for i in predicted_tags]
print(f"\nTest sentence: {test_sentence}")
print(f"\nPredicted POS tags:")
for word, tag in zip(test_words, predicted_tag_names):
print(f" {word:10s} -> {tag}")
print("\nNote: This is a simplified example.")
print("Real POS taggers use large corpora and more sophisticated models.")
SummaryΒΆ
In this notebook, we covered:
HMM Definition: States, observations, transitions, emissions
Forward Algorithm: Filtering and computing marginal likelihood
Backward Algorithm: Used with forward for smoothing
Viterbi Algorithm: Finding most likely state sequence (decoding)
Baum-Welch Algorithm: Learning HMM parameters (EM)
Applications: Weather modeling, POS tagging
Key TakeawaysΒΆ
Three fundamental problems: Evaluation, Decoding, Learning
Filtering vs Smoothing:
Filtering: Online, uses past observations
Smoothing: Offline, uses all observations, more accurate
Smoothing vs Viterbi:
Smoothing: Marginal probabilities, each state independently
Viterbi: Joint MAP, finds single best path
Baum-Welch: EM algorithm for learning HMM parameters
Applications: Speech recognition, POS tagging, bioinformatics, finance
Computational ComplexityΒΆ
For sequence of length T with K states:
Forward/Backward: O(TKΒ²)
Viterbi: O(TKΒ²)
Baum-Welch iteration: O(TKΒ²)
LimitationsΒΆ
Assumes observations are conditionally independent given states
Limited by discrete states
Markov assumption may be too strong
ExtensionsΒΆ
Gaussian HMMs: Continuous observations
Conditional Random Fields: Discriminative alternative
Recurrent Neural Networks: Modern deep learning alternative
ExercisesΒΆ
Implement a Gaussian HMM for continuous observations
Derive the forward-backward equations
Implement the scaled forward-backward algorithm
Create an HMM for DNA sequence analysis
Compare HMM vs CRF for sequence labeling