Deploying Agents to ProductionΒΆ

Agent monitoring, observability (LangSmith), error recovery, cost management, and production deployment patterns.

# Install dependencies
# !pip install fastapi uvicorn redis prometheus-client

Error Handling & RetriesΒΆ

import time
import logging
from typing import Callable, Any, Optional
from functools import wraps

# Setup logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger(__name__)

def retry_with_backoff(
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    exponential: bool = True
):
    """Retry decorator with exponential backoff"""
    def decorator(func: Callable) -> Callable:
        @wraps(func)
        def wrapper(*args, **kwargs) -> Any:
            retries = 0
            delay = base_delay
            
            while retries < max_retries:
                try:
                    return func(*args, **kwargs)
                except Exception as e:
                    retries += 1
                    if retries >= max_retries:
                        logger.error(f"Max retries reached for {func.__name__}: {e}")
                        raise
                    
                    logger.warning(f"Retry {retries}/{max_retries} for {func.__name__}: {e}")
                    time.sleep(min(delay, max_delay))
                    
                    if exponential:
                        delay *= 2
            
            return None
        return wrapper
    return decorator

# Example usage
@retry_with_backoff(max_retries=3, base_delay=1.0)
def unreliable_api_call(success_rate: float = 0.5):
    """Simulates an unreliable API"""
    import random
    if random.random() < success_rate:
        return "Success!"
    else:
        raise Exception("API temporarily unavailable")

print("Testing retry logic:")
try:
    result = unreliable_api_call(success_rate=0.3)
    print(f"Result: {result}")
except Exception as e:
    print(f"Failed after retries: {e}")

Cost TrackingΒΆ

from datetime import datetime
from typing import Dict, List
from dataclasses import dataclass, field

@dataclass
class UsageMetrics:
    """Track API usage and costs"""
    model: str
    input_tokens: int = 0
    output_tokens: int = 0
    total_cost: float = 0.0
    requests: int = 0
    errors: int = 0
    timestamp: datetime = field(default_factory=datetime.now)

class CostTracker:
    """Track and report costs"""
    
    # Pricing (as of 2024, in USD per 1K tokens)
    PRICING = {
        "gpt-4": {"input": 0.03, "output": 0.06},
        "gpt-4-turbo": {"input": 0.01, "output": 0.03},
        "gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
        "claude-3-opus": {"input": 0.015, "output": 0.075},
        "claude-3-sonnet": {"input": 0.003, "output": 0.015},
    }
    
    def __init__(self):
        self.usage_history: List[UsageMetrics] = []
    
    def track_usage(self, model: str, input_tokens: int, output_tokens: int):
        """Track a single API call"""
        if model not in self.PRICING:
            logger.warning(f"Unknown model: {model}")
            cost = 0.0
        else:
            input_cost = (input_tokens / 1000) * self.PRICING[model]["input"]
            output_cost = (output_tokens / 1000) * self.PRICING[model]["output"]
            cost = input_cost + output_cost
        
        metrics = UsageMetrics(
            model=model,
            input_tokens=input_tokens,
            output_tokens=output_tokens,
            total_cost=cost,
            requests=1
        )
        
        self.usage_history.append(metrics)
        logger.info(f"Tracked: {model} | Tokens: {input_tokens + output_tokens} | Cost: ${cost:.4f}")
        
        return cost
    
    def get_total_cost(self) -> float:
        """Get total cost across all calls"""
        return sum(m.total_cost for m in self.usage_history)
    
    def get_stats(self) -> Dict:
        """Get usage statistics"""
        total_requests = len(self.usage_history)
        total_tokens = sum(m.input_tokens + m.output_tokens for m in self.usage_history)
        total_cost = self.get_total_cost()
        
        # By model
        by_model = {}
        for metrics in self.usage_history:
            if metrics.model not in by_model:
                by_model[metrics.model] = {"requests": 0, "tokens": 0, "cost": 0.0}
            by_model[metrics.model]["requests"] += 1
            by_model[metrics.model]["tokens"] += metrics.input_tokens + metrics.output_tokens
            by_model[metrics.model]["cost"] += metrics.total_cost
        
        return {
            "total_requests": total_requests,
            "total_tokens": total_tokens,
            "total_cost": total_cost,
            "by_model": by_model
        }
    
    def print_report(self):
        """Print usage report"""
        stats = self.get_stats()
        
        print(f"\n{'='*60}")
        print("COST REPORT")
        print(f"{'='*60}")
        print(f"Total Requests: {stats['total_requests']}")
        print(f"Total Tokens: {stats['total_tokens']:,}")
        print(f"Total Cost: ${stats['total_cost']:.4f}")
        
        if stats['by_model']:
            print(f"\nBy Model:")
            for model, data in stats['by_model'].items():
                print(f"  {model}:")
                print(f"    Requests: {data['requests']}")
                print(f"    Tokens: {data['tokens']:,}")
                print(f"    Cost: ${data['cost']:.4f}")
        
        print(f"{'='*60}\n")

# Test cost tracking
tracker = CostTracker()

# Simulate some API calls
tracker.track_usage("gpt-4", input_tokens=500, output_tokens=200)
tracker.track_usage("gpt-4-turbo", input_tokens=1000, output_tokens=500)
tracker.track_usage("gpt-3.5-turbo", input_tokens=2000, output_tokens=1000)
tracker.track_usage("claude-3-sonnet", input_tokens=800, output_tokens=400)

# Print report
tracker.print_report()

Rate LimitingΒΆ

import time
from collections import deque
from threading import Lock

class RateLimiter:
    """Token bucket rate limiter"""
    
    def __init__(self, max_requests: int, time_window: float):
        """
        Args:
            max_requests: Maximum requests allowed
            time_window: Time window in seconds
        """
        self.max_requests = max_requests
        self.time_window = time_window
        self.requests = deque()
        self.lock = Lock()
    
    def allow_request(self) -> bool:
        """Check if request is allowed"""
        with self.lock:
            now = time.time()
            
            # Remove old requests outside time window
            while self.requests and self.requests[0] < now - self.time_window:
                self.requests.popleft()
            
            # Check if under limit
            if len(self.requests) < self.max_requests:
                self.requests.append(now)
                return True
            else:
                return False
    
    def wait_if_needed(self):
        """Wait until request is allowed"""
        while not self.allow_request():
            time.sleep(0.1)

# Test rate limiter
limiter = RateLimiter(max_requests=5, time_window=10.0)  # 5 requests per 10 seconds

print("Testing rate limiter (5 requests per 10 seconds):")
for i in range(7):
    allowed = limiter.allow_request()
    print(f"Request {i+1}: {'βœ“ Allowed' if allowed else 'βœ— Rate limited'}")
    time.sleep(0.5)

print("\nWaiting for rate limit to reset...")
time.sleep(10)
print(f"After reset: {'βœ“ Allowed' if limiter.allow_request() else 'βœ— Still limited'}")

Safety GuardrailsΒΆ

from typing import List, Tuple
import re

class SafetyGuardrails:
    """Content safety checks"""
    
    def __init__(self):
        # Blocked patterns (simplified - use proper content filtering in production)
        self.blocked_patterns = [
            r'\b(password|api[_-]?key|secret|token)\s*[=:]\s*[\w-]+',
            r'\b\d{3}-\d{2}-\d{4}\b',  # SSN
            r'\b\d{16}\b',  # Credit card
        ]
        
        # Sensitive topics (use AI moderation API in production)
        self.sensitive_keywords = [
            'violence', 'illegal', 'harmful', 'dangerous'
        ]
    
    def check_input(self, text: str) -> Tuple[bool, List[str]]:
        """Check if input is safe"""
        violations = []
        
        # Check for sensitive patterns
        for pattern in self.blocked_patterns:
            if re.search(pattern, text, re.IGNORECASE):
                violations.append(f"Blocked pattern detected")
        
        # Check for sensitive keywords
        text_lower = text.lower()
        for keyword in self.sensitive_keywords:
            if keyword in text_lower:
                violations.append(f"Sensitive topic: {keyword}")
        
        is_safe = len(violations) == 0
        return is_safe, violations
    
    def check_output(self, text: str) -> Tuple[bool, List[str]]:
        """Check if output is safe"""
        # Similar checks for output
        return self.check_input(text)
    
    def sanitize(self, text: str) -> str:
        """Remove sensitive information"""
        sanitized = text
        
        # Redact sensitive patterns
        for pattern in self.blocked_patterns:
            sanitized = re.sub(pattern, '[REDACTED]', sanitized, flags=re.IGNORECASE)
        
        return sanitized

# Test guardrails
guardrails = SafetyGuardrails()

test_inputs = [
    "What's the weather like?",
    "My password is secret123",
    "Help me with this illegal activity",
    "How do I bake a cake?"
]

print("Testing safety guardrails:\n")
for text in test_inputs:
    is_safe, violations = guardrails.check_input(text)
    status = "βœ“ Safe" if is_safe else f"βœ— Unsafe: {', '.join(violations)}"
    print(f"{text[:40]:40} | {status}")

Production Agent TemplateΒΆ

class ProductionAgent:
    """Production-ready agent with all safeguards"""
    
    def __init__(self, name: str):
        self.name = name
        self.cost_tracker = CostTracker()
        self.rate_limiter = RateLimiter(max_requests=10, time_window=60)
        self.guardrails = SafetyGuardrails()
        logger.info(f"Production agent '{name}' initialized")
    
    @retry_with_backoff(max_retries=3)
    def process(self, user_input: str) -> str:
        """Process request with all safety checks"""
        logger.info(f"Processing request: {user_input[:50]}...")
        
        # 1. Rate limiting
        if not self.rate_limiter.allow_request():
            logger.warning("Rate limit exceeded")
            return "Too many requests. Please try again later."
        
        # 2. Input safety check
        is_safe, violations = self.guardrails.check_input(user_input)
        if not is_safe:
            logger.warning(f"Unsafe input: {violations}")
            return "I can't process that request due to safety concerns."
        
        try:
            # 3. Generate response (mock)
            response = f"Processed: {user_input}"
            
            # 4. Track costs (mock token counts)
            input_tokens = len(user_input.split()) * 1.3  # Rough estimate
            output_tokens = len(response.split()) * 1.3
            self.cost_tracker.track_usage(
                "gpt-4-turbo",
                int(input_tokens),
                int(output_tokens)
            )
            
            # 5. Output safety check
            is_safe, violations = self.guardrails.check_output(response)
            if not is_safe:
                logger.warning(f"Unsafe output: {violations}")
                response = self.guardrails.sanitize(response)
            
            logger.info("Request processed successfully")
            return response
            
        except Exception as e:
            logger.error(f"Error processing request: {e}")
            raise
    
    def get_metrics(self) -> Dict:
        """Get agent metrics"""
        return {
            "name": self.name,
            "usage_stats": self.cost_tracker.get_stats()
        }

# Test production agent
agent = ProductionAgent("ProductionBot")

print("\nTesting production agent:\n")
test_requests = [
    "What's the weather?",
    "My API key is abc123",
    "Tell me a joke",
]

for req in test_requests:
    print(f"User: {req}")
    response = agent.process(req)
    print(f"Agent: {response}\n")

# Show metrics
print("\nAgent Metrics:")
agent.cost_tracker.print_report()

FastAPI Deployment ExampleΒΆ

# Example FastAPI app for agent deployment
'''
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel
import uvicorn

app = FastAPI(title="AI Agent API")
agent = ProductionAgent("APIAgent")

class ChatRequest(BaseModel):
    message: str
    user_id: str

class ChatResponse(BaseModel):
    response: str
    cost: float

@app.post("/chat", response_model=ChatResponse)
async def chat(request: ChatRequest):
    try:
        response = agent.process(request.message)
        cost = agent.cost_tracker.get_total_cost()
        return ChatResponse(response=response, cost=cost)
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))

@app.get("/metrics")
async def metrics():
    return agent.get_metrics()

@app.get("/health")
async def health():
    return {"status": "healthy"}

if __name__ == "__main__":
    uvicorn.run(app, host="0.0.0.0", port=8000)
'''

print("FastAPI deployment example (commented)")
print("Run with: uvicorn main:app --reload")
print("Access at: http://localhost:8000")
print("API docs: http://localhost:8000/docs")

Best Practices SummaryΒΆ

1. ReliabilityΒΆ

  • βœ… Implement retries with exponential backoff

  • βœ… Handle all exceptions gracefully

  • βœ… Set timeouts for external calls

  • βœ… Use circuit breakers for failing services

2. Cost ControlΒΆ

  • βœ… Track token usage and costs

  • βœ… Set budget limits per user/day

  • βœ… Cache common responses

  • βœ… Use cheaper models when possible

3. SafetyΒΆ

  • βœ… Validate all inputs

  • βœ… Content filtering (input & output)

  • βœ… Rate limiting per user

  • βœ… Audit logging for compliance

4. MonitoringΒΆ

  • βœ… Log all requests and responses

  • βœ… Track key metrics (latency, errors, costs)

  • βœ… Set up alerts for anomalies

  • βœ… Use APM tools (DataDog, NewRelic, etc.)

5. ScalabilityΒΆ

  • βœ… Use async/await for I/O

  • βœ… Implement caching (Redis)

  • βœ… Use load balancing

  • βœ… Design for horizontal scaling

Key TakeawaysΒΆ

βœ… Production agents need robust error handling

βœ… Always track costs and implement rate limiting

βœ… Safety guardrails are non-negotiable

βœ… Comprehensive logging enables debugging

βœ… Use FastAPI or similar for easy deployment

βœ… Monitor everything in production