Model Deployment Strategiesยถ
๐ฏ Learning Objectivesยถ
Understand deployment patterns
Implement model versioning
Handle model updates safely
Monitor model performance
Scale serving infrastructure
Deployment Patternsยถ
1. Blue-Green Deploymentยถ
Blue (v1) โโโโโโโโโ
โโโโ Load Balancer โ Users
Green (v2) โโโโโโโโโ
โ
Switch traffic
2. Canary Deploymentยถ
Old (v1): 90% traffic
New (v2): 10% traffic โ Monitor โ 100% if good
3. A/B Testingยถ
Model A: 50% users โ Compare metrics
Model B: 50% users โ Choose winner
# Install dependencies
# !pip install fastapi uvicorn joblib scikit-learn
Model Versioningยถ
A model registry provides version control for trained models, analogous to how Git versions source code. The ModelRegistry class below saves each model with a timestamp-based version, along with metadata like creation time and model type. In production, versioning is critical because you need to roll back to a previous model if a new one underperforms, audit which model was serving predictions at any given time, and reproduce results months later. Tools like MLflow Model Registry, DVC, and cloud-specific registries (SageMaker Model Registry, Vertex AI Model Registry) implement this pattern at scale with additional features like stage transitions and approval workflows.
import joblib
import os
from datetime import datetime
from pathlib import Path
class ModelRegistry:
"""Simple model registry for versioning"""
def __init__(self, base_path="models"):
self.base_path = Path(base_path)
self.base_path.mkdir(exist_ok=True)
def save_model(self, model, model_name, version=None):
"""Save model with version"""
if version is None:
version = datetime.now().strftime("%Y%m%d_%H%M%S")
model_path = self.base_path / model_name / version
model_path.mkdir(parents=True, exist_ok=True)
# Save model
model_file = model_path / "model.pkl"
joblib.dump(model, model_file)
# Save metadata
metadata = {
"version": version,
"created_at": datetime.now().isoformat(),
"model_type": type(model).__name__
}
import json
with open(model_path / "metadata.json", "w") as f:
json.dump(metadata, f, indent=2)
print(f"โ Model saved: {model_name}/{version}")
return version
def load_model(self, model_name, version="latest"):
"""Load specific model version"""
model_dir = self.base_path / model_name
if version == "latest":
# Get latest version
versions = sorted(os.listdir(model_dir))
version = versions[-1] if versions else None
if version is None:
raise ValueError(f"No versions found for {model_name}")
model_path = model_dir / version / "model.pkl"
model = joblib.load(model_path)
print(f"โ Loaded: {model_name}/{version}")
return model
def list_versions(self, model_name):
"""List all versions of a model"""
model_dir = self.base_path / model_name
if not model_dir.exists():
return []
return sorted(os.listdir(model_dir))
# Example usage
registry = ModelRegistry()
print("Model registry created")
Deploying with Version Controlยถ
Integrating the model registry with a FastAPI endpoint lets callers request predictions from a specific model version. The PredictionRequest includes an optional model_version field that defaults to "latest", giving you flexibility: most traffic hits the current best model, but you can also direct specific requests to older versions for comparison or debugging. The /models/{model_name}/versions endpoint exposes available versions so clients and monitoring dashboards can discover what is deployed. This pattern is the foundation for more advanced strategies like canary deployments and A/B testing.
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel
import numpy as np
app = FastAPI(title="Versioned Model API")
# Initialize registry
registry = ModelRegistry()
class PredictionRequest(BaseModel):
features: list
model_version: str = "latest"
class PredictionResponse(BaseModel):
prediction: float
model_version: str
@app.post("/predict", response_model=PredictionResponse)
async def predict(request: PredictionRequest):
try:
# Load requested model version
model = registry.load_model(
"iris_classifier",
version=request.model_version
)
# Predict
features = np.array(request.features).reshape(1, -1)
prediction = model.predict(features)[0]
return PredictionResponse(
prediction=float(prediction),
model_version=request.model_version
)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/models/{model_name}/versions")
async def list_model_versions(model_name: str):
versions = registry.list_versions(model_name)
return {"model": model_name, "versions": versions}
print("โ Versioned API created")
A/B Testing Frameworkยถ
A/B testing in ML goes beyond traditional web experiments: you are comparing two modelsโ predictions against real-world outcomes. The ABTestController randomly assigns each incoming request to model A or model B based on a configurable traffic split, then logs the result so you can compute statistical significance. Unlike offline evaluation on a test set, A/B tests capture how users respond to different model behaviors โ for example, whether a new recommendation model actually increases click-through rate. The split parameter controls risk: start with a small fraction on the challenger model and increase once you have confidence it performs well.
import random
from typing import Dict
class ABTestController:
"""Control A/B testing for models"""
def __init__(self, registry: ModelRegistry):
self.registry = registry
self.experiments: Dict[str, dict] = {}
def create_experiment(self, name, model_a, model_b, split=0.5):
"""Create new A/B test"""
self.experiments[name] = {
"model_a": model_a,
"model_b": model_b,
"split": split, # % traffic to model_b
"results": {"a": [], "b": []}
}
print(f"โ Experiment '{name}' created")
def get_model(self, experiment_name):
"""Get model based on A/B split"""
exp = self.experiments[experiment_name]
# Randomly assign to A or B
if random.random() < exp["split"]:
variant = "b"
model_version = exp["model_b"]
else:
variant = "a"
model_version = exp["model_a"]
model = self.registry.load_model("classifier", model_version)
return model, variant
def log_result(self, experiment_name, variant, metric_value):
"""Log experiment result"""
self.experiments[experiment_name]["results"][variant].append(metric_value)
def get_results(self, experiment_name):
"""Get experiment results"""
results = self.experiments[experiment_name]["results"]
return {
"model_a": {
"count": len(results["a"]),
"mean": np.mean(results["a"]) if results["a"] else 0
},
"model_b": {
"count": len(results["b"]),
"mean": np.mean(results["b"]) if results["b"] else 0
}
}
# Example
ab_controller = ABTestController(registry)
ab_controller.create_experiment(
"sentiment_test",
model_a="v1.0",
model_b="v2.0",
split=0.2 # 20% to new model
)
print("A/B test configured")
Canary Deploymentยถ
A canary deployment gradually shifts traffic from the stable model to a new candidate, monitoring for regressions at each step. Unlike A/B testing, which runs both models indefinitely to gather statistical evidence, a canary is a rollout mechanism: start at 5-10% traffic, watch latency and error metrics, then ramp up to 100% if everything looks healthy. The CanaryDeployment class below supports increase_canary() to widen the rollout, rollback() to instantly revert to the stable version, and promote_canary() to make the new model the default. In production, platforms like Kubernetes with Istio or AWS App Mesh automate this traffic splitting at the infrastructure level.
class CanaryDeployment:
"""Gradual rollout controller"""
def __init__(self, registry, model_name):
self.registry = registry
self.model_name = model_name
self.canary_percentage = 0 # Start at 0%
self.stable_version = None
self.canary_version = None
def start_canary(self, stable_version, canary_version, initial_percentage=10):
"""Start canary deployment"""
self.stable_version = stable_version
self.canary_version = canary_version
self.canary_percentage = initial_percentage
print(f"๐ค Canary started: {canary_percentage}% traffic to {canary_version}")
def increase_canary(self, increment=10):
"""Increase canary traffic"""
self.canary_percentage = min(100, self.canary_percentage + increment)
print(f"๏ฟฝ๏ฟฝ Canary increased to {self.canary_percentage}%")
def rollback(self):
"""Rollback to stable version"""
self.canary_percentage = 0
print(f"โฎ๏ธ Rolled back to {self.stable_version}")
def promote_canary(self):
"""Promote canary to stable"""
self.stable_version = self.canary_version
self.canary_percentage = 0
print(f"โ
Promoted {self.canary_version} to stable")
def get_model(self):
"""Get model based on canary percentage"""
if random.random() * 100 < self.canary_percentage:
version = self.canary_version
else:
version = self.stable_version
return self.registry.load_model(self.model_name, version)
# Example
canary = CanaryDeployment(registry, "classifier")
canary.start_canary(stable_version="v1.0", canary_version="v2.0", initial_percentage=5)
print("Canary deployment ready")
Monitoring Deploymentsยถ
Deployment monitoring tracks operational health metrics for each model version โ latency (average, p95, p99), success rate, and request volume. The DeploymentMonitor class logs every prediction and provides aggregated statistics along with a check_health() method that compares metrics against configurable thresholds. In a real deployment, these metrics feed into alerting systems: if the canary modelโs p99 latency spikes above the threshold or its error rate exceeds 5%, an automated rollback is triggered. Monitoring is what turns a deployment from โhope it worksโ into a controlled, observable process.
from collections import defaultdict
from datetime import datetime
class DeploymentMonitor:
"""Monitor model performance in production"""
def __init__(self):
self.metrics = defaultdict(list)
def log_prediction(self, model_version, latency_ms, success=True):
"""Log prediction metrics"""
self.metrics[model_version].append({
"timestamp": datetime.now(),
"latency_ms": latency_ms,
"success": success
})
def get_stats(self, model_version):
"""Get model statistics"""
data = self.metrics[model_version]
if not data:
return None
latencies = [d["latency_ms"] for d in data]
successes = [d["success"] for d in data]
return {
"total_requests": len(data),
"success_rate": sum(successes) / len(successes),
"avg_latency_ms": np.mean(latencies),
"p95_latency_ms": np.percentile(latencies, 95),
"p99_latency_ms": np.percentile(latencies, 99)
}
def check_health(self, model_version, max_latency_ms=100, min_success_rate=0.95):
"""Check if model is healthy"""
stats = self.get_stats(model_version)
if not stats:
return True
if stats["avg_latency_ms"] > max_latency_ms:
print(f"โ ๏ธ High latency: {stats['avg_latency_ms']:.2f}ms")
return False
if stats["success_rate"] < min_success_rate:
print(f"โ ๏ธ Low success rate: {stats['success_rate']:.2%}")
return False
print(f"โ
Model {model_version} is healthy")
return True
monitor = DeploymentMonitor()
print("Deployment monitor created")
Best Practicesยถ
Version Everything
Models
Training data
Code
Dependencies
Test Before Deploy
Unit tests
Integration tests
Load tests
Shadow testing
Deploy Gradually
Start with small percentage
Monitor metrics closely
Be ready to rollback
Monitor Continuously
Latency (p50, p95, p99)
Error rate
Model metrics
Resource usage
Enable Rollback
Keep previous versions
Automated rollback triggers
Clear rollback procedures
Key Takeawaysยถ
โ Multiple deployment strategies available โ Version control is critical โ Gradual rollouts reduce risk โ Continuous monitoring enables fast response โ Always have a rollback plan