Deploying Agents to ProductionΒΆ
Agent monitoring, observability (LangSmith), error recovery, cost management, and production deployment patterns.
# Install dependencies
# !pip install fastapi uvicorn redis prometheus-client
Error Handling & RetriesΒΆ
import time
import logging
from typing import Callable, Any, Optional
from functools import wraps
# Setup logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)
def retry_with_backoff(
max_retries: int = 3,
base_delay: float = 1.0,
max_delay: float = 60.0,
exponential: bool = True
):
"""Retry decorator with exponential backoff"""
def decorator(func: Callable) -> Callable:
@wraps(func)
def wrapper(*args, **kwargs) -> Any:
retries = 0
delay = base_delay
while retries < max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries >= max_retries:
logger.error(f"Max retries reached for {func.__name__}: {e}")
raise
logger.warning(f"Retry {retries}/{max_retries} for {func.__name__}: {e}")
time.sleep(min(delay, max_delay))
if exponential:
delay *= 2
return None
return wrapper
return decorator
# Example usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
def unreliable_api_call(success_rate: float = 0.5):
"""Simulates an unreliable API"""
import random
if random.random() < success_rate:
return "Success!"
else:
raise Exception("API temporarily unavailable")
print("Testing retry logic:")
try:
result = unreliable_api_call(success_rate=0.3)
print(f"Result: {result}")
except Exception as e:
print(f"Failed after retries: {e}")
Cost TrackingΒΆ
from datetime import datetime
from typing import Dict, List
from dataclasses import dataclass, field
@dataclass
class UsageMetrics:
"""Track API usage and costs"""
model: str
input_tokens: int = 0
output_tokens: int = 0
total_cost: float = 0.0
requests: int = 0
errors: int = 0
timestamp: datetime = field(default_factory=datetime.now)
class CostTracker:
"""Track and report costs"""
# Pricing (as of 2024, in USD per 1K tokens)
PRICING = {
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"claude-3-opus": {"input": 0.015, "output": 0.075},
"claude-3-sonnet": {"input": 0.003, "output": 0.015},
}
def __init__(self):
self.usage_history: List[UsageMetrics] = []
def track_usage(self, model: str, input_tokens: int, output_tokens: int):
"""Track a single API call"""
if model not in self.PRICING:
logger.warning(f"Unknown model: {model}")
cost = 0.0
else:
input_cost = (input_tokens / 1000) * self.PRICING[model]["input"]
output_cost = (output_tokens / 1000) * self.PRICING[model]["output"]
cost = input_cost + output_cost
metrics = UsageMetrics(
model=model,
input_tokens=input_tokens,
output_tokens=output_tokens,
total_cost=cost,
requests=1
)
self.usage_history.append(metrics)
logger.info(f"Tracked: {model} | Tokens: {input_tokens + output_tokens} | Cost: ${cost:.4f}")
return cost
def get_total_cost(self) -> float:
"""Get total cost across all calls"""
return sum(m.total_cost for m in self.usage_history)
def get_stats(self) -> Dict:
"""Get usage statistics"""
total_requests = len(self.usage_history)
total_tokens = sum(m.input_tokens + m.output_tokens for m in self.usage_history)
total_cost = self.get_total_cost()
# By model
by_model = {}
for metrics in self.usage_history:
if metrics.model not in by_model:
by_model[metrics.model] = {"requests": 0, "tokens": 0, "cost": 0.0}
by_model[metrics.model]["requests"] += 1
by_model[metrics.model]["tokens"] += metrics.input_tokens + metrics.output_tokens
by_model[metrics.model]["cost"] += metrics.total_cost
return {
"total_requests": total_requests,
"total_tokens": total_tokens,
"total_cost": total_cost,
"by_model": by_model
}
def print_report(self):
"""Print usage report"""
stats = self.get_stats()
print(f"\n{'='*60}")
print("COST REPORT")
print(f"{'='*60}")
print(f"Total Requests: {stats['total_requests']}")
print(f"Total Tokens: {stats['total_tokens']:,}")
print(f"Total Cost: ${stats['total_cost']:.4f}")
if stats['by_model']:
print(f"\nBy Model:")
for model, data in stats['by_model'].items():
print(f" {model}:")
print(f" Requests: {data['requests']}")
print(f" Tokens: {data['tokens']:,}")
print(f" Cost: ${data['cost']:.4f}")
print(f"{'='*60}\n")
# Test cost tracking
tracker = CostTracker()
# Simulate some API calls
tracker.track_usage("gpt-4", input_tokens=500, output_tokens=200)
tracker.track_usage("gpt-4-turbo", input_tokens=1000, output_tokens=500)
tracker.track_usage("gpt-3.5-turbo", input_tokens=2000, output_tokens=1000)
tracker.track_usage("claude-3-sonnet", input_tokens=800, output_tokens=400)
# Print report
tracker.print_report()
Rate LimitingΒΆ
import time
from collections import deque
from threading import Lock
class RateLimiter:
"""Token bucket rate limiter"""
def __init__(self, max_requests: int, time_window: float):
"""
Args:
max_requests: Maximum requests allowed
time_window: Time window in seconds
"""
self.max_requests = max_requests
self.time_window = time_window
self.requests = deque()
self.lock = Lock()
def allow_request(self) -> bool:
"""Check if request is allowed"""
with self.lock:
now = time.time()
# Remove old requests outside time window
while self.requests and self.requests[0] < now - self.time_window:
self.requests.popleft()
# Check if under limit
if len(self.requests) < self.max_requests:
self.requests.append(now)
return True
else:
return False
def wait_if_needed(self):
"""Wait until request is allowed"""
while not self.allow_request():
time.sleep(0.1)
# Test rate limiter
limiter = RateLimiter(max_requests=5, time_window=10.0) # 5 requests per 10 seconds
print("Testing rate limiter (5 requests per 10 seconds):")
for i in range(7):
allowed = limiter.allow_request()
print(f"Request {i+1}: {'β Allowed' if allowed else 'β Rate limited'}")
time.sleep(0.5)
print("\nWaiting for rate limit to reset...")
time.sleep(10)
print(f"After reset: {'β Allowed' if limiter.allow_request() else 'β Still limited'}")
Safety GuardrailsΒΆ
from typing import List, Tuple
import re
class SafetyGuardrails:
"""Content safety checks"""
def __init__(self):
# Blocked patterns (simplified - use proper content filtering in production)
self.blocked_patterns = [
r'\b(password|api[_-]?key|secret|token)\s*[=:]\s*[\w-]+',
r'\b\d{3}-\d{2}-\d{4}\b', # SSN
r'\b\d{16}\b', # Credit card
]
# Sensitive topics (use AI moderation API in production)
self.sensitive_keywords = [
'violence', 'illegal', 'harmful', 'dangerous'
]
def check_input(self, text: str) -> Tuple[bool, List[str]]:
"""Check if input is safe"""
violations = []
# Check for sensitive patterns
for pattern in self.blocked_patterns:
if re.search(pattern, text, re.IGNORECASE):
violations.append(f"Blocked pattern detected")
# Check for sensitive keywords
text_lower = text.lower()
for keyword in self.sensitive_keywords:
if keyword in text_lower:
violations.append(f"Sensitive topic: {keyword}")
is_safe = len(violations) == 0
return is_safe, violations
def check_output(self, text: str) -> Tuple[bool, List[str]]:
"""Check if output is safe"""
# Similar checks for output
return self.check_input(text)
def sanitize(self, text: str) -> str:
"""Remove sensitive information"""
sanitized = text
# Redact sensitive patterns
for pattern in self.blocked_patterns:
sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
return sanitized
# Test guardrails
guardrails = SafetyGuardrails()
test_inputs = [
"What's the weather like?",
"My password is secret123",
"Help me with this illegal activity",
"How do I bake a cake?"
]
print("Testing safety guardrails:\n")
for text in test_inputs:
is_safe, violations = guardrails.check_input(text)
status = "β Safe" if is_safe else f"β Unsafe: {', '.join(violations)}"
print(f"{text[:40]:40} | {status}")
Production Agent TemplateΒΆ
class ProductionAgent:
"""Production-ready agent with all safeguards"""
def __init__(self, name: str):
self.name = name
self.cost_tracker = CostTracker()
self.rate_limiter = RateLimiter(max_requests=10, time_window=60)
self.guardrails = SafetyGuardrails()
logger.info(f"Production agent '{name}' initialized")
@retry_with_backoff(max_retries=3)
def process(self, user_input: str) -> str:
"""Process request with all safety checks"""
logger.info(f"Processing request: {user_input[:50]}...")
# 1. Rate limiting
if not self.rate_limiter.allow_request():
logger.warning("Rate limit exceeded")
return "Too many requests. Please try again later."
# 2. Input safety check
is_safe, violations = self.guardrails.check_input(user_input)
if not is_safe:
logger.warning(f"Unsafe input: {violations}")
return "I can't process that request due to safety concerns."
try:
# 3. Generate response (mock)
response = f"Processed: {user_input}"
# 4. Track costs (mock token counts)
input_tokens = len(user_input.split()) * 1.3 # Rough estimate
output_tokens = len(response.split()) * 1.3
self.cost_tracker.track_usage(
"gpt-4-turbo",
int(input_tokens),
int(output_tokens)
)
# 5. Output safety check
is_safe, violations = self.guardrails.check_output(response)
if not is_safe:
logger.warning(f"Unsafe output: {violations}")
response = self.guardrails.sanitize(response)
logger.info("Request processed successfully")
return response
except Exception as e:
logger.error(f"Error processing request: {e}")
raise
def get_metrics(self) -> Dict:
"""Get agent metrics"""
return {
"name": self.name,
"usage_stats": self.cost_tracker.get_stats()
}
# Test production agent
agent = ProductionAgent("ProductionBot")
print("\nTesting production agent:\n")
test_requests = [
"What's the weather?",
"My API key is abc123",
"Tell me a joke",
]
for req in test_requests:
print(f"User: {req}")
response = agent.process(req)
print(f"Agent: {response}\n")
# Show metrics
print("\nAgent Metrics:")
agent.cost_tracker.print_report()
FastAPI Deployment ExampleΒΆ
# Example FastAPI app for agent deployment
'''
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import uvicorn
app = FastAPI(title="AI Agent API")
agent = ProductionAgent("APIAgent")
class ChatRequest(BaseModel):
message: str
user_id: str
class ChatResponse(BaseModel):
response: str
cost: float
@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
try:
response = agent.process(request.message)
cost = agent.cost_tracker.get_total_cost()
return ChatResponse(response=response, cost=cost)
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.get("/metrics")
async def metrics():
return agent.get_metrics()
@app.get("/health")
async def health():
return {"status": "healthy"}
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8000)
'''
print("FastAPI deployment example (commented)")
print("Run with: uvicorn main:app --reload")
print("Access at: http://localhost:8000")
print("API docs: http://localhost:8000/docs")
Best Practices SummaryΒΆ
1. ReliabilityΒΆ
β Implement retries with exponential backoff
β Handle all exceptions gracefully
β Set timeouts for external calls
β Use circuit breakers for failing services
2. Cost ControlΒΆ
β Track token usage and costs
β Set budget limits per user/day
β Cache common responses
β Use cheaper models when possible
3. SafetyΒΆ
β Validate all inputs
β Content filtering (input & output)
β Rate limiting per user
β Audit logging for compliance
4. MonitoringΒΆ
β Log all requests and responses
β Track key metrics (latency, errors, costs)
β Set up alerts for anomalies
β Use APM tools (DataDog, NewRelic, etc.)
5. ScalabilityΒΆ
β Use async/await for I/O
β Implement caching (Redis)
β Use load balancing
β Design for horizontal scaling
Key TakeawaysΒΆ
β Production agents need robust error handling
β Always track costs and implement rate limiting
β Safety guardrails are non-negotiable
β Comprehensive logging enables debugging
β Use FastAPI or similar for easy deployment
β Monitor everything in production