import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.naive_bayes import GaussianNB, MultinomialNB
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, confusion_matrix, classification_report
from sklearn.datasets import load_iris, fetch_20newsgroups
from sklearn.feature_extraction.text import CountVectorizer
import pandas as pd
import networkx as nx

sns.set_style('whitegrid')
plt.rcParams['figure.figsize'] = (12, 6)
np.random.seed(42)

1. Bayesian Network BasicsΒΆ

DefinitionΒΆ

A Bayesian Network (BN) is a directed acyclic graph (DAG) where:

  • Nodes represent random variables

  • Edges represent direct dependencies

  • Each node has a conditional probability distribution (CPD)

FactorizationΒΆ

The joint distribution factorizes as:

\[p(X_1, X_2, \ldots, X_n) = \prod_{i=1}^n p(X_i | \text{Pa}(X_i))\]

Where \(\text{Pa}(X_i)\) are the parents of \(X_i\) in the graph.

Example: Student NetworkΒΆ

  • Difficulty (D) β†’ Grade (G)

  • Intelligence (I) β†’ Grade (G)

  • Grade (G) β†’ Letter (L)

\[p(D, I, G, L) = p(D) \cdot p(I) \cdot p(G|D, I) \cdot p(L|G)\]
# Visualize a simple Bayesian Network
def plot_bayesian_network(edges, node_labels=None, title="Bayesian Network"):
    """Plot a directed acyclic graph"""
    G = nx.DiGraph()
    G.add_edges_from(edges)
    
    plt.figure(figsize=(10, 6))
    pos = nx.spring_layout(G, seed=42)
    
    # Draw nodes
    nx.draw_networkx_nodes(G, pos, node_size=2000, node_color='lightblue',
                          edgecolors='black', linewidths=2)
    
    # Draw edges
    nx.draw_networkx_edges(G, pos, edge_color='gray', arrows=True,
                          arrowsize=20, arrowstyle='->', width=2)
    
    # Draw labels
    if node_labels is None:
        node_labels = {node: str(node) for node in G.nodes()}
    nx.draw_networkx_labels(G, pos, node_labels, font_size=12, font_weight='bold')
    
    plt.title(title, fontsize=14, fontweight='bold')
    plt.axis('off')
    plt.tight_layout()
    plt.show()

# Example 1: Student Network
student_edges = [
    ('Difficulty', 'Grade'),
    ('Intelligence', 'Grade'),
    ('Intelligence', 'SAT'),
    ('Grade', 'Letter')
]

plot_bayesian_network(student_edges, title="Student Bayesian Network")

print("Student Network Factorization:")
print("p(D, I, G, S, L) = p(D) Β· p(I) Β· p(G|D,I) Β· p(S|I) Β· p(L|G)")
print("\nIndependencies:")
print("- D βŠ₯ I (Difficulty independent of Intelligence)")
print("- L βŠ₯ D,I | G (Letter independent of D,I given Grade)")
print("- S βŠ₯ D,G,L | I (SAT independent of others given Intelligence)")
# Example 2: Medical Diagnosis Network
medical_edges = [
    ('Smoking', 'Cancer'),
    ('Smoking', 'Bronchitis'),
    ('Cancer', 'X-ray'),
    ('Cancer', 'Dyspnea'),
    ('Bronchitis', 'Dyspnea')
]

plot_bayesian_network(medical_edges, title="Medical Diagnosis Network")

print("Medical Network Factorization:")
print("p(S, C, B, X, D) = p(S) Β· p(C|S) Β· p(B|S) Β· p(X|C) Β· p(D|C,B)")
print("\nKey relationships:")
print("- Smoking causes both Cancer and Bronchitis")
print("- Cancer and Bronchitis both cause Dyspnea")
print("- X-ray is affected by Cancer")

2. Conditional Independence and d-SeparationΒΆ

Three Basic StructuresΒΆ

  1. Chain: \(X \to Y \to Z\)

    • \(X \perp Z | Y\) (X independent of Z given Y)

  2. Fork: \(X \leftarrow Y \rightarrow Z\)

    • \(X \perp Z | Y\) (Y is common cause)

  3. Collider (V-structure): \(X \to Y \leftarrow Z\)

    • \(X \perp Z\) (unconditionally independent)

    • \(X \not\perp Z | Y\) (conditioning creates dependence!)

d-SeparationΒΆ

Two sets of nodes A and B are d-separated by C if all paths between A and B are β€œblocked” by C.

A path is blocked if:

  1. It contains a chain or fork with the middle node in C, OR

  2. It contains a collider NOT in C (and no descendants of the collider in C)

# Demonstrate the three basic structures
fig, axes = plt.subplots(1, 3, figsize=(16, 4))

# 1. Chain
plt.sca(axes[0])
plot_bayesian_network([('X', 'Y'), ('Y', 'Z')], title="Chain: X β†’ Y β†’ Z")
plt.sca(axes[0])
plt.text(0.5, -0.15, 'X βŠ₯ Z | Y', transform=axes[0].transAxes,
         ha='center', fontsize=12, bbox=dict(boxstyle='round', facecolor='wheat'))

# 2. Fork  
plt.sca(axes[1])
plot_bayesian_network([('Y', 'X'), ('Y', 'Z')], title="Fork: X ← Y β†’ Z")
plt.sca(axes[1])
plt.text(0.5, -0.15, 'X βŠ₯ Z | Y', transform=axes[1].transAxes,
         ha='center', fontsize=12, bbox=dict(boxstyle='round', facecolor='wheat'))

# 3. Collider
plt.sca(axes[2])
plot_bayesian_network([('X', 'Y'), ('Z', 'Y')], title="Collider: X β†’ Y ← Z")
plt.sca(axes[2])
plt.text(0.5, -0.15, 'X βŠ₯ Z (unconditionally)\nX βŠ₯ΜΈ Z | Y (dependent!)', 
         transform=axes[2].transAxes, ha='center', fontsize=12,
         bbox=dict(boxstyle='round', facecolor='lightcoral'))

plt.tight_layout()

print("\n" + "="*60)
print("EXPLAINING AWAY (Collider Effect)")
print("="*60)
print("\nExample: Two causes of a car not starting")
print("- X = Dead Battery")
print("- Z = Out of Gas")
print("- Y = Car Won't Start")
print("\nWithout knowing Y (car's state):")
print("  β†’ Battery and gas are independent")
print("\nGiven Y=True (car won't start):")
print("  β†’ If we learn battery is fine (X=False)")
print("  β†’ More likely out of gas (Z=True)")
print("  β†’ Battery status 'explains away' gas status!")

3. Naive Bayes ClassifierΒΆ

ModelΒΆ

Naive Bayes assumes features are conditionally independent given the class:

\[p(y, x_1, \ldots, x_d) = p(y) \prod_{j=1}^d p(x_j | y)\]

Classification RuleΒΆ

\[\hat{y} = \arg\max_y p(y) \prod_{j=1}^d p(x_j | y)\]

Graphical ModelΒΆ

       Y (class)
      /|\ 
     / | \
   X₁ Xβ‚‚ ... Xₐ (features)
class NaiveBayesClassifier:
    """Gaussian Naive Bayes from scratch"""
    
    def fit(self, X, y):
        self.classes = np.unique(y)
        n_classes = len(self.classes)
        n_features = X.shape[1]
        
        # Store mean and variance for each (class, feature) pair
        self.means = np.zeros((n_classes, n_features))
        self.vars = np.zeros((n_classes, n_features))
        self.priors = np.zeros(n_classes)
        
        for idx, c in enumerate(self.classes):
            X_c = X[y == c]
            self.means[idx, :] = X_c.mean(axis=0)
            self.vars[idx, :] = X_c.var(axis=0) + 1e-10  # Add small constant for stability
            self.priors[idx] = len(X_c) / len(X)
        
        return self
    
    def _gaussian_pdf(self, x, mean, var):
        """Compute Gaussian PDF"""
        return (1 / np.sqrt(2 * np.pi * var)) * np.exp(-((x - mean)**2) / (2 * var))
    
    def predict_proba(self, X):
        """Compute class probabilities"""
        n_samples = X.shape[0]
        n_classes = len(self.classes)
        probs = np.zeros((n_samples, n_classes))
        
        for idx in range(n_classes):
            # Log prior
            log_prior = np.log(self.priors[idx])
            
            # Log likelihood (sum of log probabilities for independence)
            log_likelihood = np.sum(
                np.log(self._gaussian_pdf(X, self.means[idx, :], self.vars[idx, :])),
                axis=1
            )
            
            probs[:, idx] = log_prior + log_likelihood
        
        # Convert log probabilities to probabilities
        # Subtract max for numerical stability
        probs = probs - np.max(probs, axis=1, keepdims=True)
        probs = np.exp(probs)
        probs = probs / np.sum(probs, axis=1, keepdims=True)
        
        return probs
    
    def predict(self, X):
        probs = self.predict_proba(X)
        return self.classes[np.argmax(probs, axis=1)]

# Test on Iris dataset
iris = load_iris()
X_iris = iris.data
y_iris = iris.target

X_train, X_test, y_train, y_test = train_test_split(
    X_iris, y_iris, test_size=0.3, random_state=42)

# Custom implementation
nb_custom = NaiveBayesClassifier()
nb_custom.fit(X_train, y_train)
y_pred_custom = nb_custom.predict(X_test)

# Sklearn implementation
nb_sklearn = GaussianNB()
nb_sklearn.fit(X_train, y_train)
y_pred_sklearn = nb_sklearn.predict(X_test)

print("Naive Bayes Results (Iris):")
print(f"\nCustom Implementation:")
print(f"  Accuracy: {accuracy_score(y_test, y_pred_custom):.4f}")
print(f"\nSklearn Implementation:")
print(f"  Accuracy: {accuracy_score(y_test, y_pred_sklearn):.4f}")

# Confusion matrix
fig, axes = plt.subplots(1, 2, figsize=(14, 5))

cm_custom = confusion_matrix(y_test, y_pred_custom)
sns.heatmap(cm_custom, annot=True, fmt='d', cmap='Blues', ax=axes[0],
           xticklabels=iris.target_names, yticklabels=iris.target_names)
axes[0].set_title('Custom Naive Bayes')
axes[0].set_ylabel('True Label')
axes[0].set_xlabel('Predicted Label')

cm_sklearn = confusion_matrix(y_test, y_pred_sklearn)
sns.heatmap(cm_sklearn, annot=True, fmt='d', cmap='Blues', ax=axes[1],
           xticklabels=iris.target_names, yticklabels=iris.target_names)
axes[1].set_title('Sklearn Naive Bayes')
axes[1].set_ylabel('True Label')
axes[1].set_xlabel('Predicted Label')

plt.tight_layout()
plt.show()

Text Classification with Naive BayesΒΆ

What: This code applies Multinomial Naive Bayes to the 20 Newsgroups text classification benchmark. Raw text documents are converted to word-count vectors using CountVectorizer, then classified into four categories. The top words per class reveal what the model has learned about each topic.

Why: Text classification is the most natural application of the Naive Bayes graphical model, where each word is a child node of the class label in the Bayesian network: \(p(\text{words}|y) = \prod_j p(w_j|y)\). Despite the obvious violation of word independence (e.g., β€œmachine” and β€œlearning” co-occur), Naive Bayes remains competitive because classification only requires the correct class ranking, not calibrated probabilities. The MultinomialNB variant is specifically designed for count data, modeling each document as a draw from a class-conditional multinomial distribution.

Connection: Naive Bayes text classifiers were among the first deployed ML systems in production (spam filtering at Yahoo and Google in the early 2000s). The interpretable top-words analysis shown here is still used for model debugging and feature engineering in modern NLP pipelines, even when the final model is a transformer.

# Load subset of 20newsgroups dataset
categories = ['alt.atheism', 'soc.religion.christian', 'comp.graphics', 'sci.med']
newsgroups_train = fetch_20newsgroups(subset='train', categories=categories,
                                      remove=('headers', 'footers', 'quotes'),
                                      random_state=42)
newsgroups_test = fetch_20newsgroups(subset='test', categories=categories,
                                     remove=('headers', 'footers', 'quotes'),
                                     random_state=42)

# Convert text to word counts
vectorizer = CountVectorizer(max_features=1000, stop_words='english')
X_train_text = vectorizer.fit_transform(newsgroups_train.data)
X_test_text = vectorizer.transform(newsgroups_test.data)

y_train_text = newsgroups_train.target
y_test_text = newsgroups_test.target

# Train Multinomial Naive Bayes (for count data)
nb_text = MultinomialNB()
nb_text.fit(X_train_text, y_train_text)
y_pred_text = nb_text.predict(X_test_text)

print("Text Classification with Naive Bayes:")
print(f"\nDataset: 20 newsgroups (4 categories)")
print(f"Training samples: {len(y_train_text)}")
print(f"Test samples: {len(y_test_text)}")
print(f"Vocabulary size: {len(vectorizer.get_feature_names_out())}")
print(f"\nTest Accuracy: {accuracy_score(y_test_text, y_pred_text):.4f}")

# Classification report
print(f"\nClassification Report:")
print(classification_report(y_test_text, y_pred_text, 
                          target_names=newsgroups_test.target_names))

# Show most informative features per class
feature_names = vectorizer.get_feature_names_out()
n_top_words = 10

print(f"\nTop {n_top_words} words per category:")
for idx, category in enumerate(newsgroups_test.target_names):
    # Get log probabilities for this class
    log_probs = nb_text.feature_log_prob_[idx]
    top_indices = np.argsort(log_probs)[-n_top_words:][::-1]
    top_words = [feature_names[i] for i in top_indices]
    print(f"\n{category}:")
    print("  ", ", ".join(top_words))

4. Markov ChainsΒΆ

First-Order Markov ChainΒΆ

\[p(x_1, x_2, \ldots, x_T) = p(x_1) \prod_{t=2}^T p(x_t | x_{t-1})\]

Markov Property: Future is independent of past given present

\[p(x_t | x_1, \ldots, x_{t-1}) = p(x_t | x_{t-1})\]
class MarkovChain:
    """First-order Markov Chain"""
    
    def __init__(self, transition_matrix, initial_dist):
        """
        Parameters:
        - transition_matrix: P[i,j] = p(x_t=j | x_{t-1}=i)
        - initial_dist: p(x_1)
        """
        self.P = np.array(transition_matrix)
        self.pi = np.array(initial_dist)
        self.n_states = len(initial_dist)
    
    def sample(self, n_steps):
        """Generate a sample path"""
        path = np.zeros(n_steps, dtype=int)
        
        # Initial state
        path[0] = np.random.choice(self.n_states, p=self.pi)
        
        # Subsequent states
        for t in range(1, n_steps):
            path[t] = np.random.choice(self.n_states, p=self.P[path[t-1]])
        
        return path
    
    def stationary_distribution(self):
        """Compute stationary distribution (eigenvector of P^T with eigenvalue 1)"""
        eigenvalues, eigenvectors = np.linalg.eig(self.P.T)
        # Find eigenvector with eigenvalue close to 1
        idx = np.argmin(np.abs(eigenvalues - 1))
        stationary = eigenvectors[:, idx].real
        stationary = stationary / stationary.sum()
        return stationary

# Example: Weather model
# States: 0=Sunny, 1=Rainy, 2=Cloudy
transition_matrix = [
    [0.7, 0.2, 0.1],  # From Sunny
    [0.3, 0.4, 0.3],  # From Rainy
    [0.4, 0.3, 0.3],  # From Cloudy
]

initial_dist = [0.5, 0.3, 0.2]

mc = MarkovChain(transition_matrix, initial_dist)

# Sample paths
n_paths = 5
n_steps = 30
state_names = ['Sunny', 'Rainy', 'Cloudy']
colors = ['gold', 'blue', 'gray']

fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# Plot sample paths
for i in range(n_paths):
    path = mc.sample(n_steps)
    axes[0].plot(path, alpha=0.7, marker='o', markersize=4, label=f'Path {i+1}')

axes[0].set_xlabel('Time step')
axes[0].set_ylabel('State')
axes[0].set_yticks([0, 1, 2])
axes[0].set_yticklabels(state_names)
axes[0].set_title('Sample Paths from Markov Chain')
axes[0].legend()
axes[0].grid(True, alpha=0.3)

# Empirical distribution over time
n_simulations = 1000
long_steps = 100
state_counts = np.zeros((long_steps, 3))

for _ in range(n_simulations):
    path = mc.sample(long_steps)
    for t in range(long_steps):
        state_counts[t, path[t]] += 1

state_freqs = state_counts / n_simulations

for state_idx, (name, color) in enumerate(zip(state_names, colors)):
    axes[1].plot(state_freqs[:, state_idx], label=name, linewidth=2, color=color)

# Add stationary distribution
stationary = mc.stationary_distribution()
for state_idx, (name, color) in enumerate(zip(state_names, colors)):
    axes[1].axhline(y=stationary[state_idx], color=color, linestyle='--', alpha=0.7)

axes[1].set_xlabel('Time step')
axes[1].set_ylabel('Probability')
axes[1].set_title('Empirical State Distribution (convergence to stationary)')
axes[1].legend()
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

print("Markov Chain Results:")
print(f"\nInitial distribution: {initial_dist}")
print(f"Stationary distribution: {stationary}")
print(f"\nTransition Matrix:")
print(pd.DataFrame(transition_matrix, 
                  index=state_names, 
                  columns=state_names))

SummaryΒΆ

In this notebook, we covered:

  1. Bayesian Networks: DAGs representing conditional dependencies

  2. Factorization: Joint distribution as product of CPDs

  3. d-Separation: Determining conditional independence from graph structure

  4. Naive Bayes: Simple but effective classifier assuming conditional independence

  5. Markov Chains: Temporal models with Markov property

Key TakeawaysΒΆ

  • Graphical Models: Compact representation of joint distributions

  • Independence: Critical for efficient computation and understanding

  • Colliders: Create dependence when conditioned (explaining away)

  • Naive Bayes:

    • Fast and simple

    • Works well despite β€œnaive” independence assumption

    • Excellent for text classification

  • Markov Chains: Foundation for HMMs and sequential modeling

ExercisesΒΆ

  1. Implement variable elimination for exact inference in BNs

  2. Derive the Naive Bayes decision boundary

  3. Implement belief propagation for tree-structured graphs

  4. Create a Markov chain text generator

  5. Implement the forward-backward algorithm for HMMs