LLM Infrastructure for Production (2025-2026 Essential Stack)ΒΆ
This notebook covers the critical infrastructure tools every ML engineer needs for deploying LLMs at scale in 2025-2026.
What You Will LearnΒΆ
Part |
Topic |
Key Benefit |
|---|---|---|
1 |
LiteLLM |
Unified gateway for 100+ LLM APIs |
2 |
Anthropic Prompt Caching |
Up to 90% cost reduction |
3 |
Speculative Decoding |
2-3x faster inference |
4 |
W&B Weave |
LLM observability and tracing |
5 |
FlashAttention-3 |
85% GPU utilization on H100 |
Part 1: LiteLLM β Unified LLM GatewayΒΆ
The Problem LiteLLM SolvesΒΆ
In 2025, teams work with multiple LLM providers:
OpenAI (GPT-4o, o3)
Anthropic (Claude 3.5/3.7)
Google (Gemini 2.0 Flash, 2.5 Pro)
AWS Bedrock (hosted Claude, Llama, Mistral)
Self-hosted via Ollama, vLLM
Each has a different SDK, different API format, different error codes. LiteLLM gives you one unified OpenAI-compatible interface for all of them.
Your Code β LiteLLM β OpenAI
β Anthropic
β Gemini
β Bedrock
β Ollama (local)
β Azure OpenAI
β 95+ more
# Install LiteLLM
# !pip install litellm
# Verify installation
try:
import litellm
print(f"LiteLLM version: {litellm.__version__}")
except ImportError:
print("Run: pip install litellm")
1.1 Calling Any LLM with the Same CodeΒΆ
import os
from litellm import completion
# --- The Universal LLM Call ---
# Set your API keys (or use environment variables)
# os.environ["OPENAI_API_KEY"] = "sk-..."
# os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
# os.environ["GEMINI_API_KEY"] = "..."
def call_any_llm(model: str, prompt: str, dry_run: bool = True):
"""Demonstrate LiteLLM's unified interface."""
if dry_run:
print(f"[DRY RUN] Would call model='{model}' with: '{prompt[:60]}...'")
return None
response = completion(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=200
)
return response.choices[0].message.content
# The EXACT same code works for all these providers:
models_to_try = [
"gpt-4o", # OpenAI
"anthropic/claude-opus-4-6", # Anthropic
"gemini/gemini-2.0-flash", # Google
"ollama/llama3.2", # Local via Ollama
"bedrock/anthropic.claude-3-5-sonnet", # AWS Bedrock
"azure/gpt-4o", # Azure OpenAI
]
prompt = "Explain gradient descent in one sentence."
for model in models_to_try:
call_any_llm(model, prompt, dry_run=True)
print("\nTo actually call: set dry_run=False and configure API keys")
1.2 Fallback Routing β Never Let One Provider Failure Break Your AppΒΆ
from litellm import Router
# Configure a router with primary + fallback models
model_list = [
{
"model_name": "gpt-4-production", # Logical alias
"litellm_params": {
"model": "gpt-4o", # Primary: OpenAI
"api_key": os.environ.get("OPENAI_API_KEY", "placeholder"),
},
},
{
"model_name": "gpt-4-production", # Same alias = fallback
"litellm_params": {
"model": "anthropic/claude-opus-4-6", # Fallback: Anthropic
"api_key": os.environ.get("ANTHROPIC_API_KEY", "placeholder"),
},
},
{
"model_name": "gpt-4-production",
"litellm_params": {
"model": "gemini/gemini-2.0-flash", # Second fallback: Gemini
"api_key": os.environ.get("GEMINI_API_KEY", "placeholder"),
},
},
]
# Create router with load balancing strategy
router = Router(
model_list=model_list,
routing_strategy="least-busy", # Options: least-busy, simple-shuffle, latency-based-routing
num_retries=3,
fallbacks=[{"gpt-4-production": ["anthropic/claude-opus-4-6"]}],
retry_after=5, # seconds between retries
allowed_fails=2, # circuit breaker: disable provider after 2 failures
cooldown_time=60, # seconds before retrying failed provider
)
print("Router configured with:")
print(f" Primary: OpenAI GPT-4o")
print(f" Fallback 1: Anthropic Claude Opus 4.6")
print(f" Fallback 2: Google Gemini 2.0 Flash")
print(f" Strategy: least-busy")
print(f" Circuit breaker: disables provider after 2 consecutive failures")
# Usage (same as regular completion)
# response = await router.acompletion(
# model="gpt-4-production",
# messages=[{"role": "user", "content": "Hello!"}]
# )
1.3 Cost Tracking and Budget LimitsΒΆ
from litellm import completion_cost, cost_per_token
# --- Cost calculation for common models ---
model_costs = {
"gpt-4o": {"input": 2.50, "output": 10.00},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"claude-opus-4-6": {"input": 15.00, "output": 75.00},
"claude-3-5-sonnet-20241022": {"input": 3.00, "output": 15.00},
"gemini/gemini-2.0-flash": {"input": 0.075, "output": 0.30},
"gemini/gemini-2.5-pro": {"input": 1.25, "output": 10.00},
}
print("Cost per 1M tokens (USD):")
print(f"{'Model':<40} {'Input':>10} {'Output':>10}")
print("-" * 62)
for model, costs in model_costs.items():
print(f"{model:<40} ${costs['input']:>9.3f} ${costs['output']:>9.3f}")
# --- Real-world cost simulation ---
print("\n--- Real-world Cost Simulation ---")
print("Scenario: 10,000 API calls/day, 500 input tokens, 200 output tokens")
print()
daily_calls = 10_000
input_tokens = 500
output_tokens = 200
for model, costs in model_costs.items():
daily_cost = daily_calls * (
(input_tokens * costs["input"] / 1_000_000) +
(output_tokens * costs["output"] / 1_000_000)
)
monthly_cost = daily_cost * 30
print(f"{model:<40} ${daily_cost:>8.2f}/day ${monthly_cost:>9.2f}/month")
1.4 LiteLLM Proxy β Run as a Local API GatewayΒΆ
The LiteLLM Proxy lets you run a local server that any OpenAI-compatible client can point to. It adds:
Cost tracking per team/user
Budget limits (block requests when budget exceeded)
Rate limiting per API key
Logging to Langfuse, LangSmith, Helicone, W&B
# LiteLLM Proxy configuration file: config.yaml
proxy_config = """
# litellm_config.yaml
model_list:
- model_name: gpt-4o
litellm_params:
model: gpt-4o
api_key: os.environ/OPENAI_API_KEY
- model_name: claude-3-5-sonnet
litellm_params:
model: anthropic/claude-3-5-sonnet-20241022
api_key: os.environ/ANTHROPIC_API_KEY
- model_name: gemini-flash
litellm_params:
model: gemini/gemini-2.0-flash
api_key: os.environ/GEMINI_API_KEY
litellm_settings:
success_callback: ["langfuse", "wandb"]
failure_callback: ["langfuse"]
budget_duration: "1mo"
general_settings:
master_key: sk-my-proxy-key
database_url: os.environ/DATABASE_URL # PostgreSQL for persistent tracking
"""
print("LiteLLM Proxy config (litellm_config.yaml):")
print(proxy_config)
print("Start the proxy server:")
print(" litellm --config litellm_config.yaml --port 4000")
print()
print("Then use it exactly like OpenAI:")
print("""
from openai import OpenAI
client = OpenAI(
base_url="http://0.0.0.0:4000", # Your local LiteLLM proxy
api_key="sk-my-proxy-key"
)
response = client.chat.completions.create(
model="claude-3-5-sonnet", # Maps to Anthropic automatically
messages=[{"role": "user", "content": "Hello!"}]
)
""")
1.5 Logging to Observability PlatformsΒΆ
import litellm
# Enable logging to multiple platforms simultaneously
litellm.success_callback = ["langfuse", "wandb", "helicone"]
litellm.failure_callback = ["langfuse", "wandb"]
# Per-request metadata for tracking
metadata_example = {
"generation_name": "rag-query-v2",
"generation_id": "gen_abc123",
"trace_user_id": "user_456",
"tags": ["production", "rag", "legal-team"],
"session_id": "session_789",
}
print("Logging configuration set:")
print(f" Success callbacks: {litellm.success_callback}")
print(f" Failure callbacks: {litellm.failure_callback}")
print()
print("Example call with metadata tracking:")
print("""
response = completion(
model="gpt-4o",
messages=[{"role": "user", "content": query}],
metadata={
"generation_name": "rag-query-v2",
"trace_user_id": "user_456",
"session_id": "session_789",
}
)
# Automatically logged to LangFuse, W&B, Helicone
""")
# Rate limiting per user
print("\nRate limiting config (in proxy config.yaml):")
print("""
router_settings:
redis_url: "redis://localhost:6379"
routing_strategy: "usage-based-routing"
# Per-key rate limits
litellm_settings:
default_team_settings:
- team_id: "engineering"
budget_limit: 100 # $100/month
tpm_limit: 100000 # 100K tokens/minute
rpm_limit: 1000 # 1K requests/minute
""")
Part 2: Anthropic Prompt Caching β 90% Cost ReductionΒΆ
What Is Prompt Caching?ΒΆ
Every API call processes your entire prompt from scratch β even if 90% of it is the same system prompt or reference document. Prompt caching stores the KV cache (key-value attention states) on Anthropicβs servers so identical prefix content only needs to be processed once.
Cost breakdown:
Token Type |
Cost vs Normal |
|---|---|
Cache write (first call) |
1.25x normal price |
Cache read (subsequent calls) |
0.1x normal price (90% off!) |
Regular (no cache) |
1.0x baseline |
Cache duration:
Default: 5 minutes after last access (resets on each hit)
Extended: 1 hour (available on Claude Sonnet 3.7+)
Minimum cacheable tokens: 1,024 tokens for Claude 3+ models
import anthropic
import time
# --- Prompt Caching Implementation ---
# Simulate a large legal document (in production this would be 10,000+ words)
large_legal_document = """
MASTER SERVICE AGREEMENT
This Master Service Agreement ("Agreement") is entered into as of January 1, 2025,
between Acme Corporation ("Client") and TechVentures Inc ("Provider").
Section 1: Definitions
1.1 "Services" means the software development and consulting services described herein.
1.2 "Deliverables" means all work product created under this Agreement.
1.3 "Confidential Information" means any non-public information disclosed by either party.
Section 2: Services and Deliverables
2.1 Provider shall perform the Services in a professional manner.
2.2 Client shall provide timely feedback on all Deliverables.
2.3 Changes to scope require written amendment signed by both parties.
Section 3: Payment Terms
3.1 Client shall pay invoices within 30 days of receipt.
3.2 Late payments accrue interest at 1.5% per month.
3.3 Provider may suspend services after 60 days of non-payment.
[... imagine 9,977 more words of legal text here ...]
""" * 50 # Repeat to reach minimum 1,024 tokens
def call_with_cache(client: anthropic.Anthropic, question: str) -> dict:
"""Make an API call with prompt caching enabled."""
response = client.messages.create(
model="claude-opus-4-6",
max_tokens=1024,
system=[
{
"type": "text",
"text": f"You are a legal expert assistant. Here is the full contract:\n\n{large_legal_document}",
"cache_control": {"type": "ephemeral"} # Mark for caching
}
],
messages=[
{"role": "user", "content": question}
]
)
return {
"answer": response.content[0].text[:200],
"input_tokens": response.usage.input_tokens,
"output_tokens": response.usage.output_tokens,
"cache_write_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
"cache_read_tokens": getattr(response.usage, "cache_read_input_tokens", 0),
}
print("Prompt caching function defined.")
print(f"Document size: ~{len(large_legal_document.split()):,} words")
print(f"Estimated tokens: ~{len(large_legal_document) // 4:,}")
# --- Cost Savings Calculator ---
def calculate_cache_savings(
system_prompt_tokens: int,
num_turns: int,
output_tokens_per_turn: int = 500,
model: str = "claude-opus-4-6"
) -> dict:
"""Calculate real cost savings from prompt caching."""
# Pricing per 1M tokens (as of 2025)
pricing = {
"claude-opus-4-6": {"input": 15.0, "output": 75.0, "cache_write": 18.75, "cache_read": 1.50},
"claude-3-5-sonnet-20241022": {"input": 3.0, "output": 15.0, "cache_write": 3.75, "cache_read": 0.30},
}
p = pricing.get(model, pricing["claude-3-5-sonnet-20241022"])
# Without caching: pay full input price every turn
no_cache_cost = num_turns * (
(system_prompt_tokens / 1_000_000) * p["input"] +
(output_tokens_per_turn / 1_000_000) * p["output"]
)
# With caching: pay cache_write once, cache_read for remaining turns
with_cache_cost = (
(system_prompt_tokens / 1_000_000) * p["cache_write"] + # First call: write
(num_turns - 1) * (system_prompt_tokens / 1_000_000) * p["cache_read"] + # Subsequent: read
num_turns * (output_tokens_per_turn / 1_000_000) * p["output"] # Output always same
)
savings = no_cache_cost - with_cache_cost
savings_pct = (savings / no_cache_cost) * 100
return {
"model": model,
"system_prompt_tokens": system_prompt_tokens,
"num_turns": num_turns,
"no_cache_cost": no_cache_cost,
"with_cache_cost": with_cache_cost,
"savings": savings,
"savings_pct": savings_pct,
}
# Simulate multi-turn agent with large system prompt
print("=" * 65)
print("PROMPT CACHING SAVINGS CALCULATOR")
print("=" * 65)
scenarios = [
{"system_prompt_tokens": 5_000, "num_turns": 10, "label": "Small system prompt, short session"},
{"system_prompt_tokens": 50_000, "num_turns": 20, "label": "Large doc (50K tokens), 20 questions"},
{"system_prompt_tokens": 100_000, "num_turns": 100, "label": "100K token codebase, 100 queries/day"},
]
for scenario in scenarios:
result = calculate_cache_savings(
system_prompt_tokens=scenario["system_prompt_tokens"],
num_turns=scenario["num_turns"],
model="claude-opus-4-6"
)
print(f"\nScenario: {scenario['label']}")
print(f" Without caching: ${result['no_cache_cost']:.4f}")
print(f" With caching: ${result['with_cache_cost']:.4f}")
print(f" Savings: ${result['savings']:.4f} ({result['savings_pct']:.1f}% saved)")
2.2 Prompt Caching on OpenAI and GeminiΒΆ
OpenAI and Gemini both offer automatic caching β no special API parameters needed.
OpenAI (as of Oct 2024):
Automatic prefix caching on GPT-4o, GPT-4o-mini, o1, o3
Minimum prefix: 1,024 tokens
Cache read discount: 50% off input tokens
Visible in
usage.prompt_tokens_details.cached_tokens
Google Gemini:
Explicit
CachedContentAPI requiredMinimum: 32,768 tokens (much larger requirement)
Cache duration: 1 hour minimum, up to 1 month
# OpenAI Prompt Caching (automatic - just check the usage response)
openai_caching_example = """
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": large_system_prompt}, # Auto-cached if >=1024 tokens
{"role": "user", "content": "What does section 3 say?"}
]
)
# Check if cache was hit
cached = response.usage.prompt_tokens_details.cached_tokens
total = response.usage.prompt_tokens
print(f"Cache hit: {cached}/{total} tokens ({100*cached/total:.1f}% from cache)")
"""
# Gemini Explicit Caching
gemini_caching_example = """
import google.generativeai as genai
from google.generativeai import caching
# Create a cached content object
cache = caching.CachedContent.create(
model="gemini-1.5-flash",
system_instruction="You are a legal expert.",
contents=[huge_document], # Must be >= 32,768 tokens
ttl=datetime.timedelta(hours=1),
display_name="legal-doc-cache",
)
# Use the cached content in requests
model = genai.GenerativeModel.from_cached_content(cached_content=cache)
response = model.generate_content("Summarize section 3")
print(response.text)
"""
print("OpenAI Caching (automatic):")
print(openai_caching_example)
print("\nGemini Caching (explicit API):")
print(gemini_caching_example)
print("\nComparison summary:")
comparison = [
("Provider", "Min Tokens", "Setup", "Discount", "TTL"),
("Anthropic", "1,024", "Explicit", "90% off", "5 min / 1 hr"),
("OpenAI", "1,024", "Automatic", "50% off", "~5-10 min"),
("Gemini", "32,768", "Explicit", "75% off", "1 hr - 1 month"),
]
for row in comparison:
print(f" {row[0]:<12} {row[1]:<12} {row[2]:<12} {row[3]:<12} {row[4]}")
Part 3: Speculative Decoding β 2-3x Faster InferenceΒΆ
How Speculative Decoding WorksΒΆ
Standard autoregressive generation is sequential β each token requires a full forward pass through the model. With a 70B parameter model, generating 100 tokens means 100 serial forward passes.
Speculative decoding breaks this bottleneck:
Standard: [Token 1] β [Token 2] β [Token 3] β [Token 4] β ...
(full model each step, sequential)
Speculative:
Step 1: Draft model generates 5 candidate tokens FAST
[t1?, t2?, t3?, t4?, t5?]
Step 2: Large model VERIFIES all 5 in parallel (one forward pass!)
Accept: [t1β, t2β, t3β] Reject: [t4β] β generate correct t4
Result: 3-4 tokens in time of 1 token β 2-4x speedup!
Key insight: Verification is cheap β one batched forward pass checks all draft tokens simultaneously. The draft model is tiny (1B vs 70B parameters) so its forward pass is negligible.
Eagle3: State-of-the-Art Speculative Decoding (2025)ΒΆ
Eagle3 is the current best spec-decode method:
3-layer hidden state drafting using features from multiple transformer layers
2.5x speedup on H100 for Llama 3.3 70B
Acceptance rate > 80% on most tasks
Supported in vLLM as of 0.6.x
# Speculative Decoding in vLLM
vllm_spec_decode_commands = """
# --- Method 1: Standard speculative decoding (draft model) ---
python -m vllm.entrypoints.openai.api_server \\
--model meta-llama/Llama-3.3-70B-Instruct \\
--speculative-model meta-llama/Llama-3.2-1B-Instruct \\
--num-speculative-tokens 5 \\
--tensor-parallel-size 4
# --- Method 2: Eagle3 (best performance, 2025) ---
python -m vllm.entrypoints.openai.api_server \\
--model meta-llama/Llama-3.3-70B-Instruct \\
--speculative-model lmzheng/Eagle3-Llama3.3-Instruct-70B \\
--speculative-draft-tensor-parallel-size 1 \\
--num-speculative-tokens 5 \\
--tensor-parallel-size 4
# --- Method 3: Medusa heads (no separate draft model) ---
python -m vllm.entrypoints.openai.api_server \\
--model FasterDecoding/medusa-1.0-llama-2-13b-chat \\
--speculative-model [medusa] \\
--num-speculative-tokens 3
"""
print("vLLM Speculative Decoding Commands:")
print(vllm_spec_decode_commands)
import time
import statistics
# Benchmark: Compare throughput with/without speculative decoding
# (This simulates results β actual benchmark requires GPU hardware)
def simulate_spec_decode_benchmark():
"""Simulate benchmark results for speculative decoding."""
# Representative benchmarks from vLLM docs and papers
benchmarks = {
"Scenario": [
"Long generation (512 tokens)",
"Short generation (64 tokens)",
"Code generation",
"Summarization",
"High-entropy (creative writing)",
],
"Without Spec Decode (tok/s)": [45, 52, 48, 44, 46],
"With Spec Decode Eagle3 (tok/s)": [112, 89, 128, 105, 58],
"Speedup": [2.49, 1.71, 2.67, 2.39, 1.26],
"Notes": [
"Best case: consistent output",
"Overhead reduces gains",
"High acceptance rate",
"Good repetitive structure",
"Low acceptance due to randomness",
]
}
print("Speculative Decoding Benchmark (Llama 3.3 70B, 4x H100)")
print("=" * 85)
print(f"{'Scenario':<35} {'Baseline':>12} {'Eagle3':>12} {'Speedup':>10} {'Notes'}")
print("-" * 85)
for i in range(len(benchmarks["Scenario"])):
print(
f"{benchmarks['Scenario'][i]:<35} "
f"{benchmarks['Without Spec Decode (tok/s)'][i]:>10} t/s "
f"{benchmarks['With Spec Decode Eagle3 (tok/s)'][i]:>10} t/s "
f"{benchmarks['Speedup'][i]:>8.2f}x "
f" {benchmarks['Notes'][i]}"
)
print()
print("Key insight: Speculative decoding helps MOST when:")
print(" - Output is long (amortizes draft overhead)")
print(" - Content is structured/predictable (code, templates, summaries)")
print(" - Using greedy decode or low temperature (<0.3)")
print()
print("Speculative decoding helps LEAST when:")
print(" - Output is short (<50 tokens) β startup overhead dominates")
print(" - High temperature / creative writing β draft acceptance rate drops")
print(" - Batch size is very large β GPU already saturated")
simulate_spec_decode_benchmark()
# Python client to benchmark an OpenAI-compatible endpoint
# (Works against vLLM, with or without spec decode)
import asyncio
import time
from typing import Optional
async def benchmark_tokens_per_second(
base_url: str,
model: str,
prompt: str,
max_tokens: int = 200,
num_runs: int = 5,
temperature: float = 0.0,
) -> dict:
"""
Benchmark LLM inference speed against an OpenAI-compatible endpoint.
Use this to compare vLLM with/without speculative decoding.
"""
try:
from openai import AsyncOpenAI
client = AsyncOpenAI(base_url=base_url, api_key="none")
except ImportError:
print("Install openai: pip install openai")
return {}
latencies = []
tokens_per_second = []
for run in range(num_runs):
start = time.perf_counter()
try:
response = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
max_tokens=max_tokens,
temperature=temperature,
)
elapsed = time.perf_counter() - start
output_tokens = response.usage.completion_tokens
tps = output_tokens / elapsed
latencies.append(elapsed)
tokens_per_second.append(tps)
except Exception as e:
print(f"Run {run+1} failed: {e}")
if not tokens_per_second:
return {}
return {
"model": model,
"num_runs": num_runs,
"mean_latency_s": statistics.mean(latencies),
"mean_tokens_per_sec": statistics.mean(tokens_per_second),
"p50_tps": statistics.median(tokens_per_second),
"min_tps": min(tokens_per_second),
"max_tps": max(tokens_per_second),
}
# Example usage β requires a running vLLM server
print("Benchmark usage example:")
print("""
# Start vLLM WITHOUT spec decode:
# python -m vllm.entrypoints.openai.api_server --model Llama-3.3-70B-Instruct
# Start vLLM WITH spec decode:
# ... --speculative-model Llama-3.2-1B-Instruct --num-speculative-tokens 5
import asyncio
baseline = asyncio.run(benchmark_tokens_per_second(
base_url="http://localhost:8000/v1",
model="meta-llama/Llama-3.3-70B-Instruct",
prompt="Write a detailed explanation of transformer architecture.",
max_tokens=500,
num_runs=10,
))
print(f"Baseline: {baseline['mean_tokens_per_sec']:.1f} tok/s")
""")
Part 4: W&B Weave β LLM ObservabilityΒΆ
Why LLM Observability MattersΒΆ
Traditional ML monitoring tracks accuracy metrics. LLMs need more:
Debugging: Why did the agent hallucinate? Which retrieval step failed?
Cost tracking: Which user/workflow is most expensive?
Quality monitoring: Are outputs degrading over time?
Latency profiling: Where is the bottleneck in my RAG pipeline?
Eval regression: Did my prompt change break anything?
W&B Weave provides all of this with minimal instrumentation.
# Install: pip install weave
# weave is part of the wandb ecosystem
try:
import weave
print(f"Weave available")
except ImportError:
print("Install: pip install weave")
# --- Basic Weave Setup ---
weave_setup_code = """
import weave
import wandb
# Initialize - connects to your W&B project
weave.init("my-llm-project")
# Any OpenAI or Anthropic calls are NOW AUTOMATICALLY TRACED!
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": "Hello!"}]
)
# β Automatically logged to W&B Weave with:
# - Full prompt and response
# - Token counts
# - Latency
# - Model parameters
# - Cost estimate
"""
print("Auto-tracing code:")
print(weave_setup_code)
# --- Custom Operation Tracing with @weave.op() ---
custom_tracing_code = """
import weave
from openai import OpenAI
weave.init("rag-production")
client = OpenAI()
@weave.op()
def retrieve_documents(query: str, top_k: int = 5) -> list[str]:
"""Retrieve relevant documents from vector store."""
# Your vector search here (Pinecone, Weaviate, pgvector, etc.)
results = vector_db.search(query, top_k=top_k)
return [doc.content for doc in results]
@weave.op()
def generate_answer(question: str, context: list[str]) -> str:
"""Generate answer using retrieved context."""
context_str = "\\n\\n".join(context)
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": "Answer based on the provided context only."},
{"role": "user", "content": f"Context:\\n{context_str}\\n\\nQuestion: {question}"}
],
max_tokens=500,
)
return response.choices[0].message.content
@weave.op()
def rag_pipeline(question: str) -> str:
"""Full RAG pipeline - each sub-step is traced separately."""
docs = retrieve_documents(question)
answer = generate_answer(question, docs)
return answer
# Every call creates a trace showing:
# rag_pipeline
# βββ retrieve_documents [latency: 45ms, returned 5 docs]
# βββ generate_answer [latency: 1200ms, 387 tokens, $0.0058]
result = rag_pipeline("What are the payment terms in our contract?")
"""
print("Custom RAG pipeline tracing:")
print(custom_tracing_code)
# --- Building Evaluations in Weave ---
weave_eval_code = """
import weave
import asyncio
weave.init("rag-evals")
# Define a scoring function
@weave.op()
def correctness_scorer(question: str, answer: str, expected: str) -> dict:
"""LLM-as-judge correctness evaluation."""
from openai import OpenAI
client = OpenAI()
prompt = f"""
Question: {question}
Expected Answer: {expected}
Actual Answer: {answer}
Is the actual answer correct? Reply with JSON: {{"correct": true/false, "reason": "..."}}
"""
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
response_format={"type": "json_object"}
)
import json
return json.loads(response.choices[0].message.content)
# Create evaluation dataset
eval_dataset = weave.Dataset(
name="contract-qa-v1",
rows=[
{"question": "What are the payment terms?", "expected": "30 days net"},
{"question": "What is the contract duration?", "expected": "1 year with auto-renewal"},
{"question": "What is the liability cap?", "expected": "12 months of fees paid"},
]
)
# Run evaluation
evaluation = weave.Evaluation(
name="rag-pipeline-eval-v1",
dataset=eval_dataset,
scorers=[correctness_scorer],
)
results = asyncio.run(evaluation.evaluate(rag_pipeline))
print(f"Correctness: {results['correctness_scorer']['correct']['mean']:.1%}")
"""
print("Weave Evaluations:")
print(weave_eval_code)
# --- Observability Platform Comparison ---
print("LLM Observability Platform Comparison (2025)")
print("=" * 95)
comparison = [
{
"Platform": "W&B Weave",
"Best For": "Teams already using W&B for ML training",
"Auto-trace": "OpenAI, Anthropic, LiteLLM",
"Evals": "Built-in datasets + LLM-as-judge",
"Pricing": "Free up to 1M traces/month",
"Integration": "Native W&B, Hugging Face",
},
{
"Platform": "LangSmith",
"Best For": "LangChain/LangGraph users",
"Auto-trace": "All LangChain components",
"Evals": "Excellent, including human annotation",
"Pricing": "Free up to 5K traces/month",
"Integration": "Deep LangChain, LiteLLM",
},
{
"Platform": "Helicone",
"Best For": "Cost-focused teams, minimal setup",
"Auto-trace": "Proxy-based, zero-code",
"Evals": "Basic",
"Pricing": "Free up to 100K requests/month",
"Integration": "Any OpenAI-compatible API",
},
{
"Platform": "Langfuse",
"Best For": "Privacy-conscious / self-hosted",
"Auto-trace": "SDK or proxy",
"Evals": "Good, with human annotation UI",
"Pricing": "Open source, self-hostable",
"Integration": "LiteLLM, LangChain, OpenAI",
},
{
"Platform": "Arize Phoenix",
"Best For": "Advanced eval + RAG debugging",
"Auto-trace": "LlamaIndex, LangChain, OpenAI",
"Evals": "Very strong, RAGAS integration",
"Pricing": "Open source",
"Integration": "LlamaIndex native",
},
]
for platform in comparison:
print(f"\n{platform['Platform']}")
for key, val in platform.items():
if key != "Platform":
print(f" {key:<15}: {val}")
Part 5: FlashAttention-3 β Maximum GPU UtilizationΒΆ
The Attention BottleneckΒΆ
Standard attention is memory-bandwidth bound: repeatedly reading/writing the attention matrix (O(NΒ²) size) to GPU HBM. For long contexts and large models, attention becomes the dominant cost.
FlashAttention evolution:
Version |
GPU Util |
Key Innovation |
Release |
|---|---|---|---|
Standard |
~30% H100 |
Baseline O(NΒ²) memory |
- |
FA1 (2022) |
~50% A100 |
IO-aware tiling, O(N) memory |
2022 |
FA2 (2023) |
~72% A100 |
Better parallelism, causal mask |
2023 |
FA3 (2024) |
~85% H100 |
Warp specialization, FP8 support |
2024 |
FA3 key improvements:
Warp specialization: Separate producer/consumer warps run concurrently on Hopper (H100/H800)
Asynchronous pipelining: Overlaps GEMM and softmax computation
FP8 support: 2x memory bandwidth at half precision
Requires: H100/H800 GPU and CUDA >= 12.3
# FlashAttention-3 with HuggingFace Transformers
flash_attn_code = """
# Install: pip install flash-attn --no-build-isolation
# (requires CUDA 12.3+ and H100/H800 for FA3)
import torch
from transformers import AutoModelForCausalLM, AutoTokenizer
model_name = "meta-llama/Llama-3.3-70B-Instruct"
# --- FA3 (H100/H800 only, CUDA >= 12.3) ---
model = AutoModelForCausalLM.from_pretrained(
model_name,
attn_implementation="flash_attention_3",
torch_dtype=torch.bfloat16,
device_map="auto",
)
# --- FA2 (A100, H100, RTX 3090+, CUDA >= 11.6) ---
model = AutoModelForCausalLM.from_pretrained(
model_name,
attn_implementation="flash_attention_2",
torch_dtype=torch.bfloat16,
device_map="auto",
)
# --- SDPA (PyTorch built-in, any GPU) ---
model = AutoModelForCausalLM.from_pretrained(
model_name,
attn_implementation="sdpa", # Fallback, no install needed
torch_dtype=torch.bfloat16,
device_map="auto",
)
"""
print("FlashAttention implementation options:")
print(flash_attn_code)
# Auto-detect best attention implementation
def get_best_attn_implementation() -> str:
"""
Detect available hardware and return the best attention implementation.
Falls back gracefully: FA3 -> FA2 -> SDPA -> eager
"""
try:
import torch
if not torch.cuda.is_available():
print("No CUDA GPU detected, using 'sdpa' (CPU/MPS)")
return "sdpa"
gpu_name = torch.cuda.get_device_name(0)
cuda_version = torch.version.cuda
compute_cap = torch.cuda.get_device_capability(0)
print(f"GPU: {gpu_name}")
print(f"CUDA: {cuda_version}")
print(f"Compute Capability: {compute_cap[0]}.{compute_cap[1]}")
# H100/H800: compute capability 9.0
# A100: compute capability 8.0
# RTX 3090/4090: compute capability 8.6/8.9
is_hopper = compute_cap[0] >= 9 # H100, H800
cuda_major = int(cuda_version.split(".")[0]) if cuda_version else 0
try:
import flash_attn
fa_version = flash_attn.__version__
fa_major = int(fa_version.split(".")[0])
if is_hopper and cuda_major >= 12 and fa_major >= 3:
print(f"FlashAttention {fa_version} detected on H100 β using FA3")
return "flash_attention_3"
elif fa_major >= 2:
print(f"FlashAttention {fa_version} detected β using FA2")
return "flash_attention_2"
except ImportError:
print("flash-attn not installed β using PyTorch SDPA")
print("Install: pip install flash-attn --no-build-isolation")
return "sdpa"
except ImportError:
return "eager"
best_impl = get_best_attn_implementation()
print(f"\nRecommended: attn_implementation='{best_impl}'")
# FlashAttention-3 in vLLM (automatic)
print("FlashAttention in vLLM:")
print("""
vLLM automatically selects the best attention backend:
- H100/H800 + flash-attn>=3.0: FLASH_ATTN_VLLM_V1 (FA3)
- A100 + flash-attn>=2.0: FLASH_ATTN (FA2)
- Fallback: TORCH_SDPA
Manual override (if needed):
VLLM_ATTENTION_BACKEND=FLASH_ATTN python -m vllm.entrypoints.openai.api_server ...
VLLM_ATTENTION_BACKEND=FLASHINFER python -m vllm.entrypoints.openai.api_server ...
FlashInfer is another high-performance option, especially good for:
- Speculative decoding
- Chunked prefill
- Sliding window attention
""")
print("Memory savings with FA3 on long contexts:")
print()
# Memory comparison: standard vs FA3
# Standard attention stores full NΓN attention matrix
# FA stores only O(N) in HBM
print(f"{'Context Length':<20} {'Standard Attn (GB)':<22} {'FlashAttn (GB)':>16} {'Reduction':<12}")
print("-" * 72)
for ctx_len in [4096, 8192, 16384, 32768, 65536, 131072]:
# Standard: NΓN attention matrix, float16 = 2 bytes
standard_gb = (ctx_len ** 2 * 2) / (1024 ** 3)
# FA: O(N) memory, roughly block_size * seq_len * 2
# In practice ~128x smaller for long sequences
fa_gb = max(0.001, standard_gb / (ctx_len / 128))
reduction = (1 - fa_gb / standard_gb) * 100
print(f"{ctx_len:<20,} {standard_gb:<22.3f} {fa_gb:>16.4f} {reduction:>10.1f}%")
Production Stack SummaryΒΆ
Here is the recommended production LLM infrastructure stack for 2025-2026:
production_stack = {
"LLM Gateway": {
"tool": "LiteLLM Proxy",
"why": "Unified API for all providers, cost tracking, fallback routing",
"install": "pip install litellm",
"key_features": ["100+ LLM providers", "Budget limits", "Rate limiting", "Load balancing"],
},
"Cost Optimization": {
"tool": "Anthropic Prompt Caching",
"why": "Up to 90% cost reduction for repeated large prompts",
"install": "pip install anthropic",
"key_features": ["cache_control: ephemeral", "5min / 1hr TTL", "1024 token minimum"],
},
"Inference Speed": {
"tool": "vLLM + Speculative Decoding (Eagle3)",
"why": "2-3x throughput improvement for self-hosted models",
"install": "pip install vllm",
"key_features": ["Eagle3 draft model", "5 speculative tokens", "Greedy/low-temp best"],
},
"Observability": {
"tool": "W&B Weave",
"why": "Full LLM tracing, evals, cost tracking, debugging",
"install": "pip install weave",
"key_features": ["@weave.op() decorator", "Auto OpenAI/Anthropic tracing", "Built-in evals"],
},
"GPU Efficiency": {
"tool": "FlashAttention-3",
"why": "85% H100 GPU utilization, O(N) memory for long contexts",
"install": "pip install flash-attn --no-build-isolation",
"key_features": ["H100/H800 only", "CUDA 12.3+", "Auto-selected by vLLM"],
},
}
print("PRODUCTION LLM INFRASTRUCTURE STACK 2025-2026")
print("=" * 60)
for layer, details in production_stack.items():
print(f"\n{layer.upper()}")
print(f" Tool: {details['tool']}")
print(f" Why: {details['why']}")
print(f" Install: {details['install']}")
print(f" Features: {', '.join(details['key_features'])}")
print("\n" + "=" * 60)
print("Quick install for the full stack:")
print(" pip install litellm anthropic weave vllm flash-attn")
# Decision guide: which tool to reach for first
decision_guide = """
DECISION GUIDE: Which Tool to Use?
====================================
Q: I'm using multiple LLM providers and tired of different SDKs?
A: Use LiteLLM β unified interface, one line of code
Q: My LLM costs are too high?
A: 1. Add Anthropic prompt caching (cache_control: ephemeral) for large system prompts
2. Use LiteLLM cost tracking to find expensive queries
3. Route simple queries to cheaper models (gpt-4o-mini, gemini-flash)
Q: My LLM is too slow for production?
A: 1. Self-host with vLLM + speculative decoding (Eagle3)
2. Enable FlashAttention-3 if on H100
3. Use smaller models with prompt caching for context
Q: I can't debug why my agent/RAG is failing?
A: Add W&B Weave β weave.init() + @weave.op() on each step
Q: I need to monitor LLM quality in production?
A: W&B Weave evaluations with LLM-as-judge scoring
Q: I need audit logs and cost control per team?
A: LiteLLM Proxy with PostgreSQL backend
Q: I'm building on H100s and want maximum GPU efficiency?
A: FlashAttention-3 (auto-enabled in vLLM on H100)
"""
print(decision_guide)
Key TakeawaysΒΆ
LiteLLM eliminates vendor lock-in and adds enterprise controls (budgets, rate limits, routing) across 100+ LLM providers with zero code changes to your application.
Prompt Caching is often the single highest-ROI optimization: for any workflow with a large, repeated system prompt or document, caching cuts costs by 80-90% with two lines of code change.
Speculative Decoding with Eagle3 provides 2-3x throughput improvement for self-hosted models, especially for structured outputs, code generation, and long responses at low temperature.
W&B Weave makes LLM pipelines debuggable and measurable. The @weave.op() decorator is all you need to get full tracing of every sub-step in your RAG or agent workflow.
FlashAttention-3 extracts maximum GPU utilization on H100 hardware and is required for practical long-context inference (128K+ tokens) due to its O(N) memory footprint.
Part of the Zero to AI series β 09 MLOps