LLM Infrastructure for Production (2025-2026 Essential Stack)ΒΆ

This notebook covers the critical infrastructure tools every ML engineer needs for deploying LLMs at scale in 2025-2026.

What You Will LearnΒΆ

Part

Topic

Key Benefit

1

LiteLLM

Unified gateway for 100+ LLM APIs

2

Anthropic Prompt Caching

Up to 90% cost reduction

3

Speculative Decoding

2-3x faster inference

4

W&B Weave

LLM observability and tracing

5

FlashAttention-3

85% GPU utilization on H100

Part 1: LiteLLM β€” Unified LLM GatewayΒΆ

The Problem LiteLLM SolvesΒΆ

In 2025, teams work with multiple LLM providers:

  • OpenAI (GPT-4o, o3)

  • Anthropic (Claude 3.5/3.7)

  • Google (Gemini 2.0 Flash, 2.5 Pro)

  • AWS Bedrock (hosted Claude, Llama, Mistral)

  • Self-hosted via Ollama, vLLM

Each has a different SDK, different API format, different error codes. LiteLLM gives you one unified OpenAI-compatible interface for all of them.

Your Code β†’ LiteLLM β†’ OpenAI
                    β†’ Anthropic
                    β†’ Gemini
                    β†’ Bedrock
                    β†’ Ollama (local)
                    β†’ Azure OpenAI
                    β†’ 95+ more
# Install LiteLLM
# !pip install litellm

# Verify installation
try:
    import litellm
    print(f"LiteLLM version: {litellm.__version__}")
except ImportError:
    print("Run: pip install litellm")

1.1 Calling Any LLM with the Same CodeΒΆ

import os
from litellm import completion

# --- The Universal LLM Call ---
# Set your API keys (or use environment variables)
# os.environ["OPENAI_API_KEY"] = "sk-..."
# os.environ["ANTHROPIC_API_KEY"] = "sk-ant-..."
# os.environ["GEMINI_API_KEY"] = "..."

def call_any_llm(model: str, prompt: str, dry_run: bool = True):
    """Demonstrate LiteLLM's unified interface."""
    if dry_run:
        print(f"[DRY RUN] Would call model='{model}' with: '{prompt[:60]}...'")
        return None

    response = completion(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        max_tokens=200
    )
    return response.choices[0].message.content


# The EXACT same code works for all these providers:
models_to_try = [
    "gpt-4o",                                  # OpenAI
    "anthropic/claude-opus-4-6",               # Anthropic
    "gemini/gemini-2.0-flash",                 # Google
    "ollama/llama3.2",                         # Local via Ollama
    "bedrock/anthropic.claude-3-5-sonnet",     # AWS Bedrock
    "azure/gpt-4o",                            # Azure OpenAI
]

prompt = "Explain gradient descent in one sentence."

for model in models_to_try:
    call_any_llm(model, prompt, dry_run=True)

print("\nTo actually call: set dry_run=False and configure API keys")

1.2 Fallback Routing β€” Never Let One Provider Failure Break Your AppΒΆ

from litellm import Router

# Configure a router with primary + fallback models
model_list = [
    {
        "model_name": "gpt-4-production",       # Logical alias
        "litellm_params": {
            "model": "gpt-4o",                  # Primary: OpenAI
            "api_key": os.environ.get("OPENAI_API_KEY", "placeholder"),
        },
    },
    {
        "model_name": "gpt-4-production",       # Same alias = fallback
        "litellm_params": {
            "model": "anthropic/claude-opus-4-6",  # Fallback: Anthropic
            "api_key": os.environ.get("ANTHROPIC_API_KEY", "placeholder"),
        },
    },
    {
        "model_name": "gpt-4-production",
        "litellm_params": {
            "model": "gemini/gemini-2.0-flash",    # Second fallback: Gemini
            "api_key": os.environ.get("GEMINI_API_KEY", "placeholder"),
        },
    },
]

# Create router with load balancing strategy
router = Router(
    model_list=model_list,
    routing_strategy="least-busy",   # Options: least-busy, simple-shuffle, latency-based-routing
    num_retries=3,
    fallbacks=[{"gpt-4-production": ["anthropic/claude-opus-4-6"]}],
    retry_after=5,  # seconds between retries
    allowed_fails=2,  # circuit breaker: disable provider after 2 failures
    cooldown_time=60,  # seconds before retrying failed provider
)

print("Router configured with:")
print(f"  Primary: OpenAI GPT-4o")
print(f"  Fallback 1: Anthropic Claude Opus 4.6")
print(f"  Fallback 2: Google Gemini 2.0 Flash")
print(f"  Strategy: least-busy")
print(f"  Circuit breaker: disables provider after 2 consecutive failures")

# Usage (same as regular completion)
# response = await router.acompletion(
#     model="gpt-4-production",
#     messages=[{"role": "user", "content": "Hello!"}]
# )

1.3 Cost Tracking and Budget LimitsΒΆ

from litellm import completion_cost, cost_per_token

# --- Cost calculation for common models ---
model_costs = {
    "gpt-4o":                              {"input": 2.50,  "output": 10.00},
    "gpt-4o-mini":                         {"input": 0.15,  "output": 0.60},
    "claude-opus-4-6":                     {"input": 15.00, "output": 75.00},
    "claude-3-5-sonnet-20241022":          {"input": 3.00,  "output": 15.00},
    "gemini/gemini-2.0-flash":             {"input": 0.075, "output": 0.30},
    "gemini/gemini-2.5-pro":               {"input": 1.25,  "output": 10.00},
}

print("Cost per 1M tokens (USD):")
print(f"{'Model':<40} {'Input':>10} {'Output':>10}")
print("-" * 62)
for model, costs in model_costs.items():
    print(f"{model:<40} ${costs['input']:>9.3f} ${costs['output']:>9.3f}")

# --- Real-world cost simulation ---
print("\n--- Real-world Cost Simulation ---")
print("Scenario: 10,000 API calls/day, 500 input tokens, 200 output tokens")
print()

daily_calls = 10_000
input_tokens = 500
output_tokens = 200

for model, costs in model_costs.items():
    daily_cost = daily_calls * (
        (input_tokens * costs["input"] / 1_000_000) +
        (output_tokens * costs["output"] / 1_000_000)
    )
    monthly_cost = daily_cost * 30
    print(f"{model:<40} ${daily_cost:>8.2f}/day  ${monthly_cost:>9.2f}/month")

1.4 LiteLLM Proxy β€” Run as a Local API GatewayΒΆ

The LiteLLM Proxy lets you run a local server that any OpenAI-compatible client can point to. It adds:

  • Cost tracking per team/user

  • Budget limits (block requests when budget exceeded)

  • Rate limiting per API key

  • Logging to Langfuse, LangSmith, Helicone, W&B

# LiteLLM Proxy configuration file: config.yaml
proxy_config = """
# litellm_config.yaml
model_list:
  - model_name: gpt-4o
    litellm_params:
      model: gpt-4o
      api_key: os.environ/OPENAI_API_KEY

  - model_name: claude-3-5-sonnet
    litellm_params:
      model: anthropic/claude-3-5-sonnet-20241022
      api_key: os.environ/ANTHROPIC_API_KEY

  - model_name: gemini-flash
    litellm_params:
      model: gemini/gemini-2.0-flash
      api_key: os.environ/GEMINI_API_KEY

litellm_settings:
  success_callback: ["langfuse", "wandb"]
  failure_callback: ["langfuse"]
  budget_duration: "1mo"

general_settings:
  master_key: sk-my-proxy-key
  database_url: os.environ/DATABASE_URL   # PostgreSQL for persistent tracking
"""

print("LiteLLM Proxy config (litellm_config.yaml):")
print(proxy_config)

print("Start the proxy server:")
print("  litellm --config litellm_config.yaml --port 4000")
print()
print("Then use it exactly like OpenAI:")
print("""
from openai import OpenAI

client = OpenAI(
    base_url="http://0.0.0.0:4000",   # Your local LiteLLM proxy
    api_key="sk-my-proxy-key"
)

response = client.chat.completions.create(
    model="claude-3-5-sonnet",          # Maps to Anthropic automatically
    messages=[{"role": "user", "content": "Hello!"}]
)
""")

1.5 Logging to Observability PlatformsΒΆ

import litellm

# Enable logging to multiple platforms simultaneously
litellm.success_callback = ["langfuse", "wandb", "helicone"]
litellm.failure_callback = ["langfuse", "wandb"]

# Per-request metadata for tracking
metadata_example = {
    "generation_name": "rag-query-v2",
    "generation_id": "gen_abc123",
    "trace_user_id": "user_456",
    "tags": ["production", "rag", "legal-team"],
    "session_id": "session_789",
}

print("Logging configuration set:")
print(f"  Success callbacks: {litellm.success_callback}")
print(f"  Failure callbacks: {litellm.failure_callback}")
print()
print("Example call with metadata tracking:")
print("""
response = completion(
    model="gpt-4o",
    messages=[{"role": "user", "content": query}],
    metadata={
        "generation_name": "rag-query-v2",
        "trace_user_id": "user_456",
        "session_id": "session_789",
    }
)
# Automatically logged to LangFuse, W&B, Helicone
""")

# Rate limiting per user
print("\nRate limiting config (in proxy config.yaml):")
print("""
router_settings:
  redis_url: "redis://localhost:6379"
  routing_strategy: "usage-based-routing"

# Per-key rate limits
litellm_settings:
  default_team_settings:
    - team_id: "engineering"
      budget_limit: 100    # $100/month
      tpm_limit: 100000    # 100K tokens/minute
      rpm_limit: 1000      # 1K requests/minute
""")

Part 2: Anthropic Prompt Caching β€” 90% Cost ReductionΒΆ

What Is Prompt Caching?ΒΆ

Every API call processes your entire prompt from scratch β€” even if 90% of it is the same system prompt or reference document. Prompt caching stores the KV cache (key-value attention states) on Anthropic’s servers so identical prefix content only needs to be processed once.

Cost breakdown:

Token Type

Cost vs Normal

Cache write (first call)

1.25x normal price

Cache read (subsequent calls)

0.1x normal price (90% off!)

Regular (no cache)

1.0x baseline

Cache duration:

  • Default: 5 minutes after last access (resets on each hit)

  • Extended: 1 hour (available on Claude Sonnet 3.7+)

Minimum cacheable tokens: 1,024 tokens for Claude 3+ models

import anthropic
import time

# --- Prompt Caching Implementation ---

# Simulate a large legal document (in production this would be 10,000+ words)
large_legal_document = """
MASTER SERVICE AGREEMENT

This Master Service Agreement ("Agreement") is entered into as of January 1, 2025,
between Acme Corporation ("Client") and TechVentures Inc ("Provider").

Section 1: Definitions
1.1 "Services" means the software development and consulting services described herein.
1.2 "Deliverables" means all work product created under this Agreement.
1.3 "Confidential Information" means any non-public information disclosed by either party.

Section 2: Services and Deliverables
2.1 Provider shall perform the Services in a professional manner.
2.2 Client shall provide timely feedback on all Deliverables.
2.3 Changes to scope require written amendment signed by both parties.

Section 3: Payment Terms
3.1 Client shall pay invoices within 30 days of receipt.
3.2 Late payments accrue interest at 1.5% per month.
3.3 Provider may suspend services after 60 days of non-payment.

[... imagine 9,977 more words of legal text here ...]
""" * 50  # Repeat to reach minimum 1,024 tokens

def call_with_cache(client: anthropic.Anthropic, question: str) -> dict:
    """Make an API call with prompt caching enabled."""
    response = client.messages.create(
        model="claude-opus-4-6",
        max_tokens=1024,
        system=[
            {
                "type": "text",
                "text": f"You are a legal expert assistant. Here is the full contract:\n\n{large_legal_document}",
                "cache_control": {"type": "ephemeral"}  # Mark for caching
            }
        ],
        messages=[
            {"role": "user", "content": question}
        ]
    )
    return {
        "answer": response.content[0].text[:200],
        "input_tokens": response.usage.input_tokens,
        "output_tokens": response.usage.output_tokens,
        "cache_write_tokens": getattr(response.usage, "cache_creation_input_tokens", 0),
        "cache_read_tokens": getattr(response.usage, "cache_read_input_tokens", 0),
    }

print("Prompt caching function defined.")
print(f"Document size: ~{len(large_legal_document.split()):,} words")
print(f"Estimated tokens: ~{len(large_legal_document) // 4:,}")
# --- Cost Savings Calculator ---

def calculate_cache_savings(
    system_prompt_tokens: int,
    num_turns: int,
    output_tokens_per_turn: int = 500,
    model: str = "claude-opus-4-6"
) -> dict:
    """Calculate real cost savings from prompt caching."""
    
    # Pricing per 1M tokens (as of 2025)
    pricing = {
        "claude-opus-4-6": {"input": 15.0, "output": 75.0, "cache_write": 18.75, "cache_read": 1.50},
        "claude-3-5-sonnet-20241022": {"input": 3.0, "output": 15.0, "cache_write": 3.75, "cache_read": 0.30},
    }
    
    p = pricing.get(model, pricing["claude-3-5-sonnet-20241022"])
    
    # Without caching: pay full input price every turn
    no_cache_cost = num_turns * (
        (system_prompt_tokens / 1_000_000) * p["input"] +
        (output_tokens_per_turn / 1_000_000) * p["output"]
    )
    
    # With caching: pay cache_write once, cache_read for remaining turns
    with_cache_cost = (
        (system_prompt_tokens / 1_000_000) * p["cache_write"] +          # First call: write
        (num_turns - 1) * (system_prompt_tokens / 1_000_000) * p["cache_read"] +  # Subsequent: read
        num_turns * (output_tokens_per_turn / 1_000_000) * p["output"]   # Output always same
    )
    
    savings = no_cache_cost - with_cache_cost
    savings_pct = (savings / no_cache_cost) * 100
    
    return {
        "model": model,
        "system_prompt_tokens": system_prompt_tokens,
        "num_turns": num_turns,
        "no_cache_cost": no_cache_cost,
        "with_cache_cost": with_cache_cost,
        "savings": savings,
        "savings_pct": savings_pct,
    }


# Simulate multi-turn agent with large system prompt
print("=" * 65)
print("PROMPT CACHING SAVINGS CALCULATOR")
print("=" * 65)

scenarios = [
    {"system_prompt_tokens": 5_000,   "num_turns": 10,  "label": "Small system prompt, short session"},
    {"system_prompt_tokens": 50_000,  "num_turns": 20,  "label": "Large doc (50K tokens), 20 questions"},
    {"system_prompt_tokens": 100_000, "num_turns": 100, "label": "100K token codebase, 100 queries/day"},
]

for scenario in scenarios:
    result = calculate_cache_savings(
        system_prompt_tokens=scenario["system_prompt_tokens"],
        num_turns=scenario["num_turns"],
        model="claude-opus-4-6"
    )
    print(f"\nScenario: {scenario['label']}")
    print(f"  Without caching:  ${result['no_cache_cost']:.4f}")
    print(f"  With caching:     ${result['with_cache_cost']:.4f}")
    print(f"  Savings:          ${result['savings']:.4f} ({result['savings_pct']:.1f}% saved)")

2.2 Prompt Caching on OpenAI and GeminiΒΆ

OpenAI and Gemini both offer automatic caching β€” no special API parameters needed.

OpenAI (as of Oct 2024):

  • Automatic prefix caching on GPT-4o, GPT-4o-mini, o1, o3

  • Minimum prefix: 1,024 tokens

  • Cache read discount: 50% off input tokens

  • Visible in usage.prompt_tokens_details.cached_tokens

Google Gemini:

  • Explicit CachedContent API required

  • Minimum: 32,768 tokens (much larger requirement)

  • Cache duration: 1 hour minimum, up to 1 month

# OpenAI Prompt Caching (automatic - just check the usage response)

openai_caching_example = """
from openai import OpenAI
client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "system", "content": large_system_prompt},  # Auto-cached if >=1024 tokens
        {"role": "user", "content": "What does section 3 say?"}
    ]
)

# Check if cache was hit
cached = response.usage.prompt_tokens_details.cached_tokens
total  = response.usage.prompt_tokens
print(f"Cache hit: {cached}/{total} tokens ({100*cached/total:.1f}% from cache)")
"""

# Gemini Explicit Caching
gemini_caching_example = """
import google.generativeai as genai
from google.generativeai import caching

# Create a cached content object
cache = caching.CachedContent.create(
    model="gemini-1.5-flash",
    system_instruction="You are a legal expert.",
    contents=[huge_document],  # Must be >= 32,768 tokens
    ttl=datetime.timedelta(hours=1),
    display_name="legal-doc-cache",
)

# Use the cached content in requests
model = genai.GenerativeModel.from_cached_content(cached_content=cache)
response = model.generate_content("Summarize section 3")
print(response.text)
"""

print("OpenAI Caching (automatic):")
print(openai_caching_example)
print("\nGemini Caching (explicit API):")
print(gemini_caching_example)

print("\nComparison summary:")
comparison = [
    ("Provider",    "Min Tokens",  "Setup",     "Discount",  "TTL"),
    ("Anthropic",   "1,024",       "Explicit",  "90% off",   "5 min / 1 hr"),
    ("OpenAI",      "1,024",       "Automatic", "50% off",   "~5-10 min"),
    ("Gemini",      "32,768",      "Explicit",  "75% off",   "1 hr - 1 month"),
]
for row in comparison:
    print(f"  {row[0]:<12} {row[1]:<12} {row[2]:<12} {row[3]:<12} {row[4]}")

Part 3: Speculative Decoding β€” 2-3x Faster InferenceΒΆ

How Speculative Decoding WorksΒΆ

Standard autoregressive generation is sequential β€” each token requires a full forward pass through the model. With a 70B parameter model, generating 100 tokens means 100 serial forward passes.

Speculative decoding breaks this bottleneck:

Standard:  [Token 1] β†’ [Token 2] β†’ [Token 3] β†’ [Token 4] β†’ ...
           (full model each step, sequential)

Speculative:
  Step 1: Draft model generates 5 candidate tokens FAST
           [t1?, t2?, t3?, t4?, t5?]

  Step 2: Large model VERIFIES all 5 in parallel (one forward pass!)
           Accept: [t1βœ“, t2βœ“, t3βœ“]  Reject: [t4βœ—]  β†’ generate correct t4

  Result: 3-4 tokens in time of 1 token  β†’  2-4x speedup!

Key insight: Verification is cheap β€” one batched forward pass checks all draft tokens simultaneously. The draft model is tiny (1B vs 70B parameters) so its forward pass is negligible.

Eagle3: State-of-the-Art Speculative Decoding (2025)ΒΆ

Eagle3 is the current best spec-decode method:

  • 3-layer hidden state drafting using features from multiple transformer layers

  • 2.5x speedup on H100 for Llama 3.3 70B

  • Acceptance rate > 80% on most tasks

  • Supported in vLLM as of 0.6.x

# Speculative Decoding in vLLM

vllm_spec_decode_commands = """
# --- Method 1: Standard speculative decoding (draft model) ---
python -m vllm.entrypoints.openai.api_server \\
    --model meta-llama/Llama-3.3-70B-Instruct \\
    --speculative-model meta-llama/Llama-3.2-1B-Instruct \\
    --num-speculative-tokens 5 \\
    --tensor-parallel-size 4

# --- Method 2: Eagle3 (best performance, 2025) ---
python -m vllm.entrypoints.openai.api_server \\
    --model meta-llama/Llama-3.3-70B-Instruct \\
    --speculative-model lmzheng/Eagle3-Llama3.3-Instruct-70B \\
    --speculative-draft-tensor-parallel-size 1 \\
    --num-speculative-tokens 5 \\
    --tensor-parallel-size 4

# --- Method 3: Medusa heads (no separate draft model) ---
python -m vllm.entrypoints.openai.api_server \\
    --model FasterDecoding/medusa-1.0-llama-2-13b-chat \\
    --speculative-model [medusa] \\
    --num-speculative-tokens 3
"""

print("vLLM Speculative Decoding Commands:")
print(vllm_spec_decode_commands)
import time
import statistics

# Benchmark: Compare throughput with/without speculative decoding
# (This simulates results β€” actual benchmark requires GPU hardware)

def simulate_spec_decode_benchmark():
    """Simulate benchmark results for speculative decoding."""
    
    # Representative benchmarks from vLLM docs and papers
    benchmarks = {
        "Scenario": [
            "Long generation (512 tokens)",
            "Short generation (64 tokens)",
            "Code generation",
            "Summarization",
            "High-entropy (creative writing)",
        ],
        "Without Spec Decode (tok/s)": [45, 52, 48, 44, 46],
        "With Spec Decode Eagle3 (tok/s)": [112, 89, 128, 105, 58],
        "Speedup": [2.49, 1.71, 2.67, 2.39, 1.26],
        "Notes": [
            "Best case: consistent output",
            "Overhead reduces gains",
            "High acceptance rate",
            "Good repetitive structure",
            "Low acceptance due to randomness",
        ]
    }
    
    print("Speculative Decoding Benchmark (Llama 3.3 70B, 4x H100)")
    print("=" * 85)
    print(f"{'Scenario':<35} {'Baseline':>12} {'Eagle3':>12} {'Speedup':>10} {'Notes'}")
    print("-" * 85)
    
    for i in range(len(benchmarks["Scenario"])):
        print(
            f"{benchmarks['Scenario'][i]:<35} "
            f"{benchmarks['Without Spec Decode (tok/s)'][i]:>10} t/s "
            f"{benchmarks['With Spec Decode Eagle3 (tok/s)'][i]:>10} t/s "
            f"{benchmarks['Speedup'][i]:>8.2f}x "
            f"  {benchmarks['Notes'][i]}"
        )
    
    print()
    print("Key insight: Speculative decoding helps MOST when:")
    print("  - Output is long (amortizes draft overhead)")
    print("  - Content is structured/predictable (code, templates, summaries)")
    print("  - Using greedy decode or low temperature (<0.3)")
    print()
    print("Speculative decoding helps LEAST when:")
    print("  - Output is short (<50 tokens) β€” startup overhead dominates")
    print("  - High temperature / creative writing β€” draft acceptance rate drops")
    print("  - Batch size is very large β€” GPU already saturated")

simulate_spec_decode_benchmark()
# Python client to benchmark an OpenAI-compatible endpoint
# (Works against vLLM, with or without spec decode)

import asyncio
import time
from typing import Optional

async def benchmark_tokens_per_second(
    base_url: str,
    model: str,
    prompt: str,
    max_tokens: int = 200,
    num_runs: int = 5,
    temperature: float = 0.0,
) -> dict:
    """
    Benchmark LLM inference speed against an OpenAI-compatible endpoint.
    Use this to compare vLLM with/without speculative decoding.
    """
    try:
        from openai import AsyncOpenAI
        client = AsyncOpenAI(base_url=base_url, api_key="none")
    except ImportError:
        print("Install openai: pip install openai")
        return {}
    
    latencies = []
    tokens_per_second = []
    
    for run in range(num_runs):
        start = time.perf_counter()
        try:
            response = await client.chat.completions.create(
                model=model,
                messages=[{"role": "user", "content": prompt}],
                max_tokens=max_tokens,
                temperature=temperature,
            )
            elapsed = time.perf_counter() - start
            output_tokens = response.usage.completion_tokens
            tps = output_tokens / elapsed
            latencies.append(elapsed)
            tokens_per_second.append(tps)
        except Exception as e:
            print(f"Run {run+1} failed: {e}")
    
    if not tokens_per_second:
        return {}
    
    return {
        "model": model,
        "num_runs": num_runs,
        "mean_latency_s": statistics.mean(latencies),
        "mean_tokens_per_sec": statistics.mean(tokens_per_second),
        "p50_tps": statistics.median(tokens_per_second),
        "min_tps": min(tokens_per_second),
        "max_tps": max(tokens_per_second),
    }


# Example usage β€” requires a running vLLM server
print("Benchmark usage example:")
print("""
# Start vLLM WITHOUT spec decode:
#   python -m vllm.entrypoints.openai.api_server --model Llama-3.3-70B-Instruct
# Start vLLM WITH spec decode:
#   ... --speculative-model Llama-3.2-1B-Instruct --num-speculative-tokens 5

import asyncio

baseline = asyncio.run(benchmark_tokens_per_second(
    base_url="http://localhost:8000/v1",
    model="meta-llama/Llama-3.3-70B-Instruct",
    prompt="Write a detailed explanation of transformer architecture.",
    max_tokens=500,
    num_runs=10,
))
print(f"Baseline: {baseline['mean_tokens_per_sec']:.1f} tok/s")
""")

Part 4: W&B Weave β€” LLM ObservabilityΒΆ

Why LLM Observability MattersΒΆ

Traditional ML monitoring tracks accuracy metrics. LLMs need more:

  • Debugging: Why did the agent hallucinate? Which retrieval step failed?

  • Cost tracking: Which user/workflow is most expensive?

  • Quality monitoring: Are outputs degrading over time?

  • Latency profiling: Where is the bottleneck in my RAG pipeline?

  • Eval regression: Did my prompt change break anything?

W&B Weave provides all of this with minimal instrumentation.

# Install: pip install weave
# weave is part of the wandb ecosystem

try:
    import weave
    print(f"Weave available")
except ImportError:
    print("Install: pip install weave")

# --- Basic Weave Setup ---
weave_setup_code = """
import weave
import wandb

# Initialize - connects to your W&B project
weave.init("my-llm-project")

# Any OpenAI or Anthropic calls are NOW AUTOMATICALLY TRACED!
from openai import OpenAI
client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[{"role": "user", "content": "Hello!"}]
)
# ↑ Automatically logged to W&B Weave with:
#   - Full prompt and response
#   - Token counts
#   - Latency
#   - Model parameters
#   - Cost estimate
"""
print("Auto-tracing code:")
print(weave_setup_code)
# --- Custom Operation Tracing with @weave.op() ---

custom_tracing_code = """
import weave
from openai import OpenAI

weave.init("rag-production")
client = OpenAI()

@weave.op()
def retrieve_documents(query: str, top_k: int = 5) -> list[str]:
    """Retrieve relevant documents from vector store."""
    # Your vector search here (Pinecone, Weaviate, pgvector, etc.)
    results = vector_db.search(query, top_k=top_k)
    return [doc.content for doc in results]


@weave.op()
def generate_answer(question: str, context: list[str]) -> str:
    """Generate answer using retrieved context."""
    context_str = "\\n\\n".join(context)
    response = client.chat.completions.create(
        model="gpt-4o",
        messages=[
            {"role": "system", "content": "Answer based on the provided context only."},
            {"role": "user", "content": f"Context:\\n{context_str}\\n\\nQuestion: {question}"}
        ],
        max_tokens=500,
    )
    return response.choices[0].message.content


@weave.op()
def rag_pipeline(question: str) -> str:
    """Full RAG pipeline - each sub-step is traced separately."""
    docs = retrieve_documents(question)
    answer = generate_answer(question, docs)
    return answer


# Every call creates a trace showing:
# rag_pipeline
#   β”œβ”€β”€ retrieve_documents  [latency: 45ms, returned 5 docs]
#   └── generate_answer     [latency: 1200ms, 387 tokens, $0.0058]
result = rag_pipeline("What are the payment terms in our contract?")
"""

print("Custom RAG pipeline tracing:")
print(custom_tracing_code)
# --- Building Evaluations in Weave ---

weave_eval_code = """
import weave
import asyncio

weave.init("rag-evals")

# Define a scoring function
@weave.op()
def correctness_scorer(question: str, answer: str, expected: str) -> dict:
    """LLM-as-judge correctness evaluation."""
    from openai import OpenAI
    client = OpenAI()
    
    prompt = f"""
    Question: {question}
    Expected Answer: {expected}
    Actual Answer: {answer}
    
    Is the actual answer correct? Reply with JSON: {{"correct": true/false, "reason": "..."}}
    """
    response = client.chat.completions.create(
        model="gpt-4o-mini",
        messages=[{"role": "user", "content": prompt}],
        response_format={"type": "json_object"}
    )
    import json
    return json.loads(response.choices[0].message.content)


# Create evaluation dataset
eval_dataset = weave.Dataset(
    name="contract-qa-v1",
    rows=[
        {"question": "What are the payment terms?", "expected": "30 days net"},
        {"question": "What is the contract duration?", "expected": "1 year with auto-renewal"},
        {"question": "What is the liability cap?", "expected": "12 months of fees paid"},
    ]
)

# Run evaluation
evaluation = weave.Evaluation(
    name="rag-pipeline-eval-v1",
    dataset=eval_dataset,
    scorers=[correctness_scorer],
)

results = asyncio.run(evaluation.evaluate(rag_pipeline))
print(f"Correctness: {results['correctness_scorer']['correct']['mean']:.1%}")
"""

print("Weave Evaluations:")
print(weave_eval_code)
# --- Observability Platform Comparison ---

print("LLM Observability Platform Comparison (2025)")
print("=" * 95)

comparison = [
    {
        "Platform": "W&B Weave",
        "Best For": "Teams already using W&B for ML training",
        "Auto-trace": "OpenAI, Anthropic, LiteLLM",
        "Evals": "Built-in datasets + LLM-as-judge",
        "Pricing": "Free up to 1M traces/month",
        "Integration": "Native W&B, Hugging Face",
    },
    {
        "Platform": "LangSmith",
        "Best For": "LangChain/LangGraph users",
        "Auto-trace": "All LangChain components",
        "Evals": "Excellent, including human annotation",
        "Pricing": "Free up to 5K traces/month",
        "Integration": "Deep LangChain, LiteLLM",
    },
    {
        "Platform": "Helicone",
        "Best For": "Cost-focused teams, minimal setup",
        "Auto-trace": "Proxy-based, zero-code",
        "Evals": "Basic",
        "Pricing": "Free up to 100K requests/month",
        "Integration": "Any OpenAI-compatible API",
    },
    {
        "Platform": "Langfuse",
        "Best For": "Privacy-conscious / self-hosted",
        "Auto-trace": "SDK or proxy",
        "Evals": "Good, with human annotation UI",
        "Pricing": "Open source, self-hostable",
        "Integration": "LiteLLM, LangChain, OpenAI",
    },
    {
        "Platform": "Arize Phoenix",
        "Best For": "Advanced eval + RAG debugging",
        "Auto-trace": "LlamaIndex, LangChain, OpenAI",
        "Evals": "Very strong, RAGAS integration",
        "Pricing": "Open source",
        "Integration": "LlamaIndex native",
    },
]

for platform in comparison:
    print(f"\n{platform['Platform']}")
    for key, val in platform.items():
        if key != "Platform":
            print(f"  {key:<15}: {val}")

Part 5: FlashAttention-3 β€” Maximum GPU UtilizationΒΆ

The Attention BottleneckΒΆ

Standard attention is memory-bandwidth bound: repeatedly reading/writing the attention matrix (O(NΒ²) size) to GPU HBM. For long contexts and large models, attention becomes the dominant cost.

FlashAttention evolution:

Version

GPU Util

Key Innovation

Release

Standard

~30% H100

Baseline O(NΒ²) memory

-

FA1 (2022)

~50% A100

IO-aware tiling, O(N) memory

2022

FA2 (2023)

~72% A100

Better parallelism, causal mask

2023

FA3 (2024)

~85% H100

Warp specialization, FP8 support

2024

FA3 key improvements:

  • Warp specialization: Separate producer/consumer warps run concurrently on Hopper (H100/H800)

  • Asynchronous pipelining: Overlaps GEMM and softmax computation

  • FP8 support: 2x memory bandwidth at half precision

  • Requires: H100/H800 GPU and CUDA >= 12.3

# FlashAttention-3 with HuggingFace Transformers

flash_attn_code = """
# Install: pip install flash-attn --no-build-isolation
#          (requires CUDA 12.3+ and H100/H800 for FA3)

import torch
from transformers import AutoModelForCausalLM, AutoTokenizer

model_name = "meta-llama/Llama-3.3-70B-Instruct"

# --- FA3 (H100/H800 only, CUDA >= 12.3) ---
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    attn_implementation="flash_attention_3",
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

# --- FA2 (A100, H100, RTX 3090+, CUDA >= 11.6) ---
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    attn_implementation="flash_attention_2",
    torch_dtype=torch.bfloat16,
    device_map="auto",
)

# --- SDPA (PyTorch built-in, any GPU) ---
model = AutoModelForCausalLM.from_pretrained(
    model_name,
    attn_implementation="sdpa",   # Fallback, no install needed
    torch_dtype=torch.bfloat16,
    device_map="auto",
)
"""

print("FlashAttention implementation options:")
print(flash_attn_code)
# Auto-detect best attention implementation

def get_best_attn_implementation() -> str:
    """
    Detect available hardware and return the best attention implementation.
    Falls back gracefully: FA3 -> FA2 -> SDPA -> eager
    """
    try:
        import torch
        if not torch.cuda.is_available():
            print("No CUDA GPU detected, using 'sdpa' (CPU/MPS)")
            return "sdpa"
        
        gpu_name = torch.cuda.get_device_name(0)
        cuda_version = torch.version.cuda
        compute_cap = torch.cuda.get_device_capability(0)
        
        print(f"GPU: {gpu_name}")
        print(f"CUDA: {cuda_version}")
        print(f"Compute Capability: {compute_cap[0]}.{compute_cap[1]}")
        
        # H100/H800: compute capability 9.0
        # A100: compute capability 8.0
        # RTX 3090/4090: compute capability 8.6/8.9
        
        is_hopper = compute_cap[0] >= 9  # H100, H800
        cuda_major = int(cuda_version.split(".")[0]) if cuda_version else 0
        
        try:
            import flash_attn
            fa_version = flash_attn.__version__
            fa_major = int(fa_version.split(".")[0])
            
            if is_hopper and cuda_major >= 12 and fa_major >= 3:
                print(f"FlashAttention {fa_version} detected on H100 β€” using FA3")
                return "flash_attention_3"
            elif fa_major >= 2:
                print(f"FlashAttention {fa_version} detected β€” using FA2")
                return "flash_attention_2"
        except ImportError:
            print("flash-attn not installed β€” using PyTorch SDPA")
            print("Install: pip install flash-attn --no-build-isolation")
        
        return "sdpa"
    
    except ImportError:
        return "eager"


best_impl = get_best_attn_implementation()
print(f"\nRecommended: attn_implementation='{best_impl}'")
# FlashAttention-3 in vLLM (automatic)

print("FlashAttention in vLLM:")
print("""
vLLM automatically selects the best attention backend:
  - H100/H800 + flash-attn>=3.0: FLASH_ATTN_VLLM_V1 (FA3)
  - A100 + flash-attn>=2.0: FLASH_ATTN (FA2)
  - Fallback: TORCH_SDPA

Manual override (if needed):
  VLLM_ATTENTION_BACKEND=FLASH_ATTN python -m vllm.entrypoints.openai.api_server ...
  VLLM_ATTENTION_BACKEND=FLASHINFER  python -m vllm.entrypoints.openai.api_server ...

FlashInfer is another high-performance option, especially good for:
  - Speculative decoding
  - Chunked prefill
  - Sliding window attention
""")  

print("Memory savings with FA3 on long contexts:")
print()

# Memory comparison: standard vs FA3
# Standard attention stores full NΓ—N attention matrix
# FA stores only O(N) in HBM

print(f"{'Context Length':<20} {'Standard Attn (GB)':<22} {'FlashAttn (GB)':>16} {'Reduction':<12}")
print("-" * 72)

for ctx_len in [4096, 8192, 16384, 32768, 65536, 131072]:
    # Standard: NΓ—N attention matrix, float16 = 2 bytes
    standard_gb = (ctx_len ** 2 * 2) / (1024 ** 3)
    # FA: O(N) memory, roughly block_size * seq_len * 2
    # In practice ~128x smaller for long sequences
    fa_gb = max(0.001, standard_gb / (ctx_len / 128))
    reduction = (1 - fa_gb / standard_gb) * 100
    print(f"{ctx_len:<20,} {standard_gb:<22.3f} {fa_gb:>16.4f} {reduction:>10.1f}%")

Production Stack SummaryΒΆ

Here is the recommended production LLM infrastructure stack for 2025-2026:

production_stack = {
    "LLM Gateway": {
        "tool": "LiteLLM Proxy",
        "why": "Unified API for all providers, cost tracking, fallback routing",
        "install": "pip install litellm",
        "key_features": ["100+ LLM providers", "Budget limits", "Rate limiting", "Load balancing"],
    },
    "Cost Optimization": {
        "tool": "Anthropic Prompt Caching",
        "why": "Up to 90% cost reduction for repeated large prompts",
        "install": "pip install anthropic",
        "key_features": ["cache_control: ephemeral", "5min / 1hr TTL", "1024 token minimum"],
    },
    "Inference Speed": {
        "tool": "vLLM + Speculative Decoding (Eagle3)",
        "why": "2-3x throughput improvement for self-hosted models",
        "install": "pip install vllm",
        "key_features": ["Eagle3 draft model", "5 speculative tokens", "Greedy/low-temp best"],
    },
    "Observability": {
        "tool": "W&B Weave",
        "why": "Full LLM tracing, evals, cost tracking, debugging",
        "install": "pip install weave",
        "key_features": ["@weave.op() decorator", "Auto OpenAI/Anthropic tracing", "Built-in evals"],
    },
    "GPU Efficiency": {
        "tool": "FlashAttention-3",
        "why": "85% H100 GPU utilization, O(N) memory for long contexts",
        "install": "pip install flash-attn --no-build-isolation",
        "key_features": ["H100/H800 only", "CUDA 12.3+", "Auto-selected by vLLM"],
    },
}

print("PRODUCTION LLM INFRASTRUCTURE STACK 2025-2026")
print("=" * 60)

for layer, details in production_stack.items():
    print(f"\n{layer.upper()}")
    print(f"  Tool:    {details['tool']}")
    print(f"  Why:     {details['why']}")
    print(f"  Install: {details['install']}")
    print(f"  Features: {', '.join(details['key_features'])}")

print("\n" + "=" * 60)
print("Quick install for the full stack:")
print("  pip install litellm anthropic weave vllm flash-attn")
# Decision guide: which tool to reach for first

decision_guide = """
DECISION GUIDE: Which Tool to Use?
====================================

Q: I'm using multiple LLM providers and tired of different SDKs?
A: Use LiteLLM β€” unified interface, one line of code

Q: My LLM costs are too high?
A: 1. Add Anthropic prompt caching (cache_control: ephemeral) for large system prompts
   2. Use LiteLLM cost tracking to find expensive queries
   3. Route simple queries to cheaper models (gpt-4o-mini, gemini-flash)

Q: My LLM is too slow for production?
A: 1. Self-host with vLLM + speculative decoding (Eagle3)
   2. Enable FlashAttention-3 if on H100
   3. Use smaller models with prompt caching for context

Q: I can't debug why my agent/RAG is failing?
A: Add W&B Weave β€” weave.init() + @weave.op() on each step

Q: I need to monitor LLM quality in production?
A: W&B Weave evaluations with LLM-as-judge scoring

Q: I need audit logs and cost control per team?
A: LiteLLM Proxy with PostgreSQL backend

Q: I'm building on H100s and want maximum GPU efficiency?
A: FlashAttention-3 (auto-enabled in vLLM on H100)
"""

print(decision_guide)

Key TakeawaysΒΆ

LiteLLM eliminates vendor lock-in and adds enterprise controls (budgets, rate limits, routing) across 100+ LLM providers with zero code changes to your application.

Prompt Caching is often the single highest-ROI optimization: for any workflow with a large, repeated system prompt or document, caching cuts costs by 80-90% with two lines of code change.

Speculative Decoding with Eagle3 provides 2-3x throughput improvement for self-hosted models, especially for structured outputs, code generation, and long responses at low temperature.

W&B Weave makes LLM pipelines debuggable and measurable. The @weave.op() decorator is all you need to get full tracing of every sub-step in your RAG or agent workflow.

FlashAttention-3 extracts maximum GPU utilization on H100 hardware and is required for practical long-context inference (128K+ tokens) due to its O(N) memory footprint.

Part of the Zero to AI series β€” 09 MLOps