Phase 20: Real-Time & Streaming AIΒΆ
OverviewΒΆ
Learn how to build real-time AI applications with streaming responses, WebSocket connections, and progressive loading.
Duration: 8 hours (5 notebooks + materials)
Topics Covered:
Streaming LLM Responses
WebSocket Connections
Chunked Generation
Real-Time RAG
Production Streaming Systems
Learning ObjectivesΒΆ
By the end of this phase, you will be able to:
Implement Server-Sent Events (SSE) for streaming
Build WebSocket-based real-time chat applications
Handle progressive loading and chunked responses
Create streaming RAG pipelines
Deploy production-ready streaming systems
Optimize for latency and throughput
PrerequisitesΒΆ
Strong Python programming skills
Understanding of LLMs and APIs
Basic knowledge of async/await
Familiarity with web technologies
Completed Phases 1-10
Course ContentΒΆ
1. Streaming Responses (90 minutes)ΒΆ
File: 01_streaming_responses.ipynb
Topics:
OpenAI streaming API (
stream=True)Server-Sent Events (SSE) protocol
Handling stream chunks
Real-time token processing
Error handling in streams
Progress indicators
Key Code:
# OpenAI Streaming
for chunk in client.chat.completions.create(
model="gpt-4",
messages=[{"role": "user", "content": "Tell me a story"}],
stream=True
):
if chunk.choices[0].delta.content:
print(chunk.choices[0].delta.content, end="")
# FastAPI SSE
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
@app.get("/stream")
async def stream_response():
async def generate():
for chunk in get_llm_stream():
yield f"data: {chunk}\\n\\n"
return StreamingResponse(generate(), media_type="text/event-stream")
2. WebSocket Connections (90 minutes)ΒΆ
File: 02_websocket_connections.ipynb
Topics:
WebSocket protocol basics
Bidirectional communication
FastAPI WebSocket endpoints
Client-side WebSocket handling
Connection management
Heartbeat and reconnection
Key Code:
# Server
from fastapi import WebSocket
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
while True:
data = await websocket.receive_text()
response = await process_message(data)
await websocket.send_text(response)
# Client
import websockets
async with websockets.connect("ws://localhost:8000/ws") as ws:
await ws.send("Hello")
response = await ws.recv()
3. Chunked Generation (75 minutes)ΒΆ
File: 03_chunked_generation.ipynb
Topics:
Progressive rendering
Chunk size optimization
Buffering strategies
Token accumulation
UI update patterns
Performance optimization
Key Techniques:
Sentence-level chunking
Token buffering
Delta updates
Smooth rendering
4. Real-Time RAG (90 minutes)ΒΆ
File: 04_real_time_rag.ipynb
Topics:
Streaming search results
Progressive context loading
Incremental vector search
Streaming summarization
Real-time document processing
Hybrid search streaming
Architecture:
User Query β Vector Search (stream) β Document Retrieval (progressive)
β
Context Assembly (incremental)
β
LLM Generation (streaming)
β
Response (real-time)
5. Production Streaming (120 minutes)ΒΆ
File: 05_production_streaming.ipynb
Topics:
Load balancing streaming connections
Connection pooling
Rate limiting
Backpressure handling
Monitoring and metrics
Error recovery
Scaling strategies
Production Considerations:
Connection limits
Timeout management
Memory management
Graceful degradation
Observability
AssessmentΒΆ
Pre-Quiz (10 questions)ΒΆ
File: pre-quiz.md
Test your baseline knowledge of:
Streaming protocols
Async programming
Real-time systems
Performance concepts
Post-Quiz (18 questions + 3 bonus)ΒΆ
File: post-quiz.md
Comprehensive assessment covering:
SSE vs WebSocket trade-offs
Streaming implementation patterns
Production deployment
Performance optimization
Error handling strategies
Assignment (100 points)ΒΆ
File: assignment.md
Project: Build a Production Streaming RAG Chatbot
Requirements:
Streaming Backend (30 points)
FastAPI with SSE or WebSocket
OpenAI streaming integration
Vector database connection
Error handling
Real-Time Frontend (25 points)
React or vanilla JS
Smooth typing animation
Progress indicators
Reconnection logic
RAG Pipeline (25 points)
Document indexing
Streaming search
Progressive context loading
Citation tracking
Production Features (20 points)
Rate limiting
Monitoring
Load testing
Documentation
Bonus (10 points):
Multi-user support
Advanced caching
WebSocket fallback
Challenges (7 progressive tasks)ΒΆ
File: challenges.md
Basic Streaming - Implement OpenAI streaming (30 min)
SSE Endpoint - Create FastAPI SSE endpoint (45 min)
WebSocket Chat - Build bi-directional chat (60 min)
Chunked UI - Progressive rendering (45 min)
Streaming RAG - Real-time document search (90 min)
Load Testing - Handle 100 concurrent connections (60 min)
Production Deploy - Deploy with monitoring (120 min)
Technical StackΒΆ
Backend:
FastAPI
OpenAI Python SDK
WebSockets library
asyncio
Frontend:
HTML/CSS/JavaScript
EventSource API
WebSocket API
React (optional)
Infrastructure:
Nginx (reverse proxy)
Redis (connection management)
Prometheus (monitoring)
Docker
Best PracticesΒΆ
PerformanceΒΆ
Use connection pooling
Implement backpressure
Buffer appropriately
Monitor latency
ReliabilityΒΆ
Handle disconnections gracefully
Implement retry logic
Timeout management
Circuit breakers
SecurityΒΆ
Rate limiting per user
Input validation
Authentication tokens
CORS configuration
User ExperienceΒΆ
Loading indicators
Smooth animations
Error messages
Offline support
Common PatternsΒΆ
Pattern 1: Simple SSE StreamingΒΆ
async def stream_generator():
async for chunk in llm_stream():
yield f"data: {json.dumps({'text': chunk})}\\n\\n"
Pattern 2: WebSocket with HeartbeatΒΆ
async def heartbeat(websocket):
while True:
await asyncio.sleep(30)
await websocket.send_json({"type": "ping"})
Pattern 3: Streaming RAGΒΆ
async def streaming_rag(query):
# Search
docs = await vector_search(query)
yield {"type": "sources", "data": docs}
# Generate
async for chunk in llm_generate(query, docs):
yield {"type": "text", "data": chunk}
Real-World ExamplesΒΆ
ChatGPT-style Interface
Streaming responses
Typing indicators
Stop generation
Copy/retry
Live Document Q&A
Upload and index
Real-time search
Streaming answers
Source citations
Multi-User Chat
WebSocket rooms
Broadcast messages
User presence
Typing indicators
ResourcesΒΆ
DocumentationΒΆ
LibrariesΒΆ
fastapi- Modern Python web frameworkwebsockets- WebSocket client/serversse-starlette- SSE for Starlette/FastAPIhttpx- Async HTTP client
ToolsΒΆ
Postman - API testing with WebSocket support
k6 - Load testing
WebSocket King - WebSocket client tester
TroubleshootingΒΆ
Issue: Stream stops unexpectedlyΒΆ
Solution: Check timeout settings, implement heartbeat
Issue: High latencyΒΆ
Solution: Optimize chunk size, reduce buffering, check network
Issue: Connection dropsΒΆ
Solution: Implement reconnection logic, use exponential backoff
Issue: Memory leaksΒΆ
Solution: Close connections properly, cleanup event listeners
Next StepsΒΆ
After completing this phase:
Review Phase 18 (AI Safety) for securing streaming apps
Explore Phase 14 (AI Agents) for multi-agent streaming
Check Phase 17 (Low-Code) for Gradio/Streamlit streaming
Build your own production streaming application
Time EstimatesΒΆ
Total Duration: 8 hours
Notebooks: 6-7 hours
Assignment: 4-6 hours
Challenges: 6-8 hours
Total with Practice: 16-20 hours
Success CriteriaΒΆ
β Implement SSE and WebSocket endpoints
β Build real-time chat interface
β Create streaming RAG pipeline
β Handle 100+ concurrent connections
β Deploy production streaming app
β Monitor and optimize performance
Note: This is a foundational module for building modern AI applications. Master these concepts to create responsive, real-time user experiences.