Feature Stores: From Training to Production Without Data LeakageΒΆ

A feature store is the missing piece between data engineering and ML. It solves the hardest problems in production ML: point-in-time correctness, training-serving skew, and feature reuse. This notebook builds a minimal feature store from scratch.

1. SetupΒΆ

import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlite3
from pathlib import Path
import json
import time
import warnings
warnings.filterwarnings('ignore')

try:
    import feast
    HAS_FEAST = True
    print("Feast is available:", feast.__version__)
except ImportError:
    HAS_FEAST = False
    print("Feast not installed. Install with: pip install feast")

print("Building a feature store from scratch with pandas + SQLite.")

2. The Problem with Naive Feature PipelinesΒΆ

Most ML engineers make this mistake at least once: they compute features on the full dataset, train a model, and get suspiciously good metrics. Then the model falls apart in production.

The culprit is data leakage β€” specifically, temporal leakage.

The Leaky PipelineΒΆ

Imagine predicting customer churn. You want to use β€œtotal purchases in the last 30 days” as a feature. If your label is β€œdid the customer churn in the next 30 days?”, computing the feature on all historical data β€” including purchases that happen after the label date β€” bakes future information into training.

Naive (wrong) approach:

features = aggregate(all_events_ever)  # includes future!
labels   = churn_label
model    = train(features, labels)     # learns from the future

Correct approach:

for each (customer, label_date):
    features = aggregate(events WHERE timestamp <= label_date)
    labels   = churn_label_at(label_date)
model = train(features, labels)        # only sees the past

Let’s quantify how bad the leak can be.

from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split

np.random.seed(42)
n_customers = 1000

# Simulate: customers who churn tend to buy less in the 30 days BEFORE churn
# but they have the SAME historical purchase rate before that window
will_churn = np.random.binomial(1, 0.3, n_customers).astype(bool)

# Purchases BEFORE the observation window (both groups are similar)
purchases_before = np.random.poisson(10, n_customers).astype(float)

# Purchases AFTER label_date (churners buy less β€” this is future data!)
purchases_after = np.where(
    will_churn,
    np.random.poisson(1, n_customers),   # churners: almost no future purchases
    np.random.poisson(8, n_customers)    # non-churners: continue buying
).astype(float)

# ----- NAIVE (leaky) pipeline -----
# Feature includes both past AND future purchases
X_leaky = (purchases_before + purchases_after).reshape(-1, 1)
y = will_churn.astype(int)

X_train_l, X_test_l, y_train, y_test = train_test_split(
    X_leaky, y, test_size=0.3, random_state=42
)
model_leaky = LogisticRegression()
model_leaky.fit(X_train_l, y_train)
auc_leaky = roc_auc_score(y_test, model_leaky.predict_proba(X_test_l)[:, 1])

# ----- CORRECT (point-in-time) pipeline -----
# Feature uses only purchases BEFORE the label date
X_correct = purchases_before.reshape(-1, 1)

X_train_c, X_test_c, _, _ = train_test_split(
    X_correct, y, test_size=0.3, random_state=42
)
model_correct = LogisticRegression()
model_correct.fit(X_train_c, y_train)
auc_correct = roc_auc_score(y_test, model_correct.predict_proba(X_test_c)[:, 1])

print("=" * 55)
print("Leaky pipeline AUC  (training includes future data):", f"{auc_leaky:.3f}")
print("Correct pipeline AUC (point-in-time features only): ", f"{auc_correct:.3f}")
print("Inflation from leak:", f"+{auc_leaky - auc_correct:.3f}")
print("=" * 55)
print()
print("Key insight: You need features as they existed at the")
print("time of the prediction, not as they exist today.")

3. Feature Store ArchitectureΒΆ

A feature store has four main components:

Data Sources                Feature Store              Serving
─────────────               ─────────────────          ──────────────
Raw events ──→  Transform β†’ Offline Store  ──→ Training
                         β†˜  (historical)
                           Online Store   ──→ Real-time serving
                           (low latency)

ComponentsΒΆ

Component

Purpose

Technology

Offline Store

Historical feature values for training

Parquet, BigQuery, Hive, SQLite

Online Store

Latest feature values for low-latency serving

Redis, DynamoDB, Cassandra

Feature Registry

Metadata catalog: descriptions, owners, SLAs

Git, database, REST API

Materialization Job

Copies offline β†’ online on a schedule

Spark, Flink, Airflow

Offline StoreΒΆ

Used during training. Stores every version of every feature value with its timestamp. You can reconstruct what any feature looked like on any date in the past. Supports point-in-time joins.

Online StoreΒΆ

Used during serving. Stores only the latest value for each feature. Optimized for sub-millisecond reads. No history β€” just the current state.

MaterializationΒΆ

A scheduled job that reads the offline store, computes the latest values, and pushes them to the online store. This is the bridge between training and serving.

Training-Serving SkewΒΆ

When features are computed differently in training vs serving, you get silent model degradation. The feature store solves this by sharing the same feature transformation logic for both.

4. Build an Offline Feature StoreΒΆ

class OfflineFeatureStore:
    """SQLite-backed offline feature store with point-in-time correct lookup."""

    def __init__(self, db_path=":memory:"):
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self._create_table()

    def _create_table(self):
        self.conn.execute("""
            CREATE TABLE IF NOT EXISTS features (
                entity_id   TEXT    NOT NULL,
                feature_name TEXT   NOT NULL,
                value        REAL,
                value_str    TEXT,
                timestamp    TEXT    NOT NULL
            )
        """)
        self.conn.execute("""
            CREATE INDEX IF NOT EXISTS idx_entity_feature_time
            ON features (entity_id, feature_name, timestamp)
        """)
        self.conn.commit()

    def write_features(self, entity_id, feature_name, value, timestamp):
        """Write a single feature observation."""
        if isinstance(timestamp, datetime):
            timestamp = timestamp.isoformat()
        value_num = float(value) if isinstance(value, (int, float, np.integer, np.floating)) else None
        value_str = str(value) if value_num is None else None
        self.conn.execute(
            "INSERT INTO features VALUES (?, ?, ?, ?, ?)",
            (str(entity_id), feature_name, value_num, value_str, str(timestamp))
        )
        self.conn.commit()

    def write_features_batch(self, records):
        """Batch insert: records is a list of (entity_id, feature_name, value, timestamp)."""
        rows = []
        for entity_id, feature_name, value, timestamp in records:
            if isinstance(timestamp, datetime):
                timestamp = timestamp.isoformat()
            value_num = float(value) if isinstance(value, (int, float, np.integer, np.floating)) else None
            value_str = str(value) if value_num is None else None
            rows.append((str(entity_id), feature_name, value_num, value_str, str(timestamp)))
        self.conn.executemany("INSERT INTO features VALUES (?, ?, ?, ?, ?)", rows)
        self.conn.commit()

    def get_historical_features(self, entity_df, feature_names, as_of_col="label_date"):
        """
        Point-in-time correct lookup.

        For each (entity_id, as_of_timestamp) in entity_df, return the most
        recent value of each feature that was written AT OR BEFORE as_of_timestamp.

        Parameters
        ----------
        entity_df   : DataFrame with columns ['entity_id', as_of_col, ...]
        feature_names : list of feature name strings
        as_of_col   : column in entity_df that holds the "as of" timestamp

        Returns
        -------
        DataFrame with one column per feature, aligned to entity_df rows
        """
        results = entity_df.copy()
        for feature in feature_names:
            values = []
            for _, row in entity_df.iterrows():
                as_of = row[as_of_col]
                if isinstance(as_of, datetime):
                    as_of = as_of.isoformat()
                else:
                    as_of = str(as_of)
                # Point-in-time SQL: most recent value <= request timestamp
                cur = self.conn.execute("""
                    SELECT value, value_str
                    FROM features
                    WHERE entity_id    = ?
                      AND feature_name = ?
                      AND timestamp   <= ?
                    ORDER BY timestamp DESC
                    LIMIT 1
                """, (str(row["entity_id"]), feature, as_of))
                row_result = cur.fetchone()
                if row_result is None:
                    values.append(np.nan)
                elif row_result[0] is not None:
                    values.append(row_result[0])
                else:
                    values.append(row_result[1])
            results[feature] = values
        return results

    def get_latest_values(self, feature_names):
        """Return the most recent value of each feature for every entity."""
        rows = []
        for feature in feature_names:
            cur = self.conn.execute("""
                SELECT entity_id, value, value_str, MAX(timestamp) AS ts
                FROM features
                WHERE feature_name = ?
                GROUP BY entity_id
            """, (feature,))
            for entity_id, val, val_str, ts in cur.fetchall():
                rows.append({
                    "entity_id": entity_id,
                    "feature_name": feature,
                    "value": val if val is not None else val_str,
                    "timestamp": ts
                })
        return pd.DataFrame(rows)


# Quick smoke test
store = OfflineFeatureStore()
store.write_features("cust_001", "purchase_count_30d", 5,  datetime(2023, 1, 15))
store.write_features("cust_001", "purchase_count_30d", 12, datetime(2023, 3, 15))
store.write_features("cust_001", "purchase_count_30d", 3,  datetime(2023, 6, 15))

entity_df = pd.DataFrame([
    {"entity_id": "cust_001", "label_date": datetime(2023, 2, 1)},   # should see value=5
    {"entity_id": "cust_001", "label_date": datetime(2023, 4, 1)},   # should see value=12
    {"entity_id": "cust_001", "label_date": datetime(2023, 7, 1)},   # should see value=3
])

result = store.get_historical_features(entity_df, ["purchase_count_30d"])
print("Point-in-time lookup smoke test:")
print(result[["entity_id", "label_date", "purchase_count_30d"]].to_string(index=False))
print()
print("Expected:  5, 12, 3  β€” each row sees only the past!")

5. Generate Synthetic E-Commerce DataΒΆ

np.random.seed(0)

# ── Customers ──────────────────────────────────────────────────────────────
N_CUSTOMERS = 500
START_DATE  = datetime(2022, 1, 1)
END_DATE    = datetime(2024, 1, 1)

customer_ids = [f"cust_{i:04d}" for i in range(N_CUSTOMERS)]
signup_dates = [
    START_DATE + timedelta(days=int(d))
    for d in np.random.uniform(0, 180, N_CUSTOMERS)
]
tiers = np.random.choice(["bronze", "silver", "gold"], N_CUSTOMERS, p=[0.6, 0.3, 0.1])

customers_df = pd.DataFrame({
    "customer_id": customer_ids,
    "signup_date": signup_dates,
    "tier": tiers
})
print(f"Customers: {len(customers_df)} rows")
print(customers_df.head(3).to_string(index=False))

# ── Events ─────────────────────────────────────────────────────────────────
N_EVENTS = 10_000
total_days = (END_DATE - START_DATE).days

event_customer_ids = np.random.choice(customer_ids, N_EVENTS)
event_types = np.random.choice(
    ["purchase", "view", "return"], N_EVENTS, p=[0.50, 0.40, 0.10]
)
amounts = np.where(
    event_types == "purchase", np.random.exponential(50, N_EVENTS),
    np.where(event_types == "return", np.random.exponential(30, N_EVENTS), 0.0)
)
event_timestamps = [
    START_DATE + timedelta(days=float(d))
    for d in np.random.uniform(0, total_days, N_EVENTS)
]

events_df = pd.DataFrame({
    "customer_id":      event_customer_ids,
    "event_type":       event_types,
    "amount":           amounts,
    "event_timestamp":  event_timestamps
}).sort_values("event_timestamp").reset_index(drop=True)

print(f"\nEvents: {len(events_df)} rows")
print(events_df.head(3).to_string(index=False))

# ── Labels (churn) ─────────────────────────────────────────────────────────
# Churn label: did the customer make zero purchases in the 90 days after label_date?
LABEL_DATE = datetime(2023, 6, 1)   # snapshot date for all training examples

label_rows = []
for cid in customer_ids:
    future_purchases = events_df[
        (events_df["customer_id"] == cid) &
        (events_df["event_type"] == "purchase") &
        (events_df["event_timestamp"] > LABEL_DATE) &
        (events_df["event_timestamp"] <= LABEL_DATE + timedelta(days=90))
    ]
    label_rows.append({
        "customer_id": cid,
        "label_date":  LABEL_DATE,
        "churned":     int(len(future_purchases) == 0)
    })

labels_df = pd.DataFrame(label_rows)
churn_rate = labels_df["churned"].mean()
print(f"\nLabels: {len(labels_df)} rows, churn rate = {churn_rate:.1%}")
print(labels_df.head(3).to_string(index=False))
# ── Feature computation functions ──────────────────────────────────────────

def _filter_events(customer_id, as_of_date, event_type=None, days_back=None):
    """Return events for a customer strictly before as_of_date, optionally filtered."""
    mask = (
        (events_df["customer_id"] == customer_id) &
        (events_df["event_timestamp"] <= as_of_date)
    )
    if days_back is not None:
        window_start = as_of_date - timedelta(days=days_back)
        mask &= (events_df["event_timestamp"] >= window_start)
    if event_type is not None:
        mask &= (events_df["event_type"] == event_type)
    return events_df[mask]


def compute_purchase_count_30d(customer_id, as_of_date):
    """Number of purchases in the 30 days up to (and including) as_of_date."""
    return float(len(_filter_events(customer_id, as_of_date,
                                    event_type="purchase", days_back=30)))


def compute_avg_purchase_value_90d(customer_id, as_of_date):
    """Mean purchase amount over the last 90 days. NaN if no purchases."""
    df = _filter_events(customer_id, as_of_date, event_type="purchase", days_back=90)
    return float(df["amount"].mean()) if len(df) > 0 else np.nan


def compute_days_since_last_purchase(customer_id, as_of_date):
    """Days elapsed since the most recent purchase before as_of_date."""
    df = _filter_events(customer_id, as_of_date, event_type="purchase")
    if len(df) == 0:
        return np.nan
    last_purchase = df["event_timestamp"].max()
    return float((as_of_date - last_purchase).days)


def compute_return_rate_180d(customer_id, as_of_date):
    """Returns / (Purchases + Returns) in the last 180 days."""
    purchases = len(_filter_events(customer_id, as_of_date,
                                   event_type="purchase", days_back=180))
    returns   = len(_filter_events(customer_id, as_of_date,
                                   event_type="return",   days_back=180))
    total = purchases + returns
    return float(returns / total) if total > 0 else 0.0


# Spot check
cid   = customer_ids[0]
aod   = LABEL_DATE
print(f"Spot-check for {cid} as of {aod.date()}:")
print(f"  purchase_count_30d        = {compute_purchase_count_30d(cid, aod):.0f}")
print(f"  avg_purchase_value_90d    = {compute_avg_purchase_value_90d(cid, aod):.2f}")
print(f"  days_since_last_purchase  = {compute_days_since_last_purchase(cid, aod):.0f}")
print(f"  return_rate_180d          = {compute_return_rate_180d(cid, aod):.3f}")

6. Point-in-Time Correct Training SetΒΆ

# Populate the offline store with point-in-time correct features
# We compute features for a subset of customers (for speed)
SAMPLE_CUSTOMERS = customer_ids[:200]

offline_store = OfflineFeatureStore()

FEATURE_FUNCS = {
    "purchase_count_30d":       compute_purchase_count_30d,
    "avg_purchase_value_90d":   compute_avg_purchase_value_90d,
    "days_since_last_purchase": compute_days_since_last_purchase,
    "return_rate_180d":         compute_return_rate_180d,
}

print("Computing and writing features to offline store...")
records = []
for cid in SAMPLE_CUSTOMERS:
    for fname, ffunc in FEATURE_FUNCS.items():
        val = ffunc(cid, LABEL_DATE)
        if not np.isnan(val):
            records.append((cid, fname, val, LABEL_DATE))

offline_store.write_features_batch(records)
print(f"Written {len(records)} feature records for {len(SAMPLE_CUSTOMERS)} customers.")
# Build training set using the offline store (point-in-time correct)
sample_labels = labels_df[labels_df["customer_id"].isin(SAMPLE_CUSTOMERS)].copy()
sample_labels = sample_labels.rename(columns={"customer_id": "entity_id"})

feature_names = list(FEATURE_FUNCS.keys())
training_df = offline_store.get_historical_features(
    sample_labels, feature_names, as_of_col="label_date"
)

print("Point-in-time training set shape:", training_df.shape)
print(training_df[feature_names].describe().round(2))
# Compare: naive (all-history) features vs point-in-time features
# Naive: use ALL purchases ever recorded, including after label_date
naive_rows = []
for cid in SAMPLE_CUSTOMERS:
    # Use END_DATE as "as_of" β€” this leaks future data!
    naive_rows.append({
        "entity_id": cid,
        "purchase_count_30d":       compute_purchase_count_30d(cid, END_DATE),
        "avg_purchase_value_90d":   compute_avg_purchase_value_90d(cid, END_DATE),
        "days_since_last_purchase": compute_days_since_last_purchase(cid, END_DATE),
        "return_rate_180d":         compute_return_rate_180d(cid, END_DATE),
    })
naive_df = pd.DataFrame(naive_rows)
naive_df = naive_df.merge(
    sample_labels[["entity_id", "churned"]], on="entity_id"
)

# Prepare clean datasets for model comparison
pit_df = training_df.dropna(subset=feature_names + ["churned"])
nav_df = naive_df.dropna(subset=feature_names + ["churned"])

def train_and_evaluate(df, feature_cols, label_col="churned", label="model"):
    X = df[feature_cols].fillna(df[feature_cols].median())
    y = df[label_col]
    X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=42)
    clf = LogisticRegression(max_iter=500)
    clf.fit(X_tr, y_tr)
    auc = roc_auc_score(y_te, clf.predict_proba(X_te)[:, 1])
    print(f"  [{label}] AUC = {auc:.4f}  (n={len(df)})")
    return auc

print("Model comparison β€” same data, different feature computation:")
auc_naive = train_and_evaluate(nav_df, feature_names, label="NAIVE  (uses future data)")
auc_pit   = train_and_evaluate(pit_df, feature_names, label="CORRECT (point-in-time)  ")
print()
print(f"AUC inflation from leakage: +{auc_naive - auc_pit:.4f}")
print("The naive model appears better in validation but will fail in production.")

7. Online Feature Store (Redis Pattern)ΒΆ

class OnlineFeatureStore:
    """
    In-memory online feature store (dict-backed).
    In production this would be Redis, DynamoDB, or Cassandra.
    """

    def __init__(self):
        self._store = {}   # {entity_id: {feature_name: value}}
        self._updated_at = {}  # {entity_id: datetime}

    def write(self, entity_id: str, features: dict):
        """Write (overwrite) a dict of features for an entity."""
        entity_id = str(entity_id)
        if entity_id not in self._store:
            self._store[entity_id] = {}
        self._store[entity_id].update(features)
        self._updated_at[entity_id] = datetime.utcnow()

    def read(self, entity_id: str, feature_names: list) -> dict:
        """Read selected features for an entity. Missing features return None."""
        entity_id = str(entity_id)
        entity_data = self._store.get(entity_id, {})
        return {f: entity_data.get(f) for f in feature_names}

    def bulk_write(self, df: pd.DataFrame, entity_col: str):
        """Write every row of a DataFrame into the online store."""
        feature_cols = [c for c in df.columns if c != entity_col]
        for _, row in df.iterrows():
            eid = str(row[entity_col])
            self.write(eid, {c: row[c] for c in feature_cols})

    def last_updated(self, entity_id: str):
        return self._updated_at.get(str(entity_id))

    def __len__(self):
        return len(self._store)


# ── Materialization: offline β†’ online ──────────────────────────────────────
online_store = OnlineFeatureStore()

print("Materializing offline store β†’ online store...")
t0 = time.perf_counter()
latest_df = offline_store.get_latest_values(feature_names)
pivot_df  = latest_df.pivot(index="entity_id", columns="feature_name", values="value").reset_index()
online_store.bulk_write(pivot_df, entity_col="entity_id")
elapsed = time.perf_counter() - t0
print(f"Materialized {len(online_store)} entities in {elapsed*1000:.1f} ms")

# ── Serving latency comparison ─────────────────────────────────────────────
test_customer = SAMPLE_CUSTOMERS[5]

# Online store read
t0 = time.perf_counter()
online_features = online_store.read(test_customer, feature_names)
online_latency_us = (time.perf_counter() - t0) * 1_000_000

# Recompute from raw events
t0 = time.perf_counter()
_ = {fname: ffunc(test_customer, datetime.utcnow()) for fname, ffunc in FEATURE_FUNCS.items()}
recompute_latency_ms = (time.perf_counter() - t0) * 1_000

print()
print("Serving latency comparison:")
print(f"  Online store lookup : {online_latency_us:.1f} Β΅s")
print(f"  Recompute from raw  : {recompute_latency_ms:.1f} ms")
print(f"  Speedup             : ~{recompute_latency_ms*1000/online_latency_us:.0f}x")
print()
print("Fetched features for", test_customer, ":")
for k, v in online_features.items():
    print(f"  {k:<30} = {v}")

8. Feature RegistryΒΆ

class FeatureRegistry:
    """Track feature metadata: name, description, owner, data type, freshness SLA."""

    def __init__(self):
        self._registry = {}  # {name: metadata_dict}

    def register(self, name, description, owner, dtype, freshness_hours):
        """
        Register a feature.

        Parameters
        ----------
        name              : unique feature identifier
        description       : human-readable description
        owner             : team or person responsible
        dtype             : expected data type (e.g. 'float', 'int', 'str')
        freshness_hours   : maximum acceptable staleness in hours
        """
        if name in self._registry:
            print(f"Warning: overwriting existing registration for '{name}'")
        self._registry[name] = {
            "name":             name,
            "description":      description,
            "owner":            owner,
            "dtype":            dtype,
            "freshness_hours":  freshness_hours,
            "registered_at":    datetime.utcnow().isoformat(),
        }

    def get_feature_info(self, name):
        """Return metadata dict for a feature, or None if not registered."""
        return self._registry.get(name)

    def list_features(self):
        """Return a DataFrame of all registered features."""
        if not self._registry:
            return pd.DataFrame()
        return pd.DataFrame(list(self._registry.values()))


# Register all four features
registry = FeatureRegistry()
registry.register(
    name="purchase_count_30d",
    description="Number of completed purchases by the customer in the last 30 days",
    owner="growth-team",
    dtype="float",
    freshness_hours=24
)
registry.register(
    name="avg_purchase_value_90d",
    description="Mean purchase amount (USD) over the last 90 days; NaN if no purchases",
    owner="growth-team",
    dtype="float",
    freshness_hours=24
)
registry.register(
    name="days_since_last_purchase",
    description="Calendar days since the customer's most recent purchase event",
    owner="growth-team",
    dtype="float",
    freshness_hours=1
)
registry.register(
    name="return_rate_180d",
    description="Ratio of returns to (purchases + returns) over the last 180 days",
    owner="risk-team",
    dtype="float",
    freshness_hours=48
)

print("Feature Registry:")
print("=" * 80)
reg_df = registry.list_features()[["name", "owner", "dtype", "freshness_hours", "description"]]
for _, row in reg_df.iterrows():
    print(f"  {row['name']:<30} owner={row['owner']:<12} "
          f"dtype={row['dtype']:<7} freshness={row['freshness_hours']}h")
    print(f"    {row['description']}")
print("=" * 80)

9. Training-Serving Skew DetectionΒΆ

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from scipy import stats

# Training distribution: point-in-time features at LABEL_DATE
train_features = pit_df[feature_names].fillna(pit_df[feature_names].median())

# Serving distribution: current values from online store
serve_rows = []
for cid in SAMPLE_CUSTOMERS:
    row = online_store.read(cid, feature_names)
    serve_rows.append(row)
serve_features = pd.DataFrame(serve_rows).fillna(train_features.median())

# KS test for each feature
print("Training-Serving Skew Detection (KS Test)")
print("=" * 65)
skew_results = []
for feat in feature_names:
    t = train_features[feat].dropna()
    s = serve_features[feat].dropna()
    if len(t) > 0 and len(s) > 0:
        ks_stat, p_val = stats.ks_2samp(t, s)
        flagged = "SKEWED" if p_val < 0.05 else "OK"
        skew_results.append({"feature": feat, "ks_stat": ks_stat, "p_value": p_val, "status": flagged})
        print(f"  {feat:<30} KS={ks_stat:.3f}  p={p_val:.4f}  [{flagged}]")
print("=" * 65)
print("Features with p < 0.05 show statistically significant skew.")

# Plot distributions
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
fig.suptitle("Training vs Serving Feature Distributions", fontsize=14, y=1.01)

for ax, feat in zip(axes.flat, feature_names):
    t_vals = train_features[feat].dropna()
    s_vals = serve_features[feat].dropna()
    # Clip to common range for readability
    lo = min(t_vals.quantile(0.01), s_vals.quantile(0.01))
    hi = max(t_vals.quantile(0.99), s_vals.quantile(0.99))
    bins = np.linspace(lo, hi, 30)
    ax.hist(t_vals.clip(lo, hi), bins=bins, alpha=0.55, label="Training",  color="steelblue", density=True)
    ax.hist(s_vals.clip(lo, hi), bins=bins, alpha=0.55, label="Serving",   color="tomato",    density=True)
    res = next((r for r in skew_results if r["feature"] == feat), None)
    title_suffix = f" [p={res['p_value']:.3f}]" if res else ""
    ax.set_title(feat + title_suffix, fontsize=9)
    ax.legend(fontsize=8)
    ax.set_xlabel("value", fontsize=8)
    ax.set_ylabel("density", fontsize=8)

plt.tight_layout()
plt.savefig("/tmp/feature_skew.png", dpi=90, bbox_inches="tight")
plt.close()
print("\nDistribution plot saved to /tmp/feature_skew.png")

10. Feast Quick ReferenceΒΆ

Feast is the most widely used open-source feature store. Below is a minimal walkthrough of its API. If Feast is installed in your environment the code will execute; otherwise it serves as a reference template.

Install: pip install feast
Docs: https://docs.feast.dev

if HAS_FEAST:
    from feast import FeatureStore, Entity, FeatureView, Field, FileSource
    from feast.types import Float32, Int64

    # Define entity
    customer = Entity(name="customer_id", description="Customer ID")

    # Define data source (Parquet/BigQuery/Snowflake)
    source = FileSource(
        path="data/customer_features.parquet",
        timestamp_field="event_timestamp"
    )

    # Define feature view
    customer_features = FeatureView(
        name="customer_features",
        entities=[customer],
        ttl=timedelta(days=1),
        schema=[
            Field(name="purchase_count_30d",       dtype=Float32),
            Field(name="avg_purchase_value_90d",   dtype=Float32),
            Field(name="days_since_last_purchase", dtype=Float32),
            Field(name="return_rate_180d",         dtype=Float32),
        ],
        source=source,
    )

    print("Feast objects created:")
    print(" Entity      :", customer.name)
    print(" FeatureView :", customer_features.name)
    print(" Features    :", [f.name for f in customer_features.schema])
    print()
    print("In a real project, initialize and apply with:")
    print("  store = FeatureStore(repo_path='.')")
    print("  store.apply([customer, customer_features])")
else:
    print("Feast not installed. Reference code shown below.")
    print()

print("""
# ─── Feast minimal example ───────────────────────────────────────────────────

# Feast: open-source feature store
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64

# Define entity
customer = Entity(name="customer_id", description="Customer ID")

# Define data source (Parquet/BigQuery/Snowflake)
source = FileSource(path="data/customer_features.parquet", timestamp_field="event_timestamp")

# Define feature view
customer_features = FeatureView(
    name="customer_features",
    entities=[customer],
    ttl=timedelta(days=1),
    schema=[Field(name="purchase_count_30d", dtype=Float32), ...],
    source=source,
)

# Initialize store
store = FeatureStore(repo_path=".")
store.apply([customer, customer_features])

# Retrieve training data (point-in-time correct)
training_df = store.get_historical_features(
    entity_df=entity_df,  # DataFrame with entity_id + event_timestamp
    features=["customer_features:purchase_count_30d"]
).to_df()

# Retrieve online features (serving)
features = store.get_online_features(
    features=["customer_features:purchase_count_30d"],
    entity_rows=[{"customer_id": 42}]
).to_dict()
# ──────────────────────────────────────────────────────────────────────────────
""")

11. Cheat SheetΒΆ

Concept               Description
─────────────────────────────────────────────────────
Point-in-time join    Features as they were at prediction time β€” prevents leakage
Offline store         Historical feature values β€” used for training
Online store          Latest feature values β€” used for serving (<5ms latency)
Materialization       Copying offline β†’ online store (runs as scheduled job)
Feature group         Logical grouping of related features (e.g., "user_activity")
TTL (time-to-live)    How long a feature value is considered fresh
Training-serving skew Features computed differently in training vs serving

When to use a feature storeΒΆ

  • Multiple models reuse the same features

  • You need reproducible training sets (time-travel queries)

  • Serving latency is critical (<10 ms)

  • You want to monitor feature drift between training and serving

AlternativesΒΆ

Tool

Notes

Feast

Open-source, integrates with GCP/AWS/Azure

Tecton

Managed, supports streaming features

Hopsworks

Open-source + managed, strong on Python

Vertex AI Feature Store

GCP-native, serverless

SageMaker Feature Store

AWS-native, offline + online

Databricks Feature Store

Tight Unity Catalog integration

12. ExercisesΒΆ

Work through these to deepen your understanding.

Exercise 1. Add a feature_freshness_check() function that accepts the online store and the feature registry, then flags any entity whose most recently materialized feature violates its freshness_hours SLA. Return a DataFrame of violations.

Exercise 2. Implement a create_lag_features(df, feature_col, lags, entity_col, time_col) utility that creates N-day lag versions of a feature column (e.g., purchase_count_30d_lag7, ..._lag14, ..._lag30) with proper point-in-time semantics β€” each lag value must also be looked up against the correct historical timestamp.

Exercise 3. Detect training-serving skew using the Population Stability Index (PSI) instead of the KS test. PSI is defined as:

PSI = Ξ£ (actual_pct - expected_pct) * ln(actual_pct / expected_pct)

where β€œexpected” is the training distribution binned into deciles, and β€œactual” is the serving distribution mapped to the same bins. Flag features with PSI > 0.2 as unstable.

Exercise 4. Build a mock real-time feature compute_session_value(customer_id, session_start) that aggregates all purchase events in the 30 minutes following session_start. Demonstrate writing it to the online store on each new session event, and compare its latency with and without an online store.

Exercise 5. Extend OfflineFeatureStore to support feature versioning. Add a version column to the SQLite table and modify write_features / get_historical_features to accept an optional version parameter. Show that purchase_count_30d version 1 (raw count) and version 2 (log-transformed count) can coexist and be retrieved independently.