Solutions: Deep Learning & NLP TrackΒΆ
Worked solutions to all exercises from the deep-learning-nlp/ notebooks.
01 β Transformers From ScratchΒΆ
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import matplotlib.pyplot as plt
# Exercise 1: Positional Encoding
# Key insight: PE uses sine for even dims, cosine for odd dims so each
# position has a unique signature and relative distances are learnable.
def positional_encoding(seq_len, d_model):
pe = torch.zeros(seq_len, d_model)
position = torch.arange(0, seq_len, dtype=torch.float).unsqueeze(1)
div_term = torch.exp(torch.arange(0, d_model, 2).float() * (-np.log(10000.0) / d_model))
pe[:, 0::2] = torch.sin(position * div_term)
pe[:, 1::2] = torch.cos(position * div_term)
return pe
seq_len, d_model = 50, 64
pe = positional_encoding(seq_len, d_model)
fig, axes = plt.subplots(1, 2, figsize=(14, 4))
axes[0].imshow(pe.numpy(), aspect='auto', cmap='RdBu')
axes[0].set_title('Positional Encoding Matrix (seq x d_model)')
axes[0].set_xlabel('Dimension'); axes[0].set_ylabel('Position')
plt.colorbar(axes[0].images[0], ax=axes[0])
# Verify: dot-product similarity decays with distance (nearby positions are more similar)
sim = pe @ pe.T
axes[1].imshow(sim.numpy(), cmap='Blues')
axes[1].set_title('PE Similarity Matrix (position vs position)')
axes[1].set_xlabel('Position'); axes[1].set_ylabel('Position')
plt.colorbar(axes[1].images[0], ax=axes[1])
plt.tight_layout()
plt.show()
print('Diagonal (self-similarity):', sim[0, 0].item())
print('Off-diagonal (pos 0 vs 10):', sim[0, 10].item())
# Key finding: similarity decreases with distance β unique positional fingerprints
# Exercise 2: Multi-Head Attention (4 heads) vs Single-Head Attention
class SingleHeadAttention(nn.Module):
def __init__(self, d_model):
super().__init__()
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.scale = d_model ** 0.5
def forward(self, x):
Q, K, V = self.W_q(x), self.W_k(x), self.W_v(x)
attn = F.softmax(Q @ K.transpose(-2, -1) / self.scale, dim=-1)
return attn @ V, attn
class MultiHeadAttention(nn.Module):
def __init__(self, d_model, n_heads):
super().__init__()
assert d_model % n_heads == 0
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.W_q = nn.Linear(d_model, d_model)
self.W_k = nn.Linear(d_model, d_model)
self.W_v = nn.Linear(d_model, d_model)
self.W_o = nn.Linear(d_model, d_model)
self.scale = self.d_head ** 0.5
def forward(self, x):
B, T, D = x.shape
Q = self.W_q(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
K = self.W_k(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
V = self.W_v(x).view(B, T, self.n_heads, self.d_head).transpose(1, 2)
attn = F.softmax(Q @ K.transpose(-2, -1) / self.scale, dim=-1)
out = (attn @ V).transpose(1, 2).contiguous().view(B, T, D)
return self.W_o(out), attn # attn shape: (B, n_heads, T, T)
torch.manual_seed(42)
d_model, seq_len, batch = 64, 10, 2
x = torch.randn(batch, seq_len, d_model)
sha = SingleHeadAttention(d_model)
mha = MultiHeadAttention(d_model, n_heads=4)
out_sha, attn_sha = sha(x)
out_mha, attn_mha = mha(x)
print('Single-head output shape:', out_sha.shape) # (2, 10, 64)
print('Multi-head output shape: ', out_mha.shape) # (2, 10, 64)
print('MHA attention shape: ', attn_mha.shape) # (2, 4, 10, 10)
fig, axes = plt.subplots(1, 5, figsize=(15, 3))
axes[0].imshow(attn_sha[0].detach().numpy(), cmap='Blues')
axes[0].set_title('Single-Head')
for h in range(4):
axes[h+1].imshow(attn_mha[0, h].detach().numpy(), cmap='Blues')
axes[h+1].set_title(f'Head {h+1}')
plt.suptitle('Attention Patterns: Single vs Multi-Head')
plt.tight_layout()
plt.show()
# Key insight: each head learns different attention patterns β specialization
# Exercise 3: Two Transformer Blocks + Gradient Norm vs Depth
class TransformerBlock(nn.Module):
def __init__(self, d_model, n_heads, ff_dim):
super().__init__()
self.attn = MultiHeadAttention(d_model, n_heads)
self.ff = nn.Sequential(nn.Linear(d_model, ff_dim), nn.ReLU(), nn.Linear(ff_dim, d_model))
self.ln1 = nn.LayerNorm(d_model)
self.ln2 = nn.LayerNorm(d_model)
def forward(self, x):
attn_out, _ = self.attn(x)
x = self.ln1(x + attn_out)
x = self.ln2(x + self.ff(x))
return x
class TwoBlockTransformer(nn.Module):
def __init__(self, d_model=64, n_heads=4, ff_dim=128):
super().__init__()
self.block1 = TransformerBlock(d_model, n_heads, ff_dim)
self.block2 = TransformerBlock(d_model, n_heads, ff_dim)
self.out = nn.Linear(d_model, d_model)
def forward(self, x):
x = self.block1(x)
x = self.block2(x)
return self.out(x)
model = TwoBlockTransformer()
x = torch.randn(2, 10, 64)
target = torch.randn(2, 10, 64)
loss = F.mse_loss(model(x), target)
loss.backward()
# Measure gradient norms per layer
grad_norms = {}
for name, param in model.named_parameters():
if param.grad is not None:
grad_norms[name] = param.grad.norm().item()
layers = list(grad_norms.keys())
norms = list(grad_norms.values())
plt.figure(figsize=(12, 4))
plt.bar(range(len(norms)), norms)
plt.xticks(range(len(layers)), [l.split('.')[-1]+'\n'+l.split('.')[0] for l in layers], rotation=45, ha='right', fontsize=7)
plt.ylabel('Gradient Norm')
plt.title('Gradient Norms by Layer (note: block1 grads are smaller β vanishing gradient effect)')
plt.tight_layout()
plt.show()
# Key insight: residual connections mitigate vanishing gradients but block1 still < block2
# Exercise 4: Causal (Autoregressive) Masking
# Upper-triangular mask: token i cannot attend to j > i
def causal_mask(seq_len, device='cpu'):
# True where attention is BLOCKED (upper triangle, excluding diagonal)
mask = torch.triu(torch.ones(seq_len, seq_len, device=device), diagonal=1).bool()
return mask
class CausalSelfAttention(nn.Module):
def __init__(self, d_model, n_heads):
super().__init__()
self.n_heads = n_heads
self.d_head = d_model // n_heads
self.W_qkv = nn.Linear(d_model, 3 * d_model)
self.W_o = nn.Linear(d_model, d_model)
self.scale = self.d_head ** 0.5
def forward(self, x):
B, T, D = x.shape
qkv = self.W_qkv(x).chunk(3, dim=-1)
Q, K, V = [t.view(B, T, self.n_heads, self.d_head).transpose(1, 2) for t in qkv]
scores = Q @ K.transpose(-2, -1) / self.scale # (B, H, T, T)
mask = causal_mask(T, x.device)
scores = scores.masked_fill(mask.unsqueeze(0).unsqueeze(0), float('-inf'))
attn = F.softmax(scores, dim=-1)
out = (attn @ V).transpose(1, 2).contiguous().view(B, T, D)
return self.W_o(out), attn
causal_attn = CausalSelfAttention(d_model=64, n_heads=4)
x = torch.randn(1, 6, 64)
out, attn_weights = causal_attn(x)
plt.figure(figsize=(5, 4))
plt.imshow(attn_weights[0, 0].detach().numpy(), cmap='Blues')
plt.title('Causal Attention (Head 0) β upper triangle is zero')
plt.xlabel('Key position'); plt.ylabel('Query position')
plt.colorbar()
plt.tight_layout()
plt.show()
# Verify: upper triangle should be ~0 (after softmax of -inf)
upper = attn_weights[0, 0].detach().numpy()
print('Max upper-triangle value:', upper[np.triu_indices(6, k=1)].max())
# Expected: 0.0 (causal constraint enforced)
# Exercise 5: Train Mini-Transformer on a Copy Task [1,2,3,4] β [1,2,3,4]
class MiniTransformer(nn.Module):
def __init__(self, vocab_size, d_model=32, n_heads=2, ff_dim=64, seq_len=4):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pe = nn.Parameter(torch.randn(seq_len, d_model) * 0.01)
self.block = TransformerBlock(d_model, n_heads, ff_dim)
self.head = nn.Linear(d_model, vocab_size)
def forward(self, x):
x = self.embed(x) + self.pe
x = self.block(x)
return self.head(x)
vocab_size = 10
model = MiniTransformer(vocab_size)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
# Copy task: input = random sequence, target = same sequence
torch.manual_seed(0)
losses = []
for step in range(500):
# Generate batch of copy-task examples
src = torch.randint(1, vocab_size, (32, 4)) # (batch, seq_len)
tgt = src.clone() # copy task: output = input
logits = model(src) # (32, 4, vocab_size)
loss = F.cross_entropy(logits.view(-1, vocab_size), tgt.view(-1))
optimizer.zero_grad()
loss.backward()
optimizer.step()
losses.append(loss.item())
plt.figure(figsize=(8, 3))
plt.plot(losses)
plt.xlabel('Step'); plt.ylabel('Cross-Entropy Loss')
plt.title('Copy Task Training Loss')
plt.tight_layout()
plt.show()
# Evaluate accuracy
model.eval()
with torch.no_grad():
src = torch.randint(1, vocab_size, (100, 4))
preds = model(src).argmax(-1)
acc = (preds == src).float().mean().item()
print(f'Copy task accuracy: {acc:.2%}') # Should approach 100% with enough training
02 β BERT Text ClassificationΒΆ
import math
# Exercise 1: Learning Rate Warmup Scheduler
# Key insight: warmup prevents large updates at start when embeddings are random.
class WarmupScheduler:
def __init__(self, optimizer, warmup_steps, d_model):
self.optimizer = optimizer
self.warmup_steps = warmup_steps
self.d_model = d_model
self._step = 0
def step(self):
self._step += 1
lr = self.d_model ** (-0.5) * min(
self._step ** (-0.5),
self._step * self.warmup_steps ** (-1.5)
)
for pg in self.optimizer.param_groups:
pg['lr'] = lr
return lr
# Simulate training steps
dummy_model = nn.Linear(10, 2)
optimizer = torch.optim.Adam(dummy_model.parameters(), lr=0)
scheduler = WarmupScheduler(optimizer, warmup_steps=100, d_model=768)
lrs = [scheduler.step() for _ in range(500)]
plt.figure(figsize=(8, 3))
plt.plot(lrs)
plt.axvline(100, color='r', linestyle='--', label='warmup end (step 100)')
plt.xlabel('Training Step'); plt.ylabel('Learning Rate')
plt.title('Warmup + Inverse-Sqrt LR Schedule (BERT-style)')
plt.legend()
plt.tight_layout()
plt.show()
print(f'Peak LR at step 100: {max(lrs):.6f}')
# Exercise 2: DistilBERT vs BERT-base Comparison (synthetic benchmark)
# Using synthetic data to illustrate parameter count and speed tradeoffs.
import time
def count_params(n_layers, hidden, ffn_mult=4, vocab=30522, n_heads=12):
"""Approximate BERT-family parameter count."""
embed = vocab * hidden + 512 * hidden + 2 * hidden # token+pos+type
per_layer = (
4 * hidden * hidden + # attention (q,k,v,o)
2 * ffn_mult * hidden * hidden + # ffn
4 * hidden # layer norms
)
return embed + n_layers * per_layer
configs = {
'BERT-base': {'n_layers': 12, 'hidden': 768},
'DistilBERT': {'n_layers': 6, 'hidden': 768},
'BERT-tiny': {'n_layers': 2, 'hidden': 128},
}
print(f'{'Model':<15} {'Params (M)':>12} {'Relative Size':>14}')
print('-' * 43)
bert_params = count_params(**configs['BERT-base']) / 1e6
for name, cfg in configs.items():
p = count_params(**cfg) / 1e6
print(f'{name:<15} {p:>12.1f} {p/bert_params:>13.1%}')
# Synthetic accuracy comparison (representative real-world numbers)
results = {
'BERT-base': {'SST-2 acc': 0.934, 'inference_ms': 42},
'DistilBERT': {'SST-2 acc': 0.910, 'inference_ms': 24},
}
print('\nSynthetic benchmark (illustrative of real-world tradeoffs):')
print(f'{'Model':<15} {'SST-2 Acc':>10} {'Inference (ms)':>15}')
print('-' * 42)
for name, r in results.items():
print(f"{name:<15} {r['SST-2 acc']:>10.3f} {r['inference_ms']:>15}")
# Key insight: DistilBERT keeps ~97% accuracy at 40% fewer params and 43% faster inference
# Exercise 3: Gradient Accumulation for effective batch_size=64, physical=8
class SimpleClassifier(nn.Module):
def __init__(self, d=128, n_classes=2):
super().__init__()
self.net = nn.Sequential(nn.Linear(d, 64), nn.ReLU(), nn.Linear(64, n_classes))
def forward(self, x): return self.net(x)
model = SimpleClassifier()
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
physical_batch = 8
effective_batch = 64
accumulation_steps = effective_batch // physical_batch # = 8
# Simulate one effective batch update
optimizer.zero_grad()
total_loss = 0.0
for acc_step in range(accumulation_steps):
x = torch.randn(physical_batch, 128)
y = torch.randint(0, 2, (physical_batch,))
loss = F.cross_entropy(model(x), y) / accumulation_steps # scale loss!
loss.backward() # gradients accumulate across mini-steps
total_loss += loss.item()
# After accumulation_steps, do one optimizer step
optimizer.step()
optimizer.zero_grad()
print(f'Effective batch size: {effective_batch}')
print(f'Physical batch size: {physical_batch}')
print(f'Accumulation steps: {accumulation_steps}')
print(f'Scaled loss (sum of mini-step losses): {total_loss:.4f}')
# Key insight: divide loss by accumulation_steps so gradient scale matches true batch
# Exercise 4: Visualize Attention Weights on a Misclassified Example
# Using a synthetic attention map to simulate a BERT misclassification analysis.
np.random.seed(42)
tokens = ['[CLS]', 'The', 'movie', 'was', 'not', 'good', 'at', 'all', '[SEP]']
n_tokens = len(tokens)
n_layers, n_heads = 3, 4
# Simulate attention weight matrix for [CLS] token across heads/layers
# In a real scenario, extract from model.bert.encoder.layer[i].attention
attn_weights = np.random.dirichlet(np.ones(n_tokens), size=(n_layers, n_heads))
# Misclassified example: 'not good' negation incorrectly handled
# Simulate model attending too much to 'good' and not enough to 'not'
for h in range(n_heads):
attn_weights[-1, h, tokens.index('good')] *= 3 # over-attend 'good'
attn_weights[-1, h] /= attn_weights[-1, h].sum()
# Average attention to [CLS] from last layer
avg_attn = attn_weights[-1].mean(axis=0)
plt.figure(figsize=(10, 3))
plt.bar(tokens, avg_attn, color=['steelblue' if t not in ['not', 'good'] else 'tomato' for t in tokens])
plt.title('Avg Attention to [CLS] β Last Layer (Misclassified: predicted Positive, true Negative)')
plt.ylabel('Attention Weight')
plt.xticks(rotation=15)
plt.tight_layout()
plt.show()
print('Key insight: model over-attends to "good" and under-attends to "not" β negation failure')
# Exercise 5: 3-Class Classification with Class-Weighted Loss (Imbalanced)
np.random.seed(0)
torch.manual_seed(0)
# Simulate imbalanced dataset: class 0=1000, class 1=200, class 2=50
class_counts = np.array([1000, 200, 50])
n_total = class_counts.sum()
n_features = 16
X_parts, y_parts = [], []
centers = [np.array([0., 0.]), np.array([3., 0.]), np.array([1.5, 2.5])]
for cls, (n, center) in enumerate(zip(class_counts, centers)):
X_parts.append(np.random.randn(n, n_features) + np.pad(center, (0, n_features-2)))
y_parts.append(np.full(n, cls))
X = torch.tensor(np.vstack(X_parts), dtype=torch.float32)
y = torch.tensor(np.concatenate(y_parts), dtype=torch.long)
# Class weights: inverse frequency
class_weights = torch.tensor(n_total / (3 * class_counts), dtype=torch.float32)
print('Class weights:', class_weights)
def train_classifier(weighted=True, n_epochs=30):
m = nn.Sequential(nn.Linear(n_features, 32), nn.ReLU(), nn.Linear(32, 3))
opt = torch.optim.Adam(m.parameters(), lr=5e-3)
weight = class_weights if weighted else None
losses = []
for _ in range(n_epochs):
logits = m(X)
loss = F.cross_entropy(logits, y, weight=weight)
opt.zero_grad(); loss.backward(); opt.step()
losses.append(loss.item())
preds = m(X).argmax(-1)
per_class_acc = [(preds[y==c] == c).float().mean().item() for c in range(3)]
return losses, per_class_acc
losses_w, acc_w = train_classifier(weighted=True)
losses_uw, acc_uw = train_classifier(weighted=False)
print(f'\n{'':20} Class 0 (n=1000) Class 1 (n=200) Class 2 (n=50)')
print(f'Weighted loss: {acc_w[0]:>14.2%} {acc_w[1]:>14.2%} {acc_w[2]:>14.2%}')
print(f'Unweighted loss: {acc_uw[0]:>14.2%} {acc_uw[1]:>14.2%} {acc_uw[2]:>14.2%}')
# Key insight: weighted loss significantly improves minority class accuracy
03 β LLM Applications & PatternsΒΆ
import json, re
from typing import Any, Dict, Type
# Exercise 1: Structured Output Extractor for Nested JSON Schemas
def validate_schema(data: Any, schema: Dict) -> bool:
"""Recursively validate data against a JSON schema subset."""
stype = schema.get('type')
if stype == 'object':
if not isinstance(data, dict): return False
for field, fschema in schema.get('properties', {}).items():
if field in schema.get('required', []) and field not in data:
return False
if field in data and not validate_schema(data[field], fschema):
return False
return True
elif stype == 'array':
if not isinstance(data, list): return False
item_schema = schema.get('items', {})
return all(validate_schema(item, item_schema) for item in data)
elif stype == 'string': return isinstance(data, str)
elif stype == 'number': return isinstance(data, (int, float))
elif stype == 'integer': return isinstance(data, int)
elif stype == 'boolean': return isinstance(data, bool)
return True
def extract_structured(llm_output: str, schema: Dict) -> Dict:
"""Extract and validate JSON matching schema from LLM text output."""
# Find JSON block in output
match = re.search(r'\{[\s\S]*\}', llm_output)
if not match:
raise ValueError('No JSON found in LLM output')
data = json.loads(match.group())
if not validate_schema(data, schema):
raise ValueError('Output does not match schema')
return data
# Nested schema: order with line items
order_schema = {
'type': 'object',
'required': ['order_id', 'customer', 'items'],
'properties': {
'order_id': {'type': 'string'},
'customer': {
'type': 'object',
'required': ['name', 'email'],
'properties': {'name': {'type': 'string'}, 'email': {'type': 'string'}}
},
'items': {
'type': 'array',
'items': {
'type': 'object',
'properties': {'sku': {'type': 'string'}, 'qty': {'type': 'integer'}, 'price': {'type': 'number'}}
}
}
}
}
# Simulated LLM output containing nested JSON
llm_output = '''
Here is the extracted order:
{"order_id": "ORD-001", "customer": {"name": "Alice", "email": "alice@example.com"},
"items": [{"sku": "SKU-A", "qty": 2, "price": 19.99}, {"sku": "SKU-B", "qty": 1, "price": 49.00}]}
'''
result = extract_structured(llm_output, order_schema)
print('Extracted and validated order:')
print(json.dumps(result, indent=2))
# Exercise 2: Multi-Tool Function-Calling Loop (mock weather + calendar APIs)
# Mock tool implementations
def get_weather(city: str, date: str) -> Dict:
"""Mock weather API."""
weather_db = {
('NYC', '2025-01-15'): {'temp_c': -2, 'condition': 'snowy', 'wind_kph': 25},
('NYC', '2025-01-16'): {'temp_c': 5, 'condition': 'cloudy', 'wind_kph': 15},
('SF', '2025-01-15'): {'temp_c': 14, 'condition': 'sunny', 'wind_kph': 10},
}
return weather_db.get((city, date), {'temp_c': 20, 'condition': 'clear', 'wind_kph': 5})
def get_calendar(user: str, date: str) -> Dict:
"""Mock calendar API."""
cal_db = {
('alice', '2025-01-15'): [{'time': '09:00', 'title': 'Team standup'}, {'time': '14:00', 'title': 'Client demo'}],
('alice', '2025-01-16'): [{'time': '11:00', 'title': '1-on-1 with manager'}],
}
return {'events': cal_db.get((user, date), [])}
# Tool registry
TOOLS = {'get_weather': get_weather, 'get_calendar': get_calendar}
def function_calling_loop(tool_calls: list) -> Dict[str, Any]:
"""Execute a list of tool calls, return all results."""
results = {}
for call in tool_calls:
fn = TOOLS[call['name']]
result = fn(**call['args'])
results[call['name']] = result
print(f"Tool: {call['name']}({call['args']}) -> {result}")
return results
# Simulated LLM decides to call both tools in parallel
tool_calls = [
{'name': 'get_weather', 'args': {'city': 'NYC', 'date': '2025-01-15'}},
{'name': 'get_calendar', 'args': {'user': 'alice', 'date': '2025-01-15'}},
]
print('=== Function-Calling Loop ===')
all_results = function_calling_loop(tool_calls)
# Simulated LLM synthesizes response from tool results
weather = all_results['get_weather']
events = all_results['get_calendar']['events']
print(f'\nSynthesized response: On 2025-01-15 in NYC it will be {weather["temp_c"]}Β°C and {weather["condition"]}.')
print(f'Alice has {len(events)} event(s): {", ".join(e["title"] for e in events)}')
# Exercise 3: BM25 Retriever vs TF-IDF
from collections import Counter
# Synthetic corpus
corpus = [
"machine learning models require large amounts of training data",
"deep learning neural networks learn representations automatically",
"natural language processing enables machines to understand text",
"transformers use attention mechanisms to process sequences",
"large language models are trained on internet-scale text data",
"retrieval augmented generation combines search with language models",
"fine tuning adapts pretrained models to specific tasks",
"embeddings represent words as dense vectors in high dimensional space",
]
queries = ["language model training data", "attention transformer", "text embeddings"]
def tokenize(text): return text.lower().split()
class BM25:
def __init__(self, corpus, k1=1.5, b=0.75):
self.k1, self.b = k1, b
self.docs = [tokenize(d) for d in corpus]
self.N = len(self.docs)
self.avgdl = np.mean([len(d) for d in self.docs])
self.df = Counter(t for doc in self.docs for t in set(doc))
def score(self, query, doc_idx):
doc = self.docs[doc_idx]
tf = Counter(doc)
dl = len(doc)
score = 0
for term in tokenize(query):
if term not in self.df: continue
idf = math.log((self.N - self.df[term] + 0.5) / (self.df[term] + 0.5) + 1)
tf_norm = tf[term] * (self.k1 + 1) / (tf[term] + self.k1 * (1 - self.b + self.b * dl / self.avgdl))
score += idf * tf_norm
return score
def retrieve(self, query, k=3):
scores = [(i, self.score(query, i)) for i in range(self.N)]
return sorted(scores, key=lambda x: -x[1])[:k]
bm25 = BM25(corpus)
for q in queries:
results = bm25.retrieve(q, k=2)
print(f'\nQuery: "{q}"')
for rank, (idx, score) in enumerate(results):
print(f' [{rank+1}] (score={score:.3f}) {corpus[idx]}')
print('\nKey insight: BM25 saturates TF (diminishing returns for repeated terms) unlike TF-IDF')
# Exercise 4: Few-Shot Chain-of-Thought for Math Word Problems
# Simulate LLM responses (in practice, call an API)
def simulate_llm(prompt: str, use_cot: bool) -> str:
"""Mock LLM that solves math problems, with or without CoT."""
# Hardcoded answers for demo purposes
if use_cot:
return (
"Step 1: Sarah has 5 apples.\n"
"Step 2: She buys 3 more. Now she has 5 + 3 = 8 apples.\n"
"Step 3: She gives 2 to her friend. 8 - 2 = 6 apples.\n"
"Answer: 6"
)
else:
return "Answer: 7" # Simulated wrong answer without CoT
problem = "Sarah has 5 apples. She buys 3 more and then gives 2 to her friend. How many does she have?"
correct = 6
# Prompt without CoT
prompt_no_cot = f"Q: {problem}\nA:"
# Few-shot CoT prompt
prompt_cot = """Q: Tom has 4 oranges. He gets 5 more. He eats 1. How many?
A: Tom starts with 4 oranges. Gets 5 more: 4+5=9. Eats 1: 9-1=8. Answer: 8
Q: A shop sells 20 books on Monday and 15 on Tuesday. 10 were returned. How many net sold?
A: Total sold: 20+15=35. Returns: 10. Net: 35-10=25. Answer: 25
Q: """ + problem + "\nA:"
resp_no_cot = simulate_llm(prompt_no_cot, use_cot=False)
resp_cot = simulate_llm(prompt_cot, use_cot=True)
print('=== Without CoT ===')
print(resp_no_cot)
print(f'Correct: {"Answer: 7" != f"Answer: {correct}"} (wrong)')
print('\n=== With Chain-of-Thought ===')
print(resp_cot)
print(f'Correct: True')
print('\nKey insight: CoT decomposes problem into steps, reducing reasoning errors')
# Exercise 5: Multi-Turn Conversation Manager with Token Window
class ConversationManager:
def __init__(self, max_tokens: int = 2000, tokens_per_word: float = 1.3):
self.max_tokens = max_tokens
self.tpw = tokens_per_word
self.history = [] # list of {'role': ..., 'content': ...}
self.system_prompt = "You are a helpful assistant."
def _estimate_tokens(self, messages):
return int(sum(len(m['content'].split()) * self.tpw for m in messages))
def _system_tokens(self):
return int(len(self.system_prompt.split()) * self.tpw)
def add_turn(self, role: str, content: str):
self.history.append({'role': role, 'content': content})
self._trim_to_fit()
def _trim_to_fit(self):
# Always keep at least the last user message
while len(self.history) > 1:
total = self._system_tokens() + self._estimate_tokens(self.history)
if total <= self.max_tokens:
break
self.history.pop(0) # drop oldest turn (FIFO)
def get_context(self):
return [{'role': 'system', 'content': self.system_prompt}] + self.history
def token_usage(self):
return self._system_tokens() + self._estimate_tokens(self.history)
mgr = ConversationManager(max_tokens=200)
turns = [
('user', 'What is the capital of France?'),
('assistant', 'The capital of France is Paris.'),
('user', 'Tell me about the Eiffel Tower in detail.'),
('assistant', 'The Eiffel Tower is a wrought iron lattice tower on the Champ de Mars in Paris, France. It was designed by Gustave Eiffel and constructed from 1887 to 1889 as the centerpiece of the 1889 World Fair.'),
('user', 'How tall is it?'),
('assistant', 'The Eiffel Tower stands 330 meters tall including its broadcast antenna.'),
('user', 'What year was it built?'),
]
print(f'Max tokens: {mgr.max_tokens}\n')
for role, content in turns:
mgr.add_turn(role, content)
print(f'[{role}] {content[:60]}...' if len(content) > 60 else f'[{role}] {content}')
print(f' -> History turns: {len(mgr.history)}, Estimated tokens: {mgr.token_usage()}')
print('\nFinal context window (oldest messages trimmed to fit):')
for m in mgr.get_context():
print(f" {m['role']}: {m['content'][:60]}")
04 β Text PreprocessingΒΆ
# Exercise 1: TF-IDF vs BM25 Retrieval β Precision@k Comparison
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
# Synthetic retrieval benchmark
np.random.seed(42)
docs = [
"python machine learning scikit learn",
"deep learning pytorch neural network training",
"natural language processing text classification",
"data science statistics regression analysis",
"computer vision image recognition convolutional",
"reinforcement learning reward optimization policy",
"transformer bert attention nlp fine tuning",
"clustering kmeans unsupervised learning data",
"recommendation system collaborative filtering",
"time series forecasting lstm recurrent network",
]
# Ground truth relevance: (query, relevant_doc_indices)
eval_set = [
("neural network deep learning", {1}),
("nlp text transformer", {2, 6}),
("unsupervised clustering", {7}),
("time series lstm", {9}),
]
def precision_at_k(retrieved, relevant, k):
return len(set(retrieved[:k]) & relevant) / k
# TF-IDF retrieval
tfidf = TfidfVectorizer()
doc_vecs = tfidf.fit_transform(docs)
bm25_retriever = BM25(docs)
k = 3
tfidf_p, bm25_p = [], []
for query, relevant in eval_set:
q_vec = tfidf.transform([query])
tfidf_scores = cosine_similarity(q_vec, doc_vecs)[0]
tfidf_top = list(np.argsort(-tfidf_scores)[:k])
bm25_top = [idx for idx, _ in bm25_retriever.retrieve(query, k=k)]
tp = precision_at_k(tfidf_top, relevant, k)
bp = precision_at_k(bm25_top, relevant, k)
tfidf_p.append(tp); bm25_p.append(bp)
print(f'Query: "{query}" | TF-IDF P@{k}: {tp:.2f} | BM25 P@{k}: {bp:.2f}')
print(f'\nMean P@{k}: TF-IDF={np.mean(tfidf_p):.3f}, BM25={np.mean(bm25_p):.3f}')
# Exercise 2: Bigram Language Model for Text Generation
from collections import defaultdict
class BigramLM:
def __init__(self, smoothing=0.1):
self.bigrams = defaultdict(Counter)
self.vocab = set()
self.smoothing = smoothing
def train(self, texts):
for text in texts:
tokens = ['<s>'] + text.lower().split() + ['</s>']
self.vocab.update(tokens)
for w1, w2 in zip(tokens, tokens[1:]):
self.bigrams[w1][w2] += 1
def prob(self, w1, w2):
counts = self.bigrams[w1]
total = sum(counts.values()) + self.smoothing * len(self.vocab)
return (counts[w2] + self.smoothing) / total
def generate(self, seed='<s>', max_len=15):
tokens = [seed]
for _ in range(max_len):
current = tokens[-1]
candidates = list(self.bigrams[current].keys())
if not candidates or tokens[-1] == '</s>':
break
probs = np.array([self.prob(current, w) for w in candidates])
probs /= probs.sum()
tokens.append(np.random.choice(candidates, p=probs))
return ' '.join(t for t in tokens if t not in ['<s>', '</s>'])
# Training corpus
corpus = [
"the cat sat on the mat", "the cat ate the rat", "the rat ran away fast",
"machine learning is a powerful technique", "deep learning uses neural networks",
"the model learns from data", "data science is fun and rewarding",
"neural networks learn complex patterns", "the algorithm converges to a solution",
]
np.random.seed(7)
lm = BigramLM()
lm.train(corpus)
print('Generated sentences (bigram LM):')
for i in range(5):
print(f' {i+1}: {lm.generate()}')
# Exercise 3: Full NLP Pipeline β normalize β tokenize β stopwords β stem β TF-IDF β LR
import re, string
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import classification_report
# Simple stemmer (suffix stripping)
def simple_stem(word):
suffixes = ['ing', 'tion', 'ed', 'ly', 'er', 'es', 's']
for suffix in suffixes:
if word.endswith(suffix) and len(word) - len(suffix) > 3:
return word[:-len(suffix)]
return word
STOPWORDS = {'the', 'a', 'an', 'is', 'it', 'in', 'on', 'at', 'to', 'for', 'of', 'and', 'or', 'but', 'this', 'that', 'with', 'was', 'are', 'be', 'as', 'by'}
def normalize(text):
text = text.lower()
text = re.sub(r'http\S+', '', text) # remove URLs
text = re.sub(r'[^a-z0-9\s]', ' ', text) # remove punctuation
text = re.sub(r'\s+', ' ', text).strip()
return text
def preprocess(text):
tokens = normalize(text).split()
tokens = [t for t in tokens if t not in STOPWORDS and len(t) > 2]
tokens = [simple_stem(t) for t in tokens]
return ' '.join(tokens)
# Synthetic sentiment dataset
train_data = [
("This product is amazing and works perfectly", 1),
("Terrible quality, completely broken on arrival", 0),
("Great value for the price, highly recommended", 1),
("Worst purchase ever, waste of money", 0),
("Excellent customer service and fast shipping", 1),
("Stopped working after one day, very disappointed", 0),
("Outstanding quality and beautiful design", 1),
("Poor build quality, not as described", 0),
] * 5 # repeat to get more training samples
test_data = [
("Really love this product, working great", 1),
("Broken and useless, terrible", 0),
("Good quality and fast delivery", 1),
("Not working at all, very bad", 0),
]
X_train = [preprocess(t) for t, _ in train_data]
y_train = [y for _, y in train_data]
X_test = [preprocess(t) for t, _ in test_data]
y_test = [y for _, y in test_data]
vec = TfidfVectorizer(max_features=200, ngram_range=(1, 2))
X_tr = vec.fit_transform(X_train)
X_te = vec.transform(X_test)
clf = LogisticRegression(max_iter=1000)
clf.fit(X_tr, y_train)
preds = clf.predict(X_te)
print('Pipeline: normalize β tokenize β stopwords β stem β TF-IDF β LogReg')
print(classification_report(y_test, preds, target_names=['negative', 'positive']))
# Exercise 4: Tokenization Edge Cases
edge_cases = [
("URL", "Visit https://www.example.com/path?q=hello&lang=en for more info!"),
("Emoji", "Great job! ππ Really loved it π #awesome"),
("Code", "Run `model.fit(X_train, y_train)` or use model.predict(X_test)"),
("Mixed lang", "This is a cafΓ© in MΓΌnchen. ΠΡΡΡΠ΅ΡΠ° Π² 5pm."),
("Contractions", "It's a can't-miss, won't-fail, I'd-do-it-again experience."),
("Numbers", "The stock rose 3.14% to $1,234.56 on 2024-01-15."),
]
def whitespace_tokenize(text): return text.split()
def regex_tokenize(text): return re.findall(r"\b\w+(?:'\w+)?\b", text)
def aggressive_tokenize(text): return re.findall(r'[a-zA-Z0-9]+', text)
print(f'{'Case':<15} {'Whitespace':>10} {'Regex':>10} {'Aggressive':>12}')
print('-' * 50)
for name, text in edge_cases:
w = len(whitespace_tokenize(text))
r = len(regex_tokenize(text))
a = len(aggressive_tokenize(text))
print(f'{name:<15} {w:>10} {r:>10} {a:>12}')
print('\nDetailed analysis for emoji case:')
t = edge_cases[1][1]
print(' Whitespace tokens:', whitespace_tokenize(t))
print(' Regex tokens: ', regex_tokenize(t))
print(' Key insight: regex tokenizer strips emoji; aggressive strips unicode entirely.')
print(' For multilingual or emoji-rich text, use sentencepiece or tiktoken.')
# Exercise 5: TF-IDF from Scratch with Sublinear TF and DF Thresholds
class TFIDFScratch:
def __init__(self, sublinear_tf=True, min_df=2, max_df_ratio=0.9):
self.sublinear_tf = sublinear_tf
self.min_df = min_df
self.max_df_ratio = max_df_ratio
self.vocab = {}
self.idf = {}
def fit(self, docs):
tokenized = [d.lower().split() for d in docs]
N = len(docs)
df = Counter(t for doc in tokenized for t in set(doc))
# Apply DF thresholds
self.vocab = {
t: i for i, t in enumerate(
sorted(t for t, c in df.items()
if c >= self.min_df and c / N <= self.max_df_ratio)
)
}
# IDF with smoothing: log((1+N)/(1+df)) + 1
self.idf = {t: math.log((1 + N) / (1 + df[t])) + 1 for t in self.vocab}
return self
def transform(self, docs):
rows = []
for doc in docs:
tokens = doc.lower().split()
tf_raw = Counter(tokens)
vec = np.zeros(len(self.vocab))
for t, idx in self.vocab.items():
if tf_raw[t] == 0: continue
tf = (1 + math.log(tf_raw[t])) if self.sublinear_tf else tf_raw[t]
vec[idx] = tf * self.idf[t]
norm = np.linalg.norm(vec)
rows.append(vec / norm if norm > 0 else vec)
return np.array(rows)
train_docs = [
"machine learning algorithm data model training",
"deep learning neural network model architecture",
"natural language text processing transformer model",
"data science statistics regression model analysis",
"machine learning data preprocessing feature engineering",
]
tfidf_scratch = TFIDFScratch(sublinear_tf=True, min_df=2)
tfidf_scratch.fit(train_docs)
X = tfidf_scratch.transform(train_docs)
print(f'Vocabulary size (after min_df=2 filter): {len(tfidf_scratch.vocab)}')
print(f'Matrix shape: {X.shape}')
print(f'Top terms by IDF (rare/informative): {sorted(tfidf_scratch.idf.items(), key=lambda x: -x[1])[:5]}')
print(f'Bottom terms by IDF (common/filtered): {sorted(tfidf_scratch.idf.items(), key=lambda x: x[1])[:5]}')
print('Key insight: sublinear_tf reduces dominance of repeated terms; min_df removes hapax legomena')
05 β LLM Fine-Tuning with LoRAΒΆ
# Exercise 1: Ablation on LoRA rank r β [4, 8, 16, 32]
class LoRALinear(nn.Module):
"""Linear layer with LoRA adapter: W' = W + BA where B β R^{d x r}, A β R^{r x k}."""
def __init__(self, in_features, out_features, r, alpha=16):
super().__init__()
self.base = nn.Linear(in_features, out_features, bias=False)
self.base.weight.requires_grad_(False) # freeze base
self.lora_A = nn.Parameter(torch.randn(r, in_features) * 0.01)
self.lora_B = nn.Parameter(torch.zeros(out_features, r))
self.scale = alpha / r
def forward(self, x):
return self.base(x) + self.scale * F.linear(F.linear(x, self.lora_A), self.lora_B)
def trainable_params(self):
return self.lora_A.numel() + self.lora_B.numel()
def trainable_params_count(model):
return sum(p.numel() for p in model.parameters() if p.requires_grad)
# Simulate a transformer-like model with LoRA on attention projection
d_model, ranks = 256, [4, 8, 16, 32]
torch.manual_seed(42)
results = []
for r in ranks:
# Two LoRA layers (q, v projections)
q_proj = LoRALinear(d_model, d_model, r=r)
v_proj = LoRALinear(d_model, d_model, r=r)
total_trainable = q_proj.trainable_params() + v_proj.trainable_params()
# Simulate validation perplexity (higher rank = lower perplexity, diminishing returns)
# Synthetic: ppl = 15 + 10 * exp(-r/8) (illustrative curve)
sim_ppl = 15 + 10 * np.exp(-r / 8)
results.append({'r': r, 'params': total_trainable, 'val_ppl': sim_ppl})
print(f'r={r:2d}: trainable params={total_trainable:6d}, val perplexity={sim_ppl:.2f}')
fig, ax1 = plt.subplots(figsize=(8, 4))
ax2 = ax1.twinx()
ax1.bar([str(r['r']) for r in results], [r['params'] for r in results], alpha=0.6, label='Trainable Params')
ax2.plot([str(r['r']) for r in results], [r['val_ppl'] for r in results], 'r-o', label='Val Perplexity')
ax1.set_xlabel('LoRA rank r'); ax1.set_ylabel('Trainable Parameters'); ax2.set_ylabel('Validation Perplexity')
ax1.set_title('LoRA Rank Ablation: Parameters vs Perplexity')
plt.tight_layout(); plt.show()
print('Key insight: r=8 or r=16 offers best params/performance tradeoff (diminishing returns at r=32)')
# Exercise 2: LoRA on all attention modules (q,k,v,o) vs q+v only
def count_lora_params(d_model, r, modules):
"""Count LoRA params for given set of attention projection matrices."""
# Each module: A (r x d_model) + B (d_model x r)
per_module = r * d_model + d_model * r
return len(modules) * per_module
configs = {
'q+v only': ['q', 'v'],
'q+k+v only': ['q', 'k', 'v'],
'q+k+v+o (all)': ['q', 'k', 'v', 'o'],
}
d_model, r, n_layers = 768, 8, 12 # BERT-base scale
print(f'LoRA rank r={r}, d_model={d_model}, layers={n_layers}\n')
print(f'{'Config':<20} {'Params/layer':>12} {'Total params':>14}')
print('-' * 48)
for name, modules in configs.items():
per_layer = count_lora_params(d_model, r, modules)
total = per_layer * n_layers
print(f'{name:<20} {per_layer:>12,} {total:>14,}')
base_params = 12 * (4 * 768 * 768 + 2 * 4 * 768 * 768) # approx BERT-base trainable
print(f'\nApprox BERT-base attention+FFN params: {base_params:,}')
for name, modules in configs.items():
total = count_lora_params(d_model, r, modules) * n_layers
print(f'{name:<20}: {100*total/base_params:.2f}% of full fine-tune params')
print('Key insight: q+v (LoRA paper default) trains only 0.3% of params with strong performance')
# Exercise 3: Validation Perplexity Monitoring with Early Stopping (patience=3)
class EarlyStopping:
def __init__(self, patience=3, min_delta=0.001):
self.patience = patience
self.min_delta = min_delta
self.best_ppl = float('inf')
self.counter = 0
self.stopped_at = None
self.best_epoch = 0
def step(self, val_ppl, epoch):
if val_ppl < self.best_ppl - self.min_delta:
self.best_ppl = val_ppl
self.counter = 0
self.best_epoch = epoch
return False # continue training
else:
self.counter += 1
if self.counter >= self.patience:
self.stopped_at = epoch
return True # stop!
return False
# Simulate training: ppl decreases then starts rising (overfitting)
np.random.seed(1)
n_epochs = 20
val_ppls = []
for ep in range(n_epochs):
# Simulate U-shaped validation curve with noise
if ep < 8:
ppl = 30 - 2.5 * ep + np.random.randn() * 0.5
else:
ppl = 11 + 0.8 * (ep - 8) + np.random.randn() * 0.5
val_ppls.append(ppl)
es = EarlyStopping(patience=3)
stop_epoch = None
for ep, ppl in enumerate(val_ppls):
if es.step(ppl, ep):
stop_epoch = ep
break
plt.figure(figsize=(8, 3))
plt.plot(val_ppls[:stop_epoch+1 if stop_epoch else len(val_ppls)], label='Val Perplexity')
plt.axvline(es.best_epoch, color='g', linestyle='--', label=f'Best epoch ({es.best_epoch})')
if stop_epoch:
plt.axvline(stop_epoch, color='r', linestyle='--', label=f'Early stop ({stop_epoch})')
plt.xlabel('Epoch'); plt.ylabel('Perplexity')
plt.title('Early Stopping with Patience=3')
plt.legend(); plt.tight_layout(); plt.show()
print(f'Best epoch: {es.best_epoch}, Best PPL: {es.best_ppl:.2f}')
print(f'Stopped at epoch: {stop_epoch} (saved {(stop_epoch or n_epochs) - es.best_epoch - 1} wasted epochs)')
# Exercise 4: Fine-tune GPT-2-scale model on synthetic domain corpus; measure perplexity
class TinyGPT(nn.Module):
"""Tiny GPT-2 style model for perplexity demonstration."""
def __init__(self, vocab_size=100, d_model=64, n_heads=2, n_layers=2, max_len=32):
super().__init__()
self.embed = nn.Embedding(vocab_size, d_model)
self.pos_embed = nn.Embedding(max_len, d_model)
self.blocks = nn.ModuleList([
TransformerBlock(d_model, n_heads, d_model * 4) for _ in range(n_layers)
])
self.ln_f = nn.LayerNorm(d_model)
self.head = nn.Linear(d_model, vocab_size, bias=False)
def forward(self, x):
B, T = x.shape
pos = torch.arange(T, device=x.device)
h = self.embed(x) + self.pos_embed(pos)
for block in self.blocks:
h = block(h)
return self.head(self.ln_f(h))
def compute_perplexity(model, data_tokens, seq_len=16):
model.eval()
total_nll, total_tokens = 0.0, 0
with torch.no_grad():
for i in range(0, len(data_tokens) - seq_len, seq_len):
seq = data_tokens[i:i+seq_len+1]
x, y = seq[:-1].unsqueeze(0), seq[1:].unsqueeze(0)
logits = model(x)
nll = F.cross_entropy(logits.view(-1, logits.size(-1)), y.view(-1), reduction='sum')
total_nll += nll.item()
total_tokens += y.numel()
return math.exp(total_nll / total_tokens)
torch.manual_seed(0)
vocab_size, seq_len = 50, 16
# Synthetic "domain corpus": structured sequences (simulate specialized vocabulary patterns)
domain_data = torch.randint(0, 20, (500,)) # low-range tokens = domain vocab
general_data = torch.randint(0, vocab_size, (500,)) # full vocab
model = TinyGPT(vocab_size=vocab_size)
ppl_before = compute_perplexity(model, domain_data)
# Fine-tune on domain data
optimizer = torch.optim.Adam(model.parameters(), lr=3e-3)
model.train()
for step in range(200):
i = np.random.randint(0, len(domain_data) - seq_len - 1)
x = domain_data[i:i+seq_len].unsqueeze(0)
y = domain_data[i+1:i+seq_len+1].unsqueeze(0)
loss = F.cross_entropy(model(x).view(-1, vocab_size), y.view(-1))
optimizer.zero_grad(); loss.backward(); optimizer.step()
ppl_after = compute_perplexity(model, domain_data)
print(f'Domain corpus perplexity BEFORE fine-tuning: {ppl_before:.2f}')
print(f'Domain corpus perplexity AFTER fine-tuning: {ppl_after:.2f}')
print(f'Improvement: {(ppl_before - ppl_after) / ppl_before:.1%}')
# Exercise 5: Verify Merge Equivalence β Merged LoRA vs Base + Adapter
class LoRALinearMergeable(nn.Module):
def __init__(self, in_features, out_features, r, alpha=16):
super().__init__()
self.W = nn.Parameter(torch.randn(out_features, in_features) * 0.02)
self.lora_A = nn.Parameter(torch.randn(r, in_features) * 0.01)
self.lora_B = nn.Parameter(torch.zeros(out_features, r))
self.scale = alpha / r
self.in_features = in_features
self.out_features = out_features
def forward_with_adapter(self, x):
"""Base + LoRA adapter (runtime inference)."""
return F.linear(x, self.W) + self.scale * F.linear(F.linear(x, self.lora_A), self.lora_B)
def merge_and_forward(self, x):
"""Merge W_merged = W + scale * B @ A, then single matmul."""
W_merged = self.W + self.scale * (self.lora_B @ self.lora_A)
return F.linear(x, W_merged)
torch.manual_seed(42)
layer = LoRALinearMergeable(in_features=128, out_features=64, r=8)
# Test on 100 random samples
x = torch.randn(100, 128)
out_adapter = layer.forward_with_adapter(x)
out_merged = layer.merge_and_forward(x)
max_diff = (out_adapter - out_merged).abs().max().item()
mean_diff = (out_adapter - out_merged).abs().mean().item()
print('Merge equivalence test (100 samples):')
print(f' Max absolute difference: {max_diff:.2e}')
print(f' Mean absolute difference: {mean_diff:.2e}')
print(f' Numerically equivalent: {max_diff < 1e-5}')
print('Key insight: merged model W + s*B@A is mathematically identical to runtime adapter')
print('Benefit: merged model has ZERO extra latency vs fine-tuned base β just one matmul')