Streaming Data: Kafka, Windowed Aggregations, and Real-Time PipelinesΒΆ

Batch pipelines process data hours later. Stream processing handles it in milliseconds. This notebook covers the core concepts β€” Kafka architecture, windowed aggregations, exactly-once semantics β€” with Python simulations you can run today.

1. SetupΒΆ

import numpy as np
import pandas as pd
from collections import defaultdict, deque
from datetime import datetime, timedelta
import threading
import queue
import time
import json
import hashlib
import random

# Optional: kafka-python
try:
    from kafka import KafkaProducer, KafkaConsumer
    HAS_KAFKA = True
except ImportError:
    HAS_KAFKA = False

# Optional: faust
try:
    import faust
    HAS_FAUST = True
except ImportError:
    HAS_FAUST = False

print("Streaming simulation starting...")
print(f"  kafka-python available : {HAS_KAFKA}")
print(f"  faust available        : {HAS_FAUST}")
print("  Using in-memory simulation for all exercises.")

2. Batch vs Stream ProcessingΒΆ

The Fundamental DifferenceΒΆ

Mode

How It Works

Latency

Storage Requirement

Batch

Reads a snapshot of accumulated data and processes it all at once

Hours

Must store all data before processing

Micro-batch

Collects events into small time buckets and runs mini-batches

Minutes

Small rolling buffer

Streaming

Reacts to each event as it arrives, maintaining running state

Milliseconds

Only need current state

When to Use EachΒΆ

  • Batch: End-of-day reports, ML training pipelines, historical analytics

  • Micro-batch (Spark Structured Streaming): Near-real-time dashboards, log aggregation

  • Streaming (Kafka + Flink/Faust): Fraud detection, live recommendations, IoT sensors, financial ticks

Use Cases for True StreamingΒΆ

  • Real-time fraud detection: Flag a suspicious transaction before it clears (< 100ms)

  • Live dashboards: Shopify/Stripe showing revenue as it happens

  • Recommendation systems: β€œUsers who just viewed X also bought Y” in the same session

  • IoT sensors: Alert when a temperature sensor crosses a threshold

# Simulate an e-commerce event log for a single day
random.seed(42)
np.random.seed(42)

NUM_EVENTS = 200
base_time = datetime(2024, 1, 15, 0, 0, 0)

events = []
for i in range(NUM_EVENTS):
    events.append({
        "event_id": i,
        "user_id": f"user_{random.randint(1, 50)}",
        "product_id": f"prod_{random.randint(1, 20)}",
        "event_type": random.choice(["view", "add_to_cart", "purchase"]),
        "amount": round(random.uniform(5.0, 500.0), 2),
        "timestamp": base_time + timedelta(seconds=i * 432),  # ~5 min apart over 24h
    })

# ---------------------------------------------------------------------------
# BATCH MODE: collect everything, run once at end of day
# ---------------------------------------------------------------------------
def batch_revenue_report(events):
    """O(n) β€” runs once over the entire accumulated dataset."""
    purchases = [e for e in events if e["event_type"] == "purchase"]
    total_revenue = sum(e["amount"] for e in purchases)
    per_product = defaultdict(float)
    for e in purchases:
        per_product[e["product_id"]] += e["amount"]
    top_products = sorted(per_product.items(), key=lambda x: -x[1])[:5]
    return {
        "total_revenue": round(total_revenue, 2),
        "num_purchases": len(purchases),
        "top_products": top_products,
    }

batch_start = time.perf_counter()
batch_result = batch_revenue_report(events)
batch_elapsed = (time.perf_counter() - batch_start) * 1000

print("=== BATCH RESULT (runs after all 200 events collected) ===")
print(f"  Total revenue   : ${batch_result['total_revenue']:,.2f}")
print(f"  Total purchases : {batch_result['num_purchases']}")
print(f"  Top products    : {batch_result['top_products'][:3]}")
print(f"  Latency         : {batch_elapsed:.3f} ms (but only runs at end-of-day)")

# ---------------------------------------------------------------------------
# STREAM MODE: update running totals with each arriving event
# ---------------------------------------------------------------------------
def stream_revenue_update(event, state):
    """O(1) per event β€” maintains running state, reacts immediately."""
    if event["event_type"] == "purchase":
        state["total_revenue"] += event["amount"]
        state["num_purchases"] += 1
        state["per_product"][event["product_id"]] += event["amount"]
    state["last_updated"] = event["timestamp"]
    return state

stream_state = {"total_revenue": 0.0, "num_purchases": 0, "per_product": defaultdict(float), "last_updated": None}
stream_start = time.perf_counter()
for event in events:
    stream_state = stream_revenue_update(event, stream_state)
stream_elapsed = (time.perf_counter() - stream_start) * 1000

print("\n=== STREAM RESULT (updated after every single event) ===")
print(f"  Total revenue   : ${stream_state['total_revenue']:,.2f}")
print(f"  Total purchases : {stream_state['num_purchases']}")
print(f"  Latency per evt : {stream_elapsed / NUM_EVENTS * 1000:.3f} Β΅s")
print(f"  State available : after EVERY event, not just end-of-day")

print("\n=== KEY INSIGHT ===")
print("  Both produce identical totals, but streaming gives you the answer")
print("  in real-time β€” not hours later when the batch job finally runs.")

3. Kafka ArchitectureΒΆ

Core ComponentsΒΆ

Producer  β†’  Topic (Partition 0)  β†’  Consumer Group A
          β†’  Topic (Partition 1)  β†’  Consumer Group B
          β†’  Topic (Partition 2)  β†’  Consumer Group B

Within a partition: messages are ordered, offset-tracked
Across partitions: parallelism, no ordering guarantee

Concept BreakdownΒΆ

Concept

Description

Topic

A named, durable log of messages (like a database table, but append-only)

Partition

An ordered, immutable sequence of records within a topic. The unit of parallelism.

Offset

Integer position of a message within a partition. Consumers commit their offset to track progress.

Consumer Group

A set of consumers that collectively read a topic. Each partition is assigned to exactly one consumer in the group.

Broker

A Kafka server. A cluster typically has 3+ brokers for fault tolerance.

Replication Factor

Number of copies of each partition across brokers. Factor of 3 tolerates 2 broker failures.

Retention

How long Kafka keeps messages (default: 7 days). Consumers can re-read historical data.

Why Partitions?ΒΆ

Partitions are Kafka’s scalability mechanism:

  • Parallelism: 10 partitions β†’ 10 consumers can process simultaneously

  • Ordering: Within a partition, order is guaranteed. Events for the same user_id go to the same partition (via key hashing) so per-user ordering is preserved.

  • Throughput: A single Kafka topic can handle millions of messages/second by distributing across partitions.

Consumer Groups β€” The Key to ScalabilityΒΆ

Topic: orders (3 partitions)

Group A β€” Analytics:
  Consumer A1 β†’ Partition 0, 1
  Consumer A2 β†’ Partition 2

Group B β€” Fraud Detection:
  Consumer B1 β†’ Partition 0
  Consumer B2 β†’ Partition 1
  Consumer B3 β†’ Partition 2

Each group reads the topic independently β€” Group A and Group B both get every message, but each consumer within a group only sees a subset of partitions.

4. Kafka Producer/Consumer SimulationΒΆ

class MockKafkaTopic:
    """Thread-safe in-memory simulation of a Kafka topic."""

    def __init__(self, name, num_partitions=3):
        self.name = name
        self.num_partitions = num_partitions
        # Each partition is a list of (offset, key, value, timestamp) tuples
        self._partitions = [[] for _ in range(num_partitions)]
        # consumer_group β†’ {partition_id: next_offset}
        self._offsets = defaultdict(lambda: defaultdict(int))
        self._lock = threading.Lock()
        self._total_produced = 0

    def _partition_for_key(self, key):
        """Deterministic partition assignment via key hash (mirrors Kafka default partitioner)."""
        if key is None:
            return self._total_produced % self.num_partitions
        key_bytes = str(key).encode("utf-8")
        return int(hashlib.md5(key_bytes).hexdigest(), 16) % self.num_partitions

    def produce(self, key, value, timestamp=None):
        """Append a message to the appropriate partition."""
        if timestamp is None:
            timestamp = datetime.utcnow()
        partition_id = self._partition_for_key(key)
        with self._lock:
            offset = len(self._partitions[partition_id])
            self._partitions[partition_id].append(
                {"offset": offset, "key": key, "value": value, "timestamp": timestamp}
            )
            self._total_produced += 1
        return partition_id, offset

    def consume(self, consumer_group, partition=None, max_records=10, timeout=1.0):
        """Fetch next batch of records for a consumer group."""
        results = []
        partitions_to_read = [partition] if partition is not None else range(self.num_partitions)
        with self._lock:
            for p in partitions_to_read:
                current_offset = self._offsets[consumer_group][p]
                partition_data = self._partitions[p]
                batch = partition_data[current_offset: current_offset + max_records]
                if batch:
                    results.extend(batch)
                    self._offsets[consumer_group][p] = current_offset + len(batch)
        return results

    def get_lag(self, consumer_group):
        """Messages produced but not yet consumed by this consumer group."""
        with self._lock:
            lag = 0
            for p in range(self.num_partitions):
                produced = len(self._partitions[p])
                consumed = self._offsets[consumer_group][p]
                lag += max(0, produced - consumed)
            return lag

    def partition_stats(self):
        with self._lock:
            return {f"partition_{i}": len(self._partitions[i]) for i in range(self.num_partitions)}


# ---------------------------------------------------------------------------
# Produce 1000 e-commerce events
# ---------------------------------------------------------------------------
random.seed(0)
topic = MockKafkaTopic("ecommerce-events", num_partitions=3)

event_types = ["page_view", "add_to_cart", "remove_from_cart", "purchase", "refund"]
event_weights = [0.50, 0.25, 0.10, 0.12, 0.03]

base_ts = datetime(2024, 1, 15, 9, 0, 0)
for i in range(1000):
    user_id = f"user_{random.randint(1, 200)}"
    event = {
        "event_id": f"evt_{i:05d}",
        "user_id": user_id,
        "product_id": f"prod_{random.randint(1, 50)}",
        "event_type": random.choices(event_types, weights=event_weights)[0],
        "amount": round(random.uniform(9.99, 499.99), 2),
        "session_id": f"sess_{random.randint(1, 500)}",
    }
    topic.produce(key=user_id, value=event, timestamp=base_ts + timedelta(milliseconds=i * 100))

print(f"Topic '{topic.name}' β€” partition distribution:")
for p, count in topic.partition_stats().items():
    print(f"  {p}: {count} messages")
print(f"  Total produced: {topic._total_produced}")

# ---------------------------------------------------------------------------
# Two independent consumer groups
# ---------------------------------------------------------------------------
print("\n--- Consumer Group: analytics ---")
analytics_batch = topic.consume("analytics", max_records=50)
print(f"  Consumed {len(analytics_batch)} records")
print(f"  Lag after first batch: {topic.get_lag('analytics')}")

# Drain analytics
total_analytics = len(analytics_batch)
while True:
    batch = topic.consume("analytics", max_records=100)
    if not batch:
        break
    total_analytics += len(batch)
print(f"  Total consumed (all batches): {total_analytics}")
print(f"  Final lag: {topic.get_lag('analytics')}")

print("\n--- Consumer Group: fraud-detection (only consumed 200 so far) ---")
fraud_consumed = 0
for _ in range(2):
    batch = topic.consume("fraud-detection", max_records=100)
    fraud_consumed += len(batch)
print(f"  Consumed so far: {fraud_consumed}")
print(f"  Lag: {topic.get_lag('fraud-detection')} (independent of analytics group)")

5. Windowed AggregationsΒΆ

Windowed aggregations are the heart of stream processing. Rather than aggregating over all history, we aggregate over a window of time.

TUMBLING WINDOW (size=60s, no overlap)
  [0s────60s]  [60s───120s]  [120s──180s]

SLIDING WINDOW (size=60s, slide=20s)
  [0s────60s]
       [20s───80s]
            [40s──100s]

SESSION WINDOW (gap=30s)
  [click──click──click]  30s gap  [click──click]
  └──── session 1 β”€β”€β”€β”€β”˜           └─ session 2 β”˜
import math

# ---------------------------------------------------------------------------
# Tumbling Window
# ---------------------------------------------------------------------------
class TumblingWindowAggregator:
    """Fixed-size, non-overlapping windows."""

    def __init__(self, window_size_seconds):
        self.window_size = window_size_seconds
        self._windows = defaultdict(lambda: {"count": 0, "sum": 0.0, "events": []})
        self._max_event_time = None

    def _window_key(self, event_time_ts):
        """Map a timestamp to its window start (epoch seconds)."""
        return int(event_time_ts // self.window_size) * self.window_size

    def add(self, event_time: datetime, value: float):
        ts = event_time.timestamp()
        if self._max_event_time is None or ts > self._max_event_time:
            self._max_event_time = ts
        wk = self._window_key(ts)
        self._windows[wk]["count"] += 1
        self._windows[wk]["sum"] += value
        self._windows[wk]["events"].append(value)

    def get_completed_windows(self):
        """Return windows that are fully closed (event time has moved past them)."""
        if self._max_event_time is None:
            return []
        current_window_start = self._window_key(self._max_event_time)
        completed = []
        for wk, stats in sorted(self._windows.items()):
            if wk < current_window_start:
                completed.append({
                    "window_start": datetime.utcfromtimestamp(wk),
                    "window_end": datetime.utcfromtimestamp(wk + self.window_size),
                    "count": stats["count"],
                    "sum": round(stats["sum"], 2),
                    "avg": round(stats["sum"] / stats["count"], 2) if stats["count"] else 0,
                })
        return completed


# ---------------------------------------------------------------------------
# Sliding Window
# ---------------------------------------------------------------------------
class SlidingWindowAggregator:
    """Fixed-size overlapping windows."""

    def __init__(self, window_size_seconds, slide_interval_seconds):
        self.window_size = window_size_seconds
        self.slide_interval = slide_interval_seconds
        self._events = []  # list of (timestamp_float, value)

    def add(self, event_time: datetime, value: float):
        self._events.append((event_time.timestamp(), value))
        self._events.sort(key=lambda x: x[0])

    def get_windows(self):
        if not self._events:
            return []
        first_ts = self._events[0][0]
        last_ts = self._events[-1][0]
        # Align window starts to slide_interval grid
        start = int(first_ts // self.slide_interval) * self.slide_interval
        results = []
        w_start = start
        while w_start <= last_ts:
            w_end = w_start + self.window_size
            window_events = [v for ts, v in self._events if w_start <= ts < w_end]
            if window_events:
                results.append({
                    "window_start": datetime.utcfromtimestamp(w_start),
                    "window_end": datetime.utcfromtimestamp(w_end),
                    "count": len(window_events),
                    "sum": round(sum(window_events), 2),
                    "avg": round(sum(window_events) / len(window_events), 2),
                })
            w_start += self.slide_interval
        return results


# ---------------------------------------------------------------------------
# Session Window
# ---------------------------------------------------------------------------
class SessionWindowAggregator:
    """Gap-based sessions per key."""

    def __init__(self, inactivity_gap_seconds):
        self.gap = inactivity_gap_seconds
        self._sessions = defaultdict(list)  # key β†’ list of (timestamp, value)

    def add(self, key, event_time: datetime, value: float):
        self._sessions[key].append((event_time.timestamp(), value))

    def get_sessions(self, key=None):
        keys = [key] if key else list(self._sessions.keys())
        all_sessions = []
        for k in keys:
            events = sorted(self._sessions[k], key=lambda x: x[0])
            if not events:
                continue
            session_start = events[0][0]
            prev_ts = events[0][0]
            session_values = [events[0][1]]
            for ts, val in events[1:]:
                if ts - prev_ts > self.gap:
                    all_sessions.append({
                        "key": k,
                        "session_start": datetime.utcfromtimestamp(session_start),
                        "session_end": datetime.utcfromtimestamp(prev_ts),
                        "duration_s": round(prev_ts - session_start, 1),
                        "event_count": len(session_values),
                        "total_value": round(sum(session_values), 2),
                    })
                    session_start = ts
                    session_values = [val]
                else:
                    session_values.append(val)
                prev_ts = ts
            # Flush final session
            all_sessions.append({
                "key": k,
                "session_start": datetime.utcfromtimestamp(session_start),
                "session_end": datetime.utcfromtimestamp(prev_ts),
                "duration_s": round(prev_ts - session_start, 1),
                "event_count": len(session_values),
                "total_value": round(sum(session_values), 2),
            })
        return all_sessions


# ---------------------------------------------------------------------------
# Generate a stream of clickstream events (10 minutes, purchase amounts)
# ---------------------------------------------------------------------------
random.seed(7)
click_base = datetime(2024, 1, 15, 10, 0, 0)
click_events = []
for i in range(300):
    gap = random.expovariate(0.5)  # avg 2s between events
    ts = click_base + timedelta(seconds=sum(random.expovariate(0.5) for _ in range(i + 1)))
    click_events.append({
        "user_id": f"user_{random.randint(1, 10)}",
        "timestamp": click_base + timedelta(seconds=i * 2 + random.uniform(0, 1)),
        "amount": round(random.uniform(1.0, 200.0), 2),
    })

# Feed events into all three window types
tumbling = TumblingWindowAggregator(window_size_seconds=60)
sliding  = SlidingWindowAggregator(window_size_seconds=120, slide_interval_seconds=30)
session  = SessionWindowAggregator(inactivity_gap_seconds=30)

for e in click_events:
    tumbling.add(e["timestamp"], e["amount"])
    sliding.add(e["timestamp"], e["amount"])
    session.add(e["user_id"], e["timestamp"], e["amount"])

# Results
print("=== TUMBLING WINDOW (60s buckets) β€” first 5 windows ===")
for w in tumbling.get_completed_windows()[:5]:
    print(f"  {w['window_start'].strftime('%H:%M:%S')} β†’ {w['window_end'].strftime('%H:%M:%S')}  "
          f"count={w['count']:3d}  sum=${w['sum']:7.2f}  avg=${w['avg']:6.2f}")

print("\n=== SLIDING WINDOW (120s window, 30s slide) β€” first 5 windows ===")
for w in sliding.get_windows()[:5]:
    print(f"  {w['window_start'].strftime('%H:%M:%S')} β†’ {w['window_end'].strftime('%H:%M:%S')}  "
          f"count={w['count']:3d}  sum=${w['sum']:7.2f}  avg=${w['avg']:6.2f}")

print("\n=== SESSION WINDOW (30s inactivity gap) β€” user_1 sessions ===")
user1_sessions = session.get_sessions("user_1")
for s in user1_sessions[:5]:
    print(f"  {s['session_start'].strftime('%H:%M:%S')} β†’ {s['session_end'].strftime('%H:%M:%S')}  "
          f"duration={s['duration_s']}s  events={s['event_count']}  total=${s['total_value']:.2f}")
print(f"  (user_1 has {len(user1_sessions)} sessions total)")

6. Late Data HandlingΒΆ

In distributed systems, events don’t always arrive in the order they were generated. A mobile app might buffer events while offline, network delays vary, and retries create ordering chaos.

Without a watermark: close windows based on wall-clock time β†’ miss late-arriving events.

With a watermark: define a tolerance (e.g., 3 seconds). β€œI’ll accept events that are up to 3 seconds late.” This trades some additional latency for completeness.

Event time:   t=0  t=1  t=2  t=3  t=4  t=5
Arrival time: t=0  t=1  t=4  t=3  t=2  t=5   ← disordered!

Watermark at t=5:  max_seen(5) - tolerance(3) = 2
  β†’ windows ending before t=2 are now CLOSED
  β†’ events with event_time β‰₯ 2 are still accepted
class WatermarkWindowAggregator:
    """Tumbling window aggregator that tolerates late events via watermarking."""

    def __init__(self, window_size_seconds, watermark_seconds):
        self.window_size = window_size_seconds
        self.watermark_lag = watermark_seconds
        self._windows = defaultdict(lambda: {"count": 0, "sum": 0.0})
        self._max_event_ts = None
        self._dropped = 0
        self._accepted_late = 0

    def _window_key(self, ts):
        return int(ts // self.window_size) * self.window_size

    @property
    def watermark(self):
        if self._max_event_ts is None:
            return None
        return self._max_event_ts - self.watermark_lag

    def add(self, event_time: datetime, value: float, is_late_flag=False):
        ts = event_time.timestamp()
        # Advance watermark
        if self._max_event_ts is None or ts > self._max_event_ts:
            self._max_event_ts = ts
        wk = self._window_key(ts)
        window_end = wk + self.window_size
        # Drop event if its window has already closed (past watermark)
        if self.watermark is not None and window_end <= self.watermark:
            self._dropped += 1
            return "dropped"
        if is_late_flag:
            self._accepted_late += 1
        self._windows[wk]["count"] += 1
        self._windows[wk]["sum"] += value
        return "accepted"

    def get_closed_windows(self):
        if self.watermark is None:
            return []
        closed = []
        for wk, stats in sorted(self._windows.items()):
            if wk + self.window_size <= self.watermark:
                closed.append({
                    "window_start": datetime.utcfromtimestamp(wk),
                    "window_end": datetime.utcfromtimestamp(wk + self.window_size),
                    "count": stats["count"],
                    "sum": round(stats["sum"], 2),
                })
        return closed


# Generate events with random delays (simulating network jitter)
random.seed(42)
event_base = datetime(2024, 1, 15, 12, 0, 0).timestamp()

# True event times: one event per second for 60 seconds
true_events = [(event_base + i, round(random.uniform(10.0, 100.0), 2)) for i in range(60)]

# Simulate arrival order: each event is delayed by 0–5 seconds
delayed_events = []
for true_ts, value in true_events:
    delay = random.uniform(0, 5)
    arrival_ts = true_ts + delay
    delayed_events.append({"event_ts": true_ts, "arrival_ts": arrival_ts, "value": value, "delay": delay})

# Sort by ARRIVAL time (as a real stream would deliver them)
delayed_events.sort(key=lambda x: x["arrival_ts"])

# Processor A: no watermark tolerance (watermark_seconds=0)
strict_agg = WatermarkWindowAggregator(window_size_seconds=10, watermark_seconds=0)
# Processor B: 3-second watermark
lenient_agg = WatermarkWindowAggregator(window_size_seconds=10, watermark_seconds=3)

for e in delayed_events:
    is_late = e["delay"] > 0
    event_dt = datetime.utcfromtimestamp(e["event_ts"])
    strict_agg.add(event_dt, e["value"], is_late_flag=is_late)
    lenient_agg.add(event_dt, e["value"], is_late_flag=is_late)

print("=== WITHOUT WATERMARK TOLERANCE (watermark=0s) ===")
print(f"  Events dropped (arrived late): {strict_agg._dropped}")
strict_windows = strict_agg.get_closed_windows()
for w in strict_windows[:4]:
    print(f"  {w['window_start'].strftime('%H:%M:%S')} β†’ {w['window_end'].strftime('%H:%M:%S')}  "
          f"count={w['count']:2d}  sum=${w['sum']:6.2f}")

print("\n=== WITH 3-SECOND WATERMARK TOLERANCE ===")
print(f"  Events dropped: {lenient_agg._dropped}")
print(f"  Late events accepted: {lenient_agg._accepted_late}")
lenient_windows = lenient_agg.get_closed_windows()
for w in lenient_windows[:4]:
    print(f"  {w['window_start'].strftime('%H:%M:%S')} β†’ {w['window_end'].strftime('%H:%M:%S')}  "
          f"count={w['count']:2d}  sum=${w['sum']:6.2f}")

print("\n=== TRADEOFF ===")
print(f"  Strict : {strict_agg._dropped} dropped events β†’ lower latency, less complete")
print(f"  Lenient: {lenient_agg._dropped} dropped events β†’ 3s extra latency, more complete")

7. Stateful Stream ProcessingΒΆ

class FraudDetector:
    """
    Track per-user statistics in a rolling window.
    Detects anomalous patterns using three rules:
      1. More than 5 transactions in a 5-minute window
      2. Total amount exceeding $1000 in a 5-minute window
      3. Transaction velocity spike (amount > 3x user's rolling average)
    """

    def __init__(self, window_seconds=300):
        self.window_seconds = window_seconds
        # user_id β†’ deque of (timestamp, amount)
        self.user_state = defaultdict(lambda: {
            "transactions": deque(),
            "lifetime_amounts": [],
            "txn_count": 0,
        })
        self.fraud_events = []

    def _evict_old(self, user_id, current_ts):
        """Remove transactions outside the rolling window."""
        state = self.user_state[user_id]
        while state["transactions"] and (current_ts - state["transactions"][0][0]) > self.window_seconds:
            state["transactions"].popleft()

    def process(self, event) -> dict:
        uid = event["user_id"]
        ts = event["timestamp"].timestamp()
        amount = event["amount"]
        state = self.user_state[uid]

        self._evict_old(uid, ts)

        # Append new transaction
        state["transactions"].append((ts, amount))
        state["lifetime_amounts"].append(amount)
        state["txn_count"] += 1

        # Compute window statistics
        window_txns = list(state["transactions"])
        window_count = len(window_txns)
        window_total = sum(a for _, a in window_txns)
        lifetime_avg = sum(state["lifetime_amounts"]) / len(state["lifetime_amounts"])

        # Evaluate rules
        alerts = []
        if window_count > 5:
            alerts.append(f"HIGH_VELOCITY:{window_count}_txns_in_{self.window_seconds}s")
        if window_total > 1000:
            alerts.append(f"HIGH_AMOUNT:${window_total:.0f}_in_{self.window_seconds}s")
        if len(state["lifetime_amounts"]) >= 5 and amount > 3 * lifetime_avg:
            alerts.append(f"AMOUNT_SPIKE:${amount:.2f}_vs_avg_${lifetime_avg:.2f}")

        result = {
            "event_id": event["event_id"],
            "user_id": uid,
            "timestamp": event["timestamp"],
            "amount": amount,
            "window_count": window_count,
            "window_total": round(window_total, 2),
            "is_fraud": len(alerts) > 0,
            "alerts": alerts,
        }
        if result["is_fraud"]:
            self.fraud_events.append(result)
        return result


# Generate 10000 synthetic transactions
random.seed(99)
txn_base = datetime(2024, 1, 15, 8, 0, 0)

transactions = []
# Most users are normal
for i in range(9700):
    uid = f"user_{random.randint(1, 500)}"
    transactions.append({
        "event_id": f"txn_{i:05d}",
        "user_id": uid,
        "amount": round(abs(random.gauss(50, 30)), 2),
        "timestamp": txn_base + timedelta(seconds=i * 3 + random.uniform(0, 3)),
    })

# Inject fraudulent patterns
fraud_base = txn_base + timedelta(minutes=30)
for j in range(300):  # 300 rapid transactions from user_999
    transactions.append({
        "event_id": f"txn_fraud_{j:04d}",
        "user_id": "user_999",
        "amount": round(random.uniform(150, 400), 2),
        "timestamp": fraud_base + timedelta(seconds=j * 1),  # 1 per second
    })

# Sort by timestamp
transactions.sort(key=lambda x: x["timestamp"])

detector = FraudDetector(window_seconds=300)
for txn in transactions:
    detector.process(txn)

fraud_df = pd.DataFrame(detector.fraud_events)
print(f"Processed {len(transactions):,} transactions")
print(f"Detected  {len(detector.fraud_events):,} fraud signals\n")

if not fraud_df.empty:
    by_user = fraud_df.groupby("user_id").agg(
        fraud_events=("event_id", "count"),
        total_flagged_amount=("amount", "sum"),
        max_window_total=("window_total", "max"),
        max_window_count=("window_count", "max"),
    ).sort_values("fraud_events", ascending=False)
    print("Top flagged users:")
    print(by_user.head(10).to_string())

    print("\nSample fraud alerts for user_999:")
    sample = fraud_df[fraud_df["user_id"] == "user_999"].head(3)
    for _, row in sample.iterrows():
        print(f"  {row['timestamp'].strftime('%H:%M:%S')}  ${row['amount']:.2f}  "
              f"window={row['window_count']} txns/${row['window_total']:.0f}  alerts={row['alerts']}")

8. Exactly-Once SemanticsΒΆ

One of the hardest problems in distributed systems is ensuring a message is processed exactly once β€” not lost, not duplicated.

The Three Delivery GuaranteesΒΆ

Guarantee

Mechanism

Risk

Use Case

At-most-once

Fire and forget, no ack

Lost messages

Metrics/telemetry where loss is acceptable

At-least-once

Retry until acked

Duplicate processing

Default Kafka; safe if downstream is idempotent

Exactly-once

Idempotent writes + transactional producers

None (but more complex)

Financial transactions, inventory updates

How Exactly-Once Works in KafkaΒΆ

  1. Idempotent producer: Kafka broker deduplicates retried produces using a sequence number per (producer_id, partition)

  2. Transactional API: Producer opens a transaction, sends to multiple partitions, commits atomically

  3. Consumer offset in transaction: The offset commit is part of the same transaction as the output write β†’ no partial state

The pattern: read β†’ process β†’ write output + commit offset all in one atomic transaction.

class IdempotentProcessor:
    """Use message ID to deduplicate retried messages."""

    def __init__(self):
        self.processed_ids = set()
        self.total_calls = 0
        self.total_processed = 0
        self.total_skipped = 0

    def _do_work(self, payload):
        return {"revenue": payload.get("amount", 0), "processed_at": datetime.utcnow()}

    def process(self, message_id, payload):
        self.total_calls += 1
        if message_id in self.processed_ids:
            self.total_skipped += 1
            return None  # Duplicate β€” skip
        result = self._do_work(payload)
        self.processed_ids.add(message_id)
        self.total_processed += 1
        return result


class NonIdempotentProcessor:
    """Naive processor that counts duplicates as new work."""

    def __init__(self):
        self.total_calls = 0
        self.total_processed = 0
        self.total_revenue = 0.0

    def process(self, message_id, payload):
        self.total_calls += 1
        self.total_processed += 1
        self.total_revenue += payload.get("amount", 0)


# Simulate at-least-once delivery: some messages are delivered multiple times
random.seed(5)
messages = [
    {"message_id": f"msg_{i:04d}", "payload": {"amount": round(random.uniform(10, 500), 2)}}
    for i in range(100)
]

# Simulate retries: 20% of messages are "retried" (delivered twice)
delivered = list(messages)
retried = random.sample(messages, k=20)
delivered.extend(retried)  # duplicates appended
random.shuffle(delivered)

true_revenue = sum(m["payload"]["amount"] for m in messages)

idempotent = IdempotentProcessor()
naive = NonIdempotentProcessor()

idempotent_revenue = 0.0
for msg in delivered:
    result = idempotent.process(msg["message_id"], msg["payload"])
    if result:
        idempotent_revenue += result["revenue"]
    naive.process(msg["message_id"], msg["payload"])

print("=== DELIVERY SIMULATION ===")
print(f"  Unique messages    : {len(messages)}")
print(f"  Total deliveries   : {len(delivered)} ({len(retried)} retries)")
print(f"  True revenue       : ${true_revenue:,.2f}")

print("\n--- Without deduplication (naive / at-least-once) ---")
print(f"  Processed count    : {naive.total_processed}  (WRONG β€” should be {len(messages)})")
print(f"  Computed revenue   : ${naive.total_revenue:,.2f}  (WRONG β€” overcounted by ${naive.total_revenue - true_revenue:,.2f})")

print("\n--- With deduplication (idempotent / exactly-once) ---")
print(f"  Processed count    : {idempotent.total_processed}  (CORRECT)")
print(f"  Skipped duplicates : {idempotent.total_skipped}")
print(f"  Computed revenue   : ${idempotent_revenue:,.2f}  (CORRECT β€” matches ${true_revenue:,.2f})")

9. Stream Processing Frameworks ComparisonΒΆ

Real production systems use dedicated frameworks. Here’s how the same β€œfilter high-value events and count per user over 5-minute tumbling windows” pattern looks across the three major options.

When to Choose WhichΒΆ

Framework

Language

Latency

Exactly-Once

Best For

Faust

Python

Low (ms)

Yes (with Kafka)

Python shops, moderate scale

Kafka Streams

Java/Kotlin

Very low (ms)

Yes (native Kafka)

JVM shops, tight Kafka integration

Apache Flink

Java/Python/Scala

Ultra-low (Β΅s–ms)

Yes

High-scale, complex stateful ops

faust_code = '''
# ── Faust (Python) ─────────────────────────────────────────────────────────
import faust

app = faust.App('revenue-aggregator', broker='kafka://localhost:9092')

class Event(faust.Record):
    user_id: str
    amount: float
    event_type: str

input_topic  = app.topic('ecommerce-events', value_type=Event)
output_topic = app.topic('revenue-alerts')

@app.agent(input_topic)
async def process_event(events):
    """Stateless transformation β€” filter and forward high-value purchases."""
    async for event in events:
        if event.event_type == 'purchase' and event.amount > 100:
            await output_topic.send(value={
                'user_id': event.user_id,
                'amount': event.amount,
                'flag': 'high_value_purchase',
            })

# Windowed aggregation (Faust Table)
revenue_table = app.Table('revenue', default=float).tumbling(60, expires=timedelta(hours=1))

@app.agent(input_topic)
async def aggregate_revenue(events):
    async for event in events.group_by(Event.user_id):
        revenue_table[event.user_id] += event.amount
'''

kafka_streams_code = '''
// ── Kafka Streams (Java) ───────────────────────────────────────────────────
StreamsBuilder builder = new StreamsBuilder();

KStream<String, Event> events = builder.stream("ecommerce-events");

events
    .filter((key, event) -> event.eventType.equals("purchase") && event.amount > 100)
    .groupBy((key, event) -> event.userId)
    .windowedBy(TimeWindows.ofSizeWithNoGrace(Duration.ofMinutes(5)))
    .aggregate(
        () -> 0.0,
        (userId, event, total) -> total + event.amount,
        Materialized.as("revenue-store")
    )
    .toStream()
    .filter((windowedKey, total) -> total > 1000.0)  // alert threshold
    .to("fraud-alerts");

KafkaStreams app = new KafkaStreams(builder.build(), config);
app.start();
'''

flink_code = '''
# ── Apache Flink (Python API) ───────────────────────────────────────────────
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.datastream.connectors import FlinkKafkaConsumer
from pyflink.datastream.window import TumblingEventTimeWindows
from pyflink.common.time import Time

env = StreamExecutionEnvironment.get_execution_environment()
env.set_parallelism(4)

kafka_source = FlinkKafkaConsumer(
    topics='ecommerce-events',
    deserialization_schema=EventSchema(),
    properties={'bootstrap.servers': 'localhost:9092'},
)

revenue_per_user = (
    env.add_source(kafka_source)
       .assign_timestamps_and_watermarks(BoundedOutOfOrdernessWatermarks(Duration.ofSeconds(3)))
       .filter(lambda e: e.event_type == 'purchase' and e.amount > 100)
       .key_by(lambda e: e.user_id)
       .window(TumblingEventTimeWindows.of(Time.minutes(5)))
       .aggregate(RevenueAggregate())  # custom AggregateFunction
       .filter(lambda (user_id, total): total > 1000.0)
)

revenue_per_user.add_sink(FlinkKafkaProducer('fraud-alerts', ProducerSchema()))
env.execute('Revenue Fraud Detector')
'''

print("=" * 70)
print("FAUST (Python β€” easiest for Python teams)")
print("=" * 70)
print(faust_code)

print("=" * 70)
print("KAFKA STREAMS (Java β€” native Kafka integration, no extra cluster)")
print("=" * 70)
print(kafka_streams_code)

print("=" * 70)
print("APACHE FLINK (Python β€” highest throughput, most powerful)")
print("=" * 70)
print(flink_code)

10. Backpressure SimulationΒΆ

Backpressure is a mechanism for a slow consumer to signal an upstream producer to slow down, preventing unbounded queue growth and memory exhaustion.

Without backpressure:

  • Producer at 1000 msg/s, consumer at 800 msg/s β†’ queue grows by 200 msg/s indefinitely

  • After N seconds: OOM, consumer falls arbitrarily far behind, latency grows without bound

With backpressure:

  • Consumer signals β€œqueue is full” β†’ producer blocks or drops

  • System reaches stable equilibrium at consumer’s processing rate

import time as _time

def simulate_backpressure(producer_rate, consumer_rate, duration_seconds=5,
                           max_queue_size=None, use_backpressure=False):
    """
    Simulate producer/consumer with optional backpressure.
    Returns a timeline of (second, queue_size, total_produced, total_consumed).
    """
    q_size = 0
    total_produced = 0
    total_consumed = 0
    timeline = []
    backpressure_events = 0
    effective_producer_rate = producer_rate

    for second in range(duration_seconds):
        # Apply backpressure: throttle producer if queue exceeds threshold
        if use_backpressure and max_queue_size and q_size > max_queue_size * 0.8:
            # Consumer signals producer to slow down to match consumer rate
            effective_producer_rate = consumer_rate
            backpressure_events += 1
        elif use_backpressure:
            # Gradually ramp back up
            effective_producer_rate = min(producer_rate, effective_producer_rate + 50)

        produced_this_second = effective_producer_rate
        consumed_this_second = min(consumer_rate, q_size + produced_this_second)

        total_produced += produced_this_second
        total_consumed += consumed_this_second
        q_size = max(0, q_size + produced_this_second - consumed_this_second)

        if max_queue_size:
            q_size = min(q_size, max_queue_size)

        latency_ms = (q_size / consumer_rate) * 1000 if consumer_rate > 0 else float('inf')
        timeline.append({
            "second": second + 1,
            "queue_size": int(q_size),
            "total_produced": int(total_produced),
            "total_consumed": int(total_consumed),
            "lag": int(total_produced - total_consumed),
            "latency_ms": round(latency_ms, 1),
            "effective_rate": effective_producer_rate,
            "backpressure_active": use_backpressure and effective_producer_rate < producer_rate,
        })

    return timeline, backpressure_events


# Scenario: producer 1000/s, consumer 800/s over 10 seconds
PRODUCER_RATE = 1000
CONSUMER_RATE = 800
DURATION = 10
MAX_Q = 5000

timeline_no_bp, _ = simulate_backpressure(
    PRODUCER_RATE, CONSUMER_RATE, DURATION, max_queue_size=MAX_Q, use_backpressure=False
)
timeline_bp, bp_events = simulate_backpressure(
    PRODUCER_RATE, CONSUMER_RATE, DURATION, max_queue_size=MAX_Q, use_backpressure=True
)

print(f"{'='*65}")
print(f"{'Sec':>4}  {'Without Backpressure':^28}  {'With Backpressure':^28}")
print(f"{'':>4}  {'Queue':>8} {'Lag':>8} {'Latency':>9}  {'Queue':>8} {'Lag':>8} {'BP?':>5}")
print(f"{'='*65}")
for no_bp, bp in zip(timeline_no_bp, timeline_bp):
    print(f"{no_bp['second']:>4}  "
          f"{no_bp['queue_size']:>8,} {no_bp['lag']:>8,} {no_bp['latency_ms']:>8.0f}ms  "
          f"{bp['queue_size']:>8,} {bp['lag']:>8,} {'YES' if bp['backpressure_active'] else 'no':>5}")

print(f"{'='*65}")
final_no_bp = timeline_no_bp[-1]
final_bp = timeline_bp[-1]
print(f"\nFinal state after {DURATION}s:")
print(f"  WITHOUT backpressure: queue={final_no_bp['queue_size']:,}  lag={final_no_bp['lag']:,}  latency={final_no_bp['latency_ms']:.0f}ms")
print(f"  WITH backpressure:    queue={final_bp['queue_size']:,}  lag={final_bp['lag']:,}  latency={final_bp['latency_ms']:.0f}ms")
print(f"  Backpressure was active for {bp_events} of {DURATION} seconds")

11. Cheat SheetΒΆ

Concept               Description
─────────────────────────────────────────────────────────────────────────────
Topic                 Named stream of messages (like a table in batch)
Partition             Ordered, immutable sequence within a topic
Consumer group        Multiple consumers sharing work (each reads distinct partitions)
Offset                Position in a partition β€” committed after processing
Retention             How long Kafka stores messages (default: 7 days)
Replication factor    Number of partition copies (3 = tolerate 2 broker failures)
Watermark             How far behind real-time we tolerate for late events
Tumbling window       [0-60s], [60-120s], ... β€” non-overlapping buckets
Sliding window        [0-60s], [10-70s], ... β€” overlapping, richer aggregations
Session window        Gap-based: events <30s apart = same session
Exactly-once          Idempotent writes + transactional producers
Backpressure          Consumer signals producer to slow down, preventing OOM
State store           Per-key mutable state in stream processor (e.g., Flink RocksDB)
Checkpoint            Periodic snapshot of state for fault tolerance
Dead letter queue     Destination for unprocessable events (for later investigation)
─────────────────────────────────────────────────────────────────────────────

Window selection guide:
  Need fixed time buckets for metrics?             β†’ Tumbling
  Need smooth rolling aggregation?                 β†’ Sliding
  Grouping user activity into logical interactions β†’ Session

Delivery guarantee selection:
  Losing events is acceptable (telemetry, metrics) β†’ At-most-once
  Can tolerate duplicates (idempotent downstream)  β†’ At-least-once  (default Kafka)
  Financial / inventory / ledger operations        β†’ Exactly-once

When to use streaming vs batch:
  Latency requirement < 1 minute                  β†’ Streaming
  Complex ML training / full-history scans         β†’ Batch
  Near-real-time dashboards with simpler ETL       β†’ Micro-batch (Spark Streaming)

12. ExercisesΒΆ

Exercise 1 β€” SlidingWindowAggregator

The SlidingWindowAggregator above recomputes all windows from scratch each time get_windows() is called (O(n Γ— w/s)). Rewrite it to maintain an incremental state: as each new event arrives, only update the windows it falls into. Your implementation should have O(window_size / slide_interval) cost per add() call and O(1) cost per completed window emission.

class EfficientSlidingWindowAggregator:
    def __init__(self, window_size_seconds: int, slide_interval_seconds: int):
        ...
    def add(self, event_time: datetime, value: float) -> list:
        """Returns any newly-completed windows."""
        ...

Exercise 2 β€” Real-Time Leaderboard

Build a real-time gaming leaderboard:

  • Input: stream of (player_id, score_delta, timestamp) events

  • Maintain a rolling 1-hour tumbling window leaderboard

  • Output: top-10 players with their cumulative score in the current window

  • When a new window starts, scores reset

class RollingLeaderboard:
    def __init__(self, window_seconds=3600, top_k=10):
        ...
    def record_score(self, player_id: str, score_delta: int, event_time: datetime):
        ...
    def get_top_k(self, as_of: datetime) -> list:
        """Returns [(rank, player_id, score), ...] for the current window."""
        ...

Test with 50,000 synthetic events from 1000 players over 3 hours.

Exercise 3 β€” Consumer Group Rebalancing

Extend MockKafkaTopic with a consumer group registry. Implement partition rebalancing:

  • register_consumer(group, consumer_id): add a consumer to the group

  • deregister_consumer(group, consumer_id): remove a consumer

  • get_assignment(group, consumer_id): returns which partitions this consumer owns

  • Use round-robin assignment (Kafka’s default strategy)

  • When a consumer joins or leaves, trigger rebalance: redistribute all partitions evenly

Demonstrate: start with 2 consumers, add a 3rd, show partitions redistribute. Then remove one, show rebalance again.

Exercise 4 β€” Dead Letter Queue

Add a DLQ to the processing pipeline:

class DLQProcessor:
    def __init__(self, main_topic: MockKafkaTopic, dlq_topic: MockKafkaTopic,
                 max_retries=3):
        ...
    def process(self, message, processing_fn):
        """Try processing_fn up to max_retries times.
           On final failure, route to DLQ with error metadata."""
        ...

Simulate a processing function that fails 10% of the time with a transient error and 2% of the time with a permanent error. Show that transient failures eventually succeed, and permanent failures land in the DLQ with retry count and error reason attached.

Exercise 5 β€” Stream Join

Implement a windowed stream join between two topics:

  • Left stream: purchase_events β€” (purchase_id, product_id, user_id, amount, timestamp)

  • Right stream: product_catalog β€” (product_id, name, category, cost_price, timestamp)

  • Join on product_id within a 5-minute window (events on both sides within 5 minutes of each other match)

  • Output: enriched purchase with product name, category, and computed margin (amount - cost_price)

class WindowedStreamJoiner:
    def __init__(self, join_window_seconds=300):
        self.left_buffer = defaultdict(list)   # product_id β†’ [(ts, event)]
        self.right_buffer = defaultdict(list)  # product_id β†’ [(ts, event)]
    
    def add_left(self, key, event_time, event): ...
    def add_right(self, key, event_time, event): ...
    def get_joined_results(self) -> list: ...

Handle the case where the catalog update arrives after the purchase event (late right-side join).