Solutions: SQL & Data Engineering TrackΒΆ

Worked solutions to all exercises from the sql-data-engineering/ notebooks. All SQL runs with sqlite3 or DuckDB (in-memory, no server needed).

import sqlite3
import pandas as pd
import numpy as np
import warnings; warnings.filterwarnings('ignore')

# Helper: run SQL on an in-memory SQLite connection and return a DataFrame
def run_sql(conn, sql, params=None):
    return pd.read_sql_query(sql, conn, params=params)

def execute(conn, sql):
    conn.executescript(sql) if ';' in sql.strip()[:-1] else conn.execute(sql)
    conn.commit()

01 β€” Advanced SQLΒΆ

conn1 = sqlite3.connect(':memory:')

# Seed employee org-chart table
conn1.executescript("""
CREATE TABLE employees (id INTEGER PRIMARY KEY, name TEXT, manager_id INTEGER);
INSERT INTO employees VALUES (1, 'Alice', NULL),
  (2, 'Bob', 1), (3, 'Carol', 1), (4, 'Dave', 2),
  (5, 'Eve', 2), (6, 'Frank', 3), (7, 'Grace', 4);
""")
conn1.commit()

# Exercise 1: Recursive CTE β€” all reports (direct + indirect) for a given manager
# Key insight: Recursive CTEs walk the adjacency list; the anchor seeds the root,
# and the recursive step follows manager_id until no more rows are found.
sql_recursive = """
WITH RECURSIVE reports(id, name, depth) AS (
    -- Anchor: direct reports of manager with id = 1 (Alice)
    SELECT id, name, 1 FROM employees WHERE manager_id = 1
    UNION ALL
    -- Recursive: reports of reports
    SELECT e.id, e.name, r.depth + 1
    FROM employees e
    JOIN reports r ON e.manager_id = r.id
)
SELECT id, name, depth FROM reports ORDER BY depth, id;
"""
print(run_sql(conn1, sql_recursive))
# Exercise 2: Running 30-day revenue per customer using ROWS BETWEEN
# Key insight: ROWS BETWEEN 29 PRECEDING AND CURRENT ROW uses physical row counts
# within the ordered window β€” for irregular dates use RANGE with date arithmetic.

try:
    import duckdb
    DUCKDB = True
    duck = duckdb.connect(':memory:')
    duck.execute("""
    CREATE TABLE orders AS
    SELECT
        'C' || cast(((i % 3) + 1) as varchar) AS customer_id,
        date '2023-01-01' + INTERVAL (i) DAY AS order_date,
        (random() * 100 + 10)::int AS revenue
    FROM range(0, 90) t(i);
    """)
    sql_window = """
    SELECT customer_id, order_date, revenue,
           SUM(revenue) OVER (
               PARTITION BY customer_id
               ORDER BY order_date
               ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
           ) AS running_30d_revenue
    FROM orders
    ORDER BY customer_id, order_date
    LIMIT 10;
    """
    print(duck.execute(sql_window).df())
except ImportError:
    print('DuckDB not installed. Install with: pip install duckdb')
    print('Equivalent SQLite (using row-based window):')
    conn1.executescript("""
    CREATE TABLE orders (
      customer_id TEXT, order_date TEXT, revenue INTEGER);
    """)
    dates = pd.date_range('2023-01-01', periods=90).strftime('%Y-%m-%d')
    for i, d in enumerate(dates):
        conn1.execute('INSERT INTO orders VALUES (?,?,?)',
                      (f'C{(i%3)+1}', d, np.random.randint(10,110)))
    conn1.commit()
    sql_sqlite = """
    SELECT customer_id, order_date, revenue,
           SUM(revenue) OVER (
               PARTITION BY customer_id ORDER BY order_date
               ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
           ) AS running_30d_revenue
    FROM orders ORDER BY customer_id, order_date LIMIT 10"""
    print(run_sql(conn1, sql_sqlite))
# Exercise 3: PIVOT β€” rows of (month, category, revenue) -> columns per month
# Key insight: SQL PIVOT uses conditional aggregation (SUM CASE WHEN); this pattern
# works in any SQL dialect without a native PIVOT keyword.

conn3 = sqlite3.connect(':memory:')
conn3.executescript("""
CREATE TABLE monthly_sales (month TEXT, category TEXT, revenue REAL);
INSERT INTO monthly_sales VALUES
  ('2023-01','Electronics',500), ('2023-01','Clothing',200), ('2023-01','Food',150),
  ('2023-02','Electronics',600), ('2023-02','Clothing',250), ('2023-02','Food',180),
  ('2023-03','Electronics',550), ('2023-03','Clothing',220), ('2023-03','Food',160);
""")
conn3.commit()

sql_pivot = """
SELECT month,
       SUM(CASE WHEN category = 'Electronics' THEN revenue ELSE 0 END) AS Electronics,
       SUM(CASE WHEN category = 'Clothing'    THEN revenue ELSE 0 END) AS Clothing,
       SUM(CASE WHEN category = 'Food'        THEN revenue ELSE 0 END) AS Food
FROM monthly_sales
GROUP BY month
ORDER BY month;
"""
print(run_sql(conn3, sql_pivot))
# Exercise 4: Gap-and-island β€” consecutive active days per user
# Key insight: Subtract ROW_NUMBER() from the date to create an "island key";
# rows in the same island share the same key because they are consecutive.

conn4 = sqlite3.connect(':memory:')
conn4.executescript("""
CREATE TABLE user_activity (user_id TEXT, activity_date TEXT);
INSERT INTO user_activity VALUES
  ('u1','2023-01-01'),('u1','2023-01-02'),('u1','2023-01-03'),
  ('u1','2023-01-05'),('u1','2023-01-06'),
  ('u2','2023-01-01'),('u2','2023-01-04');
""")
conn4.commit()

# SQLite doesn't support date arithmetic easily; compute julian day difference
sql_island = """
WITH numbered AS (
    SELECT user_id, activity_date,
           ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY activity_date) AS rn
    FROM user_activity
),
islands AS (
    SELECT user_id, activity_date,
           -- julianday diff minus row_number is constant within a consecutive run
           CAST(julianday(activity_date) - rn AS INTEGER) AS island_key
    FROM numbered
)
SELECT user_id,
       MIN(activity_date) AS streak_start,
       MAX(activity_date) AS streak_end,
       COUNT(*) AS streak_length
FROM islands
GROUP BY user_id, island_key
ORDER BY user_id, streak_start;
"""
print(run_sql(conn4, sql_island))
# Exercise 5: Top-3 per group with ties β€” RANK vs DENSE_RANK vs ROW_NUMBER
# Key insight:
#   ROW_NUMBER: no ties, always 1-N unique ranks.
#   RANK:       ties get same rank, next rank skips (1,1,3).
#   DENSE_RANK: ties get same rank, next rank does NOT skip (1,1,2).

conn5 = sqlite3.connect(':memory:')
conn5.executescript("""
CREATE TABLE scores (dept TEXT, emp TEXT, score INTEGER);
INSERT INTO scores VALUES
  ('Eng','Alice',95),('Eng','Bob',95),('Eng','Carol',88),('Eng','Dave',80),
  ('Sales','Eve',90),('Sales','Frank',85),('Sales','Grace',85),('Sales','Hank',70);
""")
conn5.commit()

sql_ranks = """
SELECT dept, emp, score,
       ROW_NUMBER()  OVER (PARTITION BY dept ORDER BY score DESC) AS row_num,
       RANK()        OVER (PARTITION BY dept ORDER BY score DESC) AS rnk,
       DENSE_RANK()  OVER (PARTITION BY dept ORDER BY score DESC) AS dense_rnk
FROM scores
ORDER BY dept, score DESC;
"""
print(run_sql(conn5, sql_ranks))
print("\nTop-3 with ties (use RANK <= 3):")
sql_top3 = """
SELECT * FROM (
  SELECT dept, emp, score,
         RANK() OVER (PARTITION BY dept ORDER BY score DESC) AS rnk
  FROM scores) t
WHERE rnk <= 3;
"""
print(run_sql(conn5, sql_top3))

02 β€” Query OptimizationΒΆ

# Exercise 1: EXPLAIN comparison β€” nested loop vs hash join
# Key insight: Small tables β†’ nested loop; large tables β†’ hash join is cheaper
# because it avoids O(N*M) comparisons; use EXPLAIN QUERY PLAN in SQLite.

conn_opt = sqlite3.connect(':memory:')
conn_opt.executescript("""
CREATE TABLE customers (cust_id INTEGER PRIMARY KEY, name TEXT, city TEXT);
CREATE TABLE orders2   (order_id INTEGER PRIMARY KEY, cust_id INTEGER, amount REAL);
""")
# Seed with synthetic data
cust_rows  = [(i, f'Customer_{i}', np.random.choice(['NY','LA','SF'])) for i in range(1000)]
order_rows = [(i, np.random.randint(1,1000), round(np.random.uniform(10,500),2)) for i in range(5000)]
conn_opt.executemany('INSERT INTO customers VALUES (?,?,?)', cust_rows)
conn_opt.executemany('INSERT INTO orders2   VALUES (?,?,?)', order_rows)
conn_opt.commit()

explain_sql = "EXPLAIN QUERY PLAN SELECT c.name, SUM(o.amount) FROM customers c JOIN orders2 o ON c.cust_id = o.cust_id GROUP BY c.cust_id;"
plan = run_sql(conn_opt, explain_sql)
print('EXPLAIN QUERY PLAN (no index):')
print(plan.to_string(index=False))
# Exercise 2: Covering index β€” query plan changes before/after
# Key insight: A covering index contains all columns needed by the query,
# so SQLite can satisfy it from the index alone (no table lookup = "Using index").

explain_before = run_sql(conn_opt,
    "EXPLAIN QUERY PLAN SELECT amount FROM orders2 WHERE cust_id = 42;")
print('Before index:')
print(explain_before.to_string(index=False))

conn_opt.execute('CREATE INDEX idx_orders_cust_amount ON orders2 (cust_id, amount);')
conn_opt.commit()

explain_after = run_sql(conn_opt,
    "EXPLAIN QUERY PLAN SELECT amount FROM orders2 WHERE cust_id = 42;")
print('\nAfter covering index (cust_id, amount):')
print(explain_after.to_string(index=False))
# Exercise 3: Rewrite correlated subquery as a JOIN
# Key insight: A correlated subquery runs once per outer row (O(N)); rewriting
# as a JOIN lets the optimizer choose a hash/merge strategy (O(N + M)).

# Correlated subquery version
sql_correlated = """
SELECT c.cust_id, c.name,
       (SELECT SUM(o.amount) FROM orders2 o WHERE o.cust_id = c.cust_id) AS total_spent
FROM customers c
LIMIT 5;
"""
# JOIN version
sql_join = """
SELECT c.cust_id, c.name, COALESCE(agg.total_spent, 0) AS total_spent
FROM customers c
LEFT JOIN (SELECT cust_id, SUM(amount) AS total_spent FROM orders2 GROUP BY cust_id) agg
  ON c.cust_id = agg.cust_id
LIMIT 5;
"""

r1 = run_sql(conn_opt, sql_correlated)
r2 = run_sql(conn_opt, sql_join)
print('Correlated subquery result:')
print(r1)
print('\nJOIN rewrite result (should match):')
print(r2)

plan_corr = run_sql(conn_opt, 'EXPLAIN QUERY PLAN ' + sql_correlated.strip())
plan_join = run_sql(conn_opt, 'EXPLAIN QUERY PLAN ' + sql_join.strip())
print('\nEXPLAIN β€” Correlated:')
print(plan_corr['detail'].values)
print('EXPLAIN β€” JOIN rewrite:')
print(plan_join['detail'].values)
try:
    import duckdb
    duck2 = duckdb.connect(':memory:')
    # Exercise 4: Partition pruning (DuckDB supports PARTITION BY)
    # Key insight: Partitioned tables store data in separate files per partition;
    # a WHERE clause on the partition column causes the engine to skip irrelevant partitions.
    duck2.execute("""
    CREATE TABLE orders_partitioned AS
    SELECT order_id,
           '2023-0' || cast((order_id % 3 + 1) as varchar) AS order_year,
           (random()*100)::int AS amount
    FROM range(1, 10001) t(order_id);
    """)
    print('EXPLAIN (partition pruning):'); print('DuckDB does not expose explicit pruning in EXPLAIN text.')
    print('Query for specific year β€” only scans matching rows:')
    result = duck2.execute("SELECT COUNT(*) FROM orders_partitioned WHERE order_year = '2023-01'").fetchone()
    print(f'Rows matching 2023-01: {result[0]}')
    print('In production (Parquet/Hive layout), only 2023-01 partition files are opened.')
except ImportError:
    print('DuckDB not installed; partition pruning demo skipped.')
# Exercise 5: Slow query improvement β€” WHERE clause selectivity
# Key insight: Adding a highly selective WHERE predicate (e.g., cust_id = X)
# prunes the scan from O(N) to O(log N) when an index exists on that column.

import time

# Without index / without selective WHERE
start = time.perf_counter()
conn_opt.execute('SELECT * FROM orders2').fetchall()
t_full = time.perf_counter() - start

# With selective WHERE and covering index (already created)
start = time.perf_counter()
conn_opt.execute('SELECT * FROM orders2 WHERE cust_id = 42').fetchall()
t_selective = time.perf_counter() - start

print(f'Full scan time:       {t_full*1000:.3f} ms')
print(f'Selective WHERE time: {t_selective*1000:.3f} ms')
print('Selective WHERE + covering index avoids scanning all 5000 rows.')

03 β€” Data Pipelines (Airflow patterns, no server required)ΒΆ

# These exercises demonstrate Airflow DAG patterns in pure Python.
# Real Airflow requires a running scheduler; here we simulate the logic.

import os, time, random, logging
from pathlib import Path
from datetime import datetime

logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')

# Exercise 1: FileSensor β€” wait for a file to exist before running the pipeline
# Key insight: Sensors poll at configurable intervals; poke_interval and timeout
# are critical to avoid stalling the scheduler indefinitely.

def file_sensor(filepath, timeout_sec=5, poke_interval_sec=1):
    """Returns True when file exists; raises TimeoutError on timeout."""
    deadline = time.time() + timeout_sec
    while time.time() < deadline:
        if Path(filepath).exists():
            logging.info(f'Sensor: file found at {filepath}')
            return True
        logging.info(f'Sensor: {filepath} not found, waiting {poke_interval_sec}s...')
        time.sleep(poke_interval_sec)
    raise TimeoutError(f'File {filepath} did not appear within {timeout_sec}s')

# Demo: create temp file mid-wait
import tempfile
tmp_path = tempfile.mktemp(suffix='.csv')

import threading
def create_file_later(path, delay=1.5):
    time.sleep(delay)
    Path(path).write_text('id,value\n1,42\n')

t = threading.Thread(target=create_file_later, args=(tmp_path, 1.5))
t.start()
file_sensor(tmp_path, timeout_sec=5)
t.join()
Path(tmp_path).unlink(missing_ok=True)
# Exercise 2: Retry with exponential backoff for a flaky API call
# Key insight: Exponential backoff (wait *= 2) reduces thundering-herd pressure
# on the API; jitter (random offset) prevents synchronized retries across workers.

def with_exponential_backoff(fn, max_retries=4, base_wait=0.1, jitter=True):
    wait = base_wait
    for attempt in range(max_retries + 1):
        try:
            return fn()
        except Exception as e:
            if attempt == max_retries:
                raise
            actual_wait = wait + (random.uniform(0, wait) if jitter else 0)
            logging.warning(f'Attempt {attempt+1} failed: {e}. Retrying in {actual_wait:.2f}s')
            time.sleep(actual_wait)
            wait *= 2

call_count = {'n': 0}
def flaky_api():
    call_count['n'] += 1
    if call_count['n'] < 3:  # fails first 2 times
        raise ConnectionError('API unavailable')
    return {'data': 'success', 'attempt': call_count['n']}

result = with_exponential_backoff(flaky_api)
print('API result:', result)
# Exercise 3: Dynamic DAG β€” generate tasks programmatically from a config list
# Key insight: Dynamic DAGs avoid boilerplate by generating operator instances
# in a loop; each task is uniquely named and receives its config at runtime.

config_list = [
    {'table': 'orders',    'source': 's3://bucket/orders/'},
    {'table': 'customers', 'source': 's3://bucket/customers/'},
    {'table': 'products',  'source': 's3://bucket/products/'},
]

def make_ingest_task(cfg):
    """Simulates an ingestion task operator."""
    def _task():
        logging.info(f"Ingesting {cfg['table']} from {cfg['source']}")
        return f"{cfg['table']}: {np.random.randint(100,1000)} rows loaded"
    _task.__name__ = f"ingest_{cfg['table']}"
    return _task

dynamic_tasks = {cfg['table']: make_ingest_task(cfg) for cfg in config_list}

results = {}
for name, task_fn in dynamic_tasks.items():
    results[name] = task_fn()
    print(f'  {name}: {results[name]}')
# Exercise 4: Email notification on task failure (mocked)
# Key insight: Airflow's on_failure_callback fires the notification function;
# here we mock SMTP send so the test is hermetic and doesn't need a mail server.

from unittest.mock import MagicMock, patch

def send_failure_email(context):
    """Mock email notification function (replace smtplib.SMTP with real SMTP in prod)."""
    subject = f"AIRFLOW FAILURE: {context['task_id']} in DAG {context['dag_id']}"
    body    = f"Task failed at {context['execution_date']}. Exception: {context['exception']}"
    with patch('smtplib.SMTP') as mock_smtp:
        smtp_instance = mock_smtp.return_value.__enter__.return_value
        smtp_instance.sendmail('airflow@company.com', 'team@company.com',
                               f'Subject: {subject}\n\n{body}')
        logging.info(f'Email sent (mocked): {subject}')
        return smtp_instance.sendmail.call_args

ctx = {'task_id': 'ingest_orders', 'dag_id': 'etl_pipeline',
       'execution_date': datetime.now(), 'exception': ValueError('S3 read timeout')}
call_args = send_failure_email(ctx)
print('sendmail called with:', call_args)
# Exercise 5: Task branching β€” full_load vs incremental_load based on row_count
# Key insight: BranchPythonOperator returns the task_id to execute next;
# the skipped branch must be marked as 'skipped' (handled by Airflow trigger_rule).

def branch_decision(row_count, threshold=10_000):
    """Return the next task id based on row count."""
    if row_count > threshold:
        return 'full_load'
    return 'incremental_load'

def full_load():
    logging.info('Running FULL LOAD: truncate + reload all data')
    return 'full_load complete'

def incremental_load():
    logging.info('Running INCREMENTAL LOAD: append new rows only')
    return 'incremental_load complete'

tasks = {'full_load': full_load, 'incremental_load': incremental_load}

for count in [5_000, 50_000]:
    chosen = branch_decision(count)
    result = tasks[chosen]()
    print(f'row_count={count:>6} -> branch={chosen} -> {result}')

04 β€” Spark / PySpark BasicsΒΆ

# PySpark requires Java + a Spark installation.
# These cells show the full PySpark code and fall back to pandas equivalents.

try:
    from pyspark.sql import SparkSession
    from pyspark.sql import functions as F
    from pyspark.sql.window import Window
    spark = SparkSession.builder.master('local[2]').appName('solutions').getOrCreate()
    spark.sparkContext.setLogLevel('ERROR')
    SPARK = True
    print('PySpark session started')
except Exception as e:
    SPARK = False
    print(f'PySpark not available ({e}). Showing pandas equivalents.')

import pandas as pd
import numpy as np

np.random.seed(42)
N_orders = 500
orders_df = pd.DataFrame({
    'order_id':   range(N_orders),
    'product_id': np.random.randint(1, 20, N_orders),
    'status':     np.random.choice(['completed','pending','cancelled'], N_orders, p=[0.7,0.2,0.1]),
    'quantity':   np.random.randint(1, 10, N_orders),
    'unit_price': np.random.uniform(5, 100, N_orders).round(2),
    'order_date': pd.date_range('2023-01-01', periods=N_orders, freq='h').date
})
orders_df['total_value'] = orders_df['quantity'] * orders_df['unit_price']
# Exercise 1: PySpark pipeline β€” filter completed -> add total_value -> top-5 by revenue
# Key insight: PySpark lazy evaluation builds a logical plan; only the final
# .show()/.collect() triggers execution, enabling the optimizer to push filters down.

if SPARK:
    sdf = spark.createDataFrame(orders_df)
    result_sdf = (sdf
        .filter(F.col('status') == 'completed')
        .withColumn('total_value', F.col('quantity') * F.col('unit_price'))
        .groupBy('product_id')
        .agg(F.sum('total_value').alias('revenue'))
        .orderBy(F.desc('revenue'))
        .limit(5))
    result_sdf.show()
else:
    # Pandas equivalent
    top5 = (orders_df[orders_df['status'] == 'completed']
            .groupby('product_id')['total_value']
            .sum().nlargest(5).reset_index()
            .rename(columns={'total_value': 'revenue'}))
    print(top5.to_string(index=False))
# Exercise 2: Running 7-day total revenue per product ordered by date
# Key insight: Spark window functions with ROWS/RANGE BETWEEN mirror SQL semantics;
# always partition first (reduces data shuffled) then apply the window.

daily_df = (orders_df.groupby(['product_id','order_date'])['total_value']
            .sum().reset_index().rename(columns={'total_value':'daily_revenue'}))
daily_df['order_date'] = pd.to_datetime(daily_df['order_date'])
daily_df = daily_df.sort_values(['product_id','order_date'])

if SPARK:
    sdf_daily = spark.createDataFrame(daily_df)
    w7 = (Window.partitionBy('product_id')
                .orderBy(F.col('order_date').cast('long'))
                .rowsBetween(-6, 0))
    (sdf_daily
     .withColumn('rolling_7d', F.sum('daily_revenue').over(w7))
     .filter(F.col('product_id') == 1)
     .orderBy('order_date').show(10))
else:
    daily_df['rolling_7d'] = (daily_df.groupby('product_id')['daily_revenue']
                              .transform(lambda x: x.rolling(7, min_periods=1).sum()))
    print(daily_df[daily_df['product_id']==1].head(10).to_string(index=False))
# Exercise 3: Broadcast join with discounts DataFrame
# Key insight: Broadcasting a small lookup table eliminates the shuffle join;
# .explain() should show BroadcastHashJoin instead of SortMergeJoin.

discounts = pd.DataFrame({'product_id': range(1, 20), 'discount': np.random.uniform(0, 0.3, 19).round(2)})

if SPARK:
    sdf_orders    = spark.createDataFrame(orders_df)
    sdf_discounts = spark.createDataFrame(discounts)
    from pyspark.sql.functions import broadcast
    joined = sdf_orders.join(broadcast(sdf_discounts), on='product_id', how='left')
    joined = joined.withColumn('discounted_value', F.col('total_value') * (1 - F.col('discount')))
    print('Physical plan excerpt:')
    joined.explain(mode='simple')  # should show BroadcastHashJoin
else:
    merged = orders_df.merge(discounts, on='product_id', how='left')
    merged['discounted_value'] = merged['total_value'] * (1 - merged['discount'])
    print('Pandas merge (simulates broadcast join):')
    print(merged[['order_id','product_id','total_value','discount','discounted_value']].head())
# Exercise 4: Repartition by category; count rows per partition
# Key insight: repartition() shuffles data to exactly N partitions;
# coalesce() reduces without full shuffle β€” use coalesce to shrink, repartition to redistribute.

orders_df['category'] = np.where(orders_df['product_id'] <= 6, 'A',
                         np.where(orders_df['product_id'] <= 13, 'B', 'C'))

if SPARK:
    sdf_cat = spark.createDataFrame(orders_df)
    sdf_repartitioned = sdf_cat.repartition(3, 'category')
    # Count per partition via RDD
    partition_counts = sdf_repartitioned.rdd.mapPartitions(
        lambda it: [sum(1 for _ in it)]).collect()
    print('Rows per partition after repartition by category:', partition_counts)
else:
    print('Partition simulation (pandas groupby):')
    print(orders_df.groupby('category').size())
# Exercise 5: Spark SQL RANK() top-3 per category
# Key insight: Spark SQL and the DataFrame API produce the same physical plan;
# SQL is often more readable for window operations β€” compare .explain() outputs.

if SPARK:
    sdf_cat = spark.createDataFrame(orders_df)
    sdf_cat.createOrReplaceTempView('orders_cat')

    sql_top3 = """
    SELECT * FROM (
        SELECT category, product_id,
               SUM(total_value) AS revenue,
               RANK() OVER (PARTITION BY category ORDER BY SUM(total_value) DESC) AS rnk
        FROM orders_cat
        GROUP BY category, product_id
    ) WHERE rnk <= 3
    ORDER BY category, rnk;
    """
    spark.sql(sql_top3).show()

    # Equivalent DataFrame API
    from pyspark.sql.window import Window
    w_cat = Window.partitionBy('category').orderBy(F.desc('revenue'))
    (sdf_cat.groupBy('category','product_id')
           .agg(F.sum('total_value').alias('revenue'))
           .withColumn('rnk', F.rank().over(w_cat))
           .filter(F.col('rnk') <= 3)
           .orderBy('category','rnk')
           .show())
else:
    agg = orders_df.groupby(['category','product_id'])['total_value'].sum().reset_index()
    agg['rnk'] = agg.groupby('category')['total_value'].rank(ascending=False, method='min').astype(int)
    print(agg[agg['rnk'] <= 3].sort_values(['category','rnk']).to_string(index=False))

05 β€” dbt Data Modeling (pattern demonstrations)ΒΆ

# dbt runs against a warehouse; here we demonstrate each pattern as SQL/Python.
# In a real project, each snippet maps directly to a dbt model file.

conn_dbt = sqlite3.connect(':memory:')
conn_dbt.executescript("""
CREATE TABLE raw_events (
    EventId INTEGER, UserId TEXT, EventType TEXT,
    EventTimestamp TEXT, Revenue REAL);
INSERT INTO raw_events VALUES
    (1,'u1','purchase','2023-06-01 10:00:00',99.99),
    (2,'u2','view',    '2023-06-01 11:00:00',NULL),
    (3,'u1','purchase','2023-06-02 09:30:00',49.50);
""")
conn_dbt.commit()

# Exercise 1: Staging model β€” cast types, rename to snake_case, add loaded_at
# Key insight: Staging models are the single point of type casting and renaming;
# downstream models should never read from raw tables directly.
sql_staging = """
SELECT
    CAST(EventId   AS INTEGER)  AS event_id,
    CAST(UserId    AS TEXT)     AS user_id,
    CAST(EventType AS TEXT)     AS event_type,
    CAST(EventTimestamp AS TEXT) AS event_timestamp,
    CAST(COALESCE(Revenue, 0)   AS REAL) AS revenue,
    CURRENT_TIMESTAMP           AS loaded_at
FROM raw_events;
"""
print('Staging model output:')
print(run_sql(conn_dbt, sql_staging))
# Exercise 2: dbt schema tests β€” not_null, unique, accepted_values, relationships
# Key insight: Schema tests run as SQL assertions; failures block the DAG,
# preventing bad data from reaching downstream consumers.

def dbt_test_not_null(conn, table, column):
    n = run_sql(conn, f'SELECT COUNT(*) AS n FROM {table} WHERE {column} IS NULL').iloc[0,0]
    status = 'PASS' if n == 0 else f'FAIL ({n} nulls)'
    print(f'  not_null({table}.{column}): {status}')
    return n == 0

def dbt_test_unique(conn, table, column):
    n = run_sql(conn, f'SELECT COUNT(*) - COUNT(DISTINCT {column}) AS n FROM {table}').iloc[0,0]
    status = 'PASS' if n == 0 else f'FAIL ({n} duplicates)'
    print(f'  unique({table}.{column}): {status}')
    return n == 0

def dbt_test_accepted_values(conn, table, column, values):
    placeholders = ','.join('?' * len(values))
    n = conn.execute(
        f'SELECT COUNT(*) FROM {table} WHERE {column} NOT IN ({placeholders})',
        values).fetchone()[0]
    status = 'PASS' if n == 0 else f'FAIL ({n} bad values)'
    print(f'  accepted_values({table}.{column}): {status}')
    return n == 0

conn_dbt.execute("CREATE VIEW stg_events AS " + sql_staging)
conn_dbt.commit()

print('Running schema tests on stg_events:')
dbt_test_not_null(conn_dbt, 'stg_events', 'event_id')
dbt_test_unique(conn_dbt, 'stg_events', 'event_id')
dbt_test_accepted_values(conn_dbt, 'stg_events', 'event_type', ['purchase','view','click','cart'])
# Exercise 3: Incremental model β€” load only orders from last 3 days
# Key insight: Incremental models use a WHERE clause on the max timestamp
# already in the target table; dbt's {{ is_incremental() }} macro toggles this.

# Simulate the dbt incremental pattern
def incremental_load_orders(conn, target_table, source_table='raw_events', lookback_days=3):
    # Check if target exists
    exists = conn.execute(
        f"SELECT name FROM sqlite_master WHERE type='table' AND name='{target_table}'"
    ).fetchone()

    if exists:
        # Incremental: only new rows
        sql = f"""
        INSERT INTO {target_table}
        SELECT * FROM {source_table}
        WHERE date(EventTimestamp) >= date('now', '-{lookback_days} days')
          AND EventId NOT IN (SELECT EventId FROM {target_table})"""
    else:
        # Full load on first run
        sql = f"CREATE TABLE {target_table} AS SELECT * FROM {source_table}"

    conn.execute(sql); conn.commit()
    return run_sql(conn, f'SELECT COUNT(*) AS rows FROM {target_table}').iloc[0,0]

rows = incremental_load_orders(conn_dbt, 'fct_orders_incremental')
print(f'Rows in incremental target: {rows}')
print('Re-run (simulates incremental):')
rows2 = incremental_load_orders(conn_dbt, 'fct_orders_incremental')
print(f'Rows after second run (no duplicates): {rows2}')
# Exercise 4: Macro safe_divide β€” return NULL instead of division by zero
# Key insight: In dbt, macros are Jinja2 functions; safe_divide compiles to
# NULLIF(denominator, 0) which causes the DB to return NULL on zero division.

# Python simulation of the compiled SQL macro output
def safe_divide_sql(numerator, denominator):
    """Returns the SQL expression that dbt's safe_divide macro would render."""
    return f'CAST({numerator} AS REAL) / NULLIF({denominator}, 0)'

# Test in SQLite
for num, den in [(10, 2), (10, 0), (0, 5), (7, 3)]:
    expr = safe_divide_sql(num, den)
    val  = conn_dbt.execute(f'SELECT {expr}').fetchone()[0]
    print(f'safe_divide({num}, {den}) = {val}')
# Exercise 5: SCD Type 2 snapshot β€” track valid_from/valid_to for customer address
# Key insight: SCD2 inserts a new row when a dimension attribute changes,
# setting valid_to on the old row and valid_from=now on the new row.

conn_scd = sqlite3.connect(':memory:')
conn_scd.executescript("""
CREATE TABLE dim_customers_scd2 (
    customer_id TEXT, address TEXT,
    valid_from TEXT, valid_to TEXT, is_current INTEGER);
CREATE TABLE source_customers (customer_id TEXT, address TEXT);
INSERT INTO source_customers VALUES ('C1','123 Main St'),('C2','456 Oak Ave');
""")
conn_scd.commit()

def scd2_upsert(conn, source_rows):
    today = datetime.today().strftime('%Y-%m-%d')
    for cust_id, new_addr in source_rows:
        cur = conn.execute(
            'SELECT address FROM dim_customers_scd2 WHERE customer_id=? AND is_current=1',
            (cust_id,)).fetchone()
        if cur is None:
            conn.execute('INSERT INTO dim_customers_scd2 VALUES (?,?,?,NULL,1)',
                         (cust_id, new_addr, today))
        elif cur[0] != new_addr:
            # Expire old row
            conn.execute('UPDATE dim_customers_scd2 SET valid_to=?, is_current=0 '
                         'WHERE customer_id=? AND is_current=1', (today, cust_id))
            # Insert new row
            conn.execute('INSERT INTO dim_customers_scd2 VALUES (?,?,?,NULL,1)',
                         (cust_id, new_addr, today))
    conn.commit()

scd2_upsert(conn_scd, [('C1','123 Main St'), ('C2','456 Oak Ave')])  # first load
scd2_upsert(conn_scd, [('C1','999 New Blvd')])  # C1 moves
print(run_sql(conn_scd, 'SELECT * FROM dim_customers_scd2 ORDER BY customer_id, valid_from'))

06 β€” Streaming BasicsΒΆ

# These exercises simulate streaming primitives in pure Python.
# Real deployments use Kafka + Flink/Spark Structured Streaming.

from collections import deque, defaultdict
import heapq

# Exercise 1: SlidingWindowAggregator with configurable window_size and slide_interval
# Key insight: Sliding windows compute aggregates over overlapping time buckets;
# window_size determines how far back events are included, slide_interval how often to emit.

class SlidingWindowAggregator:
    def __init__(self, window_size, slide_interval, agg_fn=sum):
        self.window_size     = window_size
        self.slide_interval  = slide_interval
        self.agg_fn          = agg_fn
        self.buffer          = deque()  # (timestamp, value)
        self.last_emit       = None
        self.results         = []

    def process(self, timestamp, value):
        self.buffer.append((timestamp, value))
        # Evict events outside the window
        while self.buffer and self.buffer[0][0] < timestamp - self.window_size:
            self.buffer.popleft()
        # Emit if slide_interval has passed
        if self.last_emit is None or timestamp - self.last_emit >= self.slide_interval:
            agg = self.agg_fn(v for _, v in self.buffer)
            self.results.append((timestamp, agg))
            self.last_emit = timestamp
        return self.results[-1] if self.results else None

# Simulate stream: sales events every second
agg = SlidingWindowAggregator(window_size=5, slide_interval=2, agg_fn=sum)
stream = [(t, np.random.randint(1, 20)) for t in range(20)]
for ts, val in stream:
    out = agg.process(ts, val)
    if out and out[0] == ts:
        print(f't={ts:2d}: window_sum={out[1]}')
# Exercise 2: Rolling leaderboard β€” top-10 players in a 1-hour window
# Key insight: Maintain a heap of (score, player_id) with eviction of events
# older than 1 hour; recompute top-K from active scores using a score_map.

class RollingLeaderboard:
    def __init__(self, window_seconds=3600, top_k=10):
        self.window   = window_seconds
        self.top_k    = top_k
        self.events   = deque()  # (timestamp, player_id, score)
        self.scores   = defaultdict(float)

    def add_event(self, timestamp, player_id, score):
        self.events.append((timestamp, player_id, score))
        self.scores[player_id] += score
        # Evict old events
        while self.events and self.events[0][0] < timestamp - self.window:
            _, old_player, old_score = self.events.popleft()
            self.scores[old_player] -= old_score
            if self.scores[old_player] <= 0:
                del self.scores[old_player]

    def top_players(self):
        return sorted(self.scores.items(), key=lambda x: -x[1])[:self.top_k]

lb = RollingLeaderboard(window_seconds=3600, top_k=5)
np.random.seed(3)
for t in range(200):
    lb.add_event(t * 30, f'player_{np.random.randint(1,20)}', np.random.randint(10,500))

print('Top-5 leaderboard (1-hour window):')
for rank, (player, score) in enumerate(lb.top_players(), 1):
    print(f'  {rank}. {player}: {score:.0f} pts')
# Exercise 3: Consumer group rebalancing
# Key insight: When a consumer joins or leaves, partitions are redistributed
# evenly (round-robin or range strategy); this is triggered by a group coordinator.

def rebalance_partitions(partitions, consumers):
    """Round-robin partition assignment."""
    assignment = defaultdict(list)
    for i, p in enumerate(partitions):
        assignment[consumers[i % len(consumers)]].append(p)
    return dict(assignment)

partitions = [f'p{i}' for i in range(6)]

print('Initial (3 consumers):')
consumers = ['C1','C2','C3']
assign1   = rebalance_partitions(partitions, consumers)
for c, ps in assign1.items(): print(f'  {c}: {ps}')

print('\nAfter C4 joins (4 consumers):')
consumers2 = ['C1','C2','C3','C4']
assign2    = rebalance_partitions(partitions, consumers2)
for c, ps in assign2.items(): print(f'  {c}: {ps}')

print('\nAfter C2 leaves (3 consumers):')
consumers3 = ['C1','C3','C4']
assign3    = rebalance_partitions(partitions, consumers3)
for c, ps in assign3.items(): print(f'  {c}: {ps}')
# Exercise 4: Dead letter queue β€” failed events routed for inspection
# Key insight: A DLQ captures events that cannot be processed (schema errors,
# downstream failures) so they can be replayed or inspected without data loss.

from queue import Queue

class StreamProcessor:
    def __init__(self):
        self.main_queue = Queue()
        self.dlq        = Queue()
        self.processed  = []

    def process_event(self, event):
        try:
            # Simulate: events with negative amount are invalid
            if not isinstance(event.get('amount'), (int, float)):
                raise ValueError(f'Invalid amount: {event.get("amount")}')
            if event['amount'] < 0:
                raise ValueError(f'Negative amount: {event["amount"]}')
            self.processed.append(event)
            return 'processed'
        except Exception as e:
            self.dlq.put({'event': event, 'error': str(e), 'timestamp': datetime.now()})
            return 'dlq'

events = [
    {'id': 1, 'amount': 100}, {'id': 2, 'amount': -50},
    {'id': 3, 'amount': 'bad'}, {'id': 4, 'amount': 200}
]

proc = StreamProcessor()
for ev in events:
    status = proc.process_event(ev)
    print(f'Event {ev["id"]}: {status}')

print(f'\nProcessed: {len(proc.processed)}, DLQ: {proc.dlq.qsize()}')
print('DLQ contents:')
while not proc.dlq.empty():
    item = proc.dlq.get()
    print(f'  {item["event"]} -> {item["error"]}')
# Exercise 5: Stream join β€” match purchase events with product catalog within 5-minute window
# Key insight: Stream-table joins look up the catalog on each event;
# stream-stream joins buffer events and match within a time tolerance window.

import time as time_module

class StreamJoiner:
    def __init__(self, window_seconds=300):
        self.window      = window_seconds
        self.left_buffer = deque()   # purchase events
        self.catalog     = {}        # product_id -> product_info
        self.matches     = []

    def add_catalog(self, product_id, info):
        self.catalog[product_id] = info

    def add_purchase(self, timestamp, event):
        self.left_buffer.append((timestamp, event))
        # Evict old events
        while self.left_buffer and self.left_buffer[0][0] < timestamp - self.window:
            self.left_buffer.popleft()
        # Try to join with catalog
        pid = event.get('product_id')
        if pid in self.catalog:
            self.matches.append({**event, **self.catalog[pid], 'matched_at': timestamp})
        return self.matches[-1] if self.matches else None

joiner = StreamJoiner(window_seconds=300)
joiner.add_catalog(101, {'name': 'Widget A', 'category': 'Gadgets'})
joiner.add_catalog(202, {'name': 'Widget B', 'category': 'Tools'})

purchases = [
    (0,   {'user_id': 'u1', 'product_id': 101, 'amount': 29.99}),
    (120, {'user_id': 'u2', 'product_id': 999, 'amount': 9.99}),   # no catalog match
    (240, {'user_id': 'u3', 'product_id': 202, 'amount': 49.99}),
]

for ts, purchase in purchases:
    match = joiner.add_purchase(ts, purchase)
    print(f't={ts}s: {"JOINED -> " + str(match) if match and match.get("name") else "NO MATCH"}')