Neural Collaborative Filtering: Deep Learning for RecommendationsΒΆ

Embedding layers can learn latent representations of users and items far richer than matrix factorization. This notebook implements the two-tower model, embedding-based CF, and the modern approach used by YouTube, Spotify, and Pinterest.

Environment CheckΒΆ

try:
    import torch
    import torch.nn as nn
    import torch.optim as optim
    HAS_TORCH = True
    print(f'PyTorch {torch.__version__} available. Device: {"cuda" if torch.cuda.is_available() else "cpu"}')
except ImportError:
    HAS_TORCH = False
    print('PyTorch not found. Code blocks will show the architecture but skip training.')
    print('Install with: pip install torch')

import numpy as np
import pandas as pd
import warnings
warnings.filterwarnings('ignore')

np.random.seed(42)

N_USERS = 500
N_ITEMS = 1000
EMBED_DIM = 32
DEVICE = 'cpu'

1. The Embedding IntuitionΒΆ

An embedding is a dense, low-dimensional vector representation learned from data.

Concept

Matrix Factorization

Neural Embedding

User representation

Row of \(P\) (fixed)

nn.Embedding(n_users, d) (learned)

Item representation

Row of \(Q\) (fixed)

nn.Embedding(n_items, d) (learned)

Interaction

Dot product

Dot product or concatenation β†’ MLP

Capacity

Linear

Non-linear (arbitrary depth)

Geometrically: users and items that interact frequently end up close in embedding space. The direction of an embedding dimension often captures interpretable features (genre preference, production style, etc.).

# Generate synthetic implicit feedback data (clicks/views)
rng = np.random.default_rng(42)

# Simulate user-item interactions: 5000 positive interactions
n_interactions = 5000
user_ids_raw = rng.integers(0, N_USERS, n_interactions)
item_ids_raw = rng.integers(0, N_ITEMS, n_interactions)

# Remove duplicates
interactions = pd.DataFrame({'user': user_ids_raw, 'item': item_ids_raw}).drop_duplicates()
interactions = interactions.reset_index(drop=True)

print(f'Unique interactions: {len(interactions)}')
print(f'Users: {interactions["user"].nunique()}  Items: {interactions["item"].nunique()}')
print(f'Density: {len(interactions) / (N_USERS * N_ITEMS) * 100:.3f}%')
interactions.head()

2. Basic Embedding Model β€” Dot Product PredictionΒΆ

The simplest neural CF model: learn user and item embeddings, predict rating as their dot product.

\[\hat{r}_{ui} = \mathbf{e}_u^T \mathbf{e}_i\]

This is equivalent to matrix factorization, but the gradients are computed automatically by autograd.

if HAS_TORCH:
    class DotProductCF(nn.Module):
        def __init__(self, n_users, n_items, embed_dim):
            super().__init__()
            self.user_embed = nn.Embedding(n_users, embed_dim)
            self.item_embed = nn.Embedding(n_items, embed_dim)
            # Bias terms
            self.user_bias  = nn.Embedding(n_users, 1)
            self.item_bias  = nn.Embedding(n_items, 1)
            # Init
            nn.init.normal_(self.user_embed.weight, 0, 0.01)
            nn.init.normal_(self.item_embed.weight, 0, 0.01)
            nn.init.zeros_(self.user_bias.weight)
            nn.init.zeros_(self.item_bias.weight)

        def forward(self, user_ids, item_ids):
            u = self.user_embed(user_ids)          # (B, D)
            i = self.item_embed(item_ids)          # (B, D)
            dot = (u * i).sum(dim=1, keepdim=True) # (B, 1)
            bias = self.user_bias(user_ids) + self.item_bias(item_ids)  # (B, 1)
            return (dot + bias).squeeze(1)         # (B,)

    model_dot = DotProductCF(N_USERS, N_ITEMS, EMBED_DIM)
    print(model_dot)
    total_params = sum(p.numel() for p in model_dot.parameters())
    print(f'Total parameters: {total_params:,}')
else:
    print('PyTorch not available. Architecture description:')
    print('  user_embed: Embedding(500, 32)')
    print('  item_embed: Embedding(1000, 32)')
    print('  user_bias:  Embedding(500, 1)')
    print('  item_bias:  Embedding(1000, 1)')
    print('  forward:    dot(u, i) + bias_u + bias_i')

3. Two-Tower ModelΒΆ

The two-tower (or dual encoder) architecture is the standard at YouTube, Google, Spotify:

  • User tower: MLP that maps user ID (+ features) β†’ user embedding

  • Item tower: MLP that maps item ID (+ features) β†’ item embedding

  • Score: dot product between the two embedding outputs

The towers are trained together. At serving time, item embeddings are precomputed and indexed for fast ANN search.

if HAS_TORCH:
    class Tower(nn.Module):
        """Single tower: ID embedding -> MLP -> output embedding."""
        def __init__(self, n_entities, embed_dim, hidden_dims, output_dim):
            super().__init__()
            self.embed = nn.Embedding(n_entities, embed_dim)
            layers = []
            in_dim = embed_dim
            for h in hidden_dims:
                layers += [nn.Linear(in_dim, h), nn.ReLU(), nn.LayerNorm(h)]
                in_dim = h
            layers.append(nn.Linear(in_dim, output_dim))
            self.mlp = nn.Sequential(*layers)

        def forward(self, ids):
            e = self.embed(ids)     # (B, embed_dim)
            return self.mlp(e)      # (B, output_dim)

    class TwoTowerModel(nn.Module):
        def __init__(self, n_users, n_items, embed_dim=64,
                     hidden_dims=(128, 64), output_dim=32):
            super().__init__()
            self.user_tower = Tower(n_users, embed_dim, hidden_dims, output_dim)
            self.item_tower = Tower(n_items, embed_dim, hidden_dims, output_dim)

        def forward(self, user_ids, item_ids):
            u_out = self.user_tower(user_ids)   # (B, output_dim)
            i_out = self.item_tower(item_ids)   # (B, output_dim)
            # L2-normalize before dot product (cosine similarity)
            u_norm = nn.functional.normalize(u_out, dim=1)
            i_norm = nn.functional.normalize(i_out, dim=1)
            return (u_norm * i_norm).sum(dim=1)  # (B,)

        def get_user_embedding(self, user_ids):
            return nn.functional.normalize(self.user_tower(user_ids), dim=1)

        def get_item_embeddings_all(self, n_items):
            all_ids = torch.arange(n_items)
            return nn.functional.normalize(self.item_tower(all_ids), dim=1)

    two_tower = TwoTowerModel(N_USERS, N_ITEMS)
    print(f'User tower: {sum(p.numel() for p in two_tower.user_tower.parameters()):,} params')
    print(f'Item tower: {sum(p.numel() for p in two_tower.item_tower.parameters()):,} params')
    total = sum(p.numel() for p in two_tower.parameters())
    print(f'Total: {total:,} params')
    # Test forward pass
    u = torch.tensor([0, 1, 2])
    i = torch.tensor([10, 20, 30])
    scores = two_tower(u, i)
    print(f'Test scores (cosine sim): {scores.detach().numpy()}')
else:
    print('Two-Tower architecture (no PyTorch):')
    print('  user_tower: Embedding(500,64) -> Linear(64,128) -> ReLU -> Linear(128,64) -> ReLU -> Linear(64,32)')
    print('  item_tower: same structure')
    print('  score: cosine_similarity(user_tower(u), item_tower(i))')

4. Training Loop β€” BPR Loss (Bayesian Personalized Ranking)ΒΆ

For implicit feedback (clicks, views β€” no explicit ratings), we use BPR loss:

\[\mathcal{L}_{BPR} = -\sum_{(u,i,j)} \log \sigma(\hat{r}_{ui} - \hat{r}_{uj})\]

where \(i\) is an item the user interacted with (positive) and \(j\) is a random item they didn’t (negative). This trains the model to rank positive items above random negatives.

if HAS_TORCH:
    def bpr_loss(pos_scores, neg_scores):
        """BPR loss: maximize the margin between positive and negative scores."""
        return -torch.log(torch.sigmoid(pos_scores - neg_scores) + 1e-8).mean()

    def sample_negatives(interactions_df, n_items, n_neg_per_pos=1, rng=None):
        """Sample random negatives (items not in the user's history)."""
        if rng is None:
            rng = np.random.default_rng(0)
        user_history = interactions_df.groupby('user')['item'].apply(set).to_dict()
        neg_items = []
        for _, row in interactions_df.iterrows():
            u = row['user']
            history = user_history[u]
            j = rng.integers(0, n_items)
            while j in history:
                j = rng.integers(0, n_items)
            neg_items.append(j)
        return np.array(neg_items)

    # Training setup
    model = TwoTowerModel(N_USERS, N_ITEMS, embed_dim=32, hidden_dims=(64,), output_dim=16)
    optimizer = optim.Adam(model.parameters(), lr=1e-3, weight_decay=1e-5)

    users_t = torch.tensor(interactions['user'].values, dtype=torch.long)
    items_t = torch.tensor(interactions['item'].values, dtype=torch.long)

    N_EPOCHS = 10
    BATCH_SIZE = 256
    n = len(interactions)

    print('Training Two-Tower with BPR loss...')
    train_rng = np.random.default_rng(0)
    for epoch in range(N_EPOCHS):
        model.train()
        # Sample negatives each epoch
        neg_items_np = sample_negatives(interactions, N_ITEMS, rng=train_rng)
        neg_items_t  = torch.tensor(neg_items_np, dtype=torch.long)

        # Shuffle
        idx = torch.randperm(n)
        epoch_loss = 0.0
        n_batches  = 0

        for start in range(0, n, BATCH_SIZE):
            batch_idx  = idx[start:start + BATCH_SIZE]
            u_batch    = users_t[batch_idx]
            pos_batch  = items_t[batch_idx]
            neg_batch  = neg_items_t[batch_idx]

            pos_scores = model(u_batch, pos_batch)
            neg_scores = model(u_batch, neg_batch)
            loss = bpr_loss(pos_scores, neg_scores)

            optimizer.zero_grad()
            loss.backward()
            optimizer.step()

            epoch_loss += loss.item()
            n_batches  += 1

        if (epoch + 1) % 2 == 0:
            print(f'  Epoch {epoch+1:2d}  Avg BPR Loss: {epoch_loss/n_batches:.4f}')

    print('Training done.')
else:
    print('PyTorch not available. BPR training loop skipped.')
    print("""Pseudocode:
for epoch in range(N_EPOCHS):
    neg_items = sample_negatives(interactions, N_ITEMS)
    for batch in DataLoader(dataset, batch_size=256):
        pos_scores = model(batch.user, batch.pos_item)
        neg_scores = model(batch.user, batch.neg_item)
        loss = -log(sigmoid(pos_scores - neg_scores)).mean()
        optimizer.zero_grad(); loss.backward(); optimizer.step()
    """)

5. Approximate Nearest Neighbor (ANN) Search for ServingΒΆ

Problem: At serving time, scoring 1M items for every user request is too slow.

Solution: Precompute all item embeddings, build an ANN index (e.g., FAISS), retrieve top-K in O(log N).

Library

Algorithm

Notes

FAISS (Facebook)

IVF, HNSW, PQ

Industry standard, GPU support

ScaNN (Google)

Asymmetric quantization

Best recall/latency tradeoff

Annoy (Spotify)

Random projection trees

Simple, good for static indices

hnswlib

HNSW graph

Fast, pure C++

We demonstrate with numpy brute-force (the concept) and show the FAISS API.

# Numpy brute-force ANN (concept demo, not production)
if HAS_TORCH:
    model.eval()
    with torch.no_grad():
        # Precompute ALL item embeddings
        all_item_ids = torch.arange(N_ITEMS)
        item_embeddings = model.get_item_embeddings_all(N_ITEMS)  # (N_ITEMS, output_dim)
        item_emb_np = item_embeddings.numpy()                       # numpy for indexing

        # Query: get top-10 items for user 0
        user_emb = model.get_user_embedding(torch.tensor([0]))     # (1, output_dim)
        user_emb_np = user_emb.numpy()

    # Exact cosine search (brute force)
    scores_all = (user_emb_np @ item_emb_np.T).flatten()           # (N_ITEMS,)
    top10_exact = np.argsort(scores_all)[::-1][:10]
    print(f'Top-10 items for user 0 (exact):    {top10_exact.tolist()}')
    print(f'Scores: {scores_all[top10_exact]}')
else:
    print('Showing FAISS usage pattern (requires: pip install faiss-cpu):')

print("""
--- FAISS usage pattern ---
import faiss

d = output_dim                          # embedding dimension
index = faiss.IndexFlatIP(d)            # Inner Product (cosine after L2-norm)
index = faiss.IndexIVFFlat(quantizer, d, n_cells)  # faster approximate

# Build index (offline, once)
faiss.normalize_L2(item_embeddings)     # normalize for cosine
index.add(item_embeddings)              # add all N_ITEMS vectors

# Query (online, per-request)
faiss.normalize_L2(user_embedding)
D, I = index.search(user_embedding, k=100)   # top-100 in ~1ms for 1M items
# I[0] = item indices, D[0] = similarity scores
""")

6. Comparison: Matrix Factorization vs Neural CF vs Two-TowerΒΆ

Criterion

Matrix Factorization (ALS/SVD)

Neural CF (NeuMF)

Two-Tower

Model capacity

Linear (dot product only)

Non-linear (MLP)

Non-linear per tower

Training

Closed-form or SGD

Backprop

Backprop

Serving latency

Fast (dot product)

Slow (MLP per pair)

Fast (ANN after precompute)

Cold start

Bad

Bad

Can incorporate features

Scalability

Good (sparse data)

Moderate

Excellent (precomputed)

Explainability

Moderate (factor analysis)

Low (black box)

Low

Used at

Netflix (early), Spotify

Academic research

YouTube, Pinterest, Airbnb

# NeuMF (Neural Matrix Factorization) β€” combines MF path and MLP path
if HAS_TORCH:
    class NeuMF(nn.Module):
        """
        He et al. 2017: Neural Collaborative Filtering.
        Two parallel paths:
          1. GMF path: element-wise product of embeddings (generalizes MF)
          2. MLP path: concatenation -> deep network
        Outputs are concatenated and fed to a final prediction layer.
        """
        def __init__(self, n_users, n_items, mf_dim=16, mlp_dims=(64, 32, 16)):
            super().__init__()
            # GMF embeddings
            self.user_embed_gmf = nn.Embedding(n_users, mf_dim)
            self.item_embed_gmf = nn.Embedding(n_items, mf_dim)
            # MLP embeddings
            mlp_in_dim = mlp_dims[0]
            self.user_embed_mlp = nn.Embedding(n_users, mlp_in_dim // 2)
            self.item_embed_mlp = nn.Embedding(n_items, mlp_in_dim // 2)
            # MLP layers
            layers = []
            in_d = mlp_in_dim
            for out_d in mlp_dims[1:]:
                layers += [nn.Linear(in_d, out_d), nn.ReLU()]
                in_d = out_d
            self.mlp = nn.Sequential(*layers)
            # Final layer
            self.out = nn.Linear(mf_dim + in_d, 1)

        def forward(self, user_ids, item_ids):
            # GMF path
            gmf = self.user_embed_gmf(user_ids) * self.item_embed_gmf(item_ids)
            # MLP path
            mlp_in = torch.cat([self.user_embed_mlp(user_ids),
                                 self.item_embed_mlp(item_ids)], dim=1)
            mlp_out = self.mlp(mlp_in)
            # Concatenate and predict
            concat = torch.cat([gmf, mlp_out], dim=1)
            return self.out(concat).squeeze(1)

    neumf = NeuMF(N_USERS, N_ITEMS)
    u = torch.tensor([0, 1, 2])
    i = torch.tensor([10, 20, 30])
    print('NeuMF architecture:')
    print(neumf)
    print(f'Test output: {neumf(u, i).detach().numpy()}')
else:
    print('NeuMF architecture (no PyTorch):')
    print('  GMF path: embed_user(u) * embed_item(i)  β†’ (B, mf_dim)')
    print('  MLP path: cat(embed_user(u), embed_item(i)) β†’ MLP β†’ (B, mlp_out)')
    print('  Final:    cat(GMF_out, MLP_out) β†’ Linear(1)')

7. Cheat Sheet + ExercisesΒΆ

Neural CF Cheat SheetΒΆ

# Embedding model skeleton (PyTorch)
class EmbedCF(nn.Module):
    def __init__(self, n_users, n_items, D):
        super().__init__()
        self.U = nn.Embedding(n_users, D)
        self.V = nn.Embedding(n_items, D)
    def forward(self, u, i):
        return (self.U(u) * self.V(i)).sum(1)  # dot product

# BPR loss
def bpr_loss(pos, neg):
    return -torch.log(torch.sigmoid(pos - neg) + 1e-8).mean()

# Two-tower serving
# Offline: precompute item_embs = item_tower(all_items) β†’ FAISS index
# Online:  query_emb = user_tower(user_id) β†’ index.search(query_emb, k)

Loss

Use case

Formula

MSE

Explicit ratings

(r_hat - r)^2

BCE

Implicit (0/1)

-y log(p) - (1-y) log(1-p)

BPR

Implicit (ranking)

-log Οƒ(pos - neg)

Sampled Softmax

Large catalogs

NCE / InfoNCE

ExercisesΒΆ

  1. Add user/item features: Extend the Two-Tower model so each tower also accepts a feature vector (e.g., user age + genre preferences for users, genre + year for items) concatenated with the embedding. Retrain and compare Recall@10.

  2. In-batch negatives: Instead of sampling random negatives, use other items in the same batch as negatives (β€œin-batch negatives”). Implement this by computing the full score matrix (B, B) for all user-item pairs in the batch, then applying cross-entropy loss on the diagonal. This is the approach used in SimCLR and Google’s two-tower paper.

  3. Hard negatives: Easy negatives (random items) make training fast but less discriminative. Implement a hard-negative mining strategy: after each epoch, find the top-50 items each user hasn’t interacted with and sample negatives from those. Compare training curves and final Recall@10.

  4. Embedding visualization: Train the dot-product CF model for 50 epochs. Extract the item embeddings, apply UMAP (or t-SNE) to project to 2D, and color by item ID (simulating genre clusters). What structure do you observe?

  5. Evaluation harness: Implement proper leave-one-out evaluation: for each user, hold out their most recent interaction as the test positive, and rank it against 99 random negatives. Report Hit Rate@10 (HR@10) and NDCG@10 β€” the standard protocol from the NeuMF paper.