Streaming LLM ResponsesΒΆ
Phase 20 - Notebook 1ΒΆ
What you will learn:
Why streaming matters for user experience (TTFT vs total latency)
OpenAI streaming with
stream=TrueAnthropic streaming with context manager
HuggingFace
TextIteratorStreamerfor local modelsServer-Sent Events (SSE) protocol and wire format
FastAPI SSE endpoint implementation
Handling stream chunks and delta content
Error handling in streams (timeouts, disconnections)
Progress tracking and cancellation
Multi-provider streaming adapter pattern
Token counting during streaming with
tiktokenMeasuring TTFT and TPS
Simple HTML frontend for streaming
Prerequisites: Python async/await basics, OpenAI and Anthropic API keys
# Install required packages
!pip install openai anthropic fastapi sse-starlette uvicorn httpx tiktoken tqdm python-dotenv -q
import os
import time
import json
import asyncio
from typing import Generator, Optional, AsyncGenerator
from dotenv import load_dotenv
import openai
import anthropic
load_dotenv()
# Verify API keys are present
openai_key = os.getenv("OPENAI_API_KEY", "")
anthropic_key = os.getenv("ANTHROPIC_API_KEY", "")
print("Environment setup:")
print(f" OpenAI key: {'set' if openai_key else 'NOT SET - set OPENAI_API_KEY in .env'}")
print(f" Anthropic key: {'set' if anthropic_key else 'NOT SET - set ANTHROPIC_API_KEY in .env'}")
print("\nAll imports successful.")
Part 1: Why Streaming MattersΒΆ
The Problem with Non-Streaming APIsΒΆ
When you call an LLM API without streaming, the server generates the entire response before sending a single byte to the client. For a 500-token response at 50 tokens/second, that means 10 seconds of silence before any output appears.
Non-Streaming Timeline:
t=0s [Request sent] ββββββββββββββββββββββββββββββββββββββββββββ t=10s [Full response received]
User stares at blank screen for 10 seconds
Streaming Timeline:
t=0s [Request sent]
t=0.3s [FIRST TOKEN] ββ token ββ token ββ token ββ ... ββ t=10s [Last token]
User sees output almost immediately (TTFT = 0.3s)
Key MetricsΒΆ
Metric |
Definition |
Target |
|---|---|---|
TTFT (Time To First Token) |
Time from request to first token received |
< 500ms |
TPS (Tokens Per Second) |
How many tokens arrive per second |
30-100 TPS |
Total Latency |
Time from request to last token |
Depends on length |
E2E Latency |
Full round-trip including network |
TTFT + generation time |
Why This Matters for UXΒΆ
0-100ms: Feels instantaneous
100-300ms: Feels responsive
300-1000ms: User notices the delay
> 1000ms: User feels the app is slow
Streaming lets you hit the βfeels responsiveβ range even when total generation takes 10+ seconds.
# Demonstrate the non-streaming vs streaming difference
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
PROMPT = "Write a short poem about the Python programming language (4 lines)."
# --- Non-streaming call ---
print("=" * 50)
print("NON-STREAMING: waiting for complete response...")
print("=" * 50)
start = time.time()
response = client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": PROMPT}],
stream=False
)
elapsed_non_stream = time.time() - start
print(f"\nWaited {elapsed_non_stream:.2f}s before seeing anything.")
print("\nResponse:")
print(response.choices[0].message.content)
print(f"\nTotal tokens: {response.usage.total_tokens}")
print()
print("=" * 50)
print("STREAMING: tokens appear as they are generated...")
print("=" * 50)
start = time.time()
first_token_time = None
token_count = 0
for chunk in client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": PROMPT}],
stream=True
):
if chunk.choices[0].delta.content:
if first_token_time is None:
first_token_time = time.time()
print(f"\n[TTFT: {first_token_time - start:.3f}s] ", end="")
print(chunk.choices[0].delta.content, end="", flush=True)
token_count += 1
elapsed_stream = time.time() - start
print(f"\n\nTotal time: {elapsed_stream:.2f}s")
print(f"TTFT: {first_token_time - start:.3f}s (user saw output after this)")
print(f"Tokens: {token_count}")
print(f"TPS: {token_count / elapsed_stream:.1f}")
Part 2: OpenAI Streaming APIΒΆ
How It WorksΒΆ
Add stream=True to any chat.completions.create() call. The method returns an iterator of
ChatCompletionChunk objects instead of a single ChatCompletion.
Chunk StructureΒΆ
ChatCompletionChunk(
id='chatcmpl-...',
choices=[
Choice(
delta=ChoiceDelta(
content='Hello', # The new text fragment (can be None)
role=None, # Only set on first chunk
),
finish_reason=None, # 'stop', 'length', 'content_filter', or None
index=0
)
],
model='gpt-4o-mini',
usage=None # Only in last chunk if stream_options={'include_usage': True}
)
Important NotesΒΆ
delta.contentcan beNoneon the first and last chunks (role assignment / finish)finish_reasonis only non-None on the last chunkfinish_reason='stop'= natural completionfinish_reason='length'= hitmax_tokenslimitfinish_reason='content_filter'= content policy triggered
# OpenAI streaming with full chunk inspection and metrics
def openai_stream(
prompt: str,
model: str = "gpt-4o-mini",
max_tokens: int = 200,
verbose_chunks: bool = False
) -> dict:
"""Stream OpenAI response with TTFT and TPS measurement."""
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
collected_content = []
finish_reason = None
chunk_count = 0
token_count = 0
start = time.time()
first_token_time = None
print(f"\nPrompt: {prompt[:60]}...\n")
print("-" * 40)
for chunk in client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=max_tokens
):
chunk_count += 1
delta = chunk.choices[0].delta
reason = chunk.choices[0].finish_reason
if verbose_chunks:
print(f" [chunk {chunk_count}] content={repr(delta.content)} finish={reason}")
if delta.content:
if first_token_time is None:
first_token_time = time.time()
print(delta.content, end="", flush=True)
collected_content.append(delta.content)
token_count += 1
if reason:
finish_reason = reason
total_time = time.time() - start
ttft = (first_token_time - start) if first_token_time else 0
print("\n" + "-" * 40)
print(f"Finish reason: {finish_reason}")
print(f"Chunks received:{chunk_count}")
print(f"Tokens emitted: {token_count}")
print(f"TTFT: {ttft:.3f}s")
print(f"Total time: {total_time:.3f}s")
print(f"TPS: {token_count / total_time:.1f}")
return {
"content": "".join(collected_content),
"ttft": ttft,
"total_time": total_time,
"tps": token_count / total_time,
"finish_reason": finish_reason
}
# Run it
result = openai_stream(
"Explain what a Python generator is and give a simple example.",
model="gpt-4o-mini",
max_tokens=150
)
print(f"\nFull response length: {len(result['content'])} characters")
# Inspect raw chunk structure to understand the wire format
print("First 5 chunks from a streaming call:")
print("=" * 60)
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
for i, chunk in enumerate(client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": "Say hello in 5 words."}],
stream=True
)):
print(f"\nChunk {i + 1}:")
print(f" id: {chunk.id[:20]}...")
print(f" model: {chunk.model}")
print(f" delta.role: {chunk.choices[0].delta.role}")
print(f" delta.content: {repr(chunk.choices[0].delta.content)}")
print(f" finish_reason: {chunk.choices[0].finish_reason}")
if i >= 4:
print(" ... (more chunks follow)")
break
Part 3: Anthropic StreamingΒΆ
Key Differences from OpenAIΒΆ
Anthropicβs streaming uses a context manager pattern (with client.messages.stream(...) as stream:).
This ensures the underlying HTTP connection is properly closed even if an exception occurs.
# OpenAI style (iterator)
for chunk in client.chat.completions.create(stream=True, ...):
text = chunk.choices[0].delta.content
# Anthropic style (context manager)
with client.messages.stream(...) as stream:
for text in stream.text_stream: # yields str directly, no None checks needed
print(text)
stream.text_stream vs streamΒΆ
stream.text_stream- yieldsstrfragments only (None filtered out)streamitself - yields rawMessageStreamEventobjects for lower-level controlstream.get_final_message()- returns the completeMessageafter streaming
# Anthropic streaming with TTFT and TPS measurement
def anthropic_stream(
prompt: str,
model: str = "claude-3-haiku-20240307",
max_tokens: int = 200
) -> dict:
"""Stream Anthropic response with metrics."""
anth_client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
collected = []
token_count = 0
start = time.time()
first_token_time = None
print(f"Model: {model}")
print(f"Prompt: {prompt[:60]}...\n")
print("-" * 40)
with anth_client.messages.stream(
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
if first_token_time is None:
first_token_time = time.time()
print(text, end="", flush=True)
collected.append(text)
token_count += 1
# Get final message for usage stats
final_msg = stream.get_final_message()
total_time = time.time() - start
ttft = (first_token_time - start) if first_token_time else 0
print("\n" + "-" * 40)
print(f"Input tokens: {final_msg.usage.input_tokens}")
print(f"Output tokens: {final_msg.usage.output_tokens}")
print(f"Stop reason: {final_msg.stop_reason}")
print(f"TTFT: {ttft:.3f}s")
print(f"Total time: {total_time:.3f}s")
print(f"TPS (approx): {token_count / total_time:.1f}")
return {
"content": "".join(collected),
"ttft": ttft,
"total_time": total_time,
"input_tokens": final_msg.usage.input_tokens,
"output_tokens": final_msg.usage.output_tokens
}
result = anthropic_stream(
"Explain recursion with a simple Python example.",
model="claude-3-haiku-20240307",
max_tokens=150
)
Part 4: Server-Sent Events (SSE) ProtocolΒΆ
What is SSE?ΒΆ
Server-Sent Events is a W3C standard for pushing data from a server to a browser over a plain HTTP connection. It is one-directional (server to client only) and uses HTTP chunked transfer encoding under the hood.
Wire FormatΒΆ
Each event is separated by two newlines (\n\n). Fields:
data: {"text": "Hello"}\n\n
data: {"text": " world"}\n\n
# Named event:
event: token\n
data: {"text": "Hello"}\n\n
# Comment (keepalive):
: heartbeat\n\n
# Termination signal (convention):
data: [DONE]\n\n
SSE vs WebSocket ComparisonΒΆ
Feature |
SSE |
WebSocket |
|---|---|---|
Direction |
Server β Client only |
Bidirectional |
Protocol |
HTTP/1.1 or HTTP/2 |
Custom (ws://) |
Auto-reconnect |
Built into browser |
Must implement manually |
Browser support |
All modern browsers |
All modern browsers |
Proxies/firewalls |
Usually fine (itβs HTTP) |
May be blocked |
Use case |
LLM streaming, logs, notifications |
Chat, games, collaboration |
Complexity |
Simple |
More complex |
When to Use SSEΒΆ
Use SSE when you only need server-to-client push: LLM streaming, live feeds, progress updates. Use WebSockets when you need bidirectional communication: chat, collaborative editing, games.
# FastAPI SSE endpoint implementation
# This defines the server app. Run it with:
# uvicorn <script_name>:app --reload --port 8000
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import asyncio
import json
app = FastAPI(title="LLM Streaming API")
# Allow browser requests from any origin
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["GET", "POST"],
allow_headers=["*"]
)
async def openai_sse_generator(prompt: str, model: str = "gpt-4o-mini"):
"""Async generator that yields SSE-formatted events from OpenAI streaming."""
oai = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
try:
async with oai.chat.completions.stream(
model=model,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for chunk in stream:
token = chunk.choices[0].delta.content
if token:
# Yield structured JSON payload
yield {"data": json.dumps({"text": token, "type": "token"})}
# Signal completion
yield {"data": json.dumps({"type": "done"})}
except Exception as e:
yield {"data": json.dumps({"type": "error", "message": str(e)})}
@app.get("/stream")
async def stream_endpoint(request: Request, prompt: str = "Tell me a fun fact about space."):
"""
SSE endpoint. Client connects once; server pushes tokens as they arrive.
Usage:
curl 'http://localhost:8000/stream?prompt=Hello+world'
EventSource('http://localhost:8000/stream?prompt=Hello') in browser
"""
return EventSourceResponse(
openai_sse_generator(prompt),
media_type="text/event-stream"
)
@app.get("/stream/anthropic")
async def stream_anthropic_endpoint(
request: Request,
prompt: str = "Explain asyncio in two sentences."
):
"""SSE endpoint backed by Anthropic Claude."""
async def anthropic_generator():
anth = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
try:
with anth.messages.stream(
model="claude-3-haiku-20240307",
max_tokens=300,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield {"data": json.dumps({"text": text, "type": "token"})}
yield {"data": json.dumps({"type": "done"})}
except Exception as e:
yield {"data": json.dumps({"type": "error", "message": str(e)})}
return EventSourceResponse(anthropic_generator())
print("FastAPI SSE app defined.")
print()
print("To run the server:")
print(" 1. Save this cell's app to a file: streaming_server.py")
print(" 2. Run: uvicorn streaming_server:app --reload --port 8000")
print()
print("Endpoints:")
print(" GET /stream?prompt=... (OpenAI)")
print(" GET /stream/anthropic?prompt=... (Anthropic)")
# Save the FastAPI server to a runnable Python file
server_code = '''
import os, json, asyncio
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from sse_starlette.sse import EventSourceResponse
import openai
import anthropic
from dotenv import load_dotenv
load_dotenv()
app = FastAPI(title="LLM Streaming API")
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
async def openai_sse_generator(prompt: str, model: str = "gpt-4o-mini"):
oai = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
try:
async with oai.chat.completions.stream(
model=model,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for chunk in stream:
token = chunk.choices[0].delta.content
if token:
yield {"data": json.dumps({"text": token, "type": "token"})}
yield {"data": json.dumps({"type": "done"})}
except Exception as e:
yield {"data": json.dumps({"type": "error", "message": str(e)})}
@app.get("/stream")
async def stream_endpoint(request: Request, prompt: str = "Tell me a fun fact."):
return EventSourceResponse(openai_sse_generator(prompt))
@app.get("/")
async def root():
return {"message": "LLM Streaming API", "endpoints": ["/stream"]}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
'''
server_path = "/tmp/streaming_server.py"
with open(server_path, "w") as f:
f.write(server_code)
print(f"Server saved to {server_path}")
print("Start with: python /tmp/streaming_server.py")
print("Or: uvicorn streaming_server:app --reload --port 8000")
print()
print("Test with curl (requires server running):")
print(" curl 'http://localhost:8000/stream?prompt=Hello+world'")
# Consuming an SSE endpoint from Python using httpx
# This is useful for testing your SSE server from another Python process
import httpx
import asyncio
import json
async def consume_sse(
url: str,
timeout: float = 30.0,
on_token=None
) -> str:
"""
Consume an SSE endpoint and return the full accumulated text.
Args:
url: SSE endpoint URL
timeout: Seconds to wait before giving up
on_token: Optional callback called with each token string
"""
collected = []
token_count = 0
start = time.time()
async with httpx.AsyncClient() as client:
async with client.stream("GET", url, timeout=timeout) as response:
response.raise_for_status()
print(f"Connected. Status: {response.status_code}")
print(f"Content-Type: {response.headers.get('content-type')}")
print("-" * 40)
async for line in response.aiter_lines():
if not line or line.startswith(":"): # Skip comments/keepalives
continue
if line.startswith("data: "):
raw_data = line[6:] # Strip "data: " prefix
try:
payload = json.loads(raw_data)
except json.JSONDecodeError:
continue
if payload.get("type") == "done":
print("\n[Stream complete]")
break
if payload.get("type") == "error":
print(f"\n[Error] {payload.get('message')}")
break
if payload.get("type") == "token":
text = payload["text"]
collected.append(text)
token_count += 1
print(text, end="", flush=True)
if on_token:
on_token(text)
elapsed = time.time() - start
print(f"\nTokens: {token_count} | Time: {elapsed:.2f}s | TPS: {token_count/elapsed:.1f}")
return "".join(collected)
# Usage (requires the FastAPI server running on port 8000)
print("SSE consumer defined.")
print("Usage (requires server on port 8000):")
print()
print(" import asyncio")
print(" result = asyncio.run(consume_sse(")
print(" 'http://localhost:8000/stream?prompt=Tell+me+a+joke'")
print(" ))")
print()
print("# Uncomment to run if server is active:")
# result = asyncio.run(consume_sse("http://localhost:8000/stream?prompt=Tell+me+a+joke"))
Part 5: Handling Stream Chunks and Delta ContentΒΆ
Accumulation PatternΒΆ
Always accumulate the full response alongside streaming. You need it for:
Storing in a database
Passing to the next step in a pipeline
Computing final token counts
Retry logic if something fails after streaming
Defensive Chunk HandlingΒΆ
# Bad: assumes delta.content is always a non-empty string
for chunk in stream:
print(chunk.choices[0].delta.content) # Can print "None"
# Good: check before using
for chunk in stream:
content = chunk.choices[0].delta.content
if content: # Handles None and empty string
print(content, end="", flush=True)
Finish Reason ValuesΒΆ
Value |
Meaning |
Action |
|---|---|---|
|
Natural completion |
Normal |
|
Hit max_tokens |
May want to continue with another call |
|
Safety filter triggered |
Handle gracefully, log |
|
Model wants to call a tool |
Process tool call |
|
Not the last chunk |
Continue iteration |
# Robust chunk handler with accumulation and finish reason detection
def stream_with_accumulation(
prompt: str,
model: str = "gpt-4o-mini",
max_tokens: int = 200,
system: str = None
) -> dict:
"""
Stream and accumulate a response, returning stats and full text.
Handles all edge cases in chunk processing.
"""
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
messages = []
if system:
messages.append({"role": "system", "content": system})
messages.append({"role": "user", "content": prompt})
full_response = []
finish_reason = None
model_used = None
print(f"Streaming response:\n")
for chunk in client.chat.completions.create(
model=model,
messages=messages,
stream=True,
max_tokens=max_tokens
):
# Capture model name from first chunk
if model_used is None:
model_used = chunk.model
choice = chunk.choices[0]
# Safe content extraction
content = choice.delta.content
if content is not None and content != "":
full_response.append(content)
print(content, end="", flush=True)
# Capture finish reason when it arrives
if choice.finish_reason is not None:
finish_reason = choice.finish_reason
final_text = "".join(full_response)
print("\n")
if finish_reason == "length":
print("[WARNING] Response was cut off (hit max_tokens limit).")
print(" Consider increasing max_tokens or splitting the request.")
elif finish_reason == "content_filter":
print("[WARNING] Content was filtered by safety policy.")
elif finish_reason == "stop":
print("[OK] Response completed naturally.")
return {
"text": final_text,
"finish_reason": finish_reason,
"model": model_used,
"char_count": len(final_text)
}
# Test with a truncated response (low max_tokens)
print("Test 1: Normal completion")
r1 = stream_with_accumulation("What is 2 + 2?", max_tokens=50)
print(f"Finish reason: {r1['finish_reason']}")
print("\nTest 2: Truncated (max_tokens=10)")
r2 = stream_with_accumulation("Write a long story about dragons.", max_tokens=10)
print(f"Finish reason: {r2['finish_reason']}")
Part 6: Error Handling in StreamsΒΆ
Error CategoriesΒΆ
Pre-stream errors - API key invalid, rate limit before first token, network unreachable
Mid-stream errors - Connection dropped, timeout while streaming, server restart
Content errors - Content filter triggered mid-stream
Key Principle: Partial Response RecoveryΒΆ
Always save what you have before propagating an error. A partial response is almost always better than nothing.
try:
for chunk in stream:
collected.append(chunk.choices[0].delta.content or "")
except openai.APITimeoutError:
# We have a partial response - use it rather than failing completely
return {"text": "".join(collected), "partial": True}
# Comprehensive error handling for streaming
import openai
def stream_with_error_handling(
prompt: str,
model: str = "gpt-4o-mini",
timeout: float = 30.0,
max_tokens: int = 200
) -> dict:
"""
Stream with comprehensive error handling and partial response recovery.
Returns a result dict with 'text', 'error', and 'partial' fields.
"""
client = openai.OpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
timeout=timeout
)
collected = []
error_info = None
is_partial = False
start = time.time()
try:
for chunk in client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=max_tokens
):
content = chunk.choices[0].delta.content
if content:
collected.append(content)
print(content, end="", flush=True)
except openai.APITimeoutError as e:
error_info = {"type": "timeout", "message": str(e)}
is_partial = len(collected) > 0
print(f"\n[TIMEOUT after {time.time() - start:.1f}s]")
if is_partial:
print(f" Partial response has {len(collected)} tokens.")
except openai.APIConnectionError as e:
error_info = {"type": "connection", "message": str(e)}
is_partial = len(collected) > 0
print(f"\n[CONNECTION ERROR] {e}")
except openai.RateLimitError as e:
error_info = {"type": "rate_limit", "message": str(e)}
print(f"\n[RATE LIMIT] Retry after cooling off.")
# In production: implement exponential backoff here
except openai.AuthenticationError as e:
error_info = {"type": "auth", "message": "Invalid API key"}
print(f"\n[AUTH ERROR] Check your OPENAI_API_KEY.")
except openai.BadRequestError as e:
error_info = {"type": "bad_request", "message": str(e)}
print(f"\n[BAD REQUEST] {e}")
except KeyboardInterrupt:
error_info = {"type": "cancelled", "message": "User cancelled"}
is_partial = len(collected) > 0
print(f"\n[CANCELLED by user]")
except Exception as e:
error_info = {"type": "unknown", "message": str(e)}
print(f"\n[UNEXPECTED ERROR] {type(e).__name__}: {e}")
finally:
# Always return what we have
partial_text = "".join(collected)
return {
"text": partial_text,
"partial": is_partial,
"error": error_info,
"elapsed": time.time() - start
}
# Test normal operation
print("Test: Normal streaming")
result = stream_with_error_handling("Say 'Hello, streaming!' and nothing else.")
print(f"\nResult: text={repr(result['text'])}, error={result['error']}")
# Test with very short timeout to simulate timeout error
print("\n" + "=" * 40)
print("Test: Simulated timeout (timeout=0.001s)")
result2 = stream_with_error_handling(
"Write a 500 word essay on AI.",
timeout=0.001 # Extremely short timeout - will almost always time out
)
print(f"Error type: {result2['error']['type'] if result2['error'] else None}")
print(f"Partial text collected: {repr(result2['text'][:50])}")
Part 7: Progress Tracking and CancellationΒΆ
Token Budget TrackingΒΆ
Use max_tokens to set a hard limit, and track consumed tokens in real-time:
token_budget = 100
tokens_used = 0
for chunk in stream:
if tokens_used >= token_budget:
break # Soft cancellation
tokens_used += 1
Cooperative Cancellation with asyncio.EventΒΆ
For async code, use an asyncio.Event as a cancellation token:
cancel = asyncio.Event()
# In another coroutine: await cancel_event.set()
async for chunk in stream:
if cancel.is_set():
break
# Progress tracking with tqdm
from tqdm.auto import tqdm
def stream_with_progress(
prompt: str,
max_tokens: int = 100,
model: str = "gpt-4o-mini"
) -> str:
"""
Stream with a live progress bar showing token budget consumption.
"""
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
collected = []
pbar = tqdm(
total=max_tokens,
desc="Generating",
unit="tok",
bar_format="{l_bar}{bar}| {n_fmt}/{total_fmt} tokens [{elapsed}]"
)
try:
for chunk in client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=max_tokens
):
content = chunk.choices[0].delta.content
if content:
collected.append(content)
pbar.update(1)
finally:
pbar.close()
result = "".join(collected)
print(f"\nGenerated text:\n{result}")
return result
text = stream_with_progress(
"List 5 Python best practices, one per line.",
max_tokens=120
)
# Async cancellation using asyncio.Event
import asyncio
async def cancellable_stream(
prompt: str,
cancel_event: asyncio.Event,
model: str = "gpt-4o-mini"
) -> dict:
"""
Stream that can be cancelled by setting cancel_event.
The cancel_event can be set from another coroutine.
"""
oai = openai.AsyncOpenAI(api_key=os.getenv("OPENAI_API_KEY"))
collected = []
token_count = 0
cancelled = False
try:
async with oai.chat.completions.stream(
model=model,
messages=[{"role": "user", "content": prompt}]
) as stream:
async for chunk in stream:
# Check cancellation before each token
if cancel_event.is_set():
cancelled = True
print("\n[Stream cancelled at user request]")
break
content = chunk.choices[0].delta.content
if content:
collected.append(content)
token_count += 1
print(content, end="", flush=True)
except asyncio.CancelledError:
print("\n[Task was cancelled externally]")
cancelled = True
return {
"text": "".join(collected),
"tokens": token_count,
"cancelled": cancelled
}
async def demo_cancellation():
"""Demo: cancel stream after 3 tokens."""
cancel = asyncio.Event()
# Schedule cancellation after 3 tokens worth of time (~0.2s)
async def auto_cancel():
await asyncio.sleep(0.3)
print("\n[Auto-cancel triggered after 0.3s]")
cancel.set()
canceller = asyncio.create_task(auto_cancel())
result = await cancellable_stream(
"Count from 1 to 100, one number per line.",
cancel_event=cancel
)
await canceller
print(f"\nTokens collected before cancel: {result['tokens']}")
print(f"Was cancelled: {result['cancelled']}")
print(f"Partial text: {repr(result['text'][:50])}")
# Run the cancellation demo
await demo_cancellation()
Part 8: Multi-Provider Streaming AdapterΒΆ
The Adapter PatternΒΆ
Different providers have different streaming APIs. A thin adapter layer lets you swap providers without changing the rest of your code:
Your App Code
|
βΌ
stream_from_provider(provider, prompt) β unified interface
| | |
βΌ βΌ βΌ
stream_openai stream_anthropic stream_ollama
| | |
βΌ βΌ βΌ
OpenAI API Anthropic API Local Ollama
# Multi-provider streaming adapter
from typing import Generator
import httpx
def stream_openai(
prompt: str,
model: str = "gpt-4o-mini",
max_tokens: int = 200
) -> Generator[str, None, None]:
"""Yield text fragments from OpenAI."""
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
for chunk in client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=max_tokens
):
if chunk.choices[0].delta.content:
yield chunk.choices[0].delta.content
def stream_anthropic(
prompt: str,
model: str = "claude-3-haiku-20240307",
max_tokens: int = 200
) -> Generator[str, None, None]:
"""Yield text fragments from Anthropic."""
client = anthropic.Anthropic(api_key=os.getenv("ANTHROPIC_API_KEY"))
with client.messages.stream(
model=model,
max_tokens=max_tokens,
messages=[{"role": "user", "content": prompt}]
) as stream:
for text in stream.text_stream:
yield text
def stream_ollama(
prompt: str,
model: str = "llama3.2",
base_url: str = "http://localhost:11434"
) -> Generator[str, None, None]:
"""
Yield text fragments from a local Ollama instance.
Requires: ollama running locally (https://ollama.ai)
"""
try:
with httpx.stream(
"POST",
f"{base_url}/api/generate",
json={"model": model, "prompt": prompt, "stream": True},
timeout=60.0
) as response:
for line in response.iter_lines():
if not line:
continue
data = json.loads(line)
if data.get("response"):
yield data["response"]
if data.get("done"):
break
except httpx.ConnectError:
raise RuntimeError(
"Cannot connect to Ollama. Is it running? "
"Install from https://ollama.ai and run: ollama pull llama3.2"
)
# Unified interface
_PROVIDER_MAP = {
"openai": stream_openai,
"anthropic": stream_anthropic,
"ollama": stream_ollama,
}
def stream_from_provider(
provider: str,
prompt: str,
**kwargs
) -> dict:
"""
Stream from any supported provider using a unified interface.
providers: 'openai', 'anthropic', 'ollama'
"""
if provider not in _PROVIDER_MAP:
raise ValueError(f"Unknown provider '{provider}'. Choose from: {list(_PROVIDER_MAP)}")
print(f"\n--- Streaming from {provider} ---")
start = time.time()
first_token_time = None
token_count = 0
collected = []
try:
for text in _PROVIDER_MAP[provider](prompt, **kwargs):
if first_token_time is None:
first_token_time = time.time()
print(text, end="", flush=True)
collected.append(text)
token_count += 1
except RuntimeError as e:
print(f"[SKIPPED] {e}")
return {"provider": provider, "error": str(e)}
elapsed = time.time() - start
ttft = (first_token_time - start) if first_token_time else 0
print(f"\nTTFT={ttft:.3f}s | TPS={token_count/elapsed:.1f} | tokens={token_count}")
return {"provider": provider, "text": "".join(collected), "ttft": ttft, "tps": token_count/elapsed}
# Run on OpenAI (always available)
r1 = stream_from_provider("openai", "What is a Python decorator? One sentence.", max_tokens=60)
# Uncomment to test Anthropic:
# r2 = stream_from_provider("anthropic", "What is a Python decorator? One sentence.", max_tokens=60)
# Uncomment to test Ollama (requires local Ollama):
# r3 = stream_from_provider("ollama", "What is a Python decorator? One sentence.", model="llama3.2")
Part 9: Token Counting During StreamingΒΆ
Why Count Tokens During Streaming?ΒΆ
Cost control: Stop generation when approaching a budget
Rate limit awareness: Track token usage in real-time
UX: Show βX tokens generatedβ in the UI
Analytics: Log actual token usage per request
tiktokenΒΆ
OpenAIβs tiktoken library tokenizes text the same way the models do, enabling accurate
token counting without waiting for the API response.
import tiktoken
enc = tiktoken.encoding_for_model("gpt-4o-mini")
tokens = enc.encode("Hello, world!")
print(len(tokens)) # 4
# Token counting during streaming using tiktoken
import tiktoken
def stream_with_token_count(
prompt: str,
model: str = "gpt-4o-mini",
max_tokens: int = 200,
token_budget: int = None
) -> dict:
"""
Stream with real-time token counting.
Optionally stops when token_budget is exceeded.
"""
# Use cl100k_base for gpt-4 and gpt-3.5 models
try:
enc = tiktoken.encoding_for_model(model)
except KeyError:
enc = tiktoken.get_encoding("cl100k_base")
client_oai = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
# Count input tokens
input_token_count = len(enc.encode(prompt))
output_token_count = 0
collected = []
budget_hit = False
start = time.time()
first_token_time = None
print(f"Input tokens: {input_token_count}")
print(f"Token budget: {token_budget or 'unlimited'}")
print(f"Max tokens: {max_tokens}")
print("-" * 40)
for chunk in client_oai.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=max_tokens
):
content = chunk.choices[0].delta.content
if content:
if first_token_time is None:
first_token_time = time.time()
# Count tokens in this fragment
fragment_tokens = len(enc.encode(content))
output_token_count += fragment_tokens
# Check token budget
if token_budget and output_token_count > token_budget:
budget_hit = True
print(f"\n[BUDGET EXCEEDED: {output_token_count} > {token_budget} tokens]")
break
collected.append(content)
print(content, end="", flush=True)
total_time = time.time() - start
ttft = (first_token_time - start) if first_token_time else 0
print("\n" + "-" * 40)
print(f"Input tokens: {input_token_count}")
print(f"Output tokens: {output_token_count}")
print(f"Total tokens: {input_token_count + output_token_count}")
print(f"TTFT: {ttft:.3f}s")
print(f"Total time: {total_time:.3f}s")
print(f"TPS: {output_token_count / total_time:.1f}")
if budget_hit:
print(f"Budget hit: YES (stopped early)")
return {
"text": "".join(collected),
"input_tokens": input_token_count,
"output_tokens": output_token_count,
"ttft": ttft,
"tps": output_token_count / total_time,
"budget_hit": budget_hit
}
# Normal streaming with token counts
print("Test 1: Normal streaming with token count")
r = stream_with_token_count(
"Explain the difference between a list and a tuple in Python.",
max_tokens=100
)
# Streaming with token budget enforcement
print("\n" + "=" * 40)
print("Test 2: With token budget (stop at 20 tokens)")
r2 = stream_with_token_count(
"Tell me everything about machine learning.",
max_tokens=200,
token_budget=20
)
Part 10: TTFT and TPS BenchmarkingΒΆ
Why Benchmark?ΒΆ
Compare models to choose the right cost/latency trade-off
Detect degraded performance in production
Set SLA thresholds for your application
Industry Benchmarks (approximate, as of 2025)ΒΆ
Model |
Typical TTFT |
Typical TPS |
|---|---|---|
gpt-4o-mini |
200-400ms |
80-120 TPS |
gpt-4o |
300-600ms |
50-80 TPS |
claude-3-haiku |
200-500ms |
80-150 TPS |
claude-3.5-sonnet |
300-700ms |
60-100 TPS |
Local (llama3.2) |
50-200ms |
15-60 TPS |
Numbers vary with load, region, prompt length, and system prompt.
# Benchmark multiple OpenAI models
import statistics
def benchmark_model(
model: str,
prompt: str,
max_tokens: int = 50,
runs: int = 2
) -> dict:
"""Benchmark a single model with multiple runs."""
client = openai.OpenAI(api_key=os.getenv("OPENAI_API_KEY"))
ttfts, tps_list, total_times = [], [], []
for run in range(runs):
start = time.time()
first = None
tokens = 0
for chunk in client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
max_tokens=max_tokens
):
if chunk.choices[0].delta.content:
if first is None:
first = time.time()
tokens += 1
elapsed = time.time() - start
if first and tokens > 0:
ttfts.append(first - start)
tps_list.append(tokens / elapsed)
total_times.append(elapsed)
return {
"model": model,
"runs": runs,
"avg_ttft": statistics.mean(ttfts) if ttfts else 0,
"avg_tps": statistics.mean(tps_list) if tps_list else 0,
"avg_total": statistics.mean(total_times) if total_times else 0,
"ttft_stdev": statistics.stdev(ttfts) if len(ttfts) > 1 else 0
}
def benchmark_models(models: list, prompt: str, max_tokens: int = 50, runs: int = 2):
"""Benchmark multiple models and print a comparison table."""
print(f"Benchmark: {runs} runs each, max_tokens={max_tokens}")
print(f"Prompt: {prompt[:50]}...\n")
print(f"{'Model':<25} {'Avg TTFT':>10} {'Avg TPS':>10} {'Avg Total':>12} {'TTFT Stdev':>12}")
print("-" * 72)
results = []
for model in models:
print(f" Testing {model}...", end="", flush=True)
r = benchmark_model(model, prompt, max_tokens, runs)
results.append(r)
print(f"\r{r['model']:<25} {r['avg_ttft']:>9.3f}s {r['avg_tps']:>9.1f} {r['avg_total']:>11.3f}s {r['ttft_stdev']:>11.3f}s")
# Highlight best TTFT and TPS
best_ttft = min(results, key=lambda x: x['avg_ttft'])
best_tps = max(results, key=lambda x: x['avg_tps'])
print("-" * 72)
print(f"Best TTFT: {best_ttft['model']} ({best_ttft['avg_ttft']:.3f}s)")
print(f"Best TPS: {best_tps['model']} ({best_tps['avg_tps']:.1f} tok/s)")
return results
# Benchmark gpt-4o-mini vs gpt-3.5-turbo
results = benchmark_models(
models=["gpt-4o-mini", "gpt-3.5-turbo"],
prompt="Write a haiku about software engineering.",
max_tokens=50,
runs=2
)
Part 11: HuggingFace TextIteratorStreamerΒΆ
Local Model StreamingΒΆ
For local models via HuggingFace Transformers, use TextIteratorStreamer. It runs generation
in a background thread and makes tokens available through an iterator in the main thread.
ArchitectureΒΆ
Main Thread Background Thread
| |
| Thread(target=model.generate, |
| kwargs={streamer=...}) ββ€
| | β generates tokens
| for text in streamer: βββββββββ€ puts into queue
| print(text) |
When to Use Local ModelsΒΆ
Privacy-sensitive data (nothing leaves your machine)
Cost at high volume (no per-token fees)
Offline deployment
Customized fine-tuned models
Trade-offs vs API ModelsΒΆ
Local (HuggingFace) |
API (OpenAI/Anthropic) |
|
|---|---|---|
TTFT |
50-500ms |
200-700ms |
TPS |
5-60 (CPU/GPU) |
50-150 |
Cost |
Hardware only |
Per token |
Privacy |
Complete |
Data sent to vendor |
Setup |
Complex |
Simple |
# HuggingFace TextIteratorStreamer
# Uses GPT-2 (smallest model, ~500MB) to demonstrate the pattern
# For production: replace 'gpt2' with 'meta-llama/Llama-3.2-1B' or similar
from threading import Thread
def stream_huggingface(
prompt: str,
model_name: str = "gpt2",
max_new_tokens: int = 80
) -> dict:
"""
Stream tokens from a local HuggingFace model using TextIteratorStreamer.
Requires: pip install transformers torch
"""
try:
from transformers import (
AutoModelForCausalLM,
AutoTokenizer,
TextIteratorStreamer
)
except ImportError:
print("transformers not installed. Run: pip install transformers torch")
return {"error": "transformers not installed"}
print(f"Loading {model_name}... (first run downloads model)")
tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(model_name)
print(f"Model loaded.")
# Tokenize input
inputs = tokenizer([prompt], return_tensors="pt")
# Create the streamer - skip_special_tokens removes <eos> etc.
streamer = TextIteratorStreamer(
tokenizer,
skip_prompt=True, # Don't re-emit the input prompt
skip_special_tokens=True
)
# Generation kwargs - pass the streamer here
gen_kwargs = {
**inputs,
"streamer": streamer,
"max_new_tokens": max_new_tokens,
"do_sample": True,
"temperature": 0.7,
"repetition_penalty": 1.3
}
# Start generation in a background thread
thread = Thread(target=model.generate, kwargs=gen_kwargs)
thread.start()
# Consume tokens as they become available
start = time.time()
first_token_time = None
token_count = 0
collected = []
print(f"\nStreaming from {model_name}:\n")
print("-" * 40)
for text in streamer:
if first_token_time is None:
first_token_time = time.time()
print(text, end="", flush=True)
collected.append(text)
token_count += 1
thread.join() # Wait for generation to complete
elapsed = time.time() - start
ttft = (first_token_time - start) if first_token_time else 0
print("\n" + "-" * 40)
print(f"TTFT: {ttft:.3f}s")
print(f"TPS: {token_count / elapsed:.1f}")
print(f"Total: {elapsed:.3f}s")
return {
"text": "".join(collected),
"ttft": ttft,
"tps": token_count / elapsed
}
# Uncomment to run (downloads ~500MB gpt2 model on first run):
# result = stream_huggingface("Python is a programming language that", model_name="gpt2")
print("HuggingFace TextIteratorStreamer function defined.")
print()
print("Uncomment the last line to run (requires ~500MB download for gpt2).")
print("For instruction-following models try: 'microsoft/phi-2' or 'google/gemma-2b-it'")
Part 12: Simple HTML Frontend for StreamingΒΆ
Browser EventSource APIΒΆ
The browser has a built-in EventSource class that connects to SSE endpoints:
const evtSource = new EventSource('/stream?prompt=Hello');
evtSource.onmessage = (event) => {
const data = JSON.parse(event.data);
if (data.type === 'done') {
evtSource.close();
return;
}
document.getElementById('output').textContent += data.text;
};
evtSource.onerror = () => evtSource.close();
The browser automatically reconnects if the connection drops (using the retry: field in SSE).
# Generate a complete HTML streaming demo frontend
html_content = '''<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>LLM Streaming Demo</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
max-width: 800px;
margin: 50px auto;
padding: 0 20px;
background: #f5f5f5;
}
h1 { color: #333; }
.controls { display: flex; gap: 8px; margin-bottom: 12px; }
#prompt {
flex: 1;
padding: 10px;
border: 1px solid #ddd;
border-radius: 6px;
font-size: 14px;
}
button {
padding: 10px 18px;
border: none;
border-radius: 6px;
cursor: pointer;
font-size: 14px;
}
#streamBtn { background: #4CAF50; color: white; }
#streamBtn:disabled { background: #aaa; cursor: not-allowed; }
#stopBtn { background: #f44336; color: white; }
.stats {
font-size: 12px;
color: #666;
margin-bottom: 8px;
}
#output {
background: white;
border: 1px solid #ddd;
border-radius: 6px;
padding: 16px;
min-height: 120px;
white-space: pre-wrap;
font-size: 15px;
line-height: 1.6;
}
.cursor {
display: inline-block;
width: 2px;
height: 1.1em;
background: #333;
vertical-align: text-bottom;
animation: blink 0.8s step-end infinite;
}
@keyframes blink { 0%,100%{opacity:1} 50%{opacity:0} }
.provider-select { padding: 8px; border-radius: 6px; border: 1px solid #ddd; }
</style>
</head>
<body>
<h1>LLM Streaming Demo</h1>
<p>Connects to FastAPI SSE endpoint at <code>http://localhost:8000/stream</code></p>
<div class="controls">
<select id="provider" class="provider-select">
<option value="openai">OpenAI (gpt-4o-mini)</option>
<option value="anthropic">Anthropic (claude-haiku)</option>
</select>
<input id="prompt" type="text"
placeholder="Enter your prompt..."
value="Explain streaming in 3 sentences.">
<button id="streamBtn" onclick="startStream()">Stream</button>
<button id="stopBtn" onclick="stopStream()">Stop</button>
</div>
<div class="stats">
Status: <span id="status">Ready</span> |
Tokens: <span id="tokenCount">0</span> |
TTFT: <span id="ttft">-</span> |
TPS: <span id="tps">-</span>
</div>
<div id="output">Output will appear here...</div>
<script>
let evtSource = null;
let tokenCount = 0;
let startTime = null;
let firstTokenTime = null;
let tpsInterval = null;
function startStream() {
const prompt = document.getElementById("prompt").value.trim();
const provider = document.getElementById("provider").value;
if (!prompt) return;
// Reset state
stopStream();
tokenCount = 0;
startTime = Date.now();
firstTokenTime = null;
document.getElementById("output").innerHTML = "<span class=\\"cursor\\"></span>";
document.getElementById("status").textContent = "Connecting...";
document.getElementById("tokenCount").textContent = "0";
document.getElementById("ttft").textContent = "-";
document.getElementById("tps").textContent = "-";
document.getElementById("streamBtn").disabled = true;
const endpoint = provider === "anthropic"
? "/stream/anthropic"
: "/stream";
const url = `http://localhost:8000${endpoint}?prompt=${encodeURIComponent(prompt)}`;
evtSource = new EventSource(url);
evtSource.onopen = () => {
document.getElementById("status").textContent = "Streaming...";
};
evtSource.onmessage = (e) => {
let payload;
try { payload = JSON.parse(e.data); } catch { return; }
if (payload.type === "done") {
stopStream();
document.getElementById("status").textContent = "Done";
const cursor = document.querySelector(".cursor");
if (cursor) cursor.remove();
return;
}
if (payload.type === "error") {
document.getElementById("status").textContent = "Error: " + payload.message;
stopStream();
return;
}
if (payload.type === "token") {
if (!firstTokenTime) {
firstTokenTime = Date.now();
document.getElementById("ttft").textContent =
((firstTokenTime - startTime) / 1000).toFixed(3) + "s";
}
tokenCount++;
document.getElementById("tokenCount").textContent = tokenCount;
const elapsed = (Date.now() - startTime) / 1000;
document.getElementById("tps").textContent = (tokenCount / elapsed).toFixed(1);
// Insert text before cursor
const cursor = document.querySelector(".cursor");
if (cursor) {
cursor.insertAdjacentText("beforebegin", payload.text);
} else {
document.getElementById("output").textContent += payload.text;
}
}
};
evtSource.onerror = () => {
if (document.getElementById("status").textContent === "Streaming...") {
document.getElementById("status").textContent = "Disconnected";
}
const cursor = document.querySelector(".cursor");
if (cursor) cursor.remove();
stopStream();
};
}
function stopStream() {
if (evtSource) { evtSource.close(); evtSource = null; }
if (tpsInterval) { clearInterval(tpsInterval); tpsInterval = null; }
document.getElementById("streamBtn").disabled = false;
if (document.getElementById("status").textContent === "Streaming...") {
document.getElementById("status").textContent = "Stopped";
}
}
// Allow Enter key to submit
document.addEventListener("DOMContentLoaded", () => {
document.getElementById("prompt").addEventListener("keydown", (e) => {
if (e.key === "Enter") startStream();
});
});
</script>
</body>
</html>
'''
html_path = "/tmp/streaming_demo.html"
with open(html_path, "w") as f:
f.write(html_content)
print(f"HTML frontend written to: {html_path}")
print()
print("To use it:")
print(" 1. Start the FastAPI server: python /tmp/streaming_server.py")
print(f" 2. Open {html_path} in your browser")
print(" 3. Type a prompt and click 'Stream'")
print()
print("The frontend displays:")
print(" - Animated cursor during generation")
print(" - Live token count and TPS")
print(" - TTFT measurement")
print(" - Stop button for cancellation")
SummaryΒΆ
What We CoveredΒΆ
Why streaming matters: TTFT is more important than total latency for perceived performance. Users tolerate long responses if they see output immediately.
OpenAI streaming:
stream=Truereturns an iterator ofChatCompletionChunkobjects. Always checkdelta.contentforNonebefore using.Anthropic streaming: Context manager pattern.
stream.text_streamyields cleanstrfragments.stream.get_final_message()retrieves usage stats.SSE protocol: Plain HTTP, server-to-client only,
data: payload\n\nformat. Browser auto-reconnects. Best for LLM streaming use cases.FastAPI SSE:
EventSourceResponse+ async generator function. Add CORS middleware for browser access.Error handling: Always collect partial responses before surfacing errors. Handle
APITimeoutError,APIConnectionError,RateLimitErrorseparately.Cancellation:
asyncio.Eventfor cooperative async cancellation.tqdmfor synchronous progress bars.Multi-provider adapter: Wrap each provider in a generator with the same signature. Swap providers without changing application code.
Token counting:
tiktokenfor real-time input/output token tracking. Enforce token budgets to control cost.HuggingFace local models:
TextIteratorStreamer+ backgroundThread. Best for privacy-sensitive or high-volume deployments.HTML frontend:
EventSourceAPI is the browserβs native SSE client. Shows animated cursor, live TPS, and TTFT metrics.
Decision GuideΒΆ
Need bidirectional communication? βββ YES βββΊ Use WebSockets (see Notebook 2)
β
NO
β
βΌ
Need browser support? ββββββββββββββββ YES βββΊ Use SSE (EventSource API)
β
NO
β
βΌ
Server-to-server only? βββββββββββββββ YES βββΊ Use httpx streaming (chunked HTTP)
Quick ReferenceΒΆ
# OpenAI
for chunk in client.chat.completions.create(stream=True, ...):
text = chunk.choices[0].delta.content or ""
# Anthropic
with client.messages.stream(...) as s:
for text in s.text_stream:
...
# FastAPI SSE
@app.get("/stream")
async def endpoint():
return EventSourceResponse(my_async_generator())
# Browser
const src = new EventSource('/stream?prompt=...');
src.onmessage = e => console.log(JSON.parse(e.data));
Next: Notebook 2 covers WebSocket connections for bidirectional real-time AI chat.