Section 5: End-to-End Pipeline ValidationΒΆ
Data β Model β PostprocessingΒΆ
Duration: 5 hours
Difficulty: Intermediate
5.1 Why E2E Pipeline Validation MattersΒΆ
Real AI systems arenβt just a model β theyβre a pipeline:
Raw Data β Preprocessing β Tokenization/Transforms β Model Inference β Postprocessing β Output
β β
Storage/ API / UI
Streaming Client
Each stage can introduce errors. Pipeline validation ensures the entire chain produces correct results on your hardware.
5.2 LLM Inference Pipeline ValidationΒΆ
Complete LLM PipelineΒΆ
import torch
from transformers import AutoTokenizer, AutoModelForCausalLM
import time
class LLMPipelineValidator:
"""Validate a complete LLM inference pipeline end-to-end."""
def __init__(self, model_name, device='cuda', dtype=torch.float16):
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForCausalLM.from_pretrained(
model_name, torch_dtype=dtype, device_map=device
).eval()
self.device = device
self.dtype = dtype
def validate_pipeline(self, prompt, max_tokens=100):
"""Run full pipeline and validate each stage."""
results = {"stages": []}
# Stage 1: Tokenization
start = time.perf_counter()
inputs = self.tokenizer(prompt, return_tensors="pt")
tokenize_time = time.perf_counter() - start
input_ids = inputs["input_ids"]
assert input_ids.dim() == 2, f"Expected 2D tensor, got {input_ids.dim()}D"
assert input_ids.min() >= 0, "Negative token IDs"
assert input_ids.max() < self.tokenizer.vocab_size, "Token ID exceeds vocab"
results["stages"].append({
"name": "tokenization",
"time_ms": tokenize_time * 1000,
"status": "PASS",
"tokens": input_ids.shape[1],
})
# Stage 2: Device transfer
start = time.perf_counter()
inputs = {k: v.to(self.device) for k, v in inputs.items()}
transfer_time = time.perf_counter() - start
results["stages"].append({
"name": "device_transfer",
"time_ms": transfer_time * 1000,
"status": "PASS",
})
# Stage 3: Model inference
start = time.perf_counter()
with torch.no_grad():
outputs = self.model.generate(
**inputs, max_new_tokens=max_tokens, do_sample=False
)
torch.cuda.synchronize()
inference_time = time.perf_counter() - start
# Validate output
assert outputs.shape[1] > input_ids.shape[1], "No tokens generated"
assert not torch.isnan(outputs.float()).any(), "NaN in output IDs"
new_tokens = outputs.shape[1] - input_ids.shape[1]
results["stages"].append({
"name": "inference",
"time_ms": inference_time * 1000,
"status": "PASS",
"new_tokens": new_tokens,
"tokens_per_sec": new_tokens / inference_time,
})
# Stage 4: Detokenization
start = time.perf_counter()
text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
detokenize_time = time.perf_counter() - start
assert len(text) > len(prompt), "Output shorter than input"
assert isinstance(text, str), "Output is not a string"
results["stages"].append({
"name": "detokenization",
"time_ms": detokenize_time * 1000,
"status": "PASS",
"output_length": len(text),
})
# Stage 5: Postprocessing (e.g., stop sequence trimming)
start = time.perf_counter()
stop_sequences = ["\n\n", "###", "<|end|>"]
final_text = text
for stop in stop_sequences:
if stop in final_text[len(prompt):]:
final_text = final_text[:final_text.index(stop, len(prompt))]
break
postprocess_time = time.perf_counter() - start
results["stages"].append({
"name": "postprocessing",
"time_ms": postprocess_time * 1000,
"status": "PASS",
})
# Summary
total_time = sum(s["time_ms"] for s in results["stages"])
results["total_time_ms"] = total_time
results["output"] = final_text
for stage in results["stages"]:
pct = stage["time_ms"] / total_time * 100
print(f" {stage['name']:20s}: {stage['time_ms']:8.2f}ms ({pct:.1f}%)")
print(f" {'TOTAL':20s}: {total_time:8.2f}ms")
return results
# Usage
# validator = LLMPipelineValidator("meta-llama/Llama-2-7b-hf")
# result = validator.validate_pipeline("Explain quantum computing in simple terms:")
5.3 Computer Vision Pipeline ValidationΒΆ
Image Classification PipelineΒΆ
The CV pipeline validator tests the complete path from raw image bytes to classification predictions: decode (JPEG/PNG to RGB tensor), preprocess (resize, crop, normalize), device transfer (CPU to GPU), inference (model forward pass), and postprocess (softmax + top-k). Each stage is timed independently to identify bottlenecks β in production, image decoding or CPU-based preprocessing often takes longer than GPU inference, making the pipeline CPU-bound despite having a powerful GPU. Validation also checks correctness at each stage: no NaN after normalization, correct tensor shape after preprocessing, probabilities sum to 1.0 after softmax. This end-to-end approach catches integration bugs that kernel-level testing misses.
import torch
import torchvision.transforms as T
from PIL import Image
import io
import time
class CVPipelineValidator:
"""Validate a complete CV inference pipeline."""
def __init__(self, model_name="resnet50", device='cuda', dtype=torch.float16):
from torchvision import models
model_fn = getattr(models, model_name)
self.model = model_fn(weights="DEFAULT").eval().to(device, dtype)
self.device = device
self.dtype = dtype
self.transform = T.Compose([
T.Resize(256),
T.CenterCrop(224),
T.ToTensor(),
T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
def validate_pipeline(self, image_bytes):
"""Run full pipeline: bytes β decode β preprocess β inference β postprocess."""
results = {"stages": []}
# Stage 1: Decode image
start = time.perf_counter()
image = Image.open(io.BytesIO(image_bytes)).convert("RGB")
decode_time = time.perf_counter() - start
assert image.size[0] > 0 and image.size[1] > 0, "Invalid image"
results["stages"].append({
"name": "decode",
"time_ms": decode_time * 1000,
"image_size": image.size,
})
# Stage 2: Preprocessing
start = time.perf_counter()
tensor = self.transform(image).unsqueeze(0)
preprocess_time = time.perf_counter() - start
assert tensor.shape == (1, 3, 224, 224), f"Wrong shape: {tensor.shape}"
assert not torch.isnan(tensor).any(), "NaN after preprocessing"
results["stages"].append({
"name": "preprocess",
"time_ms": preprocess_time * 1000,
})
# Stage 3: Device transfer
start = time.perf_counter()
tensor = tensor.to(self.device, self.dtype)
transfer_time = time.perf_counter() - start
results["stages"].append({
"name": "device_transfer",
"time_ms": transfer_time * 1000,
})
# Stage 4: Inference
start = time.perf_counter()
with torch.no_grad():
logits = self.model(tensor)
torch.cuda.synchronize()
inference_time = time.perf_counter() - start
assert logits.shape == (1, 1000), f"Wrong output shape: {logits.shape}"
assert not torch.isnan(logits).any(), "NaN in output"
results["stages"].append({
"name": "inference",
"time_ms": inference_time * 1000,
})
# Stage 5: Postprocessing (softmax + top-k)
start = time.perf_counter()
probs = torch.softmax(logits, dim=-1)
top5_probs, top5_ids = probs.topk(5, dim=-1)
postprocess_time = time.perf_counter() - start
assert abs(probs.sum().item() - 1.0) < 0.01, "Probabilities don't sum to 1"
results["stages"].append({
"name": "postprocess",
"time_ms": postprocess_time * 1000,
"top5_ids": top5_ids[0].tolist(),
"top5_probs": top5_probs[0].tolist(),
})
total = sum(s["time_ms"] for s in results["stages"])
results["total_time_ms"] = total
return results
5.4 RAG Pipeline ValidationΒΆ
Retrieval-Augmented Generation PipelineΒΆ
RAG (Retrieval-Augmented Generation) is the dominant architecture for enterprise LLM applications, combining a vector database with an LLM to answer questions grounded in domain-specific documents. The pipeline has five stages: query embedding (encode the question), vector search (find relevant documents), context assembly (format retrieved passages into a prompt), LLM generation (produce the answer), and validation (check that retrieved context contains relevant keywords). Each stage runs on different hardware β embedding on GPU, vector search on CPU/specialized hardware, LLM on GPU β so pipeline validation must measure end-to-end latency and verify that data flows correctly between components without type mismatches or serialization errors.
class RAGPipelineValidator:
"""Validate a RAG pipeline end-to-end."""
def __init__(self, embedding_model, llm_model, vector_store):
self.embedding_model = embedding_model
self.llm_model = llm_model
self.vector_store = vector_store
def validate_pipeline(self, query, expected_context_keywords=None):
"""Validate each stage of the RAG pipeline."""
results = {"stages": []}
# Stage 1: Query embedding
start = time.perf_counter()
query_embedding = self.embedding_model.encode(query)
embed_time = time.perf_counter() - start
assert query_embedding is not None, "Embedding returned None"
assert len(query_embedding.shape) >= 1, "Invalid embedding shape"
results["stages"].append({
"name": "query_embedding",
"time_ms": embed_time * 1000,
"embedding_dim": query_embedding.shape[-1],
})
# Stage 2: Vector search (retrieval)
start = time.perf_counter()
retrieved_docs = self.vector_store.similarity_search(
query_embedding, k=5
)
retrieval_time = time.perf_counter() - start
assert len(retrieved_docs) > 0, "No documents retrieved"
results["stages"].append({
"name": "retrieval",
"time_ms": retrieval_time * 1000,
"num_docs": len(retrieved_docs),
})
# Stage 3: Context assembly
start = time.perf_counter()
context = "\n\n".join(doc.page_content for doc in retrieved_docs)
prompt = f"Context:\n{context}\n\nQuestion: {query}\nAnswer:"
assembly_time = time.perf_counter() - start
results["stages"].append({
"name": "context_assembly",
"time_ms": assembly_time * 1000,
"context_length": len(context),
})
# Stage 4: LLM generation
start = time.perf_counter()
response = self.llm_model.generate(prompt)
generation_time = time.perf_counter() - start
assert len(response) > 0, "Empty response"
results["stages"].append({
"name": "generation",
"time_ms": generation_time * 1000,
"response_length": len(response),
})
# Stage 5: Validation checks
if expected_context_keywords:
found = [kw for kw in expected_context_keywords
if kw.lower() in context.lower()]
results["context_keyword_recall"] = len(found) / len(expected_context_keywords)
return results
5.5 Data Pipeline ValidationΒΆ
Data Loading & Preprocessing PerformanceΒΆ
The data pipeline must deliver preprocessed batches to the GPU faster than the model can consume them β otherwise the GPU idles waiting for data, destroying effective utilization. Validation measures throughput in samples per second for the complete path: disk read, decompression, augmentation, tensor conversion, and host-to-device transfer. The pin_memory=True option pre-allocates page-locked host memory for faster DMA transfers to GPU, and prefetch_factor=2 keeps two batches ready in advance. The GPU data transfer benchmark separately measures PCIe bandwidth (host to device), which is typically 12-32 GB/s depending on PCIe generation (Gen4 vs Gen5) and lane width β a bottleneck that is often the limiting factor for models with large input tensors (e.g., high-resolution images or long token sequences).
import torch
from torch.utils.data import DataLoader
import time
def validate_data_pipeline(dataset, batch_size=32, num_workers=4):
"""Validate data pipeline throughput and correctness."""
loader = DataLoader(
dataset,
batch_size=batch_size,
shuffle=True,
num_workers=num_workers,
pin_memory=True,
prefetch_factor=2,
)
# Measure throughput
start = time.perf_counter()
batches = 0
total_samples = 0
for batch in loader:
if isinstance(batch, (list, tuple)):
data = batch[0]
else:
data = batch
# Correctness checks
assert not torch.isnan(data).any(), f"NaN in batch {batches}"
assert not torch.isinf(data).any(), f"Inf in batch {batches}"
total_samples += data.shape[0]
batches += 1
if batches >= 100: # Test 100 batches
break
elapsed = time.perf_counter() - start
samples_per_sec = total_samples / elapsed
print(f"Data pipeline: {samples_per_sec:.0f} samples/sec "
f"({batches} batches in {elapsed:.1f}s)")
return samples_per_sec
def validate_gpu_data_transfer(batch_size=32, data_shape=(3, 224, 224)):
"""Measure host-to-device transfer throughput."""
data = torch.randn(batch_size, *data_shape, pin_memory=True)
# Warmup
for _ in range(5):
_ = data.to('cuda', non_blocking=True)
torch.cuda.synchronize()
# Benchmark
start = time.perf_counter()
iterations = 100
for _ in range(iterations):
gpu_data = data.to('cuda', non_blocking=True)
torch.cuda.synchronize()
elapsed = time.perf_counter() - start
bytes_total = data.numel() * data.element_size() * iterations
bandwidth_gbs = bytes_total / elapsed / 1e9
print(f"H2D transfer: {bandwidth_gbs:.1f} GB/s "
f"(batch={batch_size}, PCIe bandwidth)")
return bandwidth_gbs
5.6 Pipeline Latency BreakdownΒΆ
Identifying BottlenecksΒΆ
The latency breakdown profiles each stage of the inference pipeline independently across multiple runs, computing average times and identifying the bottleneck stage. For LLM inference, the typical breakdown is: tokenization (~1%), host-to-device transfer (~1%), prefill (~10-20%), decode (~70-80%), device-to-host transfer (<1%), detokenization (<1%). The decode phase dominates because it generates tokens sequentially, each requiring a full forward pass that loads the model weights from HBM. When the bottleneck is NOT the expected stage (e.g., tokenization taking 30% of time), it signals a software configuration issue β perhaps the tokenizer is running in Python without the Rust backend, or the data transfer is not using pinned memory. This diagnostic is the first tool a validation engineer reaches for when performance numbers are below expectations.
def pipeline_latency_breakdown(model, tokenizer, prompt, device='cuda',
num_runs=10):
"""Profile each pipeline stage and identify the bottleneck."""
stages = {
"tokenize": [],
"h2d_transfer": [],
"prefill": [],
"decode": [],
"d2h_transfer": [],
"detokenize": [],
}
for _ in range(num_runs):
# Tokenize
start = time.perf_counter()
inputs = tokenizer(prompt, return_tensors="pt")
stages["tokenize"].append(time.perf_counter() - start)
# H2D Transfer
start = time.perf_counter()
inputs = {k: v.to(device) for k, v in inputs.items()}
torch.cuda.synchronize()
stages["h2d_transfer"].append(time.perf_counter() - start)
# Prefill (first forward pass)
start = time.perf_counter()
with torch.no_grad():
out = model(**inputs)
torch.cuda.synchronize()
stages["prefill"].append(time.perf_counter() - start)
# Decode (token-by-token generation, simplified)
start = time.perf_counter()
with torch.no_grad():
generated = model.generate(
**inputs, max_new_tokens=50, do_sample=False
)
torch.cuda.synchronize()
stages["decode"].append(time.perf_counter() - start)
# D2H Transfer
start = time.perf_counter()
output_ids = generated.cpu()
stages["d2h_transfer"].append(time.perf_counter() - start)
# Detokenize
start = time.perf_counter()
text = tokenizer.decode(output_ids[0], skip_special_tokens=True)
stages["detokenize"].append(time.perf_counter() - start)
# Summary
print(f"\nPipeline Latency Breakdown (avg over {num_runs} runs):")
print("-" * 50)
total = 0
for name, times in stages.items():
avg = sum(times) / len(times) * 1000
total += avg
print(f" {name:20s}: {avg:8.2f}ms")
print(f" {'TOTAL':20s}: {total:8.2f}ms")
# Identify bottleneck
bottleneck = max(stages, key=lambda k: sum(stages[k]))
print(f"\n Bottleneck: {bottleneck}")
5.7 Pipeline Stress TestingΒΆ
Stress testing sends hundreds to thousands of sequential inference requests through the pipeline to expose failure modes that appear only under sustained load: memory leaks (GPU memory slowly climbing), latency degradation (P99 increasing over time), error accumulation, and CUDA/HIP context corruption. The test tracks P50, P95, and P99 latency percentiles because AI serving SLAs are typically defined in terms of tail latency β a system that serves P50 in 50ms but P99 in 5000ms is unusable for real-time applications. Error rate monitoring catches intermittent failures (e.g., OOM at certain input lengths) that would not appear in a single-request correctness test.
def stress_test_pipeline(model, tokenizer, device='cuda',
num_requests=1000, concurrent=False):
"""Stress test the inference pipeline with many requests."""
prompts = [
f"Question {i}: What is {i} + {i*2}? Answer:" for i in range(num_requests)
]
errors = []
latencies = []
start_total = time.time()
for i, prompt in enumerate(prompts):
try:
start = time.perf_counter()
inputs = tokenizer(prompt, return_tensors="pt").to(device)
with torch.no_grad():
outputs = model.generate(**inputs, max_new_tokens=20, do_sample=False)
torch.cuda.synchronize()
latency = time.perf_counter() - start
latencies.append(latency)
# Sanity check output
text = tokenizer.decode(outputs[0], skip_special_tokens=True)
if len(text) == 0:
errors.append({"request": i, "error": "empty output"})
except Exception as e:
errors.append({"request": i, "error": str(e)})
if (i + 1) % 100 == 0:
elapsed = time.time() - start_total
print(f" {i+1}/{num_requests} requests, "
f"{len(errors)} errors, {elapsed:.1f}s elapsed")
# Report
total_time = time.time() - start_total
print(f"\nStress Test Results:")
print(f" Requests: {num_requests}")
print(f" Errors: {len(errors)} ({100*len(errors)/num_requests:.1f}%)")
print(f" Total time: {total_time:.1f}s")
print(f" Throughput: {num_requests/total_time:.1f} req/s")
if latencies:
latencies.sort()
print(f" P50 latency: {latencies[len(latencies)//2]*1000:.1f}ms")
print(f" P95 latency: {latencies[int(len(latencies)*0.95)]*1000:.1f}ms")
print(f" P99 latency: {latencies[int(len(latencies)*0.99)]*1000:.1f}ms")
return {"errors": errors, "latencies": latencies}
5.8 ExercisesΒΆ
LLM Pipeline Profiling: Implement the
LLMPipelineValidatorand run it on GPT-2. Which stage is the bottleneck? How much time does tokenization vs inference take?Data Pipeline Throughput: Create a synthetic ImageNet-like dataset (random tensors). Measure data loading throughput with {1, 2, 4, 8} workers. Whatβs the optimal
num_workers?PCIe Bandwidth Test: Measure host-to-device transfer speed for tensor sizes from 1 KB to 1 GB. Plot bandwidth vs size. At what point do you saturate PCIe?
Pipeline Stress Test: Run 500 inference requests through a model pipeline. Report error rate, P50/P95/P99 latency, and throughput.
E2E Correctness: Build a pipeline that reads a CSV of prompts, runs them through an LLM, and writes outputs. Validate that rerunning on the same hardware with the same inputs produces identical outputs (determinism test).
Key TakeawaysΒΆ
Pipeline validation catches issues that kernel-level testing misses (data loading, PCIe transfers, postprocessing)
Latency breakdown reveals which stage is the bottleneck (often itβs NOT the model)
Stress testing exposes memory leaks, error accumulation, and throughput degradation
E2E determinism is essential for reproducibility
Data pipeline throughput must match or exceed model throughput to avoid stalls
Previous: 04_model_performance_validation.ipynb
Next: 06_distributed_training_validation.ipynb
Back to Overview: README.md