Streaming LLM ResponsesΒΆ

Phase 20 - Notebook 1ΒΆ

What you will learn:

  • Why streaming matters for user experience (TTFT vs total latency)

  • OpenAI streaming with stream=True

  • Anthropic streaming with context manager

  • HuggingFace TextIteratorStreamer for local models

  • Server-Sent Events (SSE) protocol and wire format

  • FastAPI SSE endpoint implementation

  • Handling stream chunks and delta content

  • Error handling in streams (timeouts, disconnections)

  • Progress tracking and cancellation

  • Multi-provider streaming adapter pattern

  • Token counting during streaming with tiktoken

  • Measuring TTFT and TPS

  • Simple HTML frontend for streaming

Prerequisites: Python async/await basics, OpenAI and Anthropic API keys

# Install required packages
!pip install openai anthropic fastapi sse-starlette uvicorn httpx tiktoken tqdm python-dotenv -q

import os
import time
import json
import asyncio
from typing import Generator, Optional, AsyncGenerator
from dotenv import load_dotenv

import openai
import anthropic

load_dotenv()

# Verify API keys are present
openai_key = os.getenv("OPENAI_API_KEY", "")
anthropic_key = os.getenv("ANTHROPIC_API_KEY", "")

print("Environment setup:")
print(f"  OpenAI key:    {'set' if openai_key else 'NOT SET - set OPENAI_API_KEY in .env'}")
print(f"  Anthropic key: {'set' if anthropic_key else 'NOT SET - set ANTHROPIC_API_KEY in .env'}")
print("\nAll imports successful.")

Part 1: Why Streaming MattersΒΆ

The Problem with Non-Streaming APIsΒΆ

When you call an LLM API without streaming, the server generates the entire response before sending a single byte to the client. For a 500-token response at 50 tokens/second, that means 10 seconds of silence before any output appears.

Non-Streaming Timeline:
t=0s   [Request sent] ──────────────────────────────────────────── t=10s [Full response received]
        User stares at blank screen for 10 seconds

Streaming Timeline:
t=0s   [Request sent]
t=0.3s [FIRST TOKEN] ── token ── token ── token ── ... ── t=10s [Last token]
        User sees output almost immediately (TTFT = 0.3s)

Key MetricsΒΆ

Metric

Definition

Target

TTFT (Time To First Token)

Time from request to first token received

< 500ms

TPS (Tokens Per Second)

How many tokens arrive per second

30-100 TPS

Total Latency

Time from request to last token

Depends on length

E2E Latency

Full round-trip including network

TTFT + generation time

Why This Matters for UXΒΆ

  • 0-100ms: Feels instantaneous

  • 100-300ms: Feels responsive

  • 300-1000ms: User notices the delay

  • > 1000ms: User feels the app is slow

Streaming lets you hit the β€œfeels responsive” range even when total generation takes 10+ seconds.

# Demonstrate the non-streaming vs streaming difference

client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
PROMPT = "Write a short poem about the Python programming language (4 lines)."

# --- Non-streaming call ---
print("=" * 50)
print("NON-STREAMING: waiting for complete response...")
print("=" * 50)
start = time.time()

response = client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": PROMPT}],
    stream=False
)

elapsed_non_stream = time.time() - start
print(f"\nWaited {elapsed_non_stream:.2f}s before seeing anything.")
print("\nResponse:")
print(response.choices[0].message.content)
print(f"\nTotal tokens: {response.usage.total_tokens}")

print()
print("=" * 50)
print("STREAMING: tokens appear as they are generated...")
print("=" * 50)
start = time.time()
first_token_time = None
token_count = 0

for chunk in client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": PROMPT}],
    stream=True
):
    if chunk.choices[0].delta.content:
        if first_token_time is None:
            first_token_time = time.time()
            print(f"\n[TTFT: {first_token_time - start:.3f}s] ", end="")
        print(chunk.choices[0].delta.content, end="", flush=True)
        token_count += 1

elapsed_stream = time.time() - start
print(f"\n\nTotal time:  {elapsed_stream:.2f}s")
print(f"TTFT:        {first_token_time - start:.3f}s  (user saw output after this)")
print(f"Tokens:      {token_count}")
print(f"TPS:         {token_count / elapsed_stream:.1f}")

Part 2: OpenAI Streaming APIΒΆ

How It WorksΒΆ

Add stream=True to any chat.completions.create() call. The method returns an iterator of ChatCompletionChunk objects instead of a single ChatCompletion.

Chunk StructureΒΆ

ChatCompletionChunk(
    id='chatcmpl-...',
    choices=[
        Choice(
            delta=ChoiceDelta(
                content='Hello',   # The new text fragment (can be None)
                role=None,         # Only set on first chunk
            ),
            finish_reason=None,    # 'stop', 'length', 'content_filter', or None
            index=0
        )
    ],
    model='gpt-4o-mini',
    usage=None  # Only in last chunk if stream_options={'include_usage': True}
)

Important NotesΒΆ

  • delta.content can be None on the first and last chunks (role assignment / finish)

  • finish_reason is only non-None on the last chunk

  • finish_reason='stop' = natural completion

  • finish_reason='length' = hit max_tokens limit

  • finish_reason='content_filter' = content policy triggered

# OpenAI streaming with full chunk inspection and metrics

def openai_stream(
    prompt: str,
    model: str = "gpt-4o-mini",
    max_tokens: int = 200,
    verbose_chunks: bool = False
) -> dict:
    """Stream OpenAI response with TTFT and TPS measurement."""
    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    collected_content = []
    finish_reason = None
    chunk_count = 0
    token_count = 0
    start = time.time()
    first_token_time = None

    print(f"\nPrompt: {prompt[:60]}...\n")
    print("-" * 40)

    for chunk in client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=max_tokens
    ):
        chunk_count += 1
        delta = chunk.choices[0].delta
        reason = chunk.choices[0].finish_reason

        if verbose_chunks:
            print(f"  [chunk {chunk_count}] content={repr(delta.content)} finish={reason}")

        if delta.content:
            if first_token_time is None:
                first_token_time = time.time()
            print(delta.content, end="", flush=True)
            collected_content.append(delta.content)
            token_count += 1

        if reason:
            finish_reason = reason

    total_time = time.time() - start
    ttft = (first_token_time - start) if first_token_time else 0

    print("\n" + "-" * 40)
    print(f"Finish reason:  {finish_reason}")
    print(f"Chunks received:{chunk_count}")
    print(f"Tokens emitted: {token_count}")
    print(f"TTFT:           {ttft:.3f}s")
    print(f"Total time:     {total_time:.3f}s")
    print(f"TPS:            {token_count / total_time:.1f}")

    return {
        "content": "".join(collected_content),
        "ttft": ttft,
        "total_time": total_time,
        "tps": token_count / total_time,
        "finish_reason": finish_reason
    }


# Run it
result = openai_stream(
    "Explain what a Python generator is and give a simple example.",
    model="gpt-4o-mini",
    max_tokens=150
)
print(f"\nFull response length: {len(result['content'])} characters")
# Inspect raw chunk structure to understand the wire format
print("First 5 chunks from a streaming call:")
print("=" * 60)

client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

for i, chunk in enumerate(client.chat.completions.create(
    model="gpt-4o-mini",
    messages=[{"role": "user", "content": "Say hello in 5 words."}],
    stream=True
)):
    print(f"\nChunk {i + 1}:")
    print(f"  id:            {chunk.id[:20]}...")
    print(f"  model:         {chunk.model}")
    print(f"  delta.role:    {chunk.choices[0].delta.role}")
    print(f"  delta.content: {repr(chunk.choices[0].delta.content)}")
    print(f"  finish_reason: {chunk.choices[0].finish_reason}")
    if i >= 4:
        print("  ... (more chunks follow)")
        break

Part 3: Anthropic StreamingΒΆ

Key Differences from OpenAIΒΆ

Anthropic’s streaming uses a context manager pattern (with client.messages.stream(...) as stream:). This ensures the underlying HTTP connection is properly closed even if an exception occurs.

# OpenAI style (iterator)
for chunk in client.chat.completions.create(stream=True, ...):
    text = chunk.choices[0].delta.content

# Anthropic style (context manager)
with client.messages.stream(...) as stream:
    for text in stream.text_stream:  # yields str directly, no None checks needed
        print(text)

stream.text_stream vs streamΒΆ

  • stream.text_stream - yields str fragments only (None filtered out)

  • stream itself - yields raw MessageStreamEvent objects for lower-level control

  • stream.get_final_message() - returns the complete Message after streaming

# Anthropic streaming with TTFT and TPS measurement

def anthropic_stream(
    prompt: str,
    model: str = "claude-3-haiku-20240307",
    max_tokens: int = 200
) -> dict:
    """Stream Anthropic response with metrics."""
    anth_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))

    collected = []
    token_count = 0
    start = time.time()
    first_token_time = None

    print(f"Model: {model}")
    print(f"Prompt: {prompt[:60]}...\n")
    print("-" * 40)

    with anth_client.messages.stream(
        model=model,
        max_tokens=max_tokens,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            if first_token_time is None:
                first_token_time = time.time()
            print(text, end="", flush=True)
            collected.append(text)
            token_count += 1

        # Get final message for usage stats
        final_msg = stream.get_final_message()

    total_time = time.time() - start
    ttft = (first_token_time - start) if first_token_time else 0

    print("\n" + "-" * 40)
    print(f"Input tokens:   {final_msg.usage.input_tokens}")
    print(f"Output tokens:  {final_msg.usage.output_tokens}")
    print(f"Stop reason:    {final_msg.stop_reason}")
    print(f"TTFT:           {ttft:.3f}s")
    print(f"Total time:     {total_time:.3f}s")
    print(f"TPS (approx):   {token_count / total_time:.1f}")

    return {
        "content": "".join(collected),
        "ttft": ttft,
        "total_time": total_time,
        "input_tokens": final_msg.usage.input_tokens,
        "output_tokens": final_msg.usage.output_tokens
    }


result = anthropic_stream(
    "Explain recursion with a simple Python example.",
    model="claude-3-haiku-20240307",
    max_tokens=150
)

Part 4: Server-Sent Events (SSE) ProtocolΒΆ

What is SSE?ΒΆ

Server-Sent Events is a W3C standard for pushing data from a server to a browser over a plain HTTP connection. It is one-directional (server to client only) and uses HTTP chunked transfer encoding under the hood.

Wire FormatΒΆ

Each event is separated by two newlines (\n\n). Fields:

data: {"text": "Hello"}\n\n
data: {"text": " world"}\n\n

# Named event:
event: token\n
data: {"text": "Hello"}\n\n

# Comment (keepalive):
: heartbeat\n\n

# Termination signal (convention):
data: [DONE]\n\n

SSE vs WebSocket ComparisonΒΆ

Feature

SSE

WebSocket

Direction

Server β†’ Client only

Bidirectional

Protocol

HTTP/1.1 or HTTP/2

Custom (ws://)

Auto-reconnect

Built into browser

Must implement manually

Browser support

All modern browsers

All modern browsers

Proxies/firewalls

Usually fine (it’s HTTP)

May be blocked

Use case

LLM streaming, logs, notifications

Chat, games, collaboration

Complexity

Simple

More complex

When to Use SSEΒΆ

Use SSE when you only need server-to-client push: LLM streaming, live feeds, progress updates. Use WebSockets when you need bidirectional communication: chat, collaborative editing, games.

# FastAPI SSE endpoint implementation
# This defines the server app. Run it with:
#   uvicorn <script_name>:app --reload --port 8000

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import asyncio
import json

app = FastAPI(title="LLM Streaming API")

# Allow browser requests from any origin
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_methods=["GET", "POST"],
    allow_headers=["*"]
)


async def openai_sse_generator(prompt: str, model: str = "gpt-4o-mini"):
    """Async generator that yields SSE-formatted events from OpenAI streaming."""
    oai = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    try:
        async with oai.chat.completions.stream(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for chunk in stream:
                token = chunk.choices[0].delta.content
                if token:
                    # Yield structured JSON payload
                    yield {"data": json.dumps({"text": token, "type": "token"})}

        # Signal completion
        yield {"data": json.dumps({"type": "done"})}

    except Exception as e:
        yield {"data": json.dumps({"type": "error", "message": str(e)})}


@app.get("/stream")
async def stream_endpoint(request: Request, prompt: str = "Tell me a fun fact about space."):
    """
    SSE endpoint. Client connects once; server pushes tokens as they arrive.
    
    Usage:
        curl 'http://localhost:8000/stream?prompt=Hello+world'
        EventSource('http://localhost:8000/stream?prompt=Hello') in browser
    """
    return EventSourceResponse(
        openai_sse_generator(prompt),
        media_type="text/event-stream"
    )


@app.get("/stream/anthropic")
async def stream_anthropic_endpoint(
    request: Request,
    prompt: str = "Explain asyncio in two sentences."
):
    """SSE endpoint backed by Anthropic Claude."""

    async def anthropic_generator():
        anth = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
        try:
            with anth.messages.stream(
                model="claude-3-haiku-20240307",
                max_tokens=300,
                messages=[{"role": "user", "content": prompt}]
            ) as stream:
                for text in stream.text_stream:
                    yield {"data": json.dumps({"text": text, "type": "token"})}
            yield {"data": json.dumps({"type": "done"})}
        except Exception as e:
            yield {"data": json.dumps({"type": "error", "message": str(e)})}

    return EventSourceResponse(anthropic_generator())


print("FastAPI SSE app defined.")
print()
print("To run the server:")
print("  1. Save this cell's app to a file: streaming_server.py")
print("  2. Run: uvicorn streaming_server:app --reload --port 8000")
print()
print("Endpoints:")
print("  GET /stream?prompt=...          (OpenAI)")
print("  GET /stream/anthropic?prompt=... (Anthropic)")
# Save the FastAPI server to a runnable Python file
server_code = '''
import os, json, asyncio
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import openai
import anthropic
from dotenv import load_dotenv

load_dotenv()

app = FastAPI(title="LLM Streaming API")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])

async def openai_sse_generator(prompt: str, model: str = "gpt-4o-mini"):
    oai = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    try:
        async with oai.chat.completions.stream(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for chunk in stream:
                token = chunk.choices[0].delta.content
                if token:
                    yield {"data": json.dumps({"text": token, "type": "token"})}
        yield {"data": json.dumps({"type": "done"})}
    except Exception as e:
        yield {"data": json.dumps({"type": "error", "message": str(e)})}

@app.get("/stream")
async def stream_endpoint(request: Request, prompt: str = "Tell me a fun fact."):
    return EventSourceResponse(openai_sse_generator(prompt))

@app.get("/")
async def root():
    return {"message": "LLM Streaming API", "endpoints": ["/stream"]}

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)
'''

server_path = "/tmp/streaming_server.py"
with open(server_path, "w") as f:
    f.write(server_code)

print(f"Server saved to {server_path}")
print("Start with: python /tmp/streaming_server.py")
print("Or:         uvicorn streaming_server:app --reload --port 8000")
print()
print("Test with curl (requires server running):")
print("  curl 'http://localhost:8000/stream?prompt=Hello+world'")
# Consuming an SSE endpoint from Python using httpx
# This is useful for testing your SSE server from another Python process

import httpx
import asyncio
import json


async def consume_sse(
    url: str,
    timeout: float = 30.0,
    on_token=None
) -> str:
    """
    Consume an SSE endpoint and return the full accumulated text.
    
    Args:
        url: SSE endpoint URL
        timeout: Seconds to wait before giving up
        on_token: Optional callback called with each token string
    """
    collected = []
    token_count = 0
    start = time.time()

    async with httpx.AsyncClient() as client:
        async with client.stream("GET", url, timeout=timeout) as response:
            response.raise_for_status()
            print(f"Connected. Status: {response.status_code}")
            print(f"Content-Type: {response.headers.get('content-type')}")
            print("-" * 40)

            async for line in response.aiter_lines():
                if not line or line.startswith(":"):  # Skip comments/keepalives
                    continue

                if line.startswith("data: "):
                    raw_data = line[6:]  # Strip "data: " prefix

                    try:
                        payload = json.loads(raw_data)
                    except json.JSONDecodeError:
                        continue

                    if payload.get("type") == "done":
                        print("\n[Stream complete]")
                        break

                    if payload.get("type") == "error":
                        print(f"\n[Error] {payload.get('message')}")
                        break

                    if payload.get("type") == "token":
                        text = payload["text"]
                        collected.append(text)
                        token_count += 1
                        print(text, end="", flush=True)
                        if on_token:
                            on_token(text)

    elapsed = time.time() - start
    print(f"\nTokens: {token_count} | Time: {elapsed:.2f}s | TPS: {token_count/elapsed:.1f}")
    return "".join(collected)


# Usage (requires the FastAPI server running on port 8000)
print("SSE consumer defined.")
print("Usage (requires server on port 8000):")
print()
print("  import asyncio")
print("  result = asyncio.run(consume_sse(")
print("      'http://localhost:8000/stream?prompt=Tell+me+a+joke'")
print("  ))")
print()
print("# Uncomment to run if server is active:")
# result = asyncio.run(consume_sse("http://localhost:8000/stream?prompt=Tell+me+a+joke"))

Part 5: Handling Stream Chunks and Delta ContentΒΆ

Accumulation PatternΒΆ

Always accumulate the full response alongside streaming. You need it for:

  • Storing in a database

  • Passing to the next step in a pipeline

  • Computing final token counts

  • Retry logic if something fails after streaming

Defensive Chunk HandlingΒΆ

# Bad: assumes delta.content is always a non-empty string
for chunk in stream:
    print(chunk.choices[0].delta.content)  # Can print "None"

# Good: check before using
for chunk in stream:
    content = chunk.choices[0].delta.content
    if content:  # Handles None and empty string
        print(content, end="", flush=True)

Finish Reason ValuesΒΆ

Value

Meaning

Action

stop

Natural completion

Normal

length

Hit max_tokens

May want to continue with another call

content_filter

Safety filter triggered

Handle gracefully, log

tool_calls

Model wants to call a tool

Process tool call

None

Not the last chunk

Continue iteration

# Robust chunk handler with accumulation and finish reason detection

def stream_with_accumulation(
    prompt: str,
    model: str = "gpt-4o-mini",
    max_tokens: int = 200,
    system: str = None
) -> dict:
    """
    Stream and accumulate a response, returning stats and full text.
    Handles all edge cases in chunk processing.
    """
    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    messages = []
    if system:
        messages.append({"role": "system", "content": system})
    messages.append({"role": "user", "content": prompt})

    full_response = []
    finish_reason = None
    model_used = None

    print(f"Streaming response:\n")

    for chunk in client.chat.completions.create(
        model=model,
        messages=messages,
        stream=True,
        max_tokens=max_tokens
    ):
        # Capture model name from first chunk
        if model_used is None:
            model_used = chunk.model

        choice = chunk.choices[0]

        # Safe content extraction
        content = choice.delta.content
        if content is not None and content != "":
            full_response.append(content)
            print(content, end="", flush=True)

        # Capture finish reason when it arrives
        if choice.finish_reason is not None:
            finish_reason = choice.finish_reason

    final_text = "".join(full_response)

    print("\n")
    if finish_reason == "length":
        print("[WARNING] Response was cut off (hit max_tokens limit).")
        print("  Consider increasing max_tokens or splitting the request.")
    elif finish_reason == "content_filter":
        print("[WARNING] Content was filtered by safety policy.")
    elif finish_reason == "stop":
        print("[OK] Response completed naturally.")

    return {
        "text": final_text,
        "finish_reason": finish_reason,
        "model": model_used,
        "char_count": len(final_text)
    }


# Test with a truncated response (low max_tokens)
print("Test 1: Normal completion")
r1 = stream_with_accumulation("What is 2 + 2?", max_tokens=50)
print(f"Finish reason: {r1['finish_reason']}")

print("\nTest 2: Truncated (max_tokens=10)")
r2 = stream_with_accumulation("Write a long story about dragons.", max_tokens=10)
print(f"Finish reason: {r2['finish_reason']}")

Part 6: Error Handling in StreamsΒΆ

Error CategoriesΒΆ

  1. Pre-stream errors - API key invalid, rate limit before first token, network unreachable

  2. Mid-stream errors - Connection dropped, timeout while streaming, server restart

  3. Content errors - Content filter triggered mid-stream

Key Principle: Partial Response RecoveryΒΆ

Always save what you have before propagating an error. A partial response is almost always better than nothing.

try:
    for chunk in stream:
        collected.append(chunk.choices[0].delta.content or "")
except openai.APITimeoutError:
    # We have a partial response - use it rather than failing completely
    return {"text": "".join(collected), "partial": True}
# Comprehensive error handling for streaming

import openai


def stream_with_error_handling(
    prompt: str,
    model: str = "gpt-4o-mini",
    timeout: float = 30.0,
    max_tokens: int = 200
) -> dict:
    """
    Stream with comprehensive error handling and partial response recovery.
    Returns a result dict with 'text', 'error', and 'partial' fields.
    """
    client = openai.OpenAI(
        api_key=os.getenv("OPENAI_API_KEY"),
        timeout=timeout
    )

    collected = []
    error_info = None
    is_partial = False
    start = time.time()

    try:
        for chunk in client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            max_tokens=max_tokens
        ):
            content = chunk.choices[0].delta.content
            if content:
                collected.append(content)
                print(content, end="", flush=True)

    except openai.APITimeoutError as e:
        error_info = {"type": "timeout", "message": str(e)}
        is_partial = len(collected) > 0
        print(f"\n[TIMEOUT after {time.time() - start:.1f}s]")
        if is_partial:
            print(f"  Partial response has {len(collected)} tokens.")

    except openai.APIConnectionError as e:
        error_info = {"type": "connection", "message": str(e)}
        is_partial = len(collected) > 0
        print(f"\n[CONNECTION ERROR] {e}")

    except openai.RateLimitError as e:
        error_info = {"type": "rate_limit", "message": str(e)}
        print(f"\n[RATE LIMIT] Retry after cooling off.")
        # In production: implement exponential backoff here

    except openai.AuthenticationError as e:
        error_info = {"type": "auth", "message": "Invalid API key"}
        print(f"\n[AUTH ERROR] Check your OPENAI_API_KEY.")

    except openai.BadRequestError as e:
        error_info = {"type": "bad_request", "message": str(e)}
        print(f"\n[BAD REQUEST] {e}")

    except KeyboardInterrupt:
        error_info = {"type": "cancelled", "message": "User cancelled"}
        is_partial = len(collected) > 0
        print(f"\n[CANCELLED by user]")

    except Exception as e:
        error_info = {"type": "unknown", "message": str(e)}
        print(f"\n[UNEXPECTED ERROR] {type(e).__name__}: {e}")

    finally:
        # Always return what we have
        partial_text = "".join(collected)

    return {
        "text": partial_text,
        "partial": is_partial,
        "error": error_info,
        "elapsed": time.time() - start
    }


# Test normal operation
print("Test: Normal streaming")
result = stream_with_error_handling("Say 'Hello, streaming!' and nothing else.")
print(f"\nResult: text={repr(result['text'])}, error={result['error']}")

# Test with very short timeout to simulate timeout error
print("\n" + "=" * 40)
print("Test: Simulated timeout (timeout=0.001s)")
result2 = stream_with_error_handling(
    "Write a 500 word essay on AI.",
    timeout=0.001  # Extremely short timeout - will almost always time out
)
print(f"Error type: {result2['error']['type'] if result2['error'] else None}")
print(f"Partial text collected: {repr(result2['text'][:50])}")

Part 7: Progress Tracking and CancellationΒΆ

Token Budget TrackingΒΆ

Use max_tokens to set a hard limit, and track consumed tokens in real-time:

token_budget = 100
tokens_used = 0
for chunk in stream:
    if tokens_used >= token_budget:
        break  # Soft cancellation
    tokens_used += 1

Cooperative Cancellation with asyncio.EventΒΆ

For async code, use an asyncio.Event as a cancellation token:

cancel = asyncio.Event()
# In another coroutine: await cancel_event.set()
async for chunk in stream:
    if cancel.is_set():
        break
# Progress tracking with tqdm
from tqdm.auto import tqdm


def stream_with_progress(
    prompt: str,
    max_tokens: int = 100,
    model: str = "gpt-4o-mini"
) -> str:
    """
    Stream with a live progress bar showing token budget consumption.
    """
    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    collected = []

    pbar = tqdm(
        total=max_tokens,
        desc="Generating",
        unit="tok",
        bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} tokens [{elapsed}]"
    )

    try:
        for chunk in client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            max_tokens=max_tokens
        ):
            content = chunk.choices[0].delta.content
            if content:
                collected.append(content)
                pbar.update(1)
    finally:
        pbar.close()

    result = "".join(collected)
    print(f"\nGenerated text:\n{result}")
    return result


text = stream_with_progress(
    "List 5 Python best practices, one per line.",
    max_tokens=120
)
# Async cancellation using asyncio.Event

import asyncio


async def cancellable_stream(
    prompt: str,
    cancel_event: asyncio.Event,
    model: str = "gpt-4o-mini"
) -> dict:
    """
    Stream that can be cancelled by setting cancel_event.
    The cancel_event can be set from another coroutine.
    """
    oai = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    collected = []
    token_count = 0
    cancelled = False

    try:
        async with oai.chat.completions.stream(
            model=model,
            messages=[{"role": "user", "content": prompt}]
        ) as stream:
            async for chunk in stream:
                # Check cancellation before each token
                if cancel_event.is_set():
                    cancelled = True
                    print("\n[Stream cancelled at user request]")
                    break

                content = chunk.choices[0].delta.content
                if content:
                    collected.append(content)
                    token_count += 1
                    print(content, end="", flush=True)

    except asyncio.CancelledError:
        print("\n[Task was cancelled externally]")
        cancelled = True

    return {
        "text": "".join(collected),
        "tokens": token_count,
        "cancelled": cancelled
    }


async def demo_cancellation():
    """Demo: cancel stream after 3 tokens."""
    cancel = asyncio.Event()

    # Schedule cancellation after 3 tokens worth of time (~0.2s)
    async def auto_cancel():
        await asyncio.sleep(0.3)
        print("\n[Auto-cancel triggered after 0.3s]")
        cancel.set()

    canceller = asyncio.create_task(auto_cancel())
    result = await cancellable_stream(
        "Count from 1 to 100, one number per line.",
        cancel_event=cancel
    )
    await canceller

    print(f"\nTokens collected before cancel: {result['tokens']}")
    print(f"Was cancelled: {result['cancelled']}")
    print(f"Partial text: {repr(result['text'][:50])}")


# Run the cancellation demo
await demo_cancellation()

Part 8: Multi-Provider Streaming AdapterΒΆ

The Adapter PatternΒΆ

Different providers have different streaming APIs. A thin adapter layer lets you swap providers without changing the rest of your code:

Your App Code
      |
      β–Ό
stream_from_provider(provider, prompt)  ← unified interface
      |              |              |
      β–Ό              β–Ό              β–Ό
 stream_openai   stream_anthropic  stream_ollama
      |              |              |
      β–Ό              β–Ό              β–Ό
  OpenAI API    Anthropic API   Local Ollama
# Multi-provider streaming adapter

from typing import Generator
import httpx


def stream_openai(
    prompt: str,
    model: str = "gpt-4o-mini",
    max_tokens: int = 200
) -> Generator[str, None, None]:
    """Yield text fragments from OpenAI."""
    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    for chunk in client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=max_tokens
    ):
        if chunk.choices[0].delta.content:
            yield chunk.choices[0].delta.content


def stream_anthropic(
    prompt: str,
    model: str = "claude-3-haiku-20240307",
    max_tokens: int = 200
) -> Generator[str, None, None]:
    """Yield text fragments from Anthropic."""
    client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
    with client.messages.stream(
        model=model,
        max_tokens=max_tokens,
        messages=[{"role": "user", "content": prompt}]
    ) as stream:
        for text in stream.text_stream:
            yield text


def stream_ollama(
    prompt: str,
    model: str = "llama3.2",
    base_url: str = "http://localhost:11434"
) -> Generator[str, None, None]:
    """
    Yield text fragments from a local Ollama instance.
    Requires: ollama running locally (https://ollama.ai)
    """
    try:
        with httpx.stream(
            "POST",
            f"{base_url}/api/generate",
            json={"model": model, "prompt": prompt, "stream": True},
            timeout=60.0
        ) as response:
            for line in response.iter_lines():
                if not line:
                    continue
                data = json.loads(line)
                if data.get("response"):
                    yield data["response"]
                if data.get("done"):
                    break
    except httpx.ConnectError:
        raise RuntimeError(
            "Cannot connect to Ollama. Is it running? "
            "Install from https://ollama.ai and run: ollama pull llama3.2"
        )


# Unified interface
_PROVIDER_MAP = {
    "openai": stream_openai,
    "anthropic": stream_anthropic,
    "ollama": stream_ollama,
}


def stream_from_provider(
    provider: str,
    prompt: str,
    **kwargs
) -> dict:
    """
    Stream from any supported provider using a unified interface.
    
    providers: 'openai', 'anthropic', 'ollama'
    """
    if provider not in _PROVIDER_MAP:
        raise ValueError(f"Unknown provider '{provider}'. Choose from: {list(_PROVIDER_MAP)}")

    print(f"\n--- Streaming from {provider} ---")
    start = time.time()
    first_token_time = None
    token_count = 0
    collected = []

    try:
        for text in _PROVIDER_MAP[provider](prompt, **kwargs):
            if first_token_time is None:
                first_token_time = time.time()
            print(text, end="", flush=True)
            collected.append(text)
            token_count += 1
    except RuntimeError as e:
        print(f"[SKIPPED] {e}")
        return {"provider": provider, "error": str(e)}

    elapsed = time.time() - start
    ttft = (first_token_time - start) if first_token_time else 0

    print(f"\nTTFT={ttft:.3f}s | TPS={token_count/elapsed:.1f} | tokens={token_count}")
    return {"provider": provider, "text": "".join(collected), "ttft": ttft, "tps": token_count/elapsed}


# Run on OpenAI (always available)
r1 = stream_from_provider("openai", "What is a Python decorator? One sentence.", max_tokens=60)

# Uncomment to test Anthropic:
# r2 = stream_from_provider("anthropic", "What is a Python decorator? One sentence.", max_tokens=60)

# Uncomment to test Ollama (requires local Ollama):
# r3 = stream_from_provider("ollama", "What is a Python decorator? One sentence.", model="llama3.2")

Part 9: Token Counting During StreamingΒΆ

Why Count Tokens During Streaming?ΒΆ

  • Cost control: Stop generation when approaching a budget

  • Rate limit awareness: Track token usage in real-time

  • UX: Show β€œX tokens generated” in the UI

  • Analytics: Log actual token usage per request

tiktokenΒΆ

OpenAI’s tiktoken library tokenizes text the same way the models do, enabling accurate token counting without waiting for the API response.

import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o-mini")
tokens = enc.encode("Hello, world!")
print(len(tokens))  # 4
# Token counting during streaming using tiktoken

import tiktoken


def stream_with_token_count(
    prompt: str,
    model: str = "gpt-4o-mini",
    max_tokens: int = 200,
    token_budget: int = None
) -> dict:
    """
    Stream with real-time token counting.
    Optionally stops when token_budget is exceeded.
    """
    # Use cl100k_base for gpt-4 and gpt-3.5 models
    try:
        enc = tiktoken.encoding_for_model(model)
    except KeyError:
        enc = tiktoken.get_encoding("cl100k_base")

    client_oai = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

    # Count input tokens
    input_token_count = len(enc.encode(prompt))
    output_token_count = 0
    collected = []
    budget_hit = False

    start = time.time()
    first_token_time = None

    print(f"Input tokens: {input_token_count}")
    print(f"Token budget: {token_budget or 'unlimited'}")
    print(f"Max tokens:   {max_tokens}")
    print("-" * 40)

    for chunk in client_oai.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        max_tokens=max_tokens
    ):
        content = chunk.choices[0].delta.content
        if content:
            if first_token_time is None:
                first_token_time = time.time()

            # Count tokens in this fragment
            fragment_tokens = len(enc.encode(content))
            output_token_count += fragment_tokens

            # Check token budget
            if token_budget and output_token_count > token_budget:
                budget_hit = True
                print(f"\n[BUDGET EXCEEDED: {output_token_count} > {token_budget} tokens]")
                break

            collected.append(content)
            print(content, end="", flush=True)

    total_time = time.time() - start
    ttft = (first_token_time - start) if first_token_time else 0

    print("\n" + "-" * 40)
    print(f"Input tokens:  {input_token_count}")
    print(f"Output tokens: {output_token_count}")
    print(f"Total tokens:  {input_token_count + output_token_count}")
    print(f"TTFT:          {ttft:.3f}s")
    print(f"Total time:    {total_time:.3f}s")
    print(f"TPS:           {output_token_count / total_time:.1f}")
    if budget_hit:
        print(f"Budget hit:    YES (stopped early)")

    return {
        "text": "".join(collected),
        "input_tokens": input_token_count,
        "output_tokens": output_token_count,
        "ttft": ttft,
        "tps": output_token_count / total_time,
        "budget_hit": budget_hit
    }


# Normal streaming with token counts
print("Test 1: Normal streaming with token count")
r = stream_with_token_count(
    "Explain the difference between a list and a tuple in Python.",
    max_tokens=100
)

# Streaming with token budget enforcement
print("\n" + "=" * 40)
print("Test 2: With token budget (stop at 20 tokens)")
r2 = stream_with_token_count(
    "Tell me everything about machine learning.",
    max_tokens=200,
    token_budget=20
)

Part 10: TTFT and TPS BenchmarkingΒΆ

Why Benchmark?ΒΆ

  • Compare models to choose the right cost/latency trade-off

  • Detect degraded performance in production

  • Set SLA thresholds for your application

Industry Benchmarks (approximate, as of 2025)ΒΆ

Model

Typical TTFT

Typical TPS

gpt-4o-mini

200-400ms

80-120 TPS

gpt-4o

300-600ms

50-80 TPS

claude-3-haiku

200-500ms

80-150 TPS

claude-3.5-sonnet

300-700ms

60-100 TPS

Local (llama3.2)

50-200ms

15-60 TPS

Numbers vary with load, region, prompt length, and system prompt.

# Benchmark multiple OpenAI models

import statistics


def benchmark_model(
    model: str,
    prompt: str,
    max_tokens: int = 50,
    runs: int = 2
) -> dict:
    """Benchmark a single model with multiple runs."""
    client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
    ttfts, tps_list, total_times = [], [], []

    for run in range(runs):
        start = time.time()
        first = None
        tokens = 0

        for chunk in client.chat.completions.create(
            model=model,
            messages=[{"role": "user", "content": prompt}],
            stream=True,
            max_tokens=max_tokens
        ):
            if chunk.choices[0].delta.content:
                if first is None:
                    first = time.time()
                tokens += 1

        elapsed = time.time() - start
        if first and tokens > 0:
            ttfts.append(first - start)
            tps_list.append(tokens / elapsed)
            total_times.append(elapsed)

    return {
        "model": model,
        "runs": runs,
        "avg_ttft": statistics.mean(ttfts) if ttfts else 0,
        "avg_tps": statistics.mean(tps_list) if tps_list else 0,
        "avg_total": statistics.mean(total_times) if total_times else 0,
        "ttft_stdev": statistics.stdev(ttfts) if len(ttfts) > 1 else 0
    }


def benchmark_models(models: list, prompt: str, max_tokens: int = 50, runs: int = 2):
    """Benchmark multiple models and print a comparison table."""
    print(f"Benchmark: {runs} runs each, max_tokens={max_tokens}")
    print(f"Prompt: {prompt[:50]}...\n")
    print(f"{'Model':<25} {'Avg TTFT':>10} {'Avg TPS':>10} {'Avg Total':>12} {'TTFT Stdev':>12}")
    print("-" * 72)

    results = []
    for model in models:
        print(f"  Testing {model}...", end="", flush=True)
        r = benchmark_model(model, prompt, max_tokens, runs)
        results.append(r)
        print(f"\r{r['model']:<25} {r['avg_ttft']:>9.3f}s {r['avg_tps']:>9.1f} {r['avg_total']:>11.3f}s {r['ttft_stdev']:>11.3f}s")

    # Highlight best TTFT and TPS
    best_ttft = min(results, key=lambda x: x['avg_ttft'])
    best_tps = max(results, key=lambda x: x['avg_tps'])
    print("-" * 72)
    print(f"Best TTFT: {best_ttft['model']} ({best_ttft['avg_ttft']:.3f}s)")
    print(f"Best TPS:  {best_tps['model']} ({best_tps['avg_tps']:.1f} tok/s)")
    return results


# Benchmark gpt-4o-mini vs gpt-3.5-turbo
results = benchmark_models(
    models=["gpt-4o-mini", "gpt-3.5-turbo"],
    prompt="Write a haiku about software engineering.",
    max_tokens=50,
    runs=2
)

Part 11: HuggingFace TextIteratorStreamerΒΆ

Local Model StreamingΒΆ

For local models via HuggingFace Transformers, use TextIteratorStreamer. It runs generation in a background thread and makes tokens available through an iterator in the main thread.

ArchitectureΒΆ

Main Thread                     Background Thread
    |                                  |
    |   Thread(target=model.generate,  |
    |           kwargs={streamer=...}) ──
    |                                  | β†’ generates tokens
    |   for text in streamer:  ←────────    puts into queue
    |       print(text)                |

When to Use Local ModelsΒΆ

  • Privacy-sensitive data (nothing leaves your machine)

  • Cost at high volume (no per-token fees)

  • Offline deployment

  • Customized fine-tuned models

Trade-offs vs API ModelsΒΆ

Local (HuggingFace)

API (OpenAI/Anthropic)

TTFT

50-500ms

200-700ms

TPS

5-60 (CPU/GPU)

50-150

Cost

Hardware only

Per token

Privacy

Complete

Data sent to vendor

Setup

Complex

Simple

# HuggingFace TextIteratorStreamer
# Uses GPT-2 (smallest model, ~500MB) to demonstrate the pattern
# For production: replace 'gpt2' with 'meta-llama/Llama-3.2-1B' or similar

from threading import Thread


def stream_huggingface(
    prompt: str,
    model_name: str = "gpt2",
    max_new_tokens: int = 80
) -> dict:
    """
    Stream tokens from a local HuggingFace model using TextIteratorStreamer.
    
    Requires: pip install transformers torch
    """
    try:
        from transformers import (
            AutoModelForCausalLM,
            AutoTokenizer,
            TextIteratorStreamer
        )
    except ImportError:
        print("transformers not installed. Run: pip install transformers torch")
        return {"error": "transformers not installed"}

    print(f"Loading {model_name}... (first run downloads model)")
    tokenizer = AutoTokenizer.from_pretrained(model_name)
    model = AutoModelForCausalLM.from_pretrained(model_name)
    print(f"Model loaded.")

    # Tokenize input
    inputs = tokenizer([prompt], return_tensors="pt")

    # Create the streamer - skip_special_tokens removes <eos> etc.
    streamer = TextIteratorStreamer(
        tokenizer,
        skip_prompt=True,       # Don't re-emit the input prompt
        skip_special_tokens=True
    )

    # Generation kwargs - pass the streamer here
    gen_kwargs = {
        **inputs,
        "streamer": streamer,
        "max_new_tokens": max_new_tokens,
        "do_sample": True,
        "temperature": 0.7,
        "repetition_penalty": 1.3
    }

    # Start generation in a background thread
    thread = Thread(target=model.generate, kwargs=gen_kwargs)
    thread.start()

    # Consume tokens as they become available
    start = time.time()
    first_token_time = None
    token_count = 0
    collected = []

    print(f"\nStreaming from {model_name}:\n")
    print("-" * 40)

    for text in streamer:
        if first_token_time is None:
            first_token_time = time.time()
        print(text, end="", flush=True)
        collected.append(text)
        token_count += 1

    thread.join()  # Wait for generation to complete

    elapsed = time.time() - start
    ttft = (first_token_time - start) if first_token_time else 0

    print("\n" + "-" * 40)
    print(f"TTFT:  {ttft:.3f}s")
    print(f"TPS:   {token_count / elapsed:.1f}")
    print(f"Total: {elapsed:.3f}s")

    return {
        "text": "".join(collected),
        "ttft": ttft,
        "tps": token_count / elapsed
    }


# Uncomment to run (downloads ~500MB gpt2 model on first run):
# result = stream_huggingface("Python is a programming language that", model_name="gpt2")

print("HuggingFace TextIteratorStreamer function defined.")
print()
print("Uncomment the last line to run (requires ~500MB download for gpt2).")
print("For instruction-following models try: 'microsoft/phi-2' or 'google/gemma-2b-it'")

Part 12: Simple HTML Frontend for StreamingΒΆ

Browser EventSource APIΒΆ

The browser has a built-in EventSource class that connects to SSE endpoints:

const evtSource = new EventSource('/stream?prompt=Hello');

evtSource.onmessage = (event) => {
    const data = JSON.parse(event.data);
    if (data.type === 'done') {
        evtSource.close();
        return;
    }
    document.getElementById('output').textContent += data.text;
};

evtSource.onerror = () => evtSource.close();

The browser automatically reconnects if the connection drops (using the retry: field in SSE).

# Generate a complete HTML streaming demo frontend

html_content = '''<!DOCTYPE html>
<html lang="en">
<head>
  <meta charset="UTF-8">
  <meta name="viewport" content="width=device-width, initial-scale=1.0">
  <title>LLM Streaming Demo</title>
  <style>
    body {
      font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
      max-width: 800px;
      margin: 50px auto;
      padding: 0 20px;
      background: #f5f5f5;
    }
    h1 { color: #333; }
    .controls { display: flex; gap: 8px; margin-bottom: 12px; }
    #prompt {
      flex: 1;
      padding: 10px;
      border: 1px solid #ddd;
      border-radius: 6px;
      font-size: 14px;
    }
    button {
      padding: 10px 18px;
      border: none;
      border-radius: 6px;
      cursor: pointer;
      font-size: 14px;
    }
    #streamBtn { background: #4CAF50; color: white; }
    #streamBtn:disabled { background: #aaa; cursor: not-allowed; }
    #stopBtn { background: #f44336; color: white; }
    .stats {
      font-size: 12px;
      color: #666;
      margin-bottom: 8px;
    }
    #output {
      background: white;
      border: 1px solid #ddd;
      border-radius: 6px;
      padding: 16px;
      min-height: 120px;
      white-space: pre-wrap;
      font-size: 15px;
      line-height: 1.6;
    }
    .cursor {
      display: inline-block;
      width: 2px;
      height: 1.1em;
      background: #333;
      vertical-align: text-bottom;
      animation: blink 0.8s step-end infinite;
    }
    @keyframes blink { 0%,100%{opacity:1} 50%{opacity:0} }
    .provider-select { padding: 8px; border-radius: 6px; border: 1px solid #ddd; }
  </style>
</head>
<body>
  <h1>LLM Streaming Demo</h1>
  <p>Connects to FastAPI SSE endpoint at <code>http://localhost:8000/stream</code></p>

  <div class="controls">
    <select id="provider" class="provider-select">
      <option value="openai">OpenAI (gpt-4o-mini)</option>
      <option value="anthropic">Anthropic (claude-haiku)</option>
    </select>
    <input id="prompt" type="text"
           placeholder="Enter your prompt..."
           value="Explain streaming in 3 sentences.">
    <button id="streamBtn" onclick="startStream()">Stream</button>
    <button id="stopBtn" onclick="stopStream()">Stop</button>
  </div>

  <div class="stats">
    Status: <span id="status">Ready</span> |
    Tokens: <span id="tokenCount">0</span> |
    TTFT: <span id="ttft">-</span> |
    TPS: <span id="tps">-</span>
  </div>

  <div id="output">Output will appear here...</div>

  <script>
    let evtSource = null;
    let tokenCount = 0;
    let startTime = null;
    let firstTokenTime = null;
    let tpsInterval = null;

    function startStream() {
      const prompt = document.getElementById("prompt").value.trim();
      const provider = document.getElementById("provider").value;
      if (!prompt) return;

      // Reset state
      stopStream();
      tokenCount = 0;
      startTime = Date.now();
      firstTokenTime = null;

      document.getElementById("output").innerHTML = "<span class=\\"cursor\\"></span>";
      document.getElementById("status").textContent = "Connecting...";
      document.getElementById("tokenCount").textContent = "0";
      document.getElementById("ttft").textContent = "-";
      document.getElementById("tps").textContent = "-";
      document.getElementById("streamBtn").disabled = true;

      const endpoint = provider === "anthropic"
        ? "/stream/anthropic"
        : "/stream";
      const url = `http://localhost:8000${endpoint}?prompt=${encodeURIComponent(prompt)}`;

      evtSource = new EventSource(url);

      evtSource.onopen = () => {
        document.getElementById("status").textContent = "Streaming...";
      };

      evtSource.onmessage = (e) => {
        let payload;
        try { payload = JSON.parse(e.data); } catch { return; }

        if (payload.type === "done") {
          stopStream();
          document.getElementById("status").textContent = "Done";
          const cursor = document.querySelector(".cursor");
          if (cursor) cursor.remove();
          return;
        }

        if (payload.type === "error") {
          document.getElementById("status").textContent = "Error: " + payload.message;
          stopStream();
          return;
        }

        if (payload.type === "token") {
          if (!firstTokenTime) {
            firstTokenTime = Date.now();
            document.getElementById("ttft").textContent =
              ((firstTokenTime - startTime) / 1000).toFixed(3) + "s";
          }
          tokenCount++;
          document.getElementById("tokenCount").textContent = tokenCount;

          const elapsed = (Date.now() - startTime) / 1000;
          document.getElementById("tps").textContent = (tokenCount / elapsed).toFixed(1);

          // Insert text before cursor
          const cursor = document.querySelector(".cursor");
          if (cursor) {
            cursor.insertAdjacentText("beforebegin", payload.text);
          } else {
            document.getElementById("output").textContent += payload.text;
          }
        }
      };

      evtSource.onerror = () => {
        if (document.getElementById("status").textContent === "Streaming...") {
          document.getElementById("status").textContent = "Disconnected";
        }
        const cursor = document.querySelector(".cursor");
        if (cursor) cursor.remove();
        stopStream();
      };
    }

    function stopStream() {
      if (evtSource) { evtSource.close(); evtSource = null; }
      if (tpsInterval) { clearInterval(tpsInterval); tpsInterval = null; }
      document.getElementById("streamBtn").disabled = false;
      if (document.getElementById("status").textContent === "Streaming...") {
        document.getElementById("status").textContent = "Stopped";
      }
    }

    // Allow Enter key to submit
    document.addEventListener("DOMContentLoaded", () => {
      document.getElementById("prompt").addEventListener("keydown", (e) => {
        if (e.key === "Enter") startStream();
      });
    });
  </script>
</body>
</html>
'''

html_path = "/tmp/streaming_demo.html"
with open(html_path, "w") as f:
    f.write(html_content)

print(f"HTML frontend written to: {html_path}")
print()
print("To use it:")
print("  1. Start the FastAPI server: python /tmp/streaming_server.py")
print(f"  2. Open {html_path} in your browser")
print("  3. Type a prompt and click 'Stream'")
print()
print("The frontend displays:")
print("  - Animated cursor during generation")
print("  - Live token count and TPS")
print("  - TTFT measurement")
print("  - Stop button for cancellation")

SummaryΒΆ

What We CoveredΒΆ

  1. Why streaming matters: TTFT is more important than total latency for perceived performance. Users tolerate long responses if they see output immediately.

  2. OpenAI streaming: stream=True returns an iterator of ChatCompletionChunk objects. Always check delta.content for None before using.

  3. Anthropic streaming: Context manager pattern. stream.text_stream yields clean str fragments. stream.get_final_message() retrieves usage stats.

  4. SSE protocol: Plain HTTP, server-to-client only, data: payload\n\n format. Browser auto-reconnects. Best for LLM streaming use cases.

  5. FastAPI SSE: EventSourceResponse + async generator function. Add CORS middleware for browser access.

  6. Error handling: Always collect partial responses before surfacing errors. Handle APITimeoutError, APIConnectionError, RateLimitError separately.

  7. Cancellation: asyncio.Event for cooperative async cancellation. tqdm for synchronous progress bars.

  8. Multi-provider adapter: Wrap each provider in a generator with the same signature. Swap providers without changing application code.

  9. Token counting: tiktoken for real-time input/output token tracking. Enforce token budgets to control cost.

  10. HuggingFace local models: TextIteratorStreamer + background Thread. Best for privacy-sensitive or high-volume deployments.

  11. HTML frontend: EventSource API is the browser’s native SSE client. Shows animated cursor, live TPS, and TTFT metrics.

Decision GuideΒΆ

Need bidirectional communication? ─── YES ──► Use WebSockets (see Notebook 2)
         β”‚
         NO
         β”‚
         β–Ό
Need browser support? ──────────────── YES ──► Use SSE (EventSource API)
         β”‚
         NO
         β”‚
         β–Ό
Server-to-server only? ─────────────── YES ──► Use httpx streaming (chunked HTTP)

Quick ReferenceΒΆ

# OpenAI
for chunk in client.chat.completions.create(stream=True, ...):
    text = chunk.choices[0].delta.content or ""

# Anthropic
with client.messages.stream(...) as s:
    for text in s.text_stream:
        ...

# FastAPI SSE
@app.get("/stream")
async def endpoint():
    return EventSourceResponse(my_async_generator())

# Browser
const src = new EventSource('/stream?prompt=...');
src.onmessage = e => console.log(JSON.parse(e.data));

Next: Notebook 2 covers WebSocket connections for bidirectional real-time AI chat.