Feature Stores: From Training to Production Without Data LeakageΒΆ
A feature store is the missing piece between data engineering and ML. It solves the hardest problems in production ML: point-in-time correctness, training-serving skew, and feature reuse. This notebook builds a minimal feature store from scratch.
1. SetupΒΆ
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import sqlite3
from pathlib import Path
import json
import time
import warnings
warnings.filterwarnings('ignore')
try:
import feast
HAS_FEAST = True
print("Feast is available:", feast.__version__)
except ImportError:
HAS_FEAST = False
print("Feast not installed. Install with: pip install feast")
print("Building a feature store from scratch with pandas + SQLite.")
2. The Problem with Naive Feature PipelinesΒΆ
Most ML engineers make this mistake at least once: they compute features on the full dataset, train a model, and get suspiciously good metrics. Then the model falls apart in production.
The culprit is data leakage β specifically, temporal leakage.
The Leaky PipelineΒΆ
Imagine predicting customer churn. You want to use βtotal purchases in the last 30 daysβ as a feature. If your label is βdid the customer churn in the next 30 days?β, computing the feature on all historical data β including purchases that happen after the label date β bakes future information into training.
Naive (wrong) approach:
features = aggregate(all_events_ever) # includes future!
labels = churn_label
model = train(features, labels) # learns from the future
Correct approach:
for each (customer, label_date):
features = aggregate(events WHERE timestamp <= label_date)
labels = churn_label_at(label_date)
model = train(features, labels) # only sees the past
Letβs quantify how bad the leak can be.
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import roc_auc_score
from sklearn.model_selection import train_test_split
np.random.seed(42)
n_customers = 1000
# Simulate: customers who churn tend to buy less in the 30 days BEFORE churn
# but they have the SAME historical purchase rate before that window
will_churn = np.random.binomial(1, 0.3, n_customers).astype(bool)
# Purchases BEFORE the observation window (both groups are similar)
purchases_before = np.random.poisson(10, n_customers).astype(float)
# Purchases AFTER label_date (churners buy less β this is future data!)
purchases_after = np.where(
will_churn,
np.random.poisson(1, n_customers), # churners: almost no future purchases
np.random.poisson(8, n_customers) # non-churners: continue buying
).astype(float)
# ----- NAIVE (leaky) pipeline -----
# Feature includes both past AND future purchases
X_leaky = (purchases_before + purchases_after).reshape(-1, 1)
y = will_churn.astype(int)
X_train_l, X_test_l, y_train, y_test = train_test_split(
X_leaky, y, test_size=0.3, random_state=42
)
model_leaky = LogisticRegression()
model_leaky.fit(X_train_l, y_train)
auc_leaky = roc_auc_score(y_test, model_leaky.predict_proba(X_test_l)[:, 1])
# ----- CORRECT (point-in-time) pipeline -----
# Feature uses only purchases BEFORE the label date
X_correct = purchases_before.reshape(-1, 1)
X_train_c, X_test_c, _, _ = train_test_split(
X_correct, y, test_size=0.3, random_state=42
)
model_correct = LogisticRegression()
model_correct.fit(X_train_c, y_train)
auc_correct = roc_auc_score(y_test, model_correct.predict_proba(X_test_c)[:, 1])
print("=" * 55)
print("Leaky pipeline AUC (training includes future data):", f"{auc_leaky:.3f}")
print("Correct pipeline AUC (point-in-time features only): ", f"{auc_correct:.3f}")
print("Inflation from leak:", f"+{auc_leaky - auc_correct:.3f}")
print("=" * 55)
print()
print("Key insight: You need features as they existed at the")
print("time of the prediction, not as they exist today.")
3. Feature Store ArchitectureΒΆ
A feature store has four main components:
Data Sources Feature Store Serving
βββββββββββββ βββββββββββββββββ ββββββββββββββ
Raw events βββ Transform β Offline Store βββ Training
β (historical)
Online Store βββ Real-time serving
(low latency)
ComponentsΒΆ
Component |
Purpose |
Technology |
|---|---|---|
Offline Store |
Historical feature values for training |
Parquet, BigQuery, Hive, SQLite |
Online Store |
Latest feature values for low-latency serving |
Redis, DynamoDB, Cassandra |
Feature Registry |
Metadata catalog: descriptions, owners, SLAs |
Git, database, REST API |
Materialization Job |
Copies offline β online on a schedule |
Spark, Flink, Airflow |
Offline StoreΒΆ
Used during training. Stores every version of every feature value with its timestamp. You can reconstruct what any feature looked like on any date in the past. Supports point-in-time joins.
Online StoreΒΆ
Used during serving. Stores only the latest value for each feature. Optimized for sub-millisecond reads. No history β just the current state.
MaterializationΒΆ
A scheduled job that reads the offline store, computes the latest values, and pushes them to the online store. This is the bridge between training and serving.
Training-Serving SkewΒΆ
When features are computed differently in training vs serving, you get silent model degradation. The feature store solves this by sharing the same feature transformation logic for both.
4. Build an Offline Feature StoreΒΆ
class OfflineFeatureStore:
"""SQLite-backed offline feature store with point-in-time correct lookup."""
def __init__(self, db_path=":memory:"):
self.db_path = db_path
self.conn = sqlite3.connect(db_path)
self._create_table()
def _create_table(self):
self.conn.execute("""
CREATE TABLE IF NOT EXISTS features (
entity_id TEXT NOT NULL,
feature_name TEXT NOT NULL,
value REAL,
value_str TEXT,
timestamp TEXT NOT NULL
)
""")
self.conn.execute("""
CREATE INDEX IF NOT EXISTS idx_entity_feature_time
ON features (entity_id, feature_name, timestamp)
""")
self.conn.commit()
def write_features(self, entity_id, feature_name, value, timestamp):
"""Write a single feature observation."""
if isinstance(timestamp, datetime):
timestamp = timestamp.isoformat()
value_num = float(value) if isinstance(value, (int, float, np.integer, np.floating)) else None
value_str = str(value) if value_num is None else None
self.conn.execute(
"INSERT INTO features VALUES (?, ?, ?, ?, ?)",
(str(entity_id), feature_name, value_num, value_str, str(timestamp))
)
self.conn.commit()
def write_features_batch(self, records):
"""Batch insert: records is a list of (entity_id, feature_name, value, timestamp)."""
rows = []
for entity_id, feature_name, value, timestamp in records:
if isinstance(timestamp, datetime):
timestamp = timestamp.isoformat()
value_num = float(value) if isinstance(value, (int, float, np.integer, np.floating)) else None
value_str = str(value) if value_num is None else None
rows.append((str(entity_id), feature_name, value_num, value_str, str(timestamp)))
self.conn.executemany("INSERT INTO features VALUES (?, ?, ?, ?, ?)", rows)
self.conn.commit()
def get_historical_features(self, entity_df, feature_names, as_of_col="label_date"):
"""
Point-in-time correct lookup.
For each (entity_id, as_of_timestamp) in entity_df, return the most
recent value of each feature that was written AT OR BEFORE as_of_timestamp.
Parameters
----------
entity_df : DataFrame with columns ['entity_id', as_of_col, ...]
feature_names : list of feature name strings
as_of_col : column in entity_df that holds the "as of" timestamp
Returns
-------
DataFrame with one column per feature, aligned to entity_df rows
"""
results = entity_df.copy()
for feature in feature_names:
values = []
for _, row in entity_df.iterrows():
as_of = row[as_of_col]
if isinstance(as_of, datetime):
as_of = as_of.isoformat()
else:
as_of = str(as_of)
# Point-in-time SQL: most recent value <= request timestamp
cur = self.conn.execute("""
SELECT value, value_str
FROM features
WHERE entity_id = ?
AND feature_name = ?
AND timestamp <= ?
ORDER BY timestamp DESC
LIMIT 1
""", (str(row["entity_id"]), feature, as_of))
row_result = cur.fetchone()
if row_result is None:
values.append(np.nan)
elif row_result[0] is not None:
values.append(row_result[0])
else:
values.append(row_result[1])
results[feature] = values
return results
def get_latest_values(self, feature_names):
"""Return the most recent value of each feature for every entity."""
rows = []
for feature in feature_names:
cur = self.conn.execute("""
SELECT entity_id, value, value_str, MAX(timestamp) AS ts
FROM features
WHERE feature_name = ?
GROUP BY entity_id
""", (feature,))
for entity_id, val, val_str, ts in cur.fetchall():
rows.append({
"entity_id": entity_id,
"feature_name": feature,
"value": val if val is not None else val_str,
"timestamp": ts
})
return pd.DataFrame(rows)
# Quick smoke test
store = OfflineFeatureStore()
store.write_features("cust_001", "purchase_count_30d", 5, datetime(2023, 1, 15))
store.write_features("cust_001", "purchase_count_30d", 12, datetime(2023, 3, 15))
store.write_features("cust_001", "purchase_count_30d", 3, datetime(2023, 6, 15))
entity_df = pd.DataFrame([
{"entity_id": "cust_001", "label_date": datetime(2023, 2, 1)}, # should see value=5
{"entity_id": "cust_001", "label_date": datetime(2023, 4, 1)}, # should see value=12
{"entity_id": "cust_001", "label_date": datetime(2023, 7, 1)}, # should see value=3
])
result = store.get_historical_features(entity_df, ["purchase_count_30d"])
print("Point-in-time lookup smoke test:")
print(result[["entity_id", "label_date", "purchase_count_30d"]].to_string(index=False))
print()
print("Expected: 5, 12, 3 β each row sees only the past!")
5. Generate Synthetic E-Commerce DataΒΆ
np.random.seed(0)
# ββ Customers ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
N_CUSTOMERS = 500
START_DATE = datetime(2022, 1, 1)
END_DATE = datetime(2024, 1, 1)
customer_ids = [f"cust_{i:04d}" for i in range(N_CUSTOMERS)]
signup_dates = [
START_DATE + timedelta(days=int(d))
for d in np.random.uniform(0, 180, N_CUSTOMERS)
]
tiers = np.random.choice(["bronze", "silver", "gold"], N_CUSTOMERS, p=[0.6, 0.3, 0.1])
customers_df = pd.DataFrame({
"customer_id": customer_ids,
"signup_date": signup_dates,
"tier": tiers
})
print(f"Customers: {len(customers_df)} rows")
print(customers_df.head(3).to_string(index=False))
# ββ Events βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
N_EVENTS = 10_000
total_days = (END_DATE - START_DATE).days
event_customer_ids = np.random.choice(customer_ids, N_EVENTS)
event_types = np.random.choice(
["purchase", "view", "return"], N_EVENTS, p=[0.50, 0.40, 0.10]
)
amounts = np.where(
event_types == "purchase", np.random.exponential(50, N_EVENTS),
np.where(event_types == "return", np.random.exponential(30, N_EVENTS), 0.0)
)
event_timestamps = [
START_DATE + timedelta(days=float(d))
for d in np.random.uniform(0, total_days, N_EVENTS)
]
events_df = pd.DataFrame({
"customer_id": event_customer_ids,
"event_type": event_types,
"amount": amounts,
"event_timestamp": event_timestamps
}).sort_values("event_timestamp").reset_index(drop=True)
print(f"\nEvents: {len(events_df)} rows")
print(events_df.head(3).to_string(index=False))
# ββ Labels (churn) βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
# Churn label: did the customer make zero purchases in the 90 days after label_date?
LABEL_DATE = datetime(2023, 6, 1) # snapshot date for all training examples
label_rows = []
for cid in customer_ids:
future_purchases = events_df[
(events_df["customer_id"] == cid) &
(events_df["event_type"] == "purchase") &
(events_df["event_timestamp"] > LABEL_DATE) &
(events_df["event_timestamp"] <= LABEL_DATE + timedelta(days=90))
]
label_rows.append({
"customer_id": cid,
"label_date": LABEL_DATE,
"churned": int(len(future_purchases) == 0)
})
labels_df = pd.DataFrame(label_rows)
churn_rate = labels_df["churned"].mean()
print(f"\nLabels: {len(labels_df)} rows, churn rate = {churn_rate:.1%}")
print(labels_df.head(3).to_string(index=False))
# ββ Feature computation functions ββββββββββββββββββββββββββββββββββββββββββ
def _filter_events(customer_id, as_of_date, event_type=None, days_back=None):
"""Return events for a customer strictly before as_of_date, optionally filtered."""
mask = (
(events_df["customer_id"] == customer_id) &
(events_df["event_timestamp"] <= as_of_date)
)
if days_back is not None:
window_start = as_of_date - timedelta(days=days_back)
mask &= (events_df["event_timestamp"] >= window_start)
if event_type is not None:
mask &= (events_df["event_type"] == event_type)
return events_df[mask]
def compute_purchase_count_30d(customer_id, as_of_date):
"""Number of purchases in the 30 days up to (and including) as_of_date."""
return float(len(_filter_events(customer_id, as_of_date,
event_type="purchase", days_back=30)))
def compute_avg_purchase_value_90d(customer_id, as_of_date):
"""Mean purchase amount over the last 90 days. NaN if no purchases."""
df = _filter_events(customer_id, as_of_date, event_type="purchase", days_back=90)
return float(df["amount"].mean()) if len(df) > 0 else np.nan
def compute_days_since_last_purchase(customer_id, as_of_date):
"""Days elapsed since the most recent purchase before as_of_date."""
df = _filter_events(customer_id, as_of_date, event_type="purchase")
if len(df) == 0:
return np.nan
last_purchase = df["event_timestamp"].max()
return float((as_of_date - last_purchase).days)
def compute_return_rate_180d(customer_id, as_of_date):
"""Returns / (Purchases + Returns) in the last 180 days."""
purchases = len(_filter_events(customer_id, as_of_date,
event_type="purchase", days_back=180))
returns = len(_filter_events(customer_id, as_of_date,
event_type="return", days_back=180))
total = purchases + returns
return float(returns / total) if total > 0 else 0.0
# Spot check
cid = customer_ids[0]
aod = LABEL_DATE
print(f"Spot-check for {cid} as of {aod.date()}:")
print(f" purchase_count_30d = {compute_purchase_count_30d(cid, aod):.0f}")
print(f" avg_purchase_value_90d = {compute_avg_purchase_value_90d(cid, aod):.2f}")
print(f" days_since_last_purchase = {compute_days_since_last_purchase(cid, aod):.0f}")
print(f" return_rate_180d = {compute_return_rate_180d(cid, aod):.3f}")
6. Point-in-Time Correct Training SetΒΆ
# Populate the offline store with point-in-time correct features
# We compute features for a subset of customers (for speed)
SAMPLE_CUSTOMERS = customer_ids[:200]
offline_store = OfflineFeatureStore()
FEATURE_FUNCS = {
"purchase_count_30d": compute_purchase_count_30d,
"avg_purchase_value_90d": compute_avg_purchase_value_90d,
"days_since_last_purchase": compute_days_since_last_purchase,
"return_rate_180d": compute_return_rate_180d,
}
print("Computing and writing features to offline store...")
records = []
for cid in SAMPLE_CUSTOMERS:
for fname, ffunc in FEATURE_FUNCS.items():
val = ffunc(cid, LABEL_DATE)
if not np.isnan(val):
records.append((cid, fname, val, LABEL_DATE))
offline_store.write_features_batch(records)
print(f"Written {len(records)} feature records for {len(SAMPLE_CUSTOMERS)} customers.")
# Build training set using the offline store (point-in-time correct)
sample_labels = labels_df[labels_df["customer_id"].isin(SAMPLE_CUSTOMERS)].copy()
sample_labels = sample_labels.rename(columns={"customer_id": "entity_id"})
feature_names = list(FEATURE_FUNCS.keys())
training_df = offline_store.get_historical_features(
sample_labels, feature_names, as_of_col="label_date"
)
print("Point-in-time training set shape:", training_df.shape)
print(training_df[feature_names].describe().round(2))
# Compare: naive (all-history) features vs point-in-time features
# Naive: use ALL purchases ever recorded, including after label_date
naive_rows = []
for cid in SAMPLE_CUSTOMERS:
# Use END_DATE as "as_of" β this leaks future data!
naive_rows.append({
"entity_id": cid,
"purchase_count_30d": compute_purchase_count_30d(cid, END_DATE),
"avg_purchase_value_90d": compute_avg_purchase_value_90d(cid, END_DATE),
"days_since_last_purchase": compute_days_since_last_purchase(cid, END_DATE),
"return_rate_180d": compute_return_rate_180d(cid, END_DATE),
})
naive_df = pd.DataFrame(naive_rows)
naive_df = naive_df.merge(
sample_labels[["entity_id", "churned"]], on="entity_id"
)
# Prepare clean datasets for model comparison
pit_df = training_df.dropna(subset=feature_names + ["churned"])
nav_df = naive_df.dropna(subset=feature_names + ["churned"])
def train_and_evaluate(df, feature_cols, label_col="churned", label="model"):
X = df[feature_cols].fillna(df[feature_cols].median())
y = df[label_col]
X_tr, X_te, y_tr, y_te = train_test_split(X, y, test_size=0.3, random_state=42)
clf = LogisticRegression(max_iter=500)
clf.fit(X_tr, y_tr)
auc = roc_auc_score(y_te, clf.predict_proba(X_te)[:, 1])
print(f" [{label}] AUC = {auc:.4f} (n={len(df)})")
return auc
print("Model comparison β same data, different feature computation:")
auc_naive = train_and_evaluate(nav_df, feature_names, label="NAIVE (uses future data)")
auc_pit = train_and_evaluate(pit_df, feature_names, label="CORRECT (point-in-time) ")
print()
print(f"AUC inflation from leakage: +{auc_naive - auc_pit:.4f}")
print("The naive model appears better in validation but will fail in production.")
7. Online Feature Store (Redis Pattern)ΒΆ
class OnlineFeatureStore:
"""
In-memory online feature store (dict-backed).
In production this would be Redis, DynamoDB, or Cassandra.
"""
def __init__(self):
self._store = {} # {entity_id: {feature_name: value}}
self._updated_at = {} # {entity_id: datetime}
def write(self, entity_id: str, features: dict):
"""Write (overwrite) a dict of features for an entity."""
entity_id = str(entity_id)
if entity_id not in self._store:
self._store[entity_id] = {}
self._store[entity_id].update(features)
self._updated_at[entity_id] = datetime.utcnow()
def read(self, entity_id: str, feature_names: list) -> dict:
"""Read selected features for an entity. Missing features return None."""
entity_id = str(entity_id)
entity_data = self._store.get(entity_id, {})
return {f: entity_data.get(f) for f in feature_names}
def bulk_write(self, df: pd.DataFrame, entity_col: str):
"""Write every row of a DataFrame into the online store."""
feature_cols = [c for c in df.columns if c != entity_col]
for _, row in df.iterrows():
eid = str(row[entity_col])
self.write(eid, {c: row[c] for c in feature_cols})
def last_updated(self, entity_id: str):
return self._updated_at.get(str(entity_id))
def __len__(self):
return len(self._store)
# ββ Materialization: offline β online ββββββββββββββββββββββββββββββββββββββ
online_store = OnlineFeatureStore()
print("Materializing offline store β online store...")
t0 = time.perf_counter()
latest_df = offline_store.get_latest_values(feature_names)
pivot_df = latest_df.pivot(index="entity_id", columns="feature_name", values="value").reset_index()
online_store.bulk_write(pivot_df, entity_col="entity_id")
elapsed = time.perf_counter() - t0
print(f"Materialized {len(online_store)} entities in {elapsed*1000:.1f} ms")
# ββ Serving latency comparison βββββββββββββββββββββββββββββββββββββββββββββ
test_customer = SAMPLE_CUSTOMERS[5]
# Online store read
t0 = time.perf_counter()
online_features = online_store.read(test_customer, feature_names)
online_latency_us = (time.perf_counter() - t0) * 1_000_000
# Recompute from raw events
t0 = time.perf_counter()
_ = {fname: ffunc(test_customer, datetime.utcnow()) for fname, ffunc in FEATURE_FUNCS.items()}
recompute_latency_ms = (time.perf_counter() - t0) * 1_000
print()
print("Serving latency comparison:")
print(f" Online store lookup : {online_latency_us:.1f} Β΅s")
print(f" Recompute from raw : {recompute_latency_ms:.1f} ms")
print(f" Speedup : ~{recompute_latency_ms*1000/online_latency_us:.0f}x")
print()
print("Fetched features for", test_customer, ":")
for k, v in online_features.items():
print(f" {k:<30} = {v}")
8. Feature RegistryΒΆ
class FeatureRegistry:
"""Track feature metadata: name, description, owner, data type, freshness SLA."""
def __init__(self):
self._registry = {} # {name: metadata_dict}
def register(self, name, description, owner, dtype, freshness_hours):
"""
Register a feature.
Parameters
----------
name : unique feature identifier
description : human-readable description
owner : team or person responsible
dtype : expected data type (e.g. 'float', 'int', 'str')
freshness_hours : maximum acceptable staleness in hours
"""
if name in self._registry:
print(f"Warning: overwriting existing registration for '{name}'")
self._registry[name] = {
"name": name,
"description": description,
"owner": owner,
"dtype": dtype,
"freshness_hours": freshness_hours,
"registered_at": datetime.utcnow().isoformat(),
}
def get_feature_info(self, name):
"""Return metadata dict for a feature, or None if not registered."""
return self._registry.get(name)
def list_features(self):
"""Return a DataFrame of all registered features."""
if not self._registry:
return pd.DataFrame()
return pd.DataFrame(list(self._registry.values()))
# Register all four features
registry = FeatureRegistry()
registry.register(
name="purchase_count_30d",
description="Number of completed purchases by the customer in the last 30 days",
owner="growth-team",
dtype="float",
freshness_hours=24
)
registry.register(
name="avg_purchase_value_90d",
description="Mean purchase amount (USD) over the last 90 days; NaN if no purchases",
owner="growth-team",
dtype="float",
freshness_hours=24
)
registry.register(
name="days_since_last_purchase",
description="Calendar days since the customer's most recent purchase event",
owner="growth-team",
dtype="float",
freshness_hours=1
)
registry.register(
name="return_rate_180d",
description="Ratio of returns to (purchases + returns) over the last 180 days",
owner="risk-team",
dtype="float",
freshness_hours=48
)
print("Feature Registry:")
print("=" * 80)
reg_df = registry.list_features()[["name", "owner", "dtype", "freshness_hours", "description"]]
for _, row in reg_df.iterrows():
print(f" {row['name']:<30} owner={row['owner']:<12} "
f"dtype={row['dtype']:<7} freshness={row['freshness_hours']}h")
print(f" {row['description']}")
print("=" * 80)
9. Training-Serving Skew DetectionΒΆ
import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from scipy import stats
# Training distribution: point-in-time features at LABEL_DATE
train_features = pit_df[feature_names].fillna(pit_df[feature_names].median())
# Serving distribution: current values from online store
serve_rows = []
for cid in SAMPLE_CUSTOMERS:
row = online_store.read(cid, feature_names)
serve_rows.append(row)
serve_features = pd.DataFrame(serve_rows).fillna(train_features.median())
# KS test for each feature
print("Training-Serving Skew Detection (KS Test)")
print("=" * 65)
skew_results = []
for feat in feature_names:
t = train_features[feat].dropna()
s = serve_features[feat].dropna()
if len(t) > 0 and len(s) > 0:
ks_stat, p_val = stats.ks_2samp(t, s)
flagged = "SKEWED" if p_val < 0.05 else "OK"
skew_results.append({"feature": feat, "ks_stat": ks_stat, "p_value": p_val, "status": flagged})
print(f" {feat:<30} KS={ks_stat:.3f} p={p_val:.4f} [{flagged}]")
print("=" * 65)
print("Features with p < 0.05 show statistically significant skew.")
# Plot distributions
fig, axes = plt.subplots(2, 2, figsize=(12, 8))
fig.suptitle("Training vs Serving Feature Distributions", fontsize=14, y=1.01)
for ax, feat in zip(axes.flat, feature_names):
t_vals = train_features[feat].dropna()
s_vals = serve_features[feat].dropna()
# Clip to common range for readability
lo = min(t_vals.quantile(0.01), s_vals.quantile(0.01))
hi = max(t_vals.quantile(0.99), s_vals.quantile(0.99))
bins = np.linspace(lo, hi, 30)
ax.hist(t_vals.clip(lo, hi), bins=bins, alpha=0.55, label="Training", color="steelblue", density=True)
ax.hist(s_vals.clip(lo, hi), bins=bins, alpha=0.55, label="Serving", color="tomato", density=True)
res = next((r for r in skew_results if r["feature"] == feat), None)
title_suffix = f" [p={res['p_value']:.3f}]" if res else ""
ax.set_title(feat + title_suffix, fontsize=9)
ax.legend(fontsize=8)
ax.set_xlabel("value", fontsize=8)
ax.set_ylabel("density", fontsize=8)
plt.tight_layout()
plt.savefig("/tmp/feature_skew.png", dpi=90, bbox_inches="tight")
plt.close()
print("\nDistribution plot saved to /tmp/feature_skew.png")
10. Feast Quick ReferenceΒΆ
Feast is the most widely used open-source feature store. Below is a minimal walkthrough of its API. If Feast is installed in your environment the code will execute; otherwise it serves as a reference template.
Install: pip install feast
Docs: https://docs.feast.dev
if HAS_FEAST:
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
# Define entity
customer = Entity(name="customer_id", description="Customer ID")
# Define data source (Parquet/BigQuery/Snowflake)
source = FileSource(
path="data/customer_features.parquet",
timestamp_field="event_timestamp"
)
# Define feature view
customer_features = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=1),
schema=[
Field(name="purchase_count_30d", dtype=Float32),
Field(name="avg_purchase_value_90d", dtype=Float32),
Field(name="days_since_last_purchase", dtype=Float32),
Field(name="return_rate_180d", dtype=Float32),
],
source=source,
)
print("Feast objects created:")
print(" Entity :", customer.name)
print(" FeatureView :", customer_features.name)
print(" Features :", [f.name for f in customer_features.schema])
print()
print("In a real project, initialize and apply with:")
print(" store = FeatureStore(repo_path='.')")
print(" store.apply([customer, customer_features])")
else:
print("Feast not installed. Reference code shown below.")
print()
print("""
# βββ Feast minimal example βββββββββββββββββββββββββββββββββββββββββββββββββββ
# Feast: open-source feature store
from feast import FeatureStore, Entity, FeatureView, Field, FileSource
from feast.types import Float32, Int64
# Define entity
customer = Entity(name="customer_id", description="Customer ID")
# Define data source (Parquet/BigQuery/Snowflake)
source = FileSource(path="data/customer_features.parquet", timestamp_field="event_timestamp")
# Define feature view
customer_features = FeatureView(
name="customer_features",
entities=[customer],
ttl=timedelta(days=1),
schema=[Field(name="purchase_count_30d", dtype=Float32), ...],
source=source,
)
# Initialize store
store = FeatureStore(repo_path=".")
store.apply([customer, customer_features])
# Retrieve training data (point-in-time correct)
training_df = store.get_historical_features(
entity_df=entity_df, # DataFrame with entity_id + event_timestamp
features=["customer_features:purchase_count_30d"]
).to_df()
# Retrieve online features (serving)
features = store.get_online_features(
features=["customer_features:purchase_count_30d"],
entity_rows=[{"customer_id": 42}]
).to_dict()
# ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
""")
11. Cheat SheetΒΆ
Concept Description
βββββββββββββββββββββββββββββββββββββββββββββββββββββ
Point-in-time join Features as they were at prediction time β prevents leakage
Offline store Historical feature values β used for training
Online store Latest feature values β used for serving (<5ms latency)
Materialization Copying offline β online store (runs as scheduled job)
Feature group Logical grouping of related features (e.g., "user_activity")
TTL (time-to-live) How long a feature value is considered fresh
Training-serving skew Features computed differently in training vs serving
When to use a feature storeΒΆ
Multiple models reuse the same features
You need reproducible training sets (time-travel queries)
Serving latency is critical (<10 ms)
You want to monitor feature drift between training and serving
AlternativesΒΆ
Tool |
Notes |
|---|---|
Feast |
Open-source, integrates with GCP/AWS/Azure |
Tecton |
Managed, supports streaming features |
Hopsworks |
Open-source + managed, strong on Python |
Vertex AI Feature Store |
GCP-native, serverless |
SageMaker Feature Store |
AWS-native, offline + online |
Databricks Feature Store |
Tight Unity Catalog integration |
12. ExercisesΒΆ
Work through these to deepen your understanding.
Exercise 1. Add a feature_freshness_check() function that accepts the online store and the feature registry, then flags any entity whose most recently materialized feature violates its freshness_hours SLA. Return a DataFrame of violations.
Exercise 2. Implement a create_lag_features(df, feature_col, lags, entity_col, time_col) utility that creates N-day lag versions of a feature column (e.g., purchase_count_30d_lag7, ..._lag14, ..._lag30) with proper point-in-time semantics β each lag value must also be looked up against the correct historical timestamp.
Exercise 3. Detect training-serving skew using the Population Stability Index (PSI) instead of the KS test. PSI is defined as:
PSI = Ξ£ (actual_pct - expected_pct) * ln(actual_pct / expected_pct)
where βexpectedβ is the training distribution binned into deciles, and βactualβ is the serving distribution mapped to the same bins. Flag features with PSI > 0.2 as unstable.
Exercise 4. Build a mock real-time feature compute_session_value(customer_id, session_start) that aggregates all purchase events in the 30 minutes following session_start. Demonstrate writing it to the online store on each new session event, and compare its latency with and without an online store.
Exercise 5. Extend OfflineFeatureStore to support feature versioning. Add a version column to the SQLite table and modify write_features / get_historical_features to accept an optional version parameter. Show that purchase_count_30d version 1 (raw count) and version 2 (log-transformed count) can coexist and be retrieved independently.