Streaming Data: Kafka, Windowed Aggregations, and Real-Time PipelinesΒΆ
Batch pipelines process data hours later. Stream processing handles it in milliseconds. This notebook covers the core concepts β Kafka architecture, windowed aggregations, exactly-once semantics β with Python simulations you can run today.
1. SetupΒΆ
import numpy as np
import pandas as pd
from collections import defaultdict, deque
from datetime import datetime, timedelta
import threading
import queue
import time
import json
import hashlib
import random
# Optional: kafka-python
try:
from kafka import KafkaProducer, KafkaConsumer
HAS_KAFKA = True
except ImportError:
HAS_KAFKA = False
# Optional: faust
try:
import faust
HAS_FAUST = True
except ImportError:
HAS_FAUST = False
print("Streaming simulation starting...")
print(f" kafka-python available : {HAS_KAFKA}")
print(f" faust available : {HAS_FAUST}")
print(" Using in-memory simulation for all exercises.")
2. Batch vs Stream ProcessingΒΆ
The Fundamental DifferenceΒΆ
Mode |
How It Works |
Latency |
Storage Requirement |
|---|---|---|---|
Batch |
Reads a snapshot of accumulated data and processes it all at once |
Hours |
Must store all data before processing |
Micro-batch |
Collects events into small time buckets and runs mini-batches |
Minutes |
Small rolling buffer |
Streaming |
Reacts to each event as it arrives, maintaining running state |
Milliseconds |
Only need current state |
When to Use EachΒΆ
Batch: End-of-day reports, ML training pipelines, historical analytics
Micro-batch (Spark Structured Streaming): Near-real-time dashboards, log aggregation
Streaming (Kafka + Flink/Faust): Fraud detection, live recommendations, IoT sensors, financial ticks
Use Cases for True StreamingΒΆ
Real-time fraud detection: Flag a suspicious transaction before it clears (< 100ms)
Live dashboards: Shopify/Stripe showing revenue as it happens
Recommendation systems: βUsers who just viewed X also bought Yβ in the same session
IoT sensors: Alert when a temperature sensor crosses a threshold
# Simulate an e-commerce event log for a single day
random.seed(42)
np.random.seed(42)
NUM_EVENTS = 200
base_time = datetime(2024, 1, 15, 0, 0, 0)
events = []
for i in range(NUM_EVENTS):
events.append({
"event_id": i,
"user_id": f"user_{random.randint(1, 50)}",
"product_id": f"prod_{random.randint(1, 20)}",
"event_type": random.choice(["view", "add_to_cart", "purchase"]),
"amount": round(random.uniform(5.0, 500.0), 2),
"timestamp": base_time + timedelta(seconds=i * 432), # ~5 min apart over 24h
})
# ---------------------------------------------------------------------------
# BATCH MODE: collect everything, run once at end of day
# ---------------------------------------------------------------------------
def batch_revenue_report(events):
"""O(n) β runs once over the entire accumulated dataset."""
purchases = [e for e in events if e["event_type"] == "purchase"]
total_revenue = sum(e["amount"] for e in purchases)
per_product = defaultdict(float)
for e in purchases:
per_product[e["product_id"]] += e["amount"]
top_products = sorted(per_product.items(), key=lambda x: -x[1])[:5]
return {
"total_revenue": round(total_revenue, 2),
"num_purchases": len(purchases),
"top_products": top_products,
}
batch_start = time.perf_counter()
batch_result = batch_revenue_report(events)
batch_elapsed = (time.perf_counter() - batch_start) * 1000
print("=== BATCH RESULT (runs after all 200 events collected) ===")
print(f" Total revenue : ${batch_result['total_revenue']:,.2f}")
print(f" Total purchases : {batch_result['num_purchases']}")
print(f" Top products : {batch_result['top_products'][:3]}")
print(f" Latency : {batch_elapsed:.3f} ms (but only runs at end-of-day)")
# ---------------------------------------------------------------------------
# STREAM MODE: update running totals with each arriving event
# ---------------------------------------------------------------------------
def stream_revenue_update(event, state):
"""O(1) per event β maintains running state, reacts immediately."""
if event["event_type"] == "purchase":
state["total_revenue"] += event["amount"]
state["num_purchases"] += 1
state["per_product"][event["product_id"]] += event["amount"]
state["last_updated"] = event["timestamp"]
return state
stream_state = {"total_revenue": 0.0, "num_purchases": 0, "per_product": defaultdict(float), "last_updated": None}
stream_start = time.perf_counter()
for event in events:
stream_state = stream_revenue_update(event, stream_state)
stream_elapsed = (time.perf_counter() - stream_start) * 1000
print("\n=== STREAM RESULT (updated after every single event) ===")
print(f" Total revenue : ${stream_state['total_revenue']:,.2f}")
print(f" Total purchases : {stream_state['num_purchases']}")
print(f" Latency per evt : {stream_elapsed / NUM_EVENTS * 1000:.3f} Β΅s")
print(f" State available : after EVERY event, not just end-of-day")
print("\n=== KEY INSIGHT ===")
print(" Both produce identical totals, but streaming gives you the answer")
print(" in real-time β not hours later when the batch job finally runs.")
3. Kafka ArchitectureΒΆ
Core ComponentsΒΆ
Producer β Topic (Partition 0) β Consumer Group A
β Topic (Partition 1) β Consumer Group B
β Topic (Partition 2) β Consumer Group B
Within a partition: messages are ordered, offset-tracked
Across partitions: parallelism, no ordering guarantee
Concept BreakdownΒΆ
Concept |
Description |
|---|---|
Topic |
A named, durable log of messages (like a database table, but append-only) |
Partition |
An ordered, immutable sequence of records within a topic. The unit of parallelism. |
Offset |
Integer position of a message within a partition. Consumers commit their offset to track progress. |
Consumer Group |
A set of consumers that collectively read a topic. Each partition is assigned to exactly one consumer in the group. |
Broker |
A Kafka server. A cluster typically has 3+ brokers for fault tolerance. |
Replication Factor |
Number of copies of each partition across brokers. Factor of 3 tolerates 2 broker failures. |
Retention |
How long Kafka keeps messages (default: 7 days). Consumers can re-read historical data. |
Why Partitions?ΒΆ
Partitions are Kafkaβs scalability mechanism:
Parallelism: 10 partitions β 10 consumers can process simultaneously
Ordering: Within a partition, order is guaranteed. Events for the same
user_idgo to the same partition (via key hashing) so per-user ordering is preserved.Throughput: A single Kafka topic can handle millions of messages/second by distributing across partitions.
Consumer Groups β The Key to ScalabilityΒΆ
Topic: orders (3 partitions)
Group A β Analytics:
Consumer A1 β Partition 0, 1
Consumer A2 β Partition 2
Group B β Fraud Detection:
Consumer B1 β Partition 0
Consumer B2 β Partition 1
Consumer B3 β Partition 2
Each group reads the topic independently β Group A and Group B both get every message, but each consumer within a group only sees a subset of partitions.
4. Kafka Producer/Consumer SimulationΒΆ
class MockKafkaTopic:
"""Thread-safe in-memory simulation of a Kafka topic."""
def __init__(self, name, num_partitions=3):
self.name = name
self.num_partitions = num_partitions
# Each partition is a list of (offset, key, value, timestamp) tuples
self._partitions = [[] for _ in range(num_partitions)]
# consumer_group β {partition_id: next_offset}
self._offsets = defaultdict(lambda: defaultdict(int))
self._lock = threading.Lock()
self._total_produced = 0
def _partition_for_key(self, key):
"""Deterministic partition assignment via key hash (mirrors Kafka default partitioner)."""
if key is None:
return self._total_produced % self.num_partitions
key_bytes = str(key).encode("utf-8")
return int(hashlib.md5(key_bytes).hexdigest(), 16) % self.num_partitions
def produce(self, key, value, timestamp=None):
"""Append a message to the appropriate partition."""
if timestamp is None:
timestamp = datetime.utcnow()
partition_id = self._partition_for_key(key)
with self._lock:
offset = len(self._partitions[partition_id])
self._partitions[partition_id].append(
{"offset": offset, "key": key, "value": value, "timestamp": timestamp}
)
self._total_produced += 1
return partition_id, offset
def consume(self, consumer_group, partition=None, max_records=10, timeout=1.0):
"""Fetch next batch of records for a consumer group."""
results = []
partitions_to_read = [partition] if partition is not None else range(self.num_partitions)
with self._lock:
for p in partitions_to_read:
current_offset = self._offsets[consumer_group][p]
partition_data = self._partitions[p]
batch = partition_data[current_offset: current_offset + max_records]
if batch:
results.extend(batch)
self._offsets[consumer_group][p] = current_offset + len(batch)
return results
def get_lag(self, consumer_group):
"""Messages produced but not yet consumed by this consumer group."""
with self._lock:
lag = 0
for p in range(self.num_partitions):
produced = len(self._partitions[p])
consumed = self._offsets[consumer_group][p]
lag += max(0, produced - consumed)
return lag
def partition_stats(self):
with self._lock:
return {f"partition_{i}": len(self._partitions[i]) for i in range(self.num_partitions)}
# ---------------------------------------------------------------------------
# Produce 1000 e-commerce events
# ---------------------------------------------------------------------------
random.seed(0)
topic = MockKafkaTopic("ecommerce-events", num_partitions=3)
event_types = ["page_view", "add_to_cart", "remove_from_cart", "purchase", "refund"]
event_weights = [0.50, 0.25, 0.10, 0.12, 0.03]
base_ts = datetime(2024, 1, 15, 9, 0, 0)
for i in range(1000):
user_id = f"user_{random.randint(1, 200)}"
event = {
"event_id": f"evt_{i:05d}",
"user_id": user_id,
"product_id": f"prod_{random.randint(1, 50)}",
"event_type": random.choices(event_types, weights=event_weights)[0],
"amount": round(random.uniform(9.99, 499.99), 2),
"session_id": f"sess_{random.randint(1, 500)}",
}
topic.produce(key=user_id, value=event, timestamp=base_ts + timedelta(milliseconds=i * 100))
print(f"Topic '{topic.name}' β partition distribution:")
for p, count in topic.partition_stats().items():
print(f" {p}: {count} messages")
print(f" Total produced: {topic._total_produced}")
# ---------------------------------------------------------------------------
# Two independent consumer groups
# ---------------------------------------------------------------------------
print("\n--- Consumer Group: analytics ---")
analytics_batch = topic.consume("analytics", max_records=50)
print(f" Consumed {len(analytics_batch)} records")
print(f" Lag after first batch: {topic.get_lag('analytics')}")
# Drain analytics
total_analytics = len(analytics_batch)
while True:
batch = topic.consume("analytics", max_records=100)
if not batch:
break
total_analytics += len(batch)
print(f" Total consumed (all batches): {total_analytics}")
print(f" Final lag: {topic.get_lag('analytics')}")
print("\n--- Consumer Group: fraud-detection (only consumed 200 so far) ---")
fraud_consumed = 0
for _ in range(2):
batch = topic.consume("fraud-detection", max_records=100)
fraud_consumed += len(batch)
print(f" Consumed so far: {fraud_consumed}")
print(f" Lag: {topic.get_lag('fraud-detection')} (independent of analytics group)")
5. Windowed AggregationsΒΆ
Windowed aggregations are the heart of stream processing. Rather than aggregating over all history, we aggregate over a window of time.
TUMBLING WINDOW (size=60s, no overlap)
[0sββββ60s] [60sβββ120s] [120sββ180s]
SLIDING WINDOW (size=60s, slide=20s)
[0sββββ60s]
[20sβββ80s]
[40sββ100s]
SESSION WINDOW (gap=30s)
[clickββclickββclick] 30s gap [clickββclick]
βββββ session 1 βββββ ββ session 2 β
import math
# ---------------------------------------------------------------------------
# Tumbling Window
# ---------------------------------------------------------------------------
class TumblingWindowAggregator:
"""Fixed-size, non-overlapping windows."""
def __init__(self, window_size_seconds):
self.window_size = window_size_seconds
self._windows = defaultdict(lambda: {"count": 0, "sum": 0.0, "events": []})
self._max_event_time = None
def _window_key(self, event_time_ts):
"""Map a timestamp to its window start (epoch seconds)."""
return int(event_time_ts // self.window_size) * self.window_size
def add(self, event_time: datetime, value: float):
ts = event_time.timestamp()
if self._max_event_time is None or ts > self._max_event_time:
self._max_event_time = ts
wk = self._window_key(ts)
self._windows[wk]["count"] += 1
self._windows[wk]["sum"] += value
self._windows[wk]["events"].append(value)
def get_completed_windows(self):
"""Return windows that are fully closed (event time has moved past them)."""
if self._max_event_time is None:
return []
current_window_start = self._window_key(self._max_event_time)
completed = []
for wk, stats in sorted(self._windows.items()):
if wk < current_window_start:
completed.append({
"window_start": datetime.utcfromtimestamp(wk),
"window_end": datetime.utcfromtimestamp(wk + self.window_size),
"count": stats["count"],
"sum": round(stats["sum"], 2),
"avg": round(stats["sum"] / stats["count"], 2) if stats["count"] else 0,
})
return completed
# ---------------------------------------------------------------------------
# Sliding Window
# ---------------------------------------------------------------------------
class SlidingWindowAggregator:
"""Fixed-size overlapping windows."""
def __init__(self, window_size_seconds, slide_interval_seconds):
self.window_size = window_size_seconds
self.slide_interval = slide_interval_seconds
self._events = [] # list of (timestamp_float, value)
def add(self, event_time: datetime, value: float):
self._events.append((event_time.timestamp(), value))
self._events.sort(key=lambda x: x[0])
def get_windows(self):
if not self._events:
return []
first_ts = self._events[0][0]
last_ts = self._events[-1][0]
# Align window starts to slide_interval grid
start = int(first_ts // self.slide_interval) * self.slide_interval
results = []
w_start = start
while w_start <= last_ts:
w_end = w_start + self.window_size
window_events = [v for ts, v in self._events if w_start <= ts < w_end]
if window_events:
results.append({
"window_start": datetime.utcfromtimestamp(w_start),
"window_end": datetime.utcfromtimestamp(w_end),
"count": len(window_events),
"sum": round(sum(window_events), 2),
"avg": round(sum(window_events) / len(window_events), 2),
})
w_start += self.slide_interval
return results
# ---------------------------------------------------------------------------
# Session Window
# ---------------------------------------------------------------------------
class SessionWindowAggregator:
"""Gap-based sessions per key."""
def __init__(self, inactivity_gap_seconds):
self.gap = inactivity_gap_seconds
self._sessions = defaultdict(list) # key β list of (timestamp, value)
def add(self, key, event_time: datetime, value: float):
self._sessions[key].append((event_time.timestamp(), value))
def get_sessions(self, key=None):
keys = [key] if key else list(self._sessions.keys())
all_sessions = []
for k in keys:
events = sorted(self._sessions[k], key=lambda x: x[0])
if not events:
continue
session_start = events[0][0]
prev_ts = events[0][0]
session_values = [events[0][1]]
for ts, val in events[1:]:
if ts - prev_ts > self.gap:
all_sessions.append({
"key": k,
"session_start": datetime.utcfromtimestamp(session_start),
"session_end": datetime.utcfromtimestamp(prev_ts),
"duration_s": round(prev_ts - session_start, 1),
"event_count": len(session_values),
"total_value": round(sum(session_values), 2),
})
session_start = ts
session_values = [val]
else:
session_values.append(val)
prev_ts = ts
# Flush final session
all_sessions.append({
"key": k,
"session_start": datetime.utcfromtimestamp(session_start),
"session_end": datetime.utcfromtimestamp(prev_ts),
"duration_s": round(prev_ts - session_start, 1),
"event_count": len(session_values),
"total_value": round(sum(session_values), 2),
})
return all_sessions
# ---------------------------------------------------------------------------
# Generate a stream of clickstream events (10 minutes, purchase amounts)
# ---------------------------------------------------------------------------
random.seed(7)
click_base = datetime(2024, 1, 15, 10, 0, 0)
click_events = []
for i in range(300):
gap = random.expovariate(0.5) # avg 2s between events
ts = click_base + timedelta(seconds=sum(random.expovariate(0.5) for _ in range(i + 1)))
click_events.append({
"user_id": f"user_{random.randint(1, 10)}",
"timestamp": click_base + timedelta(seconds=i * 2 + random.uniform(0, 1)),
"amount": round(random.uniform(1.0, 200.0), 2),
})
# Feed events into all three window types
tumbling = TumblingWindowAggregator(window_size_seconds=60)
sliding = SlidingWindowAggregator(window_size_seconds=120, slide_interval_seconds=30)
session = SessionWindowAggregator(inactivity_gap_seconds=30)
for e in click_events:
tumbling.add(e["timestamp"], e["amount"])
sliding.add(e["timestamp"], e["amount"])
session.add(e["user_id"], e["timestamp"], e["amount"])
# Results
print("=== TUMBLING WINDOW (60s buckets) β first 5 windows ===")
for w in tumbling.get_completed_windows()[:5]:
print(f" {w['window_start'].strftime('%H:%M:%S')} β {w['window_end'].strftime('%H:%M:%S')} "
f"count={w['count']:3d} sum=${w['sum']:7.2f} avg=${w['avg']:6.2f}")
print("\n=== SLIDING WINDOW (120s window, 30s slide) β first 5 windows ===")
for w in sliding.get_windows()[:5]:
print(f" {w['window_start'].strftime('%H:%M:%S')} β {w['window_end'].strftime('%H:%M:%S')} "
f"count={w['count']:3d} sum=${w['sum']:7.2f} avg=${w['avg']:6.2f}")
print("\n=== SESSION WINDOW (30s inactivity gap) β user_1 sessions ===")
user1_sessions = session.get_sessions("user_1")
for s in user1_sessions[:5]:
print(f" {s['session_start'].strftime('%H:%M:%S')} β {s['session_end'].strftime('%H:%M:%S')} "
f"duration={s['duration_s']}s events={s['event_count']} total=${s['total_value']:.2f}")
print(f" (user_1 has {len(user1_sessions)} sessions total)")
6. Late Data HandlingΒΆ
In distributed systems, events donβt always arrive in the order they were generated. A mobile app might buffer events while offline, network delays vary, and retries create ordering chaos.
Without a watermark: close windows based on wall-clock time β miss late-arriving events.
With a watermark: define a tolerance (e.g., 3 seconds). βIβll accept events that are up to 3 seconds late.β This trades some additional latency for completeness.
Event time: t=0 t=1 t=2 t=3 t=4 t=5
Arrival time: t=0 t=1 t=4 t=3 t=2 t=5 β disordered!
Watermark at t=5: max_seen(5) - tolerance(3) = 2
β windows ending before t=2 are now CLOSED
β events with event_time β₯ 2 are still accepted
class WatermarkWindowAggregator:
"""Tumbling window aggregator that tolerates late events via watermarking."""
def __init__(self, window_size_seconds, watermark_seconds):
self.window_size = window_size_seconds
self.watermark_lag = watermark_seconds
self._windows = defaultdict(lambda: {"count": 0, "sum": 0.0})
self._max_event_ts = None
self._dropped = 0
self._accepted_late = 0
def _window_key(self, ts):
return int(ts // self.window_size) * self.window_size
@property
def watermark(self):
if self._max_event_ts is None:
return None
return self._max_event_ts - self.watermark_lag
def add(self, event_time: datetime, value: float, is_late_flag=False):
ts = event_time.timestamp()
# Advance watermark
if self._max_event_ts is None or ts > self._max_event_ts:
self._max_event_ts = ts
wk = self._window_key(ts)
window_end = wk + self.window_size
# Drop event if its window has already closed (past watermark)
if self.watermark is not None and window_end <= self.watermark:
self._dropped += 1
return "dropped"
if is_late_flag:
self._accepted_late += 1
self._windows[wk]["count"] += 1
self._windows[wk]["sum"] += value
return "accepted"
def get_closed_windows(self):
if self.watermark is None:
return []
closed = []
for wk, stats in sorted(self._windows.items()):
if wk + self.window_size <= self.watermark:
closed.append({
"window_start": datetime.utcfromtimestamp(wk),
"window_end": datetime.utcfromtimestamp(wk + self.window_size),
"count": stats["count"],
"sum": round(stats["sum"], 2),
})
return closed
# Generate events with random delays (simulating network jitter)
random.seed(42)
event_base = datetime(2024, 1, 15, 12, 0, 0).timestamp()
# True event times: one event per second for 60 seconds
true_events = [(event_base + i, round(random.uniform(10.0, 100.0), 2)) for i in range(60)]
# Simulate arrival order: each event is delayed by 0β5 seconds
delayed_events = []
for true_ts, value in true_events:
delay = random.uniform(0, 5)
arrival_ts = true_ts + delay
delayed_events.append({"event_ts": true_ts, "arrival_ts": arrival_ts, "value": value, "delay": delay})
# Sort by ARRIVAL time (as a real stream would deliver them)
delayed_events.sort(key=lambda x: x["arrival_ts"])
# Processor A: no watermark tolerance (watermark_seconds=0)
strict_agg = WatermarkWindowAggregator(window_size_seconds=10, watermark_seconds=0)
# Processor B: 3-second watermark
lenient_agg = WatermarkWindowAggregator(window_size_seconds=10, watermark_seconds=3)
for e in delayed_events:
is_late = e["delay"] > 0
event_dt = datetime.utcfromtimestamp(e["event_ts"])
strict_agg.add(event_dt, e["value"], is_late_flag=is_late)
lenient_agg.add(event_dt, e["value"], is_late_flag=is_late)
print("=== WITHOUT WATERMARK TOLERANCE (watermark=0s) ===")
print(f" Events dropped (arrived late): {strict_agg._dropped}")
strict_windows = strict_agg.get_closed_windows()
for w in strict_windows[:4]:
print(f" {w['window_start'].strftime('%H:%M:%S')} β {w['window_end'].strftime('%H:%M:%S')} "
f"count={w['count']:2d} sum=${w['sum']:6.2f}")
print("\n=== WITH 3-SECOND WATERMARK TOLERANCE ===")
print(f" Events dropped: {lenient_agg._dropped}")
print(f" Late events accepted: {lenient_agg._accepted_late}")
lenient_windows = lenient_agg.get_closed_windows()
for w in lenient_windows[:4]:
print(f" {w['window_start'].strftime('%H:%M:%S')} β {w['window_end'].strftime('%H:%M:%S')} "
f"count={w['count']:2d} sum=${w['sum']:6.2f}")
print("\n=== TRADEOFF ===")
print(f" Strict : {strict_agg._dropped} dropped events β lower latency, less complete")
print(f" Lenient: {lenient_agg._dropped} dropped events β 3s extra latency, more complete")
7. Stateful Stream ProcessingΒΆ
class FraudDetector:
"""
Track per-user statistics in a rolling window.
Detects anomalous patterns using three rules:
1. More than 5 transactions in a 5-minute window
2. Total amount exceeding $1000 in a 5-minute window
3. Transaction velocity spike (amount > 3x user's rolling average)
"""
def __init__(self, window_seconds=300):
self.window_seconds = window_seconds
# user_id β deque of (timestamp, amount)
self.user_state = defaultdict(lambda: {
"transactions": deque(),
"lifetime_amounts": [],
"txn_count": 0,
})
self.fraud_events = []
def _evict_old(self, user_id, current_ts):
"""Remove transactions outside the rolling window."""
state = self.user_state[user_id]
while state["transactions"] and (current_ts - state["transactions"][0][0]) > self.window_seconds:
state["transactions"].popleft()
def process(self, event) -> dict:
uid = event["user_id"]
ts = event["timestamp"].timestamp()
amount = event["amount"]
state = self.user_state[uid]
self._evict_old(uid, ts)
# Append new transaction
state["transactions"].append((ts, amount))
state["lifetime_amounts"].append(amount)
state["txn_count"] += 1
# Compute window statistics
window_txns = list(state["transactions"])
window_count = len(window_txns)
window_total = sum(a for _, a in window_txns)
lifetime_avg = sum(state["lifetime_amounts"]) / len(state["lifetime_amounts"])
# Evaluate rules
alerts = []
if window_count > 5:
alerts.append(f"HIGH_VELOCITY:{window_count}_txns_in_{self.window_seconds}s")
if window_total > 1000:
alerts.append(f"HIGH_AMOUNT:${window_total:.0f}_in_{self.window_seconds}s")
if len(state["lifetime_amounts"]) >= 5 and amount > 3 * lifetime_avg:
alerts.append(f"AMOUNT_SPIKE:${amount:.2f}_vs_avg_${lifetime_avg:.2f}")
result = {
"event_id": event["event_id"],
"user_id": uid,
"timestamp": event["timestamp"],
"amount": amount,
"window_count": window_count,
"window_total": round(window_total, 2),
"is_fraud": len(alerts) > 0,
"alerts": alerts,
}
if result["is_fraud"]:
self.fraud_events.append(result)
return result
# Generate 10000 synthetic transactions
random.seed(99)
txn_base = datetime(2024, 1, 15, 8, 0, 0)
transactions = []
# Most users are normal
for i in range(9700):
uid = f"user_{random.randint(1, 500)}"
transactions.append({
"event_id": f"txn_{i:05d}",
"user_id": uid,
"amount": round(abs(random.gauss(50, 30)), 2),
"timestamp": txn_base + timedelta(seconds=i * 3 + random.uniform(0, 3)),
})
# Inject fraudulent patterns
fraud_base = txn_base + timedelta(minutes=30)
for j in range(300): # 300 rapid transactions from user_999
transactions.append({
"event_id": f"txn_fraud_{j:04d}",
"user_id": "user_999",
"amount": round(random.uniform(150, 400), 2),
"timestamp": fraud_base + timedelta(seconds=j * 1), # 1 per second
})
# Sort by timestamp
transactions.sort(key=lambda x: x["timestamp"])
detector = FraudDetector(window_seconds=300)
for txn in transactions:
detector.process(txn)
fraud_df = pd.DataFrame(detector.fraud_events)
print(f"Processed {len(transactions):,} transactions")
print(f"Detected {len(detector.fraud_events):,} fraud signals\n")
if not fraud_df.empty:
by_user = fraud_df.groupby("user_id").agg(
fraud_events=("event_id", "count"),
total_flagged_amount=("amount", "sum"),
max_window_total=("window_total", "max"),
max_window_count=("window_count", "max"),
).sort_values("fraud_events", ascending=False)
print("Top flagged users:")
print(by_user.head(10).to_string())
print("\nSample fraud alerts for user_999:")
sample = fraud_df[fraud_df["user_id"] == "user_999"].head(3)
for _, row in sample.iterrows():
print(f" {row['timestamp'].strftime('%H:%M:%S')} ${row['amount']:.2f} "
f"window={row['window_count']} txns/${row['window_total']:.0f} alerts={row['alerts']}")
8. Exactly-Once SemanticsΒΆ
One of the hardest problems in distributed systems is ensuring a message is processed exactly once β not lost, not duplicated.
The Three Delivery GuaranteesΒΆ
Guarantee |
Mechanism |
Risk |
Use Case |
|---|---|---|---|
At-most-once |
Fire and forget, no ack |
Lost messages |
Metrics/telemetry where loss is acceptable |
At-least-once |
Retry until acked |
Duplicate processing |
Default Kafka; safe if downstream is idempotent |
Exactly-once |
Idempotent writes + transactional producers |
None (but more complex) |
Financial transactions, inventory updates |
How Exactly-Once Works in KafkaΒΆ
Idempotent producer: Kafka broker deduplicates retried produces using a sequence number per
(producer_id, partition)Transactional API: Producer opens a transaction, sends to multiple partitions, commits atomically
Consumer offset in transaction: The offset commit is part of the same transaction as the output write β no partial state
The pattern: read β process β write output + commit offset all in one atomic transaction.
class IdempotentProcessor:
"""Use message ID to deduplicate retried messages."""
def __init__(self):
self.processed_ids = set()
self.total_calls = 0
self.total_processed = 0
self.total_skipped = 0
def _do_work(self, payload):
return {"revenue": payload.get("amount", 0), "processed_at": datetime.utcnow()}
def process(self, message_id, payload):
self.total_calls += 1
if message_id in self.processed_ids:
self.total_skipped += 1
return None # Duplicate β skip
result = self._do_work(payload)
self.processed_ids.add(message_id)
self.total_processed += 1
return result
class NonIdempotentProcessor:
"""Naive processor that counts duplicates as new work."""
def __init__(self):
self.total_calls = 0
self.total_processed = 0
self.total_revenue = 0.0
def process(self, message_id, payload):
self.total_calls += 1
self.total_processed += 1
self.total_revenue += payload.get("amount", 0)
# Simulate at-least-once delivery: some messages are delivered multiple times
random.seed(5)
messages = [
{"message_id": f"msg_{i:04d}", "payload": {"amount": round(random.uniform(10, 500), 2)}}
for i in range(100)
]
# Simulate retries: 20% of messages are "retried" (delivered twice)
delivered = list(messages)
retried = random.sample(messages, k=20)
delivered.extend(retried) # duplicates appended
random.shuffle(delivered)
true_revenue = sum(m["payload"]["amount"] for m in messages)
idempotent = IdempotentProcessor()
naive = NonIdempotentProcessor()
idempotent_revenue = 0.0
for msg in delivered:
result = idempotent.process(msg["message_id"], msg["payload"])
if result:
idempotent_revenue += result["revenue"]
naive.process(msg["message_id"], msg["payload"])
print("=== DELIVERY SIMULATION ===")
print(f" Unique messages : {len(messages)}")
print(f" Total deliveries : {len(delivered)} ({len(retried)} retries)")
print(f" True revenue : ${true_revenue:,.2f}")
print("\n--- Without deduplication (naive / at-least-once) ---")
print(f" Processed count : {naive.total_processed} (WRONG β should be {len(messages)})")
print(f" Computed revenue : ${naive.total_revenue:,.2f} (WRONG β overcounted by ${naive.total_revenue - true_revenue:,.2f})")
print("\n--- With deduplication (idempotent / exactly-once) ---")
print(f" Processed count : {idempotent.total_processed} (CORRECT)")
print(f" Skipped duplicates : {idempotent.total_skipped}")
print(f" Computed revenue : ${idempotent_revenue:,.2f} (CORRECT β matches ${true_revenue:,.2f})")
9. Stream Processing Frameworks ComparisonΒΆ
Real production systems use dedicated frameworks. Hereβs how the same βfilter high-value events and count per user over 5-minute tumbling windowsβ pattern looks across the three major options.
When to Choose WhichΒΆ
Framework |
Language |
Latency |
Exactly-Once |
Best For |
|---|---|---|---|---|
Faust |
Python |
Low (ms) |
Yes (with Kafka) |
Python shops, moderate scale |
Kafka Streams |
Java/Kotlin |
Very low (ms) |
Yes (native Kafka) |
JVM shops, tight Kafka integration |
Apache Flink |
Java/Python/Scala |
Ultra-low (Β΅sβms) |
Yes |
High-scale, complex stateful ops |
faust_code = '''
# ββ Faust (Python) βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
import faust
app = faust.App('revenue-aggregator', broker='kafka://localhost:9092')
class Event(faust.Record):
user_id: str
amount: float
event_type: str
input_topic = app.topic('ecommerce-events', value_type=Event)
output_topic = app.topic('revenue-alerts')
@app.agent(input_topic)
async def process_event(events):
"""Stateless transformation β filter and forward high-value purchases."""
async for event in events:
if event.event_type == 'purchase' and event.amount > 100:
await output_topic.send(value={
'user_id': event.user_id,
'amount': event.amount,
'flag': 'high_value_purchase',
})
# Windowed aggregation (Faust Table)
revenue_table = app.Table('revenue', default=float).tumbling(60, expires=timedelta(hours=1))
@app.agent(input_topic)
async def aggregate_revenue(events):
async for event in events.group_by(Event.user_id):
revenue_table[event.user_id] += event.amount
'''
kafka_streams_code = '''
// ββ Kafka Streams (Java) βββββββββββββββββββββββββββββββββββββββββββββββββββ
StreamsBuilder builder = new StreamsBuilder();
KStream<String, Event> events = builder.stream("ecommerce-events");
events
.filter((key, event) -> event.eventType.equals("purchase") && event.amount > 100)
.groupBy((key, event) -> event.userId)
.windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
.aggregate(
() -> 0.0,
(userId, event, total) -> total + event.amount,
Materialized.as("revenue-store")
)
.toStream()
.filter((windowedKey, total) -> total > 1000.0) // alert threshold
.to("fraud-alerts");
KafkaStreams app = new KafkaStreams(builder.build(), config);
app.start();
'''
flink_code = '''
# ββ Apache Flink (Python API) βββββββββββββββββββββββββββββββββββββββββββββββ
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time
env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)
kafka_source = FlinkKafkaConsumer(
topics='ecommerce-events',
deserialization_schema=EventSchema(),
properties={'bootstrap.servers': 'localhost:9092'},
)
revenue_per_user = (
env.add_source(kafka_source)
.assign_timestamps_and_watermarks(BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(3)))
.filter(lambda e: e.event_type == 'purchase' and e.amount > 100)
.key_by(lambda e: e.user_id)
.window(TumblingEventTimeWindows.of(Time.minutes(5)))
.aggregate(RevenueAggregate()) # custom AggregateFunction
.filter(lambda (user_id, total): total > 1000.0)
)
revenue_per_user.add_sink(FlinkKafkaProducer('fraud-alerts', ProducerSchema()))
env.execute('Revenue Fraud Detector')
'''
print("=" * 70)
print("FAUST (Python β easiest for Python teams)")
print("=" * 70)
print(faust_code)
print("=" * 70)
print("KAFKA STREAMS (Java β native Kafka integration, no extra cluster)")
print("=" * 70)
print(kafka_streams_code)
print("=" * 70)
print("APACHE FLINK (Python β highest throughput, most powerful)")
print("=" * 70)
print(flink_code)
10. Backpressure SimulationΒΆ
Backpressure is a mechanism for a slow consumer to signal an upstream producer to slow down, preventing unbounded queue growth and memory exhaustion.
Without backpressure:
Producer at 1000 msg/s, consumer at 800 msg/s β queue grows by 200 msg/s indefinitely
After N seconds: OOM, consumer falls arbitrarily far behind, latency grows without bound
With backpressure:
Consumer signals βqueue is fullβ β producer blocks or drops
System reaches stable equilibrium at consumerβs processing rate
import time as _time
def simulate_backpressure(producer_rate, consumer_rate, duration_seconds=5,
max_queue_size=None, use_backpressure=False):
"""
Simulate producer/consumer with optional backpressure.
Returns a timeline of (second, queue_size, total_produced, total_consumed).
"""
q_size = 0
total_produced = 0
total_consumed = 0
timeline = []
backpressure_events = 0
effective_producer_rate = producer_rate
for second in range(duration_seconds):
# Apply backpressure: throttle producer if queue exceeds threshold
if use_backpressure and max_queue_size and q_size > max_queue_size * 0.8:
# Consumer signals producer to slow down to match consumer rate
effective_producer_rate = consumer_rate
backpressure_events += 1
elif use_backpressure:
# Gradually ramp back up
effective_producer_rate = min(producer_rate, effective_producer_rate + 50)
produced_this_second = effective_producer_rate
consumed_this_second = min(consumer_rate, q_size + produced_this_second)
total_produced += produced_this_second
total_consumed += consumed_this_second
q_size = max(0, q_size + produced_this_second - consumed_this_second)
if max_queue_size:
q_size = min(q_size, max_queue_size)
latency_ms = (q_size / consumer_rate) * 1000 if consumer_rate > 0 else float('inf')
timeline.append({
"second": second + 1,
"queue_size": int(q_size),
"total_produced": int(total_produced),
"total_consumed": int(total_consumed),
"lag": int(total_produced - total_consumed),
"latency_ms": round(latency_ms, 1),
"effective_rate": effective_producer_rate,
"backpressure_active": use_backpressure and effective_producer_rate < producer_rate,
})
return timeline, backpressure_events
# Scenario: producer 1000/s, consumer 800/s over 10 seconds
PRODUCER_RATE = 1000
CONSUMER_RATE = 800
DURATION = 10
MAX_Q = 5000
timeline_no_bp, _ = simulate_backpressure(
PRODUCER_RATE, CONSUMER_RATE, DURATION, max_queue_size=MAX_Q, use_backpressure=False
)
timeline_bp, bp_events = simulate_backpressure(
PRODUCER_RATE, CONSUMER_RATE, DURATION, max_queue_size=MAX_Q, use_backpressure=True
)
print(f"{'='*65}")
print(f"{'Sec':>4} {'Without Backpressure':^28} {'With Backpressure':^28}")
print(f"{'':>4} {'Queue':>8} {'Lag':>8} {'Latency':>9} {'Queue':>8} {'Lag':>8} {'BP?':>5}")
print(f"{'='*65}")
for no_bp, bp in zip(timeline_no_bp, timeline_bp):
print(f"{no_bp['second']:>4} "
f"{no_bp['queue_size']:>8,} {no_bp['lag']:>8,} {no_bp['latency_ms']:>8.0f}ms "
f"{bp['queue_size']:>8,} {bp['lag']:>8,} {'YES' if bp['backpressure_active'] else 'no':>5}")
print(f"{'='*65}")
final_no_bp = timeline_no_bp[-1]
final_bp = timeline_bp[-1]
print(f"\nFinal state after {DURATION}s:")
print(f" WITHOUT backpressure: queue={final_no_bp['queue_size']:,} lag={final_no_bp['lag']:,} latency={final_no_bp['latency_ms']:.0f}ms")
print(f" WITH backpressure: queue={final_bp['queue_size']:,} lag={final_bp['lag']:,} latency={final_bp['latency_ms']:.0f}ms")
print(f" Backpressure was active for {bp_events} of {DURATION} seconds")
11. Cheat SheetΒΆ
Concept Description
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Topic Named stream of messages (like a table in batch)
Partition Ordered, immutable sequence within a topic
Consumer group Multiple consumers sharing work (each reads distinct partitions)
Offset Position in a partition β committed after processing
Retention How long Kafka stores messages (default: 7 days)
Replication factor Number of partition copies (3 = tolerate 2 broker failures)
Watermark How far behind real-time we tolerate for late events
Tumbling window [0-60s], [60-120s], ... β non-overlapping buckets
Sliding window [0-60s], [10-70s], ... β overlapping, richer aggregations
Session window Gap-based: events <30s apart = same session
Exactly-once Idempotent writes + transactional producers
Backpressure Consumer signals producer to slow down, preventing OOM
State store Per-key mutable state in stream processor (e.g., Flink RocksDB)
Checkpoint Periodic snapshot of state for fault tolerance
Dead letter queue Destination for unprocessable events (for later investigation)
βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ
Window selection guide:
Need fixed time buckets for metrics? β Tumbling
Need smooth rolling aggregation? β Sliding
Grouping user activity into logical interactions β Session
Delivery guarantee selection:
Losing events is acceptable (telemetry, metrics) β At-most-once
Can tolerate duplicates (idempotent downstream) β At-least-once (default Kafka)
Financial / inventory / ledger operations β Exactly-once
When to use streaming vs batch:
Latency requirement < 1 minute β Streaming
Complex ML training / full-history scans β Batch
Near-real-time dashboards with simpler ETL β Micro-batch (Spark Streaming)
12. ExercisesΒΆ
Exercise 1 β SlidingWindowAggregator
The SlidingWindowAggregator above recomputes all windows from scratch each time get_windows() is called (O(n Γ w/s)). Rewrite it to maintain an incremental state: as each new event arrives, only update the windows it falls into. Your implementation should have O(window_size / slide_interval) cost per add() call and O(1) cost per completed window emission.
class EfficientSlidingWindowAggregator:
def __init__(self, window_size_seconds: int, slide_interval_seconds: int):
...
def add(self, event_time: datetime, value: float) -> list:
"""Returns any newly-completed windows."""
...
Exercise 2 β Real-Time Leaderboard
Build a real-time gaming leaderboard:
Input: stream of
(player_id, score_delta, timestamp)eventsMaintain a rolling 1-hour tumbling window leaderboard
Output: top-10 players with their cumulative score in the current window
When a new window starts, scores reset
class RollingLeaderboard:
def __init__(self, window_seconds=3600, top_k=10):
...
def record_score(self, player_id: str, score_delta: int, event_time: datetime):
...
def get_top_k(self, as_of: datetime) -> list:
"""Returns [(rank, player_id, score), ...] for the current window."""
...
Test with 50,000 synthetic events from 1000 players over 3 hours.
Exercise 3 β Consumer Group Rebalancing
Extend MockKafkaTopic with a consumer group registry. Implement partition rebalancing:
register_consumer(group, consumer_id): add a consumer to the groupderegister_consumer(group, consumer_id): remove a consumerget_assignment(group, consumer_id): returns which partitions this consumer ownsUse round-robin assignment (Kafkaβs default strategy)
When a consumer joins or leaves, trigger rebalance: redistribute all partitions evenly
Demonstrate: start with 2 consumers, add a 3rd, show partitions redistribute. Then remove one, show rebalance again.
Exercise 4 β Dead Letter Queue
Add a DLQ to the processing pipeline:
class DLQProcessor:
def __init__(self, main_topic: MockKafkaTopic, dlq_topic: MockKafkaTopic,
max_retries=3):
...
def process(self, message, processing_fn):
"""Try processing_fn up to max_retries times.
On final failure, route to DLQ with error metadata."""
...
Simulate a processing function that fails 10% of the time with a transient error and 2% of the time with a permanent error. Show that transient failures eventually succeed, and permanent failures land in the DLQ with retry count and error reason attached.
Exercise 5 β Stream Join
Implement a windowed stream join between two topics:
Left stream:
purchase_eventsβ(purchase_id, product_id, user_id, amount, timestamp)Right stream:
product_catalogβ(product_id, name, category, cost_price, timestamp)Join on
product_idwithin a 5-minute window (events on both sides within 5 minutes of each other match)Output: enriched purchase with product name, category, and computed margin (
amount - cost_price)
class WindowedStreamJoiner:
def __init__(self, join_window_seconds=300):
self.left_buffer = defaultdict(list) # product_id β [(ts, event)]
self.right_buffer = defaultdict(list) # product_id β [(ts, event)]
def add_left(self, key, event_time, event): ...
def add_right(self, key, event_time, event): ...
def get_joined_results(self) -> list: ...
Handle the case where the catalog update arrives after the purchase event (late right-side join).