Streaming RAG PipelineΒΆ

OverviewΒΆ

Retrieval-Augmented Generation (RAG) normally works in two sequential phases: retrieve then generate. This creates an awkward wait: the user sees nothing until both phases complete. Streaming RAG breaks this bottleneck.

Learning ObjectivesΒΆ

  • Understand why naive RAG blocks streaming and how to fix it

  • Implement parallel retrieval + streaming generation

  • Stream source documents to the client before generation starts

  • Build progressive context loading for large document sets

  • Track citations in real time as the LLM generates

  • Create a FastAPI streaming RAG endpoint (SSE)

  • Use LangChain streaming callback handlers

  • Build a complete, runnable streaming RAG system with ChromaDB + OpenAI

PrerequisitesΒΆ

pip install langchain langchain-openai langchain-community chromadb openai fastapi uvicorn httpx python-dotenv tiktoken

1. Challenges of RAG with StreamingΒΆ

The Problem: Retrieval Blocks GenerationΒΆ

Naive RAG Timeline
──────────────────
t=0ms   User sends query
t=50ms  Embed query
t=200ms Vector search completes        <-- user waits
t=250ms Build prompt with context      <-- user waits
t=300ms First token from LLM           <-- user FINALLY sees something
t=2500ms Last token                    <-- done

User perceived wait: 300ms before anything appears

The Solution: Overlap Retrieval and GenerationΒΆ

Streaming RAG Timeline
──────────────────────
t=0ms   User sends query
t=50ms  Embed query
t=60ms  Stream source doc #1 to client  <-- user sees sources immediately
t=80ms  Stream source doc #2 to client
t=100ms Stream source doc #3 to client
t=200ms Vector search fully complete
t=250ms Build prompt
t=300ms First token streams to client
t=2500ms Last token

User perceived wait: 60ms -- sources appear almost instantly

Key TechniquesΒΆ

Technique

Benefit

Stream sources first

User sees provenance immediately

Parallel embed + fetch

Reduce retrieval latency

Progressive context

Start generation with partial context

Incremental vector search

Stream results as they are found

Citation streaming

Track which sources are cited in real time

# Install dependencies (uncomment if needed)
# !pip install langchain langchain-openai langchain-community chromadb openai fastapi uvicorn httpx python-dotenv tiktoken

import asyncio
import json
import os
import time
import uuid
from typing import AsyncGenerator, Dict, List, Optional, Any
from dataclasses import dataclass, field

import chromadb
from chromadb.config import Settings

# OpenAI
try:
    from openai import AsyncOpenAI
    OPENAI_AVAILABLE = True
except ImportError:
    OPENAI_AVAILABLE = False
    print("OpenAI not installed. Install with: pip install openai")

# LangChain
try:
    from langchain_openai import ChatOpenAI, OpenAIEmbeddings
    from langchain.callbacks.base import BaseCallbackHandler
    from langchain.schema import LLMResult
    from langchain.text_splitter import RecursiveCharacterTextSplitter
    from langchain_community.vectorstores import Chroma
    from langchain.chains import RetrievalQA
    LANGCHAIN_AVAILABLE = True
except ImportError:
    LANGCHAIN_AVAILABLE = False
    print("LangChain not fully installed. Install with: pip install langchain langchain-openai langchain-community")

# FastAPI
try:
    from fastapi import FastAPI, HTTPException
    from fastapi.responses import StreamingResponse
    from pydantic import BaseModel
    FASTAPI_AVAILABLE = True
except ImportError:
    FASTAPI_AVAILABLE = False
    print("FastAPI not installed. Install with: pip install fastapi uvicorn")

# Load API key from environment
OPENAI_API_KEY = os.getenv("OPENAI_API_KEY", "")
if not OPENAI_API_KEY:
    print("WARNING: OPENAI_API_KEY not set. Code sections requiring OpenAI will use mock responses.")
    print("Set it with: export OPENAI_API_KEY='sk-...'")
else:
    print(f"OpenAI API key found: {OPENAI_API_KEY[:8]}...")

print("\nImports complete.")

2. Build the Knowledge Base with ChromaDBΒΆ

Vector Indexing as the Foundation of RAGΒΆ

Before any retrieval can happen, documents must be converted into dense vector representations and stored in a vector database. ChromaDB provides an embedded, open-source vector store that handles embedding generation, storage, and ANN search. The indexing step computes an embedding for each document and inserts it into an HNSW index, enabling sub-linear search time.

Why this matters for streaming RAG: retrieval latency is the single largest contributor to TTFT in a RAG pipeline. A well-indexed collection typically returns results in 5-20ms. In production, indexing runs as a batch or incremental pipeline while the query path remains fast and stateless.

# Sample knowledge base documents
DOCUMENTS = [
    {
        "id": "doc-001",
        "content": "Retrieval-Augmented Generation (RAG) is a technique that combines information retrieval with large language models. It retrieves relevant documents from a knowledge base and uses them as context for generating responses. RAG reduces hallucinations by grounding answers in real documents.",
        "metadata": {"source": "rag_overview.txt", "topic": "RAG", "year": 2023}
    },
    {
        "id": "doc-002",
        "content": "Vector databases store high-dimensional embeddings and enable fast approximate nearest-neighbor search. Popular options include ChromaDB (local), Pinecone (cloud), Weaviate, and Qdrant. They use HNSW or IVF indexes for efficient similarity search at scale.",
        "metadata": {"source": "vector_dbs.txt", "topic": "Infrastructure", "year": 2023}
    },
    {
        "id": "doc-003",
        "content": "Server-Sent Events (SSE) is a one-way HTTP streaming protocol where the server pushes data to the client over a persistent connection. It is simpler than WebSockets for streaming text generation since LLM responses are naturally unidirectional. SSE uses Content-Type: text/event-stream.",
        "metadata": {"source": "sse_guide.txt", "topic": "Streaming", "year": 2023}
    },
    {
        "id": "doc-004",
        "content": "LangChain provides a unified interface for building LLM applications. It includes chains for sequential operations, agents for dynamic tool use, and callback handlers for streaming. The streaming=True flag enables token-by-token output from LLMs. Callbacks like StreamingStdOutCallbackHandler print tokens as they arrive.",
        "metadata": {"source": "langchain_docs.txt", "topic": "LangChain", "year": 2024}
    },
    {
        "id": "doc-005",
        "content": "OpenAI's streaming API uses chunk objects with choices[0].delta.content fields. Set stream=True to enable streaming. The stream returns an async iterator of ChatCompletionChunk objects. The final chunk has finish_reason='stop'. Always handle the StopAsyncIteration exception.",
        "metadata": {"source": "openai_streaming.txt", "topic": "OpenAI", "year": 2024}
    },
    {
        "id": "doc-006",
        "content": "Text chunking strategies for RAG include fixed-size chunking (split every N characters), recursive splitting (split on paragraphs then sentences), semantic chunking (split at topic boundaries), and sliding window (overlapping chunks). Chunk size affects retrieval precision: smaller chunks are more precise but lose context.",
        "metadata": {"source": "chunking.txt", "topic": "RAG", "year": 2023}
    },
    {
        "id": "doc-007",
        "content": "Embedding models convert text to dense vectors that capture semantic meaning. OpenAI text-embedding-3-small produces 1536-dimensional vectors. Sentence Transformers (all-MiniLM-L6-v2) produce 384-dimensional vectors locally. Cosine similarity measures semantic closeness between embeddings.",
        "metadata": {"source": "embeddings.txt", "topic": "Embeddings", "year": 2024}
    },
    {
        "id": "doc-008",
        "content": "Streaming response architectures must handle backpressure when clients consume data slower than servers produce it. Techniques include bounded queues, client-side flow control, and dropping slow consumers. In FastAPI, use asyncio.Queue with a maxsize to buffer tokens and detect slow clients.",
        "metadata": {"source": "backpressure.txt", "topic": "Streaming", "year": 2024}
    },
    {
        "id": "doc-009",
        "content": "Prompt engineering for RAG systems: Always include retrieved context before the question. Use clear delimiters like XML tags (<context></context>) to separate context from instructions. Instruct the model to cite sources by document ID. Add a fallback instruction: if the context does not contain the answer, say so.",
        "metadata": {"source": "rag_prompts.txt", "topic": "RAG", "year": 2024}
    },
    {
        "id": "doc-010",
        "content": "FastAPI supports async generators for streaming responses. Use StreamingResponse with an async generator function that yields bytes. For SSE, format each chunk as 'data: {json}\\n\\n'. Set media_type='text/event-stream'. Add Cache-Control: no-cache and Connection: keep-alive headers.",
        "metadata": {"source": "fastapi_streaming.txt", "topic": "FastAPI", "year": 2024}
    }
]

print(f"Knowledge base: {len(DOCUMENTS)} documents")
for doc in DOCUMENTS:
    print(f"  {doc['id']}: [{doc['metadata']['topic']}] {doc['content'][:60]}...")
import numpy as np

# Initialize ChromaDB in-memory (use PersistentClient for production)
chroma_client = chromadb.Client()

# Reset if collection already exists (idempotent for demo)
try:
    chroma_client.delete_collection("streaming_rag")
except Exception:
    pass

collection = chroma_client.create_collection(
    name="streaming_rag",
    metadata={"hnsw:space": "cosine"}
)

# --- Embedding function ---
# Use OpenAI embeddings if key available, else simple random vectors for demo
async def embed_text(text: str) -> List[float]:
    """Embed text using OpenAI or a mock fallback."""
    if OPENAI_API_KEY and OPENAI_AVAILABLE:
        client = AsyncOpenAI(api_key=OPENAI_API_KEY)
        response = await client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return response.data[0].embedding
    else:
        # Deterministic mock embedding based on text hash (384-dim)
        rng = np.random.default_rng(abs(hash(text)) % (2**32))
        vec = rng.standard_normal(384).astype(np.float32)
        vec = vec / np.linalg.norm(vec)
        return vec.tolist()

# --- Index documents ---
async def index_documents(docs: List[Dict]) -> None:
    """Embed and index documents into ChromaDB."""
    print("Indexing documents...")
    ids = []
    embeddings = []
    texts = []
    metadatas = []

    for doc in docs:
        embedding = await embed_text(doc["content"])
        ids.append(doc["id"])
        embeddings.append(embedding)
        texts.append(doc["content"])
        metadatas.append(doc["metadata"])
        print(f"  Indexed: {doc['id']}")

    collection.add(
        ids=ids,
        embeddings=embeddings,
        documents=texts,
        metadatas=metadatas
    )
    print(f"\nTotal documents in collection: {collection.count()}")

# Run indexing
await index_documents(DOCUMENTS)

3. Streaming Search Results Before GenerationΒΆ

Progressive Document DeliveryΒΆ

In a traditional RAG pipeline, the client sees nothing until all retrieval and generation complete – a latency chain that can exceed several seconds. Streaming search results breaks this pattern by yielding each retrieved document to the client as soon as it is scored, using an async generator that produces results incrementally. The cosine similarity score is computed per-document and streamed immediately.

Why this matters: user-perceived latency drops from total retrieval plus generation time to time-to-first-document – typically under 100ms. This is the same pattern that makes Perplexity AI feel fast: sources appear instantly while the answer streams in below. The async generator pattern integrates naturally with FastAPI StreamingResponse and SSE, making it trivial to pipe search results directly to the HTTP response.

async def stream_search_results(
    query: str,
    n_results: int = 3,
    delay_between_results: float = 0.05  # simulate progressive loading
) -> AsyncGenerator[Dict, None]:
    """
    Stream search results one-by-one as they are retrieved.
    In practice, each yield happens as the vector DB returns each hit.
    """
    query_embedding = await embed_text(query)

    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results
    )

    docs = results["documents"][0]
    ids = results["ids"][0]
    distances = results["distances"][0]
    metadatas = results["metadatas"][0]

    for i, (doc_id, doc_text, distance, meta) in enumerate(zip(ids, docs, distances, metadatas)):
        # Yield one source at a time -- client can display it immediately
        yield {
            "type": "source",
            "index": i,
            "id": doc_id,
            "content": doc_text,
            "score": round(1 - distance, 4),   # cosine similarity
            "metadata": meta
        }
        await asyncio.sleep(delay_between_results)  # simulate network/IO


# --- Demo ---
query = "How does RAG work with vector databases?"
print(f"Query: {query}\n")
print("Streaming sources as they arrive:")
print("-" * 60)

async def demo_streaming_search():
    async for event in stream_search_results(query, n_results=3):
        print(f"[t={time.time():.3f}] Source #{event['index']+1}: {event['id']}")
        print(f"  Score: {event['score']}")
        print(f"  Topic: {event['metadata']['topic']}")
        print(f"  Content: {event['content'][:80]}...")
        print()

await demo_streaming_search()

4. Progressive Context Loading PatternΒΆ

Start Fast, Enrich LaterΒΆ

Progressive context loading addresses a fundamental tension in RAG: more context improves answer quality, but waiting for all documents delays generation. The solution is to load context in waves – start LLM generation with only the top-1 most relevant document, then augment the context window with additional documents as they arrive. This mirrors how human experts work: begin with the most authoritative source, then cross-reference.

The latency math is compelling. If retrieval returns \(k\) documents in \(T_{retrieval}\) ms and generation TTFT is \(T_{gen}\) ms, naive RAG waits for the full retrieval before starting generation. With progressive loading at \(k=1\), TTFT drops dramatically since the first result arrives in roughly \(1/k\) of total retrieval time. For \(k=6\), this can reduce perceived wait by 80%. The tradeoff is that early tokens may be revised as more context arrives, which is why this pattern works best with sources-first-then-answer UX layouts.

async def progressive_context_loader(
    query: str,
    k_increments: List[int] = [1, 2, 3]
) -> AsyncGenerator[Dict, None]:
    """
    Load context progressively:
    - First yield: 1 document (start generation fast)
    - Second yield: 2 documents (more context)
    - Third yield: 3 documents (full context for final answer)
    """
    query_embedding = await embed_text(query)
    max_k = max(k_increments)

    # Single vector search, then serve results progressively
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=max_k
    )

    all_docs = [
        {
            "id": doc_id,
            "content": text,
            "score": round(1 - dist, 4),
            "metadata": meta
        }
        for doc_id, text, dist, meta in zip(
            results["ids"][0],
            results["documents"][0],
            results["distances"][0],
            results["metadatas"][0]
        )
    ]

    prev_k = 0
    for k in k_increments:
        new_docs = all_docs[prev_k:k]
        context_so_far = all_docs[:k]

        yield {
            "type": "context_update",
            "wave": k_increments.index(k) + 1,
            "total_waves": len(k_increments),
            "new_docs": new_docs,
            "context": context_so_far,
            "context_text": "\n\n".join(
                f"[{d['id']}] {d['content']}" for d in context_so_far
            )
        }
        prev_k = k
        await asyncio.sleep(0.05)  # simulate latency between waves


# --- Demo ---
query = "What is LangChain and how does it support streaming?"
print(f"Query: {query}\n")

async def demo_progressive_context():
    async for update in progressive_context_loader(query, k_increments=[1, 2, 3]):
        print(f"Wave {update['wave']}/{update['total_waves']}: +{len(update['new_docs'])} doc(s)")
        for d in update["new_docs"]:
            print(f"  Added: {d['id']} (score={d['score']}) [{d['metadata']['topic']}]")
        print(f"  Context length: {len(update['context_text'])} chars")
        print(f"  -> Would start/update generation with {update['wave']*1} doc(s) of context")
        print()

await demo_progressive_context()

5. Sources First, Then Streaming AnswerΒΆ

The Canonical Streaming RAG UX PatternΒΆ

This is the dominant pattern used by Perplexity AI, ChatGPT with Browse, Google AI Overviews, and most production RAG applications. The pipeline executes in three strict phases: (1) retrieve all sources (typically 50-200ms), (2) immediately stream each source document to the client as a structured event, (3) build the prompt from sources and stream LLM tokens as they arrive. The client renders a Sources panel first, then the answer streams in below it.

Why this ordering matters: by showing sources before the answer, users can assess provenance while waiting for generation. This builds trust – the user sees where the answer comes from before seeing what the answer says. From an engineering perspective, the pattern also enables client-side optimizations: the frontend can pre-fetch source URLs, render thumbnails, or highlight relevant passages while tokens are still streaming. The streaming_rag_pipeline async generator yields typed events (source, token, done) that map directly to SSE data payloads.

async def mock_streaming_llm(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> AsyncGenerator[str, None]:
    """
    Mock streaming LLM. Yields tokens word-by-word with small delays.
    Replace with real OpenAI/Anthropic call in production.
    """
    response = (
        "Based on the retrieved context, RAG combines retrieval systems with "
        "language models to produce grounded, citation-backed answers. "
        "The vector database (doc-001, doc-002) handles fast similarity search, "
        "while the LLM (doc-004, doc-005) generates coherent prose from the context."
    )
    words = response.split(" ")
    for word in words:
        yield word + " "
        await asyncio.sleep(0.04)  # simulate ~25 tokens/sec


async def real_streaming_llm(
    prompt: str,
    model: str = "gpt-4o-mini"
) -> AsyncGenerator[str, None]:
    """
    Real OpenAI streaming. Requires OPENAI_API_KEY.
    """
    client = AsyncOpenAI(api_key=OPENAI_API_KEY)
    stream = await client.chat.completions.create(
        model=model,
        messages=[{"role": "user", "content": prompt}],
        stream=True,
        temperature=0.3,
        max_tokens=512,
    )
    async for chunk in stream:
        delta = chunk.choices[0].delta.content
        if delta:
            yield delta


async def streaming_rag_pipeline(
    query: str,
    n_results: int = 3
) -> AsyncGenerator[Dict, None]:
    """
    Full streaming RAG pipeline:
    1. Retrieve and stream sources
    2. Build prompt
    3. Stream LLM tokens
    """
    sources = []

    # Phase 1: Stream sources to client
    yield {"type": "status", "message": "Searching knowledge base..."}

    query_embedding = await embed_text(query)
    results = collection.query(
        query_embeddings=[query_embedding],
        n_results=n_results
    )

    for i, (doc_id, text, distance, meta) in enumerate(zip(
        results["ids"][0],
        results["documents"][0],
        results["distances"][0],
        results["metadatas"][0]
    )):
        source = {
            "type": "source",
            "index": i,
            "id": doc_id,
            "content": text,
            "score": round(1 - distance, 4),
            "metadata": meta
        }
        sources.append(source)
        yield source  # Client sees this immediately

    # Phase 2: Build prompt
    yield {"type": "status", "message": "Generating answer..."}

    context = "\n\n".join(
        f"[Source {i+1}: {s['id']}]\n{s['content']}"
        for i, s in enumerate(sources)
    )
    prompt = f"""You are a helpful assistant. Answer the question using ONLY the provided sources.
Cite sources by their ID (e.g., [doc-001]). If the sources don't contain enough information, say so.

<context>
{context}
</context>

Question: {query}

Answer:"""

    # Phase 3: Stream tokens
    llm_stream = real_streaming_llm(prompt) if OPENAI_API_KEY and OPENAI_AVAILABLE else mock_streaming_llm(prompt)

    async for token in llm_stream:
        yield {"type": "token", "content": token}

    yield {"type": "done", "source_ids": [s["id"] for s in sources]}


# --- Demo ---
query = "How does vector search work in RAG systems?"
print(f"Query: {query}\n")
print("=" * 60)

answer_tokens = []

async def demo_full_pipeline():
    async for event in streaming_rag_pipeline(query, n_results=3):
        if event["type"] == "status":
            print(f"\n[STATUS] {event['message']}")
        elif event["type"] == "source":
            print(f"  [SOURCE #{event['index']+1}] {event['id']} (score={event['score']})")
            print(f"    {event['content'][:80]}...")
        elif event["type"] == "token":
            print(event["content"], end="", flush=True)
            answer_tokens.append(event["content"])
        elif event["type"] == "done":
            print(f"\n\n[DONE] Sources used: {event['source_ids']}")

await demo_full_pipeline()

6. Streaming Citation TrackingΒΆ

Real-Time Source AttributionΒΆ

As LLM tokens stream in, citation tracking detects source references (e.g., [doc-001]) in real time using a rolling buffer and regex pattern matching. The CitationTracker maintains a buffer of the last \(N\) characters (enough to capture a full citation token like [doc-001]) and scans for matches after each new token arrives. Newly detected citations are returned immediately, enabling the UI to highlight the corresponding source card the moment it is referenced.

Why this matters for trust and UX: without real-time citation tracking, citations are only resolved after generation completes – the user reads an entire paragraph before knowing which source backs each claim. With streaming detection, the source panel lights up progressively as the answer references each document. This is also critical for hallucination detection: if the model cites a source ID not in the retrieved set, the tracker flags it immediately, allowing the system to insert a warning or trigger re-retrieval.

import re

class CitationTracker:
    """
    Tracks citations (e.g., [doc-001]) as tokens stream in.
    Maintains a rolling buffer to detect citations that span multiple tokens.
    """

    CITATION_PATTERN = re.compile(r'\[doc-\d+\]')

    def __init__(self, known_source_ids: List[str]):
        self.known_source_ids = set(known_source_ids)
        self.buffer = ""          # rolling window for pattern matching
        self.buffer_max = 20      # enough to capture [doc-001]
        self.cited: List[str] = []  # ordered list of first-appearance citations
        self.seen: set = set()    # dedup
        self.total_tokens = 0
        self.full_text = ""

    def feed(self, token: str) -> List[str]:
        """
        Feed a new token. Returns list of newly detected citation IDs.
        """
        self.full_text += token
        self.buffer += token
        self.total_tokens += 1

        # Trim buffer
        if len(self.buffer) > self.buffer_max:
            self.buffer = self.buffer[-self.buffer_max:]

        new_citations = []
        for match in self.CITATION_PATTERN.finditer(self.full_text):
            citation_id = match.group(0).strip("[]")  # "doc-001"
            if citation_id not in self.seen and citation_id in self.known_source_ids:
                self.seen.add(citation_id)
                self.cited.append(citation_id)
                new_citations.append(citation_id)

        return new_citations

    def get_summary(self) -> Dict:
        return {
            "total_tokens": self.total_tokens,
            "cited_sources": self.cited,
            "uncited_sources": [
                s for s in self.known_source_ids if s not in self.seen
            ]
        }


# --- Demo with simulated streaming text ---
source_ids = ["doc-001", "doc-002", "doc-003"]
tracker = CitationTracker(source_ids)

# Simulate streaming tokens that include citations
simulated_tokens = [
    "RAG ", "combines ", "retrieval ", "[doc-001] ", "with ", "generation. ",
    "Vector ", "databases ", "[doc-002] ", "store ", "embeddings. ",
    "The ", "system ", "also ", "relies ", "on ", "[doc-001] ", "for ", "context."
]

print("Tracking citations as tokens stream in:")
print("-" * 50)

for i, token in enumerate(simulated_tokens):
    new_citations = tracker.feed(token)
    if new_citations:
        print(f"  Token #{i+1} '{token.strip()}' -> NEW CITATION: {new_citations}")

print()
summary = tracker.get_summary()
print(f"Summary:")
print(f"  Total tokens processed: {summary['total_tokens']}")
print(f"  Cited sources: {summary['cited_sources']}")
print(f"  Uncited sources: {summary['uncited_sources']}")
print(f"  Full text: {tracker.full_text}")

8. Streaming Summarization of Long DocumentsΒΆ

Chunked Progressive SummarizationΒΆ

When a retrieved document exceeds the LLM context window or is too long for a single prompt, it must be chunked and summarized progressively. The document is split into overlapping segments (using a sliding window with configurable chunk_size and overlap), and each chunk is summarized sequentially. The rolling summary builds up in real time, with each chunk appended to the running output and streamed to the client.

Why this matters for RAG: many enterprise documents (legal contracts, technical manuals, research papers) are 10,000-100,000+ tokens – far exceeding typical context windows. The naive approach of truncating to the first \(N\) tokens loses critical information. Progressive summarization processes the entire document while giving the user immediate feedback: they see the summary build chunk by chunk. The chunk_document function with overlap ensures no information is lost at chunk boundaries, and the streaming architecture means the user starts reading within seconds even for very long documents.

def chunk_document(text: str, chunk_size: int = 300, overlap: int = 50) -> List[str]:
    """Split a document into overlapping chunks."""
    chunks = []
    start = 0
    while start < len(text):
        end = min(start + chunk_size, len(text))
        chunks.append(text[start:end])
        start += chunk_size - overlap
    return chunks


async def stream_document_summary(
    document: str,
    document_id: str = "unknown",
    chunk_size: int = 300
) -> AsyncGenerator[Dict, None]:
    """
    Stream a rolling summary of a long document.
    Processes each chunk and yields partial summaries.
    """
    chunks = chunk_document(document, chunk_size=chunk_size)
    yield {
        "type": "summary_start",
        "document_id": document_id,
        "total_chunks": len(chunks),
        "total_chars": len(document)
    }

    rolling_summary = ""

    for i, chunk in enumerate(chunks):
        # In production: send rolling_summary + chunk to LLM
        # Here we simulate a mock summary update
        chunk_summary = f"[Chunk {i+1}/{len(chunks)}]: {chunk[:80].strip()}..."

        rolling_summary += (" " if rolling_summary else "") + chunk_summary

        yield {
            "type": "summary_chunk",
            "chunk_index": i,
            "chunk_text": chunk[:60] + "...",
            "partial_summary": rolling_summary[-200:],  # last 200 chars of rolling summary
            "progress": round((i + 1) / len(chunks) * 100, 1)
        }
        await asyncio.sleep(0.03)

    yield {
        "type": "summary_done",
        "document_id": document_id,
        "final_summary": f"Document '{document_id}' covers {len(chunks)} sections on the topic."
    }


# --- Demo ---
long_doc = (
    "Retrieval-Augmented Generation is a powerful paradigm in modern NLP. "
    "It works by first retrieving relevant documents from a knowledge base "
    "using semantic similarity, then feeding those documents as context to a "
    "large language model for generation. "
    "The retrieval component typically uses a vector database like ChromaDB, "
    "Pinecone, or Weaviate. Queries are embedded using models like "
    "text-embedding-3-small or all-MiniLM-L6-v2. The top-k most similar "
    "documents are returned and inserted into the prompt. "
    "The generation component can be any LLM: GPT-4, Claude, Llama, etc. "
    "RAG reduces hallucinations by grounding the model in factual documents. "
    "Advanced RAG techniques include HyDE (Hypothetical Document Embeddings), "
    "multi-query retrieval, reranking with cross-encoders, and "
    "self-RAG where the model decides when to retrieve."
) * 3  # Make it longer

print(f"Document length: {len(long_doc)} chars")
print("Streaming summary:\n")

async def demo_streaming_summary():
    async for event in stream_document_summary(long_doc, document_id="rag_overview", chunk_size=200):
        if event["type"] == "summary_start":
            print(f"Starting summary: {event['total_chunks']} chunks, {event['total_chars']} chars")
        elif event["type"] == "summary_chunk":
            print(f"  [{event['progress']:5.1f}%] Processed chunk {event['chunk_index']+1}")
        elif event["type"] == "summary_done":
            print(f"\nFinal summary: {event['final_summary']}")

await demo_streaming_summary()

9. Real-Time Document Processing PipelineΒΆ

Streaming Ingestion with Status UpdatesΒΆ

Production RAG systems need to ingest new documents continuously – not just at batch-indexing time. The real-time document processing pipeline takes each new document through four stages: received (acknowledged), chunked (split into segments), embedded (vector representations computed), and indexed (inserted into ChromaDB). At each stage, a status event is streamed to the caller via an async generator, providing live visibility into pipeline progress.

Why this matters: in enterprise deployments, users often ask questions about documents they just uploaded. Without real-time ingestion, there is an unpredictable delay between upload and queryability. By streaming status updates, the application can show a progress indicator (β€œEmbedding document… 3/5 chunks complete”) and notify the user the instant their document becomes searchable. The pipeline also handles errors per-document rather than failing the entire batch, making it robust to malformed inputs or transient embedding API failures.

async def realtime_doc_pipeline(
    new_docs: List[Dict]
) -> AsyncGenerator[Dict, None]:
    """
    Process and index new documents in real time.
    Streams status for each document as it moves through the pipeline:
      received -> chunking -> embedding -> indexing -> ready
    """
    yield {
        "type": "pipeline_start",
        "total_docs": len(new_docs),
        "timestamp": time.time()
    }

    indexed_count = 0

    for doc in new_docs:
        doc_id = doc.get("id", str(uuid.uuid4())[:8])
        content = doc["content"]

        # Stage 1: Received
        yield {"type": "doc_stage", "doc_id": doc_id, "stage": "received",
               "chars": len(content)}
        await asyncio.sleep(0.01)

        # Stage 2: Chunking
        chunks = chunk_document(content, chunk_size=500)
        yield {"type": "doc_stage", "doc_id": doc_id, "stage": "chunked",
               "num_chunks": len(chunks)}
        await asyncio.sleep(0.01)

        # Stage 3: Embedding
        yield {"type": "doc_stage", "doc_id": doc_id, "stage": "embedding"}
        chunk_embeddings = []
        for chunk in chunks:
            emb = await embed_text(chunk)
            chunk_embeddings.append(emb)
        yield {"type": "doc_stage", "doc_id": doc_id, "stage": "embedded",
               "embedding_dims": len(chunk_embeddings[0])}
        await asyncio.sleep(0.01)

        # Stage 4: Indexing
        yield {"type": "doc_stage", "doc_id": doc_id, "stage": "indexing"}
        try:
            collection.add(
                ids=[f"{doc_id}_chunk_{i}" for i in range(len(chunks))],
                embeddings=chunk_embeddings,
                documents=chunks,
                metadatas=[doc.get("metadata", {}) for _ in chunks]
            )
            indexed_count += 1
            yield {"type": "doc_stage", "doc_id": doc_id, "stage": "ready",
                   "num_chunks_indexed": len(chunks)}
        except Exception as e:
            yield {"type": "doc_error", "doc_id": doc_id, "error": str(e)}

    yield {
        "type": "pipeline_done",
        "indexed": indexed_count,
        "failed": len(new_docs) - indexed_count,
        "collection_size": collection.count(),
        "elapsed_ms": round((time.time()) * 1000) % 10000
    }


# --- Demo ---
new_documents = [
    {
        "id": "new-doc-A",
        "content": "Agentic RAG extends standard RAG by allowing the LLM to iteratively refine its retrieval strategy. The agent can decide to search for more information, rephrase queries, or combine results from multiple searches before generating an answer.",
        "metadata": {"source": "agentic_rag.txt", "topic": "Agents"}
    },
    {
        "id": "new-doc-B",
        "content": "Reranking in RAG uses a cross-encoder model to re-score initial retrieval results. Unlike bi-encoders used for initial retrieval, cross-encoders process the query and document together, producing more accurate relevance scores at the cost of higher latency.",
        "metadata": {"source": "reranking.txt", "topic": "RAG"}
    }
]

print("Real-time document processing pipeline:")
print("=" * 60)

async def demo_pipeline():
    async for event in realtime_doc_pipeline(new_documents):
        t = event["type"]
        if t == "pipeline_start":
            print(f"Starting pipeline for {event['total_docs']} documents")
        elif t == "doc_stage":
            extra = {k: v for k, v in event.items() if k not in ("type", "doc_id", "stage")}
            print(f"  [{event['doc_id']}] {event['stage']:12s} {extra}")
        elif t == "doc_error":
            print(f"  [{event['doc_id']}] ERROR: {event['error']}")
        elif t == "pipeline_done":
            print(f"\nPipeline complete: {event['indexed']} indexed, {event['failed']} failed")
            print(f"Collection now has {event['collection_size']} total entries")

await demo_pipeline()

10. FastAPI Streaming RAG EndpointΒΆ

Production SSE API for RAGΒΆ

This section assembles the streaming RAG components into a production-ready FastAPI application with three endpoints: POST /rag/stream for SSE-based streaming RAG queries, POST /index for adding new documents, and GET /health for load balancer health checks. Each SSE event is a JSON object with a type field that the client uses for routing (status, source, token, done, error).

Key architectural decisions: the /rag/stream endpoint uses StreamingResponse with media_type text/event-stream and sets Cache-Control: no-cache, Connection: keep-alive, and X-Accel-Buffering: no (to disable nginx buffering). The stream_sources parameter lets clients opt out of source streaming if they only want tokens. Cancellation is handled via asyncio.CancelledError – when a client disconnects mid-stream, the generator cleans up gracefully rather than leaving orphaned coroutines. This pattern maps directly to what production services like Vercel AI SDK and LangServe implement.

from fastapi import FastAPI, Request
from fastapi.responses import StreamingResponse
from pydantic import BaseModel
import uvicorn

app = FastAPI(title="Streaming RAG API", version="1.0.0")


class RAGQuery(BaseModel):
    query: str
    n_results: int = 3
    stream_sources: bool = True


class IndexRequest(BaseModel):
    documents: List[Dict]


def sse_format(data: Dict) -> str:
    """Format a dict as an SSE event."""
    return f"data: {json.dumps(data)}\n\n"


async def rag_event_stream(
    query: str,
    n_results: int,
    stream_sources: bool
) -> AsyncGenerator[str, None]:
    """Async generator that yields SSE-formatted events."""
    try:
        async for event in streaming_rag_pipeline(query, n_results=n_results):
            # Skip sources if client doesn't want them
            if not stream_sources and event["type"] == "source":
                continue
            yield sse_format(event)
    except asyncio.CancelledError:
        # Client disconnected
        yield sse_format({"type": "cancelled", "reason": "client_disconnected"})
    except Exception as e:
        yield sse_format({"type": "error", "message": str(e)})


@app.post("/rag/stream")
async def stream_rag(request: RAGQuery, http_request: Request):
    """
    Streaming RAG endpoint using Server-Sent Events.

    Response event types:
    - {"type": "status", "message": "..."}
    - {"type": "source", "id": "doc-001", "content": "...", "score": 0.92}
    - {"type": "token", "content": "word "}
    - {"type": "done", "source_ids": [...]}
    - {"type": "error", "message": "..."}
    """
    return StreamingResponse(
        rag_event_stream(request.query, request.n_results, request.stream_sources),
        media_type="text/event-stream",
        headers={
            "Cache-Control": "no-cache",
            "Connection": "keep-alive",
            "X-Accel-Buffering": "no",  # Disable nginx buffering
        }
    )


@app.post("/index")
async def index_new_docs(request: IndexRequest):
    """
    Index new documents. Returns a summary of the indexing operation.
    For streaming indexing status, use /index/stream.
    """
    results = []
    async for event in realtime_doc_pipeline(request.documents):
        results.append(event)
    done_event = next((e for e in results if e["type"] == "pipeline_done"), {})
    return {"status": "ok", "summary": done_event}


@app.get("/health")
async def health():
    return {
        "status": "healthy",
        "collection_size": collection.count(),
        "openai_configured": bool(OPENAI_API_KEY)
    }


# --- Show the app's routes ---
print("FastAPI app configured. Routes:")
for route in app.routes:
    if hasattr(route, 'methods'):
        print(f"  {list(route.methods)} {route.path}")

print("\nTo run the server:")
print("  uvicorn notebook_rag_app:app --reload --port 8000")
print("\nTo test streaming:")
print('  curl -X POST http://localhost:8000/rag/stream \\')
print('    -H "Content-Type: application/json" \\')
print('    -d \'{"query": "What is RAG?", "n_results": 3}\'')
# Test the streaming RAG endpoint directly (without HTTP)
# This simulates what the HTTP client would receive

print("Simulating SSE stream for: 'How does FastAPI support streaming?'")
print("=" * 60)

test_query = RAGQuery(query="How does FastAPI support streaming?", n_results=2)

async def test_rag_endpoint():
    event_count = 0
    answer = ""
    sources_received = []

    async for sse_line in rag_event_stream(
        test_query.query,
        test_query.n_results,
        test_query.stream_sources
    ):
        event_count += 1
        # Parse the SSE line
        if sse_line.startswith("data: "):
            data = json.loads(sse_line[6:].strip())
            if data["type"] == "status":
                print(f"[STATUS] {data['message']}")
            elif data["type"] == "source":
                sources_received.append(data["id"])
                print(f"[SOURCE] {data['id']} score={data['score']}")
            elif data["type"] == "token":
                answer += data["content"]
                print(data["content"], end="", flush=True)
            elif data["type"] == "done":
                print(f"\n[DONE] {data}")

    print(f"\n\nTotal SSE events: {event_count}")
    print(f"Sources received: {sources_received}")
    print(f"Answer length: {len(answer)} chars")

await test_rag_endpoint()

11. LangChain Streaming Callback HandlersΒΆ

Framework-Native Streaming IntegrationΒΆ

LangChain callback system provides hooks into every stage of a chain execution – retrieval start/end, LLM start/end, and crucially on_new_token for streaming. The StreamingRAGCallback class implements these hooks to capture retrieval events and route individual tokens to an asyncio.Queue, enabling a consumer (such as a FastAPI SSE endpoint) to yield tokens as they arrive without polling.

Why callbacks over raw streaming: LangChain chains compose multiple components (retriever, prompt template, LLM, output parser), and the callback system provides visibility into all of them through a single interface. The on_retriever_end callback fires when documents are retrieved (before the LLM even starts), enabling the sources-first pattern. The on_llm_new_token callback is the hot path – it must be fast (no blocking I/O) to avoid slowing generation. Using asyncio.Queue with put_nowait ensures the callback returns immediately, with backpressure handled by the queue maxsize parameter.

import threading
from queue import Queue, Empty
from typing import Union

# LangChain streaming callback handler
class StreamingRAGCallback:
    """
    A callback handler that captures LangChain streaming events.
    Works with both sync and async LangChain pipelines.
    """

    def __init__(self):
        self.tokens: List[str] = []
        self.retrieved_docs: List[Dict] = []
        self.llm_started = False
        self.llm_ended = False
        self.errors: List[str] = []
        self._token_queue: asyncio.Queue = None
        self._done = False

    def setup_async_queue(self):
        self._token_queue = asyncio.Queue()
        return self._token_queue

    # --- Simulated callback methods (matching LangChain's BaseCallbackHandler interface) ---

    def on_retriever_start(self, serialized: Dict, query: str, **kwargs):
        """Called when retriever starts."""
        print(f"[Callback] Retriever started: query='{query[:50]}'")

    def on_retriever_end(self, documents: List, **kwargs):
        """Called when retriever finishes -- documents are available."""
        for doc in documents:
            self.retrieved_docs.append({
                "content": str(doc)[:100] if doc else "",
                "source": "retrieved"
            })
        print(f"[Callback] Retriever done: {len(documents)} docs retrieved")

    def on_llm_start(self, serialized: Dict, prompts: List[str], **kwargs):
        """Called when LLM generation starts."""
        self.llm_started = True
        print(f"[Callback] LLM started with {len(prompts)} prompt(s)")

    def on_llm_new_token(self, token: str, **kwargs):
        """
        Called for each new token during streaming.
        This is the hot path -- keep it fast.
        """
        self.tokens.append(token)
        # In async context, put token in queue for the consumer
        if self._token_queue is not None:
            try:
                self._token_queue.put_nowait({"type": "token", "content": token})
            except asyncio.QueueFull:
                pass  # Drop token if consumer is too slow

    def on_llm_end(self, response, **kwargs):
        """Called when LLM generation ends."""
        self.llm_ended = True
        self._done = True
        if self._token_queue is not None:
            self._token_queue.put_nowait({"type": "done", "total_tokens": len(self.tokens)})
        print(f"[Callback] LLM ended: {len(self.tokens)} tokens generated")

    def on_llm_error(self, error: Exception, **kwargs):
        """Called on LLM error."""
        self.errors.append(str(error))
        if self._token_queue is not None:
            self._token_queue.put_nowait({"type": "error", "message": str(error)})
        print(f"[Callback] LLM error: {error}")

    def get_full_response(self) -> str:
        return "".join(self.tokens)


# --- Simulate a LangChain-style streaming call ---
callback = StreamingRAGCallback()

# Simulate the retrieval phase
callback.on_retriever_start({"name": "ChromaRetriever"}, "What is RAG?")
mock_docs = ["RAG combines retrieval with generation.", "Vector DBs store embeddings."]
callback.on_retriever_end(mock_docs)

# Simulate LLM streaming phase
callback.on_llm_start({"name": "ChatOpenAI"}, ["Answer: What is RAG?"])

mock_tokens = ["RAG ", "stands ", "for ", "Retrieval-Augmented ", "Generation. ",
               "It ", "combines ", "a ", "retriever ", "with ", "an ", "LLM."]
for tok in mock_tokens:
    callback.on_llm_new_token(tok)

callback.on_llm_end({"generations": []})

print(f"\nFull response: {callback.get_full_response()}")
print(f"Retrieved docs: {len(callback.retrieved_docs)}")
# Real LangChain streaming RAG with ChromaDB (requires OPENAI_API_KEY)
# This shows the actual LangChain integration pattern

LANGCHAIN_RAG_CODE = '''
# Requires: pip install langchain langchain-openai langchain-community chromadb
import os
from langchain_openai import ChatOpenAI, OpenAIEmbeddings
from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
from langchain_community.vectorstores import Chroma
from langchain.chains import RetrievalQA
from langchain.schema import Document

# 1. Build vector store
embedding_model = OpenAIEmbeddings(model="text-embedding-3-small")

docs = [
    Document(page_content="RAG combines retrieval with generation.", metadata={"source": "doc1"}),
    Document(page_content="Vector databases store embeddings for fast search.", metadata={"source": "doc2"}),
    Document(page_content="FastAPI supports streaming via async generators.", metadata={"source": "doc3"}),
]

vectorstore = Chroma.from_documents(
    documents=docs,
    embedding=embedding_model,
    collection_name="langchain_demo"
)
retriever = vectorstore.as_retriever(search_kwargs={"k": 2})

# 2. Build streaming LLM
streaming_callback = StreamingStdOutCallbackHandler()

llm = ChatOpenAI(
    model="gpt-4o-mini",
    streaming=True,
    callbacks=[streaming_callback],
    temperature=0.2
)

# 3. Build RAG chain
rag_chain = RetrievalQA.from_chain_type(
    llm=llm,
    chain_type="stuff",
    retriever=retriever,
    return_source_documents=True,
    verbose=True
)

# 4. Run with streaming
# Tokens print in real time via StreamingStdOutCallbackHandler
result = rag_chain.invoke({"query": "How does RAG work with FastAPI?"})
print("\\nSource documents:")
for doc in result["source_documents"]:
    print(f"  {doc.metadata['source']}: {doc.page_content[:60]}")
'''

print("LangChain streaming RAG code (run when OPENAI_API_KEY is set):")
print("=" * 60)
print(LANGCHAIN_RAG_CODE)

if LANGCHAIN_AVAILABLE and OPENAI_API_KEY:
    print("\nOpenAI key found -- running real LangChain example...")
    try:
        from langchain_openai import ChatOpenAI, OpenAIEmbeddings
        from langchain.callbacks.streaming_stdout import StreamingStdOutCallbackHandler
        from langchain_community.vectorstores import Chroma
        from langchain.chains import RetrievalQA
        from langchain.schema import Document

        embedding_model = OpenAIEmbeddings(
            model="text-embedding-3-small",
            api_key=OPENAI_API_KEY
        )
        lc_docs = [
            Document(page_content=d["content"], metadata=d["metadata"])
            for d in DOCUMENTS[:5]
        ]
        vectorstore = Chroma.from_documents(lc_docs, embedding_model)
        retriever = vectorstore.as_retriever(search_kwargs={"k": 2})
        llm = ChatOpenAI(
            model="gpt-4o-mini",
            streaming=True,
            callbacks=[StreamingStdOutCallbackHandler()],
            api_key=OPENAI_API_KEY
        )
        rag_chain = RetrievalQA.from_chain_type(
            llm=llm,
            chain_type="stuff",
            retriever=retriever,
            return_source_documents=True
        )
        result = rag_chain.invoke({"query": "What is streaming RAG?"})
        print("\nSources:", [d.metadata for d in result.get("source_documents", [])])
    except Exception as e:
        print(f"Error: {e}")
else:
    print("\n(Skipped: requires OPENAI_API_KEY and langchain packages)")

12. Complete Working Streaming RAG - ChromaDB + OpenAIΒΆ

End-to-End IntegrationΒΆ

This section ties together every pattern from the notebook into a single, self-contained StreamingRAGPipeline class: vector indexing with ChromaDB, streaming search results, citation tracking, and SSE-formatted token delivery. The class encapsulates the full lifecycle – from document ingestion to streaming query response – behind a clean async generator interface that any web framework can consume.

Why a unified class matters: in production, streaming RAG involves coordinating five async subsystems (embedding, retrieval, prompt construction, generation, citation tracking) with shared state (the ChromaDB collection, API clients, citation tracker). Encapsulating these in a single class ensures consistent error handling, resource cleanup, and configuration. The async generator pattern (async for event in pipeline.query(…)) makes it trivial to plug into FastAPI, Django Channels, or any ASGI framework.

class StreamingRAGPipeline:
    """
    Complete streaming RAG pipeline.

    Features:
    - Source documents streamed before generation
    - Real-time citation tracking
    - Incremental context loading
    - OpenAI streaming (with mock fallback)
    - Performance metrics
    """

    def __init__(
        self,
        chroma_collection,
        openai_api_key: str = "",
        model: str = "gpt-4o-mini",
        n_results: int = 3
    ):
        self.collection = chroma_collection
        self.api_key = openai_api_key
        self.model = model
        self.n_results = n_results
        self._openai_client = None

    def _get_openai_client(self):
        if self._openai_client is None and self.api_key and OPENAI_AVAILABLE:
            self._openai_client = AsyncOpenAI(api_key=self.api_key)
        return self._openai_client

    def _build_prompt(self, query: str, sources: List[Dict]) -> str:
        context_parts = [
            f"[{s['id']}] (relevance: {s['score']})\n{s['content']}"
            for s in sources
        ]
        context = "\n\n---\n\n".join(context_parts)
        return f"""You are a precise, citation-focused assistant.
Answer the question using ONLY the provided context documents.
Cite sources using their IDs like [doc-001]. If unsure, say so.

<context>
{context}
</context>

Question: {query}
Answer:"""

    async def _stream_llm(self, prompt: str) -> AsyncGenerator[str, None]:
        client = self._get_openai_client()
        if client:
            stream = await client.chat.completions.create(
                model=self.model,
                messages=[{"role": "user", "content": prompt}],
                stream=True,
                temperature=0.2,
                max_tokens=512
            )
            async for chunk in stream:
                delta = chunk.choices[0].delta.content
                if delta:
                    yield delta
        else:
            # Mock fallback
            async for token in mock_streaming_llm(prompt):
                yield token

    async def query(
        self,
        question: str
    ) -> AsyncGenerator[Dict, None]:
        """
        Execute a streaming RAG query.
        Yields typed events: status, source, token, citation, metrics, done.
        """
        start_time = time.perf_counter()
        first_token_time = None
        token_count = 0
        sources = []

        yield {"type": "status", "phase": "retrieving", "message": "Searching knowledge base..."}

        # Retrieval
        query_embedding = await embed_text(question)
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=self.n_results
        )

        retrieval_time_ms = round((time.perf_counter() - start_time) * 1000, 1)

        for i, (doc_id, text, dist, meta) in enumerate(zip(
            results["ids"][0],
            results["documents"][0],
            results["distances"][0],
            results["metadatas"][0]
        )):
            source = {
                "type": "source",
                "index": i,
                "id": doc_id,
                "content": text,
                "score": round(1 - dist, 4),
                "metadata": meta
            }
            sources.append(source)
            yield source

        yield {"type": "status", "phase": "generating",
               "message": f"Generating answer from {len(sources)} sources...",
               "retrieval_ms": retrieval_time_ms}

        # Citation tracker
        tracker = CitationTracker([s["id"] for s in sources])

        # Generation
        prompt = self._build_prompt(question, sources)
        async for token in self._stream_llm(prompt):
            if first_token_time is None:
                first_token_time = time.perf_counter()
                ttft_ms = round((first_token_time - start_time) * 1000, 1)
                yield {"type": "ttft", "ms": ttft_ms}  # Time to first token

            token_count += 1
            new_citations = tracker.feed(token)

            yield {"type": "token", "content": token}

            for cite_id in new_citations:
                yield {"type": "citation", "id": cite_id, "token_index": token_count}

        # Final metrics
        total_time = time.perf_counter() - start_time
        tokens_per_sec = round(token_count / total_time, 1) if total_time > 0 else 0

        yield {
            "type": "done",
            "cited_sources": tracker.cited,
            "uncited_sources": tracker.get_summary()["uncited_sources"],
            "metrics": {
                "total_ms": round(total_time * 1000, 1),
                "retrieval_ms": retrieval_time_ms,
                "token_count": token_count,
                "tokens_per_sec": tokens_per_sec
            }
        }


# --- Run the complete pipeline ---
pipeline = StreamingRAGPipeline(
    chroma_collection=collection,
    openai_api_key=OPENAI_API_KEY,
    model="gpt-4o-mini",
    n_results=3
)

print("Complete Streaming RAG Pipeline")
print("=" * 60)
question = "What are the best practices for chunking documents in RAG?"
print(f"Question: {question}")
print("-" * 60)

async def run_complete_pipeline():
    answer = ""
    async for event in pipeline.query(question):
        t = event["type"]
        if t == "status":
            print(f"\n[{event['phase'].upper()}] {event['message']}")
        elif t == "source":
            print(f"  Source {event['index']+1}: {event['id']} (score={event['score']})")
        elif t == "ttft":
            print(f"\n[TTFT: {event['ms']}ms] Answer:")
        elif t == "token":
            answer += event["content"]
            print(event["content"], end="", flush=True)
        elif t == "citation":
            print(f" [+cite:{event['id']}]", end="", flush=True)
        elif t == "done":
            m = event["metrics"]
            print(f"\n\n[METRICS]")
            print(f"  Total time:     {m['total_ms']}ms")
            print(f"  Retrieval time: {m['retrieval_ms']}ms")
            print(f"  Tokens:         {m['token_count']}")
            print(f"  Tokens/sec:     {m['tokens_per_sec']}")
            print(f"  Cited sources:  {event['cited_sources']}")

await run_complete_pipeline()

SummaryΒΆ

Streaming RAG PatternsΒΆ

Pattern

When to Use

Benefit

Sources first

Always

User sees provenance in <100ms

Progressive context

Large doc sets

Start generation with partial context

Incremental search

Uncertain query scope

Expand only if needed

Citation tracking

Verifiable answers

Real-time source attribution

Streaming summarization

Long documents

Progressive understanding

Realtime indexing

Live data

Always-fresh knowledge base

Key Implementation PointsΒΆ

  1. Always stream sources first – users tolerate waiting for the answer but want to see that retrieval worked immediately.

  2. Use async generators everywhere – async for + AsyncGenerator[T, None] is the composable primitive for streaming pipelines.

  3. SSE for web clients – text/event-stream with data: {json}\n\n format is the standard for server-to-browser streaming.

  4. Typed events – all events should have a type field so clients can route them correctly.

  5. Measure TTFT – Time To First Token is the most important UX metric for streaming RAG. Target <500ms.

Next StepsΒΆ

  • See 04_production_streaming.ipynb for production hardening: rate limiting, circuit breakers, Prometheus metrics, load testing.