Streaming RAG PipelineΒΆ
OverviewΒΆ
Retrieval-Augmented Generation (RAG) normally works in two sequential phases: retrieve then generate. This creates an awkward wait: the user sees nothing until both phases complete. Streaming RAG breaks this bottleneck.
Learning ObjectivesΒΆ
Understand why naive RAG blocks streaming and how to fix it
Implement parallel retrieval + streaming generation
Stream source documents to the client before generation starts
Build progressive context loading for large document sets
Track citations in real time as the LLM generates
Create a FastAPI streaming RAG endpoint (SSE)
Use LangChain streaming callback handlers
Build a complete, runnable streaming RAG system with ChromaDB + OpenAI
PrerequisitesΒΆ
pip install langchain langchain-openai langchain-community chromadb openai fastapi uvicorn httpx python-dotenv tiktoken
1. Challenges of RAG with StreamingΒΆ
The Problem: Retrieval Blocks GenerationΒΆ
Naive RAG Timeline
ββββββββββββββββββ
t=0ms User sends query
t=50ms Embed query
t=200ms Vector search completes <-- user waits
t=250ms Build prompt with context <-- user waits
t=300ms First token from LLM <-- user FINALLY sees something
t=2500ms Last token <-- done
User perceived wait: 300ms before anything appears
The Solution: Overlap Retrieval and GenerationΒΆ
Streaming RAG Timeline
ββββββββββββββββββββββ
t=0ms User sends query
t=50ms Embed query
t=60ms Stream source doc #1 to client <-- user sees sources immediately
t=80ms Stream source doc #2 to client
t=100ms Stream source doc #3 to client
t=200ms Vector search fully complete
t=250ms Build prompt
t=300ms First token streams to client
t=2500ms Last token
User perceived wait: 60ms -- sources appear almost instantly
Key TechniquesΒΆ
Technique |
Benefit |
|---|---|
Stream sources first |
User sees provenance immediately |
Parallel embed + fetch |
Reduce retrieval latency |
Progressive context |
Start generation with partial context |
Incremental vector search |
Stream results as they are found |
Citation streaming |
Track which sources are cited in real time |
# Install dependencies (uncomment if needed)
# !pip install langchain langchain-openai langchain-community chromadb openai fastapi uvicorn httpx python-dotenv tiktoken
import asyncio
import json
import os
import time
import uuid
from typing import AsyncGenerator, Dict, List, Optional, Any
from dataclasses import dataclass, field
import chromadb
from chromadb.config import Settings
# OpenAI
try:
from openai import AsyncOpenAI
OPENAI_AVAILABLE = True
except ImportError:
OPENAI_AVAILABLE = False
print("OpenAI not installed. Install with: pip install openai")
# LangChain
try:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.callbacks.base import BaseCallbackHandler
from langchain.schema import LLMResult
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
LANGCHAIN_AVAILABLE = True
except ImportError:
LANGCHAIN_AVAILABLE = False
print("LangChain not fully installed. Install with: pip install langchain langchain-openai langchain-community")
# FastAPI
try:
from fastapi import FastAPI, HTTPException
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
FASTAPI_AVAILABLE = True
except ImportError:
FASTAPI_AVAILABLE = False
print("FastAPI not installed. Install with: pip install fastapi uvicorn")
# Load API key from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
if not OPENAI_API_KEY:
print("WARNING: OPENAI_API_KEY not set. Code sections requiring OpenAI will use mock responses.")
print("Set it with: export OPENAI_API_KEY='sk-...'")
else:
print(f"OpenAI API key found: {OPENAI_API_KEY[:8]}...")
print("\nImports complete.")
2. Build the Knowledge Base with ChromaDBΒΆ
Vector Indexing as the Foundation of RAGΒΆ
Before any retrieval can happen, documents must be converted into dense vector representations and stored in a vector database. ChromaDB provides an embedded, open-source vector store that handles embedding generation, storage, and ANN search. The indexing step computes an embedding for each document and inserts it into an HNSW index, enabling sub-linear search time.
Why this matters for streaming RAG: retrieval latency is the single largest contributor to TTFT in a RAG pipeline. A well-indexed collection typically returns results in 5-20ms. In production, indexing runs as a batch or incremental pipeline while the query path remains fast and stateless.
# Sample knowledge base documents
DOCUMENTS = [
{
"id": "doc-001",
"content": "Retrieval-Augmented Generation (RAG) is a technique that combines information retrieval with large language models. It retrieves relevant documents from a knowledge base and uses them as context for generating responses. RAG reduces hallucinations by grounding answers in real documents.",
"metadata": {"source": "rag_overview.txt", "topic": "RAG", "year": 2023}
},
{
"id": "doc-002",
"content": "Vector databases store high-dimensional embeddings and enable fast approximate nearest-neighbor search. Popular options include ChromaDB (local), Pinecone (cloud), Weaviate, and Qdrant. They use HNSW or IVF indexes for efficient similarity search at scale.",
"metadata": {"source": "vector_dbs.txt", "topic": "Infrastructure", "year": 2023}
},
{
"id": "doc-003",
"content": "Server-Sent Events (SSE) is a one-way HTTP streaming protocol where the server pushes data to the client over a persistent connection. It is simpler than WebSockets for streaming text generation since LLM responses are naturally unidirectional. SSE uses Content-Type: text/event-stream.",
"metadata": {"source": "sse_guide.txt", "topic": "Streaming", "year": 2023}
},
{
"id": "doc-004",
"content": "LangChain provides a unified interface for building LLM applications. It includes chains for sequential operations, agents for dynamic tool use, and callback handlers for streaming. The streaming=True flag enables token-by-token output from LLMs. Callbacks like StreamingStdOutCallbackHandler print tokens as they arrive.",
"metadata": {"source": "langchain_docs.txt", "topic": "LangChain", "year": 2024}
},
{
"id": "doc-005",
"content": "OpenAI's streaming API uses chunk objects with choices[0].delta.content fields. Set stream=True to enable streaming. The stream returns an async iterator of ChatCompletionChunk objects. The final chunk has finish_reason='stop'. Always handle the StopAsyncIteration exception.",
"metadata": {"source": "openai_streaming.txt", "topic": "OpenAI", "year": 2024}
},
{
"id": "doc-006",
"content": "Text chunking strategies for RAG include fixed-size chunking (split every N characters), recursive splitting (split on paragraphs then sentences), semantic chunking (split at topic boundaries), and sliding window (overlapping chunks). Chunk size affects retrieval precision: smaller chunks are more precise but lose context.",
"metadata": {"source": "chunking.txt", "topic": "RAG", "year": 2023}
},
{
"id": "doc-007",
"content": "Embedding models convert text to dense vectors that capture semantic meaning. OpenAI text-embedding-3-small produces 1536-dimensional vectors. Sentence Transformers (all-MiniLM-L6-v2) produce 384-dimensional vectors locally. Cosine similarity measures semantic closeness between embeddings.",
"metadata": {"source": "embeddings.txt", "topic": "Embeddings", "year": 2024}
},
{
"id": "doc-008",
"content": "Streaming response architectures must handle backpressure when clients consume data slower than servers produce it. Techniques include bounded queues, client-side flow control, and dropping slow consumers. In FastAPI, use asyncio.Queue with a maxsize to buffer tokens and detect slow clients.",
"metadata": {"source": "backpressure.txt", "topic": "Streaming", "year": 2024}
},
{
"id": "doc-009",
"content": "Prompt engineering for RAG systems: Always include retrieved context before the question. Use clear delimiters like XML tags (<context></context>) to separate context from instructions. Instruct the model to cite sources by document ID. Add a fallback instruction: if the context does not contain the answer, say so.",
"metadata": {"source": "rag_prompts.txt", "topic": "RAG", "year": 2024}
},
{
"id": "doc-010",
"content": "FastAPI supports async generators for streaming responses. Use StreamingResponse with an async generator function that yields bytes. For SSE, format each chunk as 'data: {json}\\n\\n'. Set media_type='text/event-stream'. Add Cache-Control: no-cache and Connection: keep-alive headers.",
"metadata": {"source": "fastapi_streaming.txt", "topic": "FastAPI", "year": 2024}
}
]
print(f"Knowledge base: {len(DOCUMENTS)} documents")
for doc in DOCUMENTS:
print(f" {doc['id']}: [{doc['metadata']['topic']}] {doc['content'][:60]}...")
import numpy as np
# Initialize ChromaDB in-memory (use PersistentClient for production)
chroma_client = chromadb.Client()
# Reset if collection already exists (idempotent for demo)
try:
chroma_client.delete_collection("streaming_rag")
except Exception:
pass
collection = chroma_client.create_collection(
name="streaming_rag",
metadata={"hnsw:space": "cosine"}
)
# --- Embedding function ---
# Use OpenAI embeddings if key available, else simple random vectors for demo
async def embed_text(text: str) -> List[float]:
"""Embed text using OpenAI or a mock fallback."""
if OPENAI_API_KEY and OPENAI_AVAILABLE:
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
response = await client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return response.data[0].embedding
else:
# Deterministic mock embedding based on text hash (384-dim)
rng = np.random.default_rng(abs(hash(text)) % (2**32))
vec = rng.standard_normal(384).astype(np.float32)
vec = vec / np.linalg.norm(vec)
return vec.tolist()
# --- Index documents ---
async def index_documents(docs: List[Dict]) -> None:
"""Embed and index documents into ChromaDB."""
print("Indexing documents...")
ids = []
embeddings = []
texts = []
metadatas = []
for doc in docs:
embedding = await embed_text(doc["content"])
ids.append(doc["id"])
embeddings.append(embedding)
texts.append(doc["content"])
metadatas.append(doc["metadata"])
print(f" Indexed: {doc['id']}")
collection.add(
ids=ids,
embeddings=embeddings,
documents=texts,
metadatas=metadatas
)
print(f"\nTotal documents in collection: {collection.count()}")
# Run indexing
await index_documents(DOCUMENTS)
3. Streaming Search Results Before GenerationΒΆ
Progressive Document DeliveryΒΆ
In a traditional RAG pipeline, the client sees nothing until all retrieval and generation complete β a latency chain that can exceed several seconds. Streaming search results breaks this pattern by yielding each retrieved document to the client as soon as it is scored, using an async generator that produces results incrementally. The cosine similarity score is computed per-document and streamed immediately.
Why this matters: user-perceived latency drops from total retrieval plus generation time to time-to-first-document β typically under 100ms. This is the same pattern that makes Perplexity AI feel fast: sources appear instantly while the answer streams in below. The async generator pattern integrates naturally with FastAPI StreamingResponse and SSE, making it trivial to pipe search results directly to the HTTP response.
async def stream_search_results(
query: str,
n_results: int = 3,
delay_between_results: float = 0.05 # simulate progressive loading
) -> AsyncGenerator[Dict, None]:
"""
Stream search results one-by-one as they are retrieved.
In practice, each yield happens as the vector DB returns each hit.
"""
query_embedding = await embed_text(query)
results = collection.query(
query_embeddings=[query_embedding],
n_results=n_results
)
docs = results["documents"][0]
ids = results["ids"][0]
distances = results["distances"][0]
metadatas = results["metadatas"][0]
for i, (doc_id, doc_text, distance, meta) in enumerate(zip(ids, docs, distances, metadatas)):
# Yield one source at a time -- client can display it immediately
yield {
"type": "source",
"index": i,
"id": doc_id,
"content": doc_text,
"score": round(1 - distance, 4), # cosine similarity
"metadata": meta
}
await asyncio.sleep(delay_between_results) # simulate network/IO
# --- Demo ---
query = "How does RAG work with vector databases?"
print(f"Query: {query}\n")
print("Streaming sources as they arrive:")
print("-" * 60)
async def demo_streaming_search():
async for event in stream_search_results(query, n_results=3):
print(f"[t={time.time():.3f}] Source #{event['index']+1}: {event['id']}")
print(f" Score: {event['score']}")
print(f" Topic: {event['metadata']['topic']}")
print(f" Content: {event['content'][:80]}...")
print()
await demo_streaming_search()
4. Progressive Context Loading PatternΒΆ
Start Fast, Enrich LaterΒΆ
Progressive context loading addresses a fundamental tension in RAG: more context improves answer quality, but waiting for all documents delays generation. The solution is to load context in waves β start LLM generation with only the top-1 most relevant document, then augment the context window with additional documents as they arrive. This mirrors how human experts work: begin with the most authoritative source, then cross-reference.
The latency math is compelling. If retrieval returns \(k\) documents in \(T_{retrieval}\) ms and generation TTFT is \(T_{gen}\) ms, naive RAG waits for the full retrieval before starting generation. With progressive loading at \(k=1\), TTFT drops dramatically since the first result arrives in roughly \(1/k\) of total retrieval time. For \(k=6\), this can reduce perceived wait by 80%. The tradeoff is that early tokens may be revised as more context arrives, which is why this pattern works best with sources-first-then-answer UX layouts.
async def progressive_context_loader(
query: str,
k_increments: List[int] = [1, 2, 3]
) -> AsyncGenerator[Dict, None]:
"""
Load context progressively:
- First yield: 1 document (start generation fast)
- Second yield: 2 documents (more context)
- Third yield: 3 documents (full context for final answer)
"""
query_embedding = await embed_text(query)
max_k = max(k_increments)
# Single vector search, then serve results progressively
results = collection.query(
query_embeddings=[query_embedding],
n_results=max_k
)
all_docs = [
{
"id": doc_id,
"content": text,
"score": round(1 - dist, 4),
"metadata": meta
}
for doc_id, text, dist, meta in zip(
results["ids"][0],
results["documents"][0],
results["distances"][0],
results["metadatas"][0]
)
]
prev_k = 0
for k in k_increments:
new_docs = all_docs[prev_k:k]
context_so_far = all_docs[:k]
yield {
"type": "context_update",
"wave": k_increments.index(k) + 1,
"total_waves": len(k_increments),
"new_docs": new_docs,
"context": context_so_far,
"context_text": "\n\n".join(
f"[{d['id']}] {d['content']}" for d in context_so_far
)
}
prev_k = k
await asyncio.sleep(0.05) # simulate latency between waves
# --- Demo ---
query = "What is LangChain and how does it support streaming?"
print(f"Query: {query}\n")
async def demo_progressive_context():
async for update in progressive_context_loader(query, k_increments=[1, 2, 3]):
print(f"Wave {update['wave']}/{update['total_waves']}: +{len(update['new_docs'])} doc(s)")
for d in update["new_docs"]:
print(f" Added: {d['id']} (score={d['score']}) [{d['metadata']['topic']}]")
print(f" Context length: {len(update['context_text'])} chars")
print(f" -> Would start/update generation with {update['wave']*1} doc(s) of context")
print()
await demo_progressive_context()
5. Sources First, Then Streaming AnswerΒΆ
The Canonical Streaming RAG UX PatternΒΆ
This is the dominant pattern used by Perplexity AI, ChatGPT with Browse, Google AI Overviews, and most production RAG applications. The pipeline executes in three strict phases: (1) retrieve all sources (typically 50-200ms), (2) immediately stream each source document to the client as a structured event, (3) build the prompt from sources and stream LLM tokens as they arrive. The client renders a Sources panel first, then the answer streams in below it.
Why this ordering matters: by showing sources before the answer, users can assess provenance while waiting for generation. This builds trust β the user sees where the answer comes from before seeing what the answer says. From an engineering perspective, the pattern also enables client-side optimizations: the frontend can pre-fetch source URLs, render thumbnails, or highlight relevant passages while tokens are still streaming. The streaming_rag_pipeline async generator yields typed events (source, token, done) that map directly to SSE data payloads.
async def mock_streaming_llm(
prompt: str,
model: str = "gpt-4o-mini"
) -> AsyncGenerator[str, None]:
"""
Mock streaming LLM. Yields tokens word-by-word with small delays.
Replace with real OpenAI/Anthropic call in production.
"""
response = (
"Based on the retrieved context, RAG combines retrieval systems with "
"language models to produce grounded, citation-backed answers. "
"The vector database (doc-001, doc-002) handles fast similarity search, "
"while the LLM (doc-004, doc-005) generates coherent prose from the context."
)
words = response.split(" ")
for word in words:
yield word + " "
await asyncio.sleep(0.04) # simulate ~25 tokens/sec
async def real_streaming_llm(
prompt: str,
model: str = "gpt-4o-mini"
) -> AsyncGenerator[str, None]:
"""
Real OpenAI streaming. Requires OPENAI_API_KEY.
"""
client = AsyncOpenAI(api_key=OPENAI_API_KEY)
stream = await client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
stream=True,
temperature=0.3,
max_tokens=512,
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
async def streaming_rag_pipeline(
query: str,
n_results: int = 3
) -> AsyncGenerator[Dict, None]:
"""
Full streaming RAG pipeline:
1. Retrieve and stream sources
2. Build prompt
3. Stream LLM tokens
"""
sources = []
# Phase 1: Stream sources to client
yield {"type": "status", "message": "Searching knowledge base..."}
query_embedding = await embed_text(query)
results = collection.query(
query_embeddings=[query_embedding],
n_results=n_results
)
for i, (doc_id, text, distance, meta) in enumerate(zip(
results["ids"][0],
results["documents"][0],
results["distances"][0],
results["metadatas"][0]
)):
source = {
"type": "source",
"index": i,
"id": doc_id,
"content": text,
"score": round(1 - distance, 4),
"metadata": meta
}
sources.append(source)
yield source # Client sees this immediately
# Phase 2: Build prompt
yield {"type": "status", "message": "Generating answer..."}
context = "\n\n".join(
f"[Source {i+1}: {s['id']}]\n{s['content']}"
for i, s in enumerate(sources)
)
prompt = f"""You are a helpful assistant. Answer the question using ONLY the provided sources.
Cite sources by their ID (e.g., [doc-001]). If the sources don't contain enough information, say so.
<context>
{context}
</context>
Question: {query}
Answer:"""
# Phase 3: Stream tokens
llm_stream = real_streaming_llm(prompt) if OPENAI_API_KEY and OPENAI_AVAILABLE else mock_streaming_llm(prompt)
async for token in llm_stream:
yield {"type": "token", "content": token}
yield {"type": "done", "source_ids": [s["id"] for s in sources]}
# --- Demo ---
query = "How does vector search work in RAG systems?"
print(f"Query: {query}\n")
print("=" * 60)
answer_tokens = []
async def demo_full_pipeline():
async for event in streaming_rag_pipeline(query, n_results=3):
if event["type"] == "status":
print(f"\n[STATUS] {event['message']}")
elif event["type"] == "source":
print(f" [SOURCE #{event['index']+1}] {event['id']} (score={event['score']})")
print(f" {event['content'][:80]}...")
elif event["type"] == "token":
print(event["content"], end="", flush=True)
answer_tokens.append(event["content"])
elif event["type"] == "done":
print(f"\n\n[DONE] Sources used: {event['source_ids']}")
await demo_full_pipeline()
6. Streaming Citation TrackingΒΆ
Real-Time Source AttributionΒΆ
As LLM tokens stream in, citation tracking detects source references (e.g., [doc-001]) in real time using a rolling buffer and regex pattern matching. The CitationTracker maintains a buffer of the last \(N\) characters (enough to capture a full citation token like [doc-001]) and scans for matches after each new token arrives. Newly detected citations are returned immediately, enabling the UI to highlight the corresponding source card the moment it is referenced.
Why this matters for trust and UX: without real-time citation tracking, citations are only resolved after generation completes β the user reads an entire paragraph before knowing which source backs each claim. With streaming detection, the source panel lights up progressively as the answer references each document. This is also critical for hallucination detection: if the model cites a source ID not in the retrieved set, the tracker flags it immediately, allowing the system to insert a warning or trigger re-retrieval.
import re
class CitationTracker:
"""
Tracks citations (e.g., [doc-001]) as tokens stream in.
Maintains a rolling buffer to detect citations that span multiple tokens.
"""
CITATION_PATTERN = re.compile(r'\[doc-\d+\]')
def __init__(self, known_source_ids: List[str]):
self.known_source_ids = set(known_source_ids)
self.buffer = "" # rolling window for pattern matching
self.buffer_max = 20 # enough to capture [doc-001]
self.cited: List[str] = [] # ordered list of first-appearance citations
self.seen: set = set() # dedup
self.total_tokens = 0
self.full_text = ""
def feed(self, token: str) -> List[str]:
"""
Feed a new token. Returns list of newly detected citation IDs.
"""
self.full_text += token
self.buffer += token
self.total_tokens += 1
# Trim buffer
if len(self.buffer) > self.buffer_max:
self.buffer = self.buffer[-self.buffer_max:]
new_citations = []
for match in self.CITATION_PATTERN.finditer(self.full_text):
citation_id = match.group(0).strip("[]") # "doc-001"
if citation_id not in self.seen and citation_id in self.known_source_ids:
self.seen.add(citation_id)
self.cited.append(citation_id)
new_citations.append(citation_id)
return new_citations
def get_summary(self) -> Dict:
return {
"total_tokens": self.total_tokens,
"cited_sources": self.cited,
"uncited_sources": [
s for s in self.known_source_ids if s not in self.seen
]
}
# --- Demo with simulated streaming text ---
source_ids = ["doc-001", "doc-002", "doc-003"]
tracker = CitationTracker(source_ids)
# Simulate streaming tokens that include citations
simulated_tokens = [
"RAG ", "combines ", "retrieval ", "[doc-001] ", "with ", "generation. ",
"Vector ", "databases ", "[doc-002] ", "store ", "embeddings. ",
"The ", "system ", "also ", "relies ", "on ", "[doc-001] ", "for ", "context."
]
print("Tracking citations as tokens stream in:")
print("-" * 50)
for i, token in enumerate(simulated_tokens):
new_citations = tracker.feed(token)
if new_citations:
print(f" Token #{i+1} '{token.strip()}' -> NEW CITATION: {new_citations}")
print()
summary = tracker.get_summary()
print(f"Summary:")
print(f" Total tokens processed: {summary['total_tokens']}")
print(f" Cited sources: {summary['cited_sources']}")
print(f" Uncited sources: {summary['uncited_sources']}")
print(f" Full text: {tracker.full_text}")
7. Incremental Vector SearchΒΆ
Adaptive Retrieval with Early StoppingΒΆ
Incremental vector search expands the retrieval radius progressively β starting with top-2, then top-4, then top-6 β rather than fetching all \(k\) results upfront. At each step, newly retrieved documents are checked against a relevance threshold: if the cosine similarity falls below the threshold, the search stops early because additional documents are unlikely to improve answer quality.
Why this matters: in production RAG systems, the optimal \(k\) varies dramatically by query. A factual lookup (βWhat is the capital of France?β) may need only 1 document, while a synthesis question (βCompare RAG architecturesβ) may need 6-10. Fetching a fixed top-\(k\) wastes latency and tokens on easy queries while under-serving hard ones. Incremental search adapts automatically: easy queries stop at step 1 (minimal latency), while hard queries expand to the full budget. Combined with progressive context loading, this creates a pipeline that is both fast and thorough.
async def incremental_vector_search(
query: str,
k_steps: List[int] = [2, 4, 6],
relevance_threshold: float = 0.3
) -> AsyncGenerator[Dict, None]:
"""
Incrementally expand search results.
Yields new documents at each step, stopping early if new results are below threshold.
"""
query_embedding = await embed_text(query)
max_k = max(k_steps)
seen_ids = set()
# Fetch all at once from ChromaDB, stream in steps
results = collection.query(
query_embeddings=[query_embedding],
n_results=min(max_k, collection.count())
)
all_hits = list(zip(
results["ids"][0],
results["documents"][0],
results["distances"][0],
results["metadatas"][0]
))
prev_k = 0
for step_num, k in enumerate(k_steps):
new_hits = all_hits[prev_k:k]
step_docs = []
stop_early = False
for doc_id, text, distance, meta in new_hits:
score = 1 - distance
if score < relevance_threshold:
stop_early = True
break
if doc_id not in seen_ids:
seen_ids.add(doc_id)
step_docs.append({
"id": doc_id,
"content": text,
"score": round(score, 4),
"metadata": meta
})
yield {
"type": "search_step",
"step": step_num + 1,
"k": k,
"new_docs": step_docs,
"total_retrieved": len(seen_ids),
"stopped_early": stop_early
}
if stop_early:
print(f" Stopping early at step {step_num+1}: new results below threshold")
break
prev_k = k
await asyncio.sleep(0.02)
# --- Demo ---
query = "What are embeddings and how are they used in RAG?"
print(f"Query: {query}")
print(f"Relevance threshold: 0.3")
print("-" * 60)
async def demo_incremental_search():
total_docs = []
async for step in incremental_vector_search(query, k_steps=[2, 4, 6], relevance_threshold=0.3):
print(f"\nStep {step['step']}: fetched top-{step['k']}")
for doc in step["new_docs"]:
print(f" + {doc['id']} (score={doc['score']}) [{doc['metadata']['topic']}]")
total_docs.append(doc)
print(f" Total unique docs retrieved: {step['total_retrieved']}")
if step["stopped_early"]:
print(" [Stopped early - threshold reached]")
return total_docs
retrieved = await demo_incremental_search()
print(f"\nFinal: {len(retrieved)} relevant documents retrieved")
8. Streaming Summarization of Long DocumentsΒΆ
Chunked Progressive SummarizationΒΆ
When a retrieved document exceeds the LLM context window or is too long for a single prompt, it must be chunked and summarized progressively. The document is split into overlapping segments (using a sliding window with configurable chunk_size and overlap), and each chunk is summarized sequentially. The rolling summary builds up in real time, with each chunk appended to the running output and streamed to the client.
Why this matters for RAG: many enterprise documents (legal contracts, technical manuals, research papers) are 10,000-100,000+ tokens β far exceeding typical context windows. The naive approach of truncating to the first \(N\) tokens loses critical information. Progressive summarization processes the entire document while giving the user immediate feedback: they see the summary build chunk by chunk. The chunk_document function with overlap ensures no information is lost at chunk boundaries, and the streaming architecture means the user starts reading within seconds even for very long documents.
def chunk_document(text: str, chunk_size: int = 300, overlap: int = 50) -> List[str]:
"""Split a document into overlapping chunks."""
chunks = []
start = 0
while start < len(text):
end = min(start + chunk_size, len(text))
chunks.append(text[start:end])
start += chunk_size - overlap
return chunks
async def stream_document_summary(
document: str,
document_id: str = "unknown",
chunk_size: int = 300
) -> AsyncGenerator[Dict, None]:
"""
Stream a rolling summary of a long document.
Processes each chunk and yields partial summaries.
"""
chunks = chunk_document(document, chunk_size=chunk_size)
yield {
"type": "summary_start",
"document_id": document_id,
"total_chunks": len(chunks),
"total_chars": len(document)
}
rolling_summary = ""
for i, chunk in enumerate(chunks):
# In production: send rolling_summary + chunk to LLM
# Here we simulate a mock summary update
chunk_summary = f"[Chunk {i+1}/{len(chunks)}]: {chunk[:80].strip()}..."
rolling_summary += (" " if rolling_summary else "") + chunk_summary
yield {
"type": "summary_chunk",
"chunk_index": i,
"chunk_text": chunk[:60] + "...",
"partial_summary": rolling_summary[-200:], # last 200 chars of rolling summary
"progress": round((i + 1) / len(chunks) * 100, 1)
}
await asyncio.sleep(0.03)
yield {
"type": "summary_done",
"document_id": document_id,
"final_summary": f"Document '{document_id}' covers {len(chunks)} sections on the topic."
}
# --- Demo ---
long_doc = (
"Retrieval-Augmented Generation is a powerful paradigm in modern NLP. "
"It works by first retrieving relevant documents from a knowledge base "
"using semantic similarity, then feeding those documents as context to a "
"large language model for generation. "
"The retrieval component typically uses a vector database like ChromaDB, "
"Pinecone, or Weaviate. Queries are embedded using models like "
"text-embedding-3-small or all-MiniLM-L6-v2. The top-k most similar "
"documents are returned and inserted into the prompt. "
"The generation component can be any LLM: GPT-4, Claude, Llama, etc. "
"RAG reduces hallucinations by grounding the model in factual documents. "
"Advanced RAG techniques include HyDE (Hypothetical Document Embeddings), "
"multi-query retrieval, reranking with cross-encoders, and "
"self-RAG where the model decides when to retrieve."
) * 3 # Make it longer
print(f"Document length: {len(long_doc)} chars")
print("Streaming summary:\n")
async def demo_streaming_summary():
async for event in stream_document_summary(long_doc, document_id="rag_overview", chunk_size=200):
if event["type"] == "summary_start":
print(f"Starting summary: {event['total_chunks']} chunks, {event['total_chars']} chars")
elif event["type"] == "summary_chunk":
print(f" [{event['progress']:5.1f}%] Processed chunk {event['chunk_index']+1}")
elif event["type"] == "summary_done":
print(f"\nFinal summary: {event['final_summary']}")
await demo_streaming_summary()
9. Real-Time Document Processing PipelineΒΆ
Streaming Ingestion with Status UpdatesΒΆ
Production RAG systems need to ingest new documents continuously β not just at batch-indexing time. The real-time document processing pipeline takes each new document through four stages: received (acknowledged), chunked (split into segments), embedded (vector representations computed), and indexed (inserted into ChromaDB). At each stage, a status event is streamed to the caller via an async generator, providing live visibility into pipeline progress.
Why this matters: in enterprise deployments, users often ask questions about documents they just uploaded. Without real-time ingestion, there is an unpredictable delay between upload and queryability. By streaming status updates, the application can show a progress indicator (βEmbedding documentβ¦ 3/5 chunks completeβ) and notify the user the instant their document becomes searchable. The pipeline also handles errors per-document rather than failing the entire batch, making it robust to malformed inputs or transient embedding API failures.
async def realtime_doc_pipeline(
new_docs: List[Dict]
) -> AsyncGenerator[Dict, None]:
"""
Process and index new documents in real time.
Streams status for each document as it moves through the pipeline:
received -> chunking -> embedding -> indexing -> ready
"""
yield {
"type": "pipeline_start",
"total_docs": len(new_docs),
"timestamp": time.time()
}
indexed_count = 0
for doc in new_docs:
doc_id = doc.get("id", str(uuid.uuid4())[:8])
content = doc["content"]
# Stage 1: Received
yield {"type": "doc_stage", "doc_id": doc_id, "stage": "received",
"chars": len(content)}
await asyncio.sleep(0.01)
# Stage 2: Chunking
chunks = chunk_document(content, chunk_size=500)
yield {"type": "doc_stage", "doc_id": doc_id, "stage": "chunked",
"num_chunks": len(chunks)}
await asyncio.sleep(0.01)
# Stage 3: Embedding
yield {"type": "doc_stage", "doc_id": doc_id, "stage": "embedding"}
chunk_embeddings = []
for chunk in chunks:
emb = await embed_text(chunk)
chunk_embeddings.append(emb)
yield {"type": "doc_stage", "doc_id": doc_id, "stage": "embedded",
"embedding_dims": len(chunk_embeddings[0])}
await asyncio.sleep(0.01)
# Stage 4: Indexing
yield {"type": "doc_stage", "doc_id": doc_id, "stage": "indexing"}
try:
collection.add(
ids=[f"{doc_id}_chunk_{i}" for i in range(len(chunks))],
embeddings=chunk_embeddings,
documents=chunks,
metadatas=[doc.get("metadata", {}) for _ in chunks]
)
indexed_count += 1
yield {"type": "doc_stage", "doc_id": doc_id, "stage": "ready",
"num_chunks_indexed": len(chunks)}
except Exception as e:
yield {"type": "doc_error", "doc_id": doc_id, "error": str(e)}
yield {
"type": "pipeline_done",
"indexed": indexed_count,
"failed": len(new_docs) - indexed_count,
"collection_size": collection.count(),
"elapsed_ms": round((time.time()) * 1000) % 10000
}
# --- Demo ---
new_documents = [
{
"id": "new-doc-A",
"content": "Agentic RAG extends standard RAG by allowing the LLM to iteratively refine its retrieval strategy. The agent can decide to search for more information, rephrase queries, or combine results from multiple searches before generating an answer.",
"metadata": {"source": "agentic_rag.txt", "topic": "Agents"}
},
{
"id": "new-doc-B",
"content": "Reranking in RAG uses a cross-encoder model to re-score initial retrieval results. Unlike bi-encoders used for initial retrieval, cross-encoders process the query and document together, producing more accurate relevance scores at the cost of higher latency.",
"metadata": {"source": "reranking.txt", "topic": "RAG"}
}
]
print("Real-time document processing pipeline:")
print("=" * 60)
async def demo_pipeline():
async for event in realtime_doc_pipeline(new_documents):
t = event["type"]
if t == "pipeline_start":
print(f"Starting pipeline for {event['total_docs']} documents")
elif t == "doc_stage":
extra = {k: v for k, v in event.items() if k not in ("type", "doc_id", "stage")}
print(f" [{event['doc_id']}] {event['stage']:12s} {extra}")
elif t == "doc_error":
print(f" [{event['doc_id']}] ERROR: {event['error']}")
elif t == "pipeline_done":
print(f"\nPipeline complete: {event['indexed']} indexed, {event['failed']} failed")
print(f"Collection now has {event['collection_size']} total entries")
await demo_pipeline()
10. FastAPI Streaming RAG EndpointΒΆ
Production SSE API for RAGΒΆ
This section assembles the streaming RAG components into a production-ready FastAPI application with three endpoints: POST /rag/stream for SSE-based streaming RAG queries, POST /index for adding new documents, and GET /health for load balancer health checks. Each SSE event is a JSON object with a type field that the client uses for routing (status, source, token, done, error).
Key architectural decisions: the /rag/stream endpoint uses StreamingResponse with media_type text/event-stream and sets Cache-Control: no-cache, Connection: keep-alive, and X-Accel-Buffering: no (to disable nginx buffering). The stream_sources parameter lets clients opt out of source streaming if they only want tokens. Cancellation is handled via asyncio.CancelledError β when a client disconnects mid-stream, the generator cleans up gracefully rather than leaving orphaned coroutines. This pattern maps directly to what production services like Vercel AI SDK and LangServe implement.
from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn
app = FastAPI(title="Streaming RAG API", version="1.0.0")
class RAGQuery(BaseModel):
query: str
n_results: int = 3
stream_sources: bool = True
class IndexRequest(BaseModel):
documents: List[Dict]
def sse_format(data: Dict) -> str:
"""Format a dict as an SSE event."""
return f"data: {json.dumps(data)}\n\n"
async def rag_event_stream(
query: str,
n_results: int,
stream_sources: bool
) -> AsyncGenerator[str, None]:
"""Async generator that yields SSE-formatted events."""
try:
async for event in streaming_rag_pipeline(query, n_results=n_results):
# Skip sources if client doesn't want them
if not stream_sources and event["type"] == "source":
continue
yield sse_format(event)
except asyncio.CancelledError:
# Client disconnected
yield sse_format({"type": "cancelled", "reason": "client_disconnected"})
except Exception as e:
yield sse_format({"type": "error", "message": str(e)})
@app.post("/rag/stream")
async def stream_rag(request: RAGQuery, http_request: Request):
"""
Streaming RAG endpoint using Server-Sent Events.
Response event types:
- {"type": "status", "message": "..."}
- {"type": "source", "id": "doc-001", "content": "...", "score": 0.92}
- {"type": "token", "content": "word "}
- {"type": "done", "source_ids": [...]}
- {"type": "error", "message": "..."}
"""
return StreamingResponse(
rag_event_stream(request.query, request.n_results, request.stream_sources),
media_type="text/event-stream",
headers={
"Cache-Control": "no-cache",
"Connection": "keep-alive",
"X-Accel-Buffering": "no", # Disable nginx buffering
}
)
@app.post("/index")
async def index_new_docs(request: IndexRequest):
"""
Index new documents. Returns a summary of the indexing operation.
For streaming indexing status, use /index/stream.
"""
results = []
async for event in realtime_doc_pipeline(request.documents):
results.append(event)
done_event = next((e for e in results if e["type"] == "pipeline_done"), {})
return {"status": "ok", "summary": done_event}
@app.get("/health")
async def health():
return {
"status": "healthy",
"collection_size": collection.count(),
"openai_configured": bool(OPENAI_API_KEY)
}
# --- Show the app's routes ---
print("FastAPI app configured. Routes:")
for route in app.routes:
if hasattr(route, 'methods'):
print(f" {list(route.methods)} {route.path}")
print("\nTo run the server:")
print(" uvicorn notebook_rag_app:app --reload --port 8000")
print("\nTo test streaming:")
print(' curl -X POST http://localhost:8000/rag/stream \\')
print(' -H "Content-Type: application/json" \\')
print(' -d \'{"query": "What is RAG?", "n_results": 3}\'')
# Test the streaming RAG endpoint directly (without HTTP)
# This simulates what the HTTP client would receive
print("Simulating SSE stream for: 'How does FastAPI support streaming?'")
print("=" * 60)
test_query = RAGQuery(query="How does FastAPI support streaming?", n_results=2)
async def test_rag_endpoint():
event_count = 0
answer = ""
sources_received = []
async for sse_line in rag_event_stream(
test_query.query,
test_query.n_results,
test_query.stream_sources
):
event_count += 1
# Parse the SSE line
if sse_line.startswith("data: "):
data = json.loads(sse_line[6:].strip())
if data["type"] == "status":
print(f"[STATUS] {data['message']}")
elif data["type"] == "source":
sources_received.append(data["id"])
print(f"[SOURCE] {data['id']} score={data['score']}")
elif data["type"] == "token":
answer += data["content"]
print(data["content"], end="", flush=True)
elif data["type"] == "done":
print(f"\n[DONE] {data}")
print(f"\n\nTotal SSE events: {event_count}")
print(f"Sources received: {sources_received}")
print(f"Answer length: {len(answer)} chars")
await test_rag_endpoint()
11. LangChain Streaming Callback HandlersΒΆ
Framework-Native Streaming IntegrationΒΆ
LangChain callback system provides hooks into every stage of a chain execution β retrieval start/end, LLM start/end, and crucially on_new_token for streaming. The StreamingRAGCallback class implements these hooks to capture retrieval events and route individual tokens to an asyncio.Queue, enabling a consumer (such as a FastAPI SSE endpoint) to yield tokens as they arrive without polling.
Why callbacks over raw streaming: LangChain chains compose multiple components (retriever, prompt template, LLM, output parser), and the callback system provides visibility into all of them through a single interface. The on_retriever_end callback fires when documents are retrieved (before the LLM even starts), enabling the sources-first pattern. The on_llm_new_token callback is the hot path β it must be fast (no blocking I/O) to avoid slowing generation. Using asyncio.Queue with put_nowait ensures the callback returns immediately, with backpressure handled by the queue maxsize parameter.
import threading
from queue import Queue, Empty
from typing import Union
# LangChain streaming callback handler
class StreamingRAGCallback:
"""
A callback handler that captures LangChain streaming events.
Works with both sync and async LangChain pipelines.
"""
def __init__(self):
self.tokens: List[str] = []
self.retrieved_docs: List[Dict] = []
self.llm_started = False
self.llm_ended = False
self.errors: List[str] = []
self._token_queue: asyncio.Queue = None
self._done = False
def setup_async_queue(self):
self._token_queue = asyncio.Queue()
return self._token_queue
# --- Simulated callback methods (matching LangChain's BaseCallbackHandler interface) ---
def on_retriever_start(self, serialized: Dict, query: str, **kwargs):
"""Called when retriever starts."""
print(f"[Callback] Retriever started: query='{query[:50]}'")
def on_retriever_end(self, documents: List, **kwargs):
"""Called when retriever finishes -- documents are available."""
for doc in documents:
self.retrieved_docs.append({
"content": str(doc)[:100] if doc else "",
"source": "retrieved"
})
print(f"[Callback] Retriever done: {len(documents)} docs retrieved")
def on_llm_start(self, serialized: Dict, prompts: List[str], **kwargs):
"""Called when LLM generation starts."""
self.llm_started = True
print(f"[Callback] LLM started with {len(prompts)} prompt(s)")
def on_llm_new_token(self, token: str, **kwargs):
"""
Called for each new token during streaming.
This is the hot path -- keep it fast.
"""
self.tokens.append(token)
# In async context, put token in queue for the consumer
if self._token_queue is not None:
try:
self._token_queue.put_nowait({"type": "token", "content": token})
except asyncio.QueueFull:
pass # Drop token if consumer is too slow
def on_llm_end(self, response, **kwargs):
"""Called when LLM generation ends."""
self.llm_ended = True
self._done = True
if self._token_queue is not None:
self._token_queue.put_nowait({"type": "done", "total_tokens": len(self.tokens)})
print(f"[Callback] LLM ended: {len(self.tokens)} tokens generated")
def on_llm_error(self, error: Exception, **kwargs):
"""Called on LLM error."""
self.errors.append(str(error))
if self._token_queue is not None:
self._token_queue.put_nowait({"type": "error", "message": str(error)})
print(f"[Callback] LLM error: {error}")
def get_full_response(self) -> str:
return "".join(self.tokens)
# --- Simulate a LangChain-style streaming call ---
callback = StreamingRAGCallback()
# Simulate the retrieval phase
callback.on_retriever_start({"name": "ChromaRetriever"}, "What is RAG?")
mock_docs = ["RAG combines retrieval with generation.", "Vector DBs store embeddings."]
callback.on_retriever_end(mock_docs)
# Simulate LLM streaming phase
callback.on_llm_start({"name": "ChatOpenAI"}, ["Answer: What is RAG?"])
mock_tokens = ["RAG ", "stands ", "for ", "Retrieval-Augmented ", "Generation. ",
"It ", "combines ", "a ", "retriever ", "with ", "an ", "LLM."]
for tok in mock_tokens:
callback.on_llm_new_token(tok)
callback.on_llm_end({"generations": []})
print(f"\nFull response: {callback.get_full_response()}")
print(f"Retrieved docs: {len(callback.retrieved_docs)}")
# Real LangChain streaming RAG with ChromaDB (requires OPENAI_API_KEY)
# This shows the actual LangChain integration pattern
LANGCHAIN_RAG_CODE = '''
# Requires: pip install langchain langchain-openai langchain-community chromadb
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.schema import Document
# 1. Build vector store
embedding_model = OpenAIEmbeddings(model="text-embedding-3-small")
docs = [
Document(page_content="RAG combines retrieval with generation.", metadata={"source": "doc1"}),
Document(page_content="Vector databases store embeddings for fast search.", metadata={"source": "doc2"}),
Document(page_content="FastAPI supports streaming via async generators.", metadata={"source": "doc3"}),
]
vectorstore = Chroma.from_documents(
documents=docs,
embedding=embedding_model,
collection_name="langchain_demo"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
# 2. Build streaming LLM
streaming_callback = StreamingStdOutCallbackHandler()
llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
callbacks=[streaming_callback],
temperature=0.2
)
# 3. Build RAG chain
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True,
verbose=True
)
# 4. Run with streaming
# Tokens print in real time via StreamingStdOutCallbackHandler
result = rag_chain.invoke({"query": "How does RAG work with FastAPI?"})
print("\\nSource documents:")
for doc in result["source_documents"]:
print(f" {doc.metadata['source']}: {doc.page_content[:60]}")
'''
print("LangChain streaming RAG code (run when OPENAI_API_KEY is set):")
print("=" * 60)
print(LANGCHAIN_RAG_CODE)
if LANGCHAIN_AVAILABLE and OPENAI_API_KEY:
print("\nOpenAI key found -- running real LangChain example...")
try:
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.schema import Document
embedding_model = OpenAIEmbeddings(
model="text-embedding-3-small",
api_key=OPENAI_API_KEY
)
lc_docs = [
Document(page_content=d["content"], metadata=d["metadata"])
for d in DOCUMENTS[:5]
]
vectorstore = Chroma.from_documents(lc_docs, embedding_model)
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
llm = ChatOpenAI(
model="gpt-4o-mini",
streaming=True,
callbacks=[StreamingStdOutCallbackHandler()],
api_key=OPENAI_API_KEY
)
rag_chain = RetrievalQA.from_chain_type(
llm=llm,
chain_type="stuff",
retriever=retriever,
return_source_documents=True
)
result = rag_chain.invoke({"query": "What is streaming RAG?"})
print("\nSources:", [d.metadata for d in result.get("source_documents", [])])
except Exception as e:
print(f"Error: {e}")
else:
print("\n(Skipped: requires OPENAI_API_KEY and langchain packages)")
12. Complete Working Streaming RAG - ChromaDB + OpenAIΒΆ
End-to-End IntegrationΒΆ
This section ties together every pattern from the notebook into a single, self-contained StreamingRAGPipeline class: vector indexing with ChromaDB, streaming search results, citation tracking, and SSE-formatted token delivery. The class encapsulates the full lifecycle β from document ingestion to streaming query response β behind a clean async generator interface that any web framework can consume.
Why a unified class matters: in production, streaming RAG involves coordinating five async subsystems (embedding, retrieval, prompt construction, generation, citation tracking) with shared state (the ChromaDB collection, API clients, citation tracker). Encapsulating these in a single class ensures consistent error handling, resource cleanup, and configuration. The async generator pattern (async for event in pipeline.query(β¦)) makes it trivial to plug into FastAPI, Django Channels, or any ASGI framework.
class StreamingRAGPipeline:
"""
Complete streaming RAG pipeline.
Features:
- Source documents streamed before generation
- Real-time citation tracking
- Incremental context loading
- OpenAI streaming (with mock fallback)
- Performance metrics
"""
def __init__(
self,
chroma_collection,
openai_api_key: str = "",
model: str = "gpt-4o-mini",
n_results: int = 3
):
self.collection = chroma_collection
self.api_key = openai_api_key
self.model = model
self.n_results = n_results
self._openai_client = None
def _get_openai_client(self):
if self._openai_client is None and self.api_key and OPENAI_AVAILABLE:
self._openai_client = AsyncOpenAI(api_key=self.api_key)
return self._openai_client
def _build_prompt(self, query: str, sources: List[Dict]) -> str:
context_parts = [
f"[{s['id']}] (relevance: {s['score']})\n{s['content']}"
for s in sources
]
context = "\n\n---\n\n".join(context_parts)
return f"""You are a precise, citation-focused assistant.
Answer the question using ONLY the provided context documents.
Cite sources using their IDs like [doc-001]. If unsure, say so.
<context>
{context}
</context>
Question: {query}
Answer:"""
async def _stream_llm(self, prompt: str) -> AsyncGenerator[str, None]:
client = self._get_openai_client()
if client:
stream = await client.chat.completions.create(
model=self.model,
messages=[{"role": "user", "content": prompt}],
stream=True,
temperature=0.2,
max_tokens=512
)
async for chunk in stream:
delta = chunk.choices[0].delta.content
if delta:
yield delta
else:
# Mock fallback
async for token in mock_streaming_llm(prompt):
yield token
async def query(
self,
question: str
) -> AsyncGenerator[Dict, None]:
"""
Execute a streaming RAG query.
Yields typed events: status, source, token, citation, metrics, done.
"""
start_time = time.perf_counter()
first_token_time = None
token_count = 0
sources = []
yield {"type": "status", "phase": "retrieving", "message": "Searching knowledge base..."}
# Retrieval
query_embedding = await embed_text(question)
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=self.n_results
)
retrieval_time_ms = round((time.perf_counter() - start_time) * 1000, 1)
for i, (doc_id, text, dist, meta) in enumerate(zip(
results["ids"][0],
results["documents"][0],
results["distances"][0],
results["metadatas"][0]
)):
source = {
"type": "source",
"index": i,
"id": doc_id,
"content": text,
"score": round(1 - dist, 4),
"metadata": meta
}
sources.append(source)
yield source
yield {"type": "status", "phase": "generating",
"message": f"Generating answer from {len(sources)} sources...",
"retrieval_ms": retrieval_time_ms}
# Citation tracker
tracker = CitationTracker([s["id"] for s in sources])
# Generation
prompt = self._build_prompt(question, sources)
async for token in self._stream_llm(prompt):
if first_token_time is None:
first_token_time = time.perf_counter()
ttft_ms = round((first_token_time - start_time) * 1000, 1)
yield {"type": "ttft", "ms": ttft_ms} # Time to first token
token_count += 1
new_citations = tracker.feed(token)
yield {"type": "token", "content": token}
for cite_id in new_citations:
yield {"type": "citation", "id": cite_id, "token_index": token_count}
# Final metrics
total_time = time.perf_counter() - start_time
tokens_per_sec = round(token_count / total_time, 1) if total_time > 0 else 0
yield {
"type": "done",
"cited_sources": tracker.cited,
"uncited_sources": tracker.get_summary()["uncited_sources"],
"metrics": {
"total_ms": round(total_time * 1000, 1),
"retrieval_ms": retrieval_time_ms,
"token_count": token_count,
"tokens_per_sec": tokens_per_sec
}
}
# --- Run the complete pipeline ---
pipeline = StreamingRAGPipeline(
chroma_collection=collection,
openai_api_key=OPENAI_API_KEY,
model="gpt-4o-mini",
n_results=3
)
print("Complete Streaming RAG Pipeline")
print("=" * 60)
question = "What are the best practices for chunking documents in RAG?"
print(f"Question: {question}")
print("-" * 60)
async def run_complete_pipeline():
answer = ""
async for event in pipeline.query(question):
t = event["type"]
if t == "status":
print(f"\n[{event['phase'].upper()}] {event['message']}")
elif t == "source":
print(f" Source {event['index']+1}: {event['id']} (score={event['score']})")
elif t == "ttft":
print(f"\n[TTFT: {event['ms']}ms] Answer:")
elif t == "token":
answer += event["content"]
print(event["content"], end="", flush=True)
elif t == "citation":
print(f" [+cite:{event['id']}]", end="", flush=True)
elif t == "done":
m = event["metrics"]
print(f"\n\n[METRICS]")
print(f" Total time: {m['total_ms']}ms")
print(f" Retrieval time: {m['retrieval_ms']}ms")
print(f" Tokens: {m['token_count']}")
print(f" Tokens/sec: {m['tokens_per_sec']}")
print(f" Cited sources: {event['cited_sources']}")
await run_complete_pipeline()
SummaryΒΆ
Streaming RAG PatternsΒΆ
Pattern |
When to Use |
Benefit |
|---|---|---|
Sources first |
Always |
User sees provenance in <100ms |
Progressive context |
Large doc sets |
Start generation with partial context |
Incremental search |
Uncertain query scope |
Expand only if needed |
Citation tracking |
Verifiable answers |
Real-time source attribution |
Streaming summarization |
Long documents |
Progressive understanding |
Realtime indexing |
Live data |
Always-fresh knowledge base |
Key Implementation PointsΒΆ
Always stream sources first β users tolerate waiting for the answer but want to see that retrieval worked immediately.
Use async generators everywhere β
async for+AsyncGenerator[T, None]is the composable primitive for streaming pipelines.SSE for web clients β
text/event-streamwithdata: {json}\n\nformat is the standard for server-to-browser streaming.Typed events β all events should have a
typefield so clients can route them correctly.Measure TTFT β Time To First Token is the most important UX metric for streaming RAG. Target <500ms.
Next StepsΒΆ
See
04_production_streaming.ipynbfor production hardening: rate limiting, circuit breakers, Prometheus metrics, load testing.