Solutions: SQL & Data Engineering TrackΒΆ
Worked solutions to all exercises from the sql-data-engineering/ notebooks. All SQL runs with sqlite3 or DuckDB (in-memory, no server needed).
import sqlite3
import pandas as pd
import numpy as np
import warnings; warnings.filterwarnings('ignore')
# Helper: run SQL on an in-memory SQLite connection and return a DataFrame
def run_sql(conn, sql, params=None):
return pd.read_sql_query(sql, conn, params=params)
def execute(conn, sql):
conn.executescript(sql) if ';' in sql.strip()[:-1] else conn.execute(sql)
conn.commit()
01 β Advanced SQLΒΆ
conn1 = sqlite3.connect(':memory:')
# Seed employee org-chart table
conn1.executescript("""
CREATE TABLE employees (id INTEGER PRIMARY KEY, name TEXT, manager_id INTEGER);
INSERT INTO employees VALUES (1, 'Alice', NULL),
(2, 'Bob', 1), (3, 'Carol', 1), (4, 'Dave', 2),
(5, 'Eve', 2), (6, 'Frank', 3), (7, 'Grace', 4);
""")
conn1.commit()
# Exercise 1: Recursive CTE β all reports (direct + indirect) for a given manager
# Key insight: Recursive CTEs walk the adjacency list; the anchor seeds the root,
# and the recursive step follows manager_id until no more rows are found.
sql_recursive = """
WITH RECURSIVE reports(id, name, depth) AS (
-- Anchor: direct reports of manager with id = 1 (Alice)
SELECT id, name, 1 FROM employees WHERE manager_id = 1
UNION ALL
-- Recursive: reports of reports
SELECT e.id, e.name, r.depth + 1
FROM employees e
JOIN reports r ON e.manager_id = r.id
)
SELECT id, name, depth FROM reports ORDER BY depth, id;
"""
print(run_sql(conn1, sql_recursive))
# Exercise 2: Running 30-day revenue per customer using ROWS BETWEEN
# Key insight: ROWS BETWEEN 29 PRECEDING AND CURRENT ROW uses physical row counts
# within the ordered window β for irregular dates use RANGE with date arithmetic.
try:
import duckdb
DUCKDB = True
duck = duckdb.connect(':memory:')
duck.execute("""
CREATE TABLE orders AS
SELECT
'C' || cast(((i % 3) + 1) as varchar) AS customer_id,
date '2023-01-01' + INTERVAL (i) DAY AS order_date,
(random() * 100 + 10)::int AS revenue
FROM range(0, 90) t(i);
""")
sql_window = """
SELECT customer_id, order_date, revenue,
SUM(revenue) OVER (
PARTITION BY customer_id
ORDER BY order_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS running_30d_revenue
FROM orders
ORDER BY customer_id, order_date
LIMIT 10;
"""
print(duck.execute(sql_window).df())
except ImportError:
print('DuckDB not installed. Install with: pip install duckdb')
print('Equivalent SQLite (using row-based window):')
conn1.executescript("""
CREATE TABLE orders (
customer_id TEXT, order_date TEXT, revenue INTEGER);
""")
dates = pd.date_range('2023-01-01', periods=90).strftime('%Y-%m-%d')
for i, d in enumerate(dates):
conn1.execute('INSERT INTO orders VALUES (?,?,?)',
(f'C{(i%3)+1}', d, np.random.randint(10,110)))
conn1.commit()
sql_sqlite = """
SELECT customer_id, order_date, revenue,
SUM(revenue) OVER (
PARTITION BY customer_id ORDER BY order_date
ROWS BETWEEN 29 PRECEDING AND CURRENT ROW
) AS running_30d_revenue
FROM orders ORDER BY customer_id, order_date LIMIT 10"""
print(run_sql(conn1, sql_sqlite))
# Exercise 3: PIVOT β rows of (month, category, revenue) -> columns per month
# Key insight: SQL PIVOT uses conditional aggregation (SUM CASE WHEN); this pattern
# works in any SQL dialect without a native PIVOT keyword.
conn3 = sqlite3.connect(':memory:')
conn3.executescript("""
CREATE TABLE monthly_sales (month TEXT, category TEXT, revenue REAL);
INSERT INTO monthly_sales VALUES
('2023-01','Electronics',500), ('2023-01','Clothing',200), ('2023-01','Food',150),
('2023-02','Electronics',600), ('2023-02','Clothing',250), ('2023-02','Food',180),
('2023-03','Electronics',550), ('2023-03','Clothing',220), ('2023-03','Food',160);
""")
conn3.commit()
sql_pivot = """
SELECT month,
SUM(CASE WHEN category = 'Electronics' THEN revenue ELSE 0 END) AS Electronics,
SUM(CASE WHEN category = 'Clothing' THEN revenue ELSE 0 END) AS Clothing,
SUM(CASE WHEN category = 'Food' THEN revenue ELSE 0 END) AS Food
FROM monthly_sales
GROUP BY month
ORDER BY month;
"""
print(run_sql(conn3, sql_pivot))
# Exercise 4: Gap-and-island β consecutive active days per user
# Key insight: Subtract ROW_NUMBER() from the date to create an "island key";
# rows in the same island share the same key because they are consecutive.
conn4 = sqlite3.connect(':memory:')
conn4.executescript("""
CREATE TABLE user_activity (user_id TEXT, activity_date TEXT);
INSERT INTO user_activity VALUES
('u1','2023-01-01'),('u1','2023-01-02'),('u1','2023-01-03'),
('u1','2023-01-05'),('u1','2023-01-06'),
('u2','2023-01-01'),('u2','2023-01-04');
""")
conn4.commit()
# SQLite doesn't support date arithmetic easily; compute julian day difference
sql_island = """
WITH numbered AS (
SELECT user_id, activity_date,
ROW_NUMBER() OVER (PARTITION BY user_id ORDER BY activity_date) AS rn
FROM user_activity
),
islands AS (
SELECT user_id, activity_date,
-- julianday diff minus row_number is constant within a consecutive run
CAST(julianday(activity_date) - rn AS INTEGER) AS island_key
FROM numbered
)
SELECT user_id,
MIN(activity_date) AS streak_start,
MAX(activity_date) AS streak_end,
COUNT(*) AS streak_length
FROM islands
GROUP BY user_id, island_key
ORDER BY user_id, streak_start;
"""
print(run_sql(conn4, sql_island))
# Exercise 5: Top-3 per group with ties β RANK vs DENSE_RANK vs ROW_NUMBER
# Key insight:
# ROW_NUMBER: no ties, always 1-N unique ranks.
# RANK: ties get same rank, next rank skips (1,1,3).
# DENSE_RANK: ties get same rank, next rank does NOT skip (1,1,2).
conn5 = sqlite3.connect(':memory:')
conn5.executescript("""
CREATE TABLE scores (dept TEXT, emp TEXT, score INTEGER);
INSERT INTO scores VALUES
('Eng','Alice',95),('Eng','Bob',95),('Eng','Carol',88),('Eng','Dave',80),
('Sales','Eve',90),('Sales','Frank',85),('Sales','Grace',85),('Sales','Hank',70);
""")
conn5.commit()
sql_ranks = """
SELECT dept, emp, score,
ROW_NUMBER() OVER (PARTITION BY dept ORDER BY score DESC) AS row_num,
RANK() OVER (PARTITION BY dept ORDER BY score DESC) AS rnk,
DENSE_RANK() OVER (PARTITION BY dept ORDER BY score DESC) AS dense_rnk
FROM scores
ORDER BY dept, score DESC;
"""
print(run_sql(conn5, sql_ranks))
print("\nTop-3 with ties (use RANK <= 3):")
sql_top3 = """
SELECT * FROM (
SELECT dept, emp, score,
RANK() OVER (PARTITION BY dept ORDER BY score DESC) AS rnk
FROM scores) t
WHERE rnk <= 3;
"""
print(run_sql(conn5, sql_top3))
02 β Query OptimizationΒΆ
# Exercise 1: EXPLAIN comparison β nested loop vs hash join
# Key insight: Small tables β nested loop; large tables β hash join is cheaper
# because it avoids O(N*M) comparisons; use EXPLAIN QUERY PLAN in SQLite.
conn_opt = sqlite3.connect(':memory:')
conn_opt.executescript("""
CREATE TABLE customers (cust_id INTEGER PRIMARY KEY, name TEXT, city TEXT);
CREATE TABLE orders2 (order_id INTEGER PRIMARY KEY, cust_id INTEGER, amount REAL);
""")
# Seed with synthetic data
cust_rows = [(i, f'Customer_{i}', np.random.choice(['NY','LA','SF'])) for i in range(1000)]
order_rows = [(i, np.random.randint(1,1000), round(np.random.uniform(10,500),2)) for i in range(5000)]
conn_opt.executemany('INSERT INTO customers VALUES (?,?,?)', cust_rows)
conn_opt.executemany('INSERT INTO orders2 VALUES (?,?,?)', order_rows)
conn_opt.commit()
explain_sql = "EXPLAIN QUERY PLAN SELECT c.name, SUM(o.amount) FROM customers c JOIN orders2 o ON c.cust_id = o.cust_id GROUP BY c.cust_id;"
plan = run_sql(conn_opt, explain_sql)
print('EXPLAIN QUERY PLAN (no index):')
print(plan.to_string(index=False))
# Exercise 2: Covering index β query plan changes before/after
# Key insight: A covering index contains all columns needed by the query,
# so SQLite can satisfy it from the index alone (no table lookup = "Using index").
explain_before = run_sql(conn_opt,
"EXPLAIN QUERY PLAN SELECT amount FROM orders2 WHERE cust_id = 42;")
print('Before index:')
print(explain_before.to_string(index=False))
conn_opt.execute('CREATE INDEX idx_orders_cust_amount ON orders2 (cust_id, amount);')
conn_opt.commit()
explain_after = run_sql(conn_opt,
"EXPLAIN QUERY PLAN SELECT amount FROM orders2 WHERE cust_id = 42;")
print('\nAfter covering index (cust_id, amount):')
print(explain_after.to_string(index=False))
# Exercise 3: Rewrite correlated subquery as a JOIN
# Key insight: A correlated subquery runs once per outer row (O(N)); rewriting
# as a JOIN lets the optimizer choose a hash/merge strategy (O(N + M)).
# Correlated subquery version
sql_correlated = """
SELECT c.cust_id, c.name,
(SELECT SUM(o.amount) FROM orders2 o WHERE o.cust_id = c.cust_id) AS total_spent
FROM customers c
LIMIT 5;
"""
# JOIN version
sql_join = """
SELECT c.cust_id, c.name, COALESCE(agg.total_spent, 0) AS total_spent
FROM customers c
LEFT JOIN (SELECT cust_id, SUM(amount) AS total_spent FROM orders2 GROUP BY cust_id) agg
ON c.cust_id = agg.cust_id
LIMIT 5;
"""
r1 = run_sql(conn_opt, sql_correlated)
r2 = run_sql(conn_opt, sql_join)
print('Correlated subquery result:')
print(r1)
print('\nJOIN rewrite result (should match):')
print(r2)
plan_corr = run_sql(conn_opt, 'EXPLAIN QUERY PLAN ' + sql_correlated.strip())
plan_join = run_sql(conn_opt, 'EXPLAIN QUERY PLAN ' + sql_join.strip())
print('\nEXPLAIN β Correlated:')
print(plan_corr['detail'].values)
print('EXPLAIN β JOIN rewrite:')
print(plan_join['detail'].values)
try:
import duckdb
duck2 = duckdb.connect(':memory:')
# Exercise 4: Partition pruning (DuckDB supports PARTITION BY)
# Key insight: Partitioned tables store data in separate files per partition;
# a WHERE clause on the partition column causes the engine to skip irrelevant partitions.
duck2.execute("""
CREATE TABLE orders_partitioned AS
SELECT order_id,
'2023-0' || cast((order_id % 3 + 1) as varchar) AS order_year,
(random()*100)::int AS amount
FROM range(1, 10001) t(order_id);
""")
print('EXPLAIN (partition pruning):'); print('DuckDB does not expose explicit pruning in EXPLAIN text.')
print('Query for specific year β only scans matching rows:')
result = duck2.execute("SELECT COUNT(*) FROM orders_partitioned WHERE order_year = '2023-01'").fetchone()
print(f'Rows matching 2023-01: {result[0]}')
print('In production (Parquet/Hive layout), only 2023-01 partition files are opened.')
except ImportError:
print('DuckDB not installed; partition pruning demo skipped.')
# Exercise 5: Slow query improvement β WHERE clause selectivity
# Key insight: Adding a highly selective WHERE predicate (e.g., cust_id = X)
# prunes the scan from O(N) to O(log N) when an index exists on that column.
import time
# Without index / without selective WHERE
start = time.perf_counter()
conn_opt.execute('SELECT * FROM orders2').fetchall()
t_full = time.perf_counter() - start
# With selective WHERE and covering index (already created)
start = time.perf_counter()
conn_opt.execute('SELECT * FROM orders2 WHERE cust_id = 42').fetchall()
t_selective = time.perf_counter() - start
print(f'Full scan time: {t_full*1000:.3f} ms')
print(f'Selective WHERE time: {t_selective*1000:.3f} ms')
print('Selective WHERE + covering index avoids scanning all 5000 rows.')
03 β Data Pipelines (Airflow patterns, no server required)ΒΆ
# These exercises demonstrate Airflow DAG patterns in pure Python.
# Real Airflow requires a running scheduler; here we simulate the logic.
import os, time, random, logging
from pathlib import Path
from datetime import datetime
logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s')
# Exercise 1: FileSensor β wait for a file to exist before running the pipeline
# Key insight: Sensors poll at configurable intervals; poke_interval and timeout
# are critical to avoid stalling the scheduler indefinitely.
def file_sensor(filepath, timeout_sec=5, poke_interval_sec=1):
"""Returns True when file exists; raises TimeoutError on timeout."""
deadline = time.time() + timeout_sec
while time.time() < deadline:
if Path(filepath).exists():
logging.info(f'Sensor: file found at {filepath}')
return True
logging.info(f'Sensor: {filepath} not found, waiting {poke_interval_sec}s...')
time.sleep(poke_interval_sec)
raise TimeoutError(f'File {filepath} did not appear within {timeout_sec}s')
# Demo: create temp file mid-wait
import tempfile
tmp_path = tempfile.mktemp(suffix='.csv')
import threading
def create_file_later(path, delay=1.5):
time.sleep(delay)
Path(path).write_text('id,value\n1,42\n')
t = threading.Thread(target=create_file_later, args=(tmp_path, 1.5))
t.start()
file_sensor(tmp_path, timeout_sec=5)
t.join()
Path(tmp_path).unlink(missing_ok=True)
# Exercise 2: Retry with exponential backoff for a flaky API call
# Key insight: Exponential backoff (wait *= 2) reduces thundering-herd pressure
# on the API; jitter (random offset) prevents synchronized retries across workers.
def with_exponential_backoff(fn, max_retries=4, base_wait=0.1, jitter=True):
wait = base_wait
for attempt in range(max_retries + 1):
try:
return fn()
except Exception as e:
if attempt == max_retries:
raise
actual_wait = wait + (random.uniform(0, wait) if jitter else 0)
logging.warning(f'Attempt {attempt+1} failed: {e}. Retrying in {actual_wait:.2f}s')
time.sleep(actual_wait)
wait *= 2
call_count = {'n': 0}
def flaky_api():
call_count['n'] += 1
if call_count['n'] < 3: # fails first 2 times
raise ConnectionError('API unavailable')
return {'data': 'success', 'attempt': call_count['n']}
result = with_exponential_backoff(flaky_api)
print('API result:', result)
# Exercise 3: Dynamic DAG β generate tasks programmatically from a config list
# Key insight: Dynamic DAGs avoid boilerplate by generating operator instances
# in a loop; each task is uniquely named and receives its config at runtime.
config_list = [
{'table': 'orders', 'source': 's3://bucket/orders/'},
{'table': 'customers', 'source': 's3://bucket/customers/'},
{'table': 'products', 'source': 's3://bucket/products/'},
]
def make_ingest_task(cfg):
"""Simulates an ingestion task operator."""
def _task():
logging.info(f"Ingesting {cfg['table']} from {cfg['source']}")
return f"{cfg['table']}: {np.random.randint(100,1000)} rows loaded"
_task.__name__ = f"ingest_{cfg['table']}"
return _task
dynamic_tasks = {cfg['table']: make_ingest_task(cfg) for cfg in config_list}
results = {}
for name, task_fn in dynamic_tasks.items():
results[name] = task_fn()
print(f' {name}: {results[name]}')
# Exercise 4: Email notification on task failure (mocked)
# Key insight: Airflow's on_failure_callback fires the notification function;
# here we mock SMTP send so the test is hermetic and doesn't need a mail server.
from unittest.mock import MagicMock, patch
def send_failure_email(context):
"""Mock email notification function (replace smtplib.SMTP with real SMTP in prod)."""
subject = f"AIRFLOW FAILURE: {context['task_id']} in DAG {context['dag_id']}"
body = f"Task failed at {context['execution_date']}. Exception: {context['exception']}"
with patch('smtplib.SMTP') as mock_smtp:
smtp_instance = mock_smtp.return_value.__enter__.return_value
smtp_instance.sendmail('airflow@company.com', 'team@company.com',
f'Subject: {subject}\n\n{body}')
logging.info(f'Email sent (mocked): {subject}')
return smtp_instance.sendmail.call_args
ctx = {'task_id': 'ingest_orders', 'dag_id': 'etl_pipeline',
'execution_date': datetime.now(), 'exception': ValueError('S3 read timeout')}
call_args = send_failure_email(ctx)
print('sendmail called with:', call_args)
# Exercise 5: Task branching β full_load vs incremental_load based on row_count
# Key insight: BranchPythonOperator returns the task_id to execute next;
# the skipped branch must be marked as 'skipped' (handled by Airflow trigger_rule).
def branch_decision(row_count, threshold=10_000):
"""Return the next task id based on row count."""
if row_count > threshold:
return 'full_load'
return 'incremental_load'
def full_load():
logging.info('Running FULL LOAD: truncate + reload all data')
return 'full_load complete'
def incremental_load():
logging.info('Running INCREMENTAL LOAD: append new rows only')
return 'incremental_load complete'
tasks = {'full_load': full_load, 'incremental_load': incremental_load}
for count in [5_000, 50_000]:
chosen = branch_decision(count)
result = tasks[chosen]()
print(f'row_count={count:>6} -> branch={chosen} -> {result}')
04 β Spark / PySpark BasicsΒΆ
# PySpark requires Java + a Spark installation.
# These cells show the full PySpark code and fall back to pandas equivalents.
try:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
spark = SparkSession.builder.master('local[2]').appName('solutions').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
SPARK = True
print('PySpark session started')
except Exception as e:
SPARK = False
print(f'PySpark not available ({e}). Showing pandas equivalents.')
import pandas as pd
import numpy as np
np.random.seed(42)
N_orders = 500
orders_df = pd.DataFrame({
'order_id': range(N_orders),
'product_id': np.random.randint(1, 20, N_orders),
'status': np.random.choice(['completed','pending','cancelled'], N_orders, p=[0.7,0.2,0.1]),
'quantity': np.random.randint(1, 10, N_orders),
'unit_price': np.random.uniform(5, 100, N_orders).round(2),
'order_date': pd.date_range('2023-01-01', periods=N_orders, freq='h').date
})
orders_df['total_value'] = orders_df['quantity'] * orders_df['unit_price']
# Exercise 1: PySpark pipeline β filter completed -> add total_value -> top-5 by revenue
# Key insight: PySpark lazy evaluation builds a logical plan; only the final
# .show()/.collect() triggers execution, enabling the optimizer to push filters down.
if SPARK:
sdf = spark.createDataFrame(orders_df)
result_sdf = (sdf
.filter(F.col('status') == 'completed')
.withColumn('total_value', F.col('quantity') * F.col('unit_price'))
.groupBy('product_id')
.agg(F.sum('total_value').alias('revenue'))
.orderBy(F.desc('revenue'))
.limit(5))
result_sdf.show()
else:
# Pandas equivalent
top5 = (orders_df[orders_df['status'] == 'completed']
.groupby('product_id')['total_value']
.sum().nlargest(5).reset_index()
.rename(columns={'total_value': 'revenue'}))
print(top5.to_string(index=False))
# Exercise 2: Running 7-day total revenue per product ordered by date
# Key insight: Spark window functions with ROWS/RANGE BETWEEN mirror SQL semantics;
# always partition first (reduces data shuffled) then apply the window.
daily_df = (orders_df.groupby(['product_id','order_date'])['total_value']
.sum().reset_index().rename(columns={'total_value':'daily_revenue'}))
daily_df['order_date'] = pd.to_datetime(daily_df['order_date'])
daily_df = daily_df.sort_values(['product_id','order_date'])
if SPARK:
sdf_daily = spark.createDataFrame(daily_df)
w7 = (Window.partitionBy('product_id')
.orderBy(F.col('order_date').cast('long'))
.rowsBetween(-6, 0))
(sdf_daily
.withColumn('rolling_7d', F.sum('daily_revenue').over(w7))
.filter(F.col('product_id') == 1)
.orderBy('order_date').show(10))
else:
daily_df['rolling_7d'] = (daily_df.groupby('product_id')['daily_revenue']
.transform(lambda x: x.rolling(7, min_periods=1).sum()))
print(daily_df[daily_df['product_id']==1].head(10).to_string(index=False))
# Exercise 3: Broadcast join with discounts DataFrame
# Key insight: Broadcasting a small lookup table eliminates the shuffle join;
# .explain() should show BroadcastHashJoin instead of SortMergeJoin.
discounts = pd.DataFrame({'product_id': range(1, 20), 'discount': np.random.uniform(0, 0.3, 19).round(2)})
if SPARK:
sdf_orders = spark.createDataFrame(orders_df)
sdf_discounts = spark.createDataFrame(discounts)
from pyspark.sql.functions import broadcast
joined = sdf_orders.join(broadcast(sdf_discounts), on='product_id', how='left')
joined = joined.withColumn('discounted_value', F.col('total_value') * (1 - F.col('discount')))
print('Physical plan excerpt:')
joined.explain(mode='simple') # should show BroadcastHashJoin
else:
merged = orders_df.merge(discounts, on='product_id', how='left')
merged['discounted_value'] = merged['total_value'] * (1 - merged['discount'])
print('Pandas merge (simulates broadcast join):')
print(merged[['order_id','product_id','total_value','discount','discounted_value']].head())
# Exercise 4: Repartition by category; count rows per partition
# Key insight: repartition() shuffles data to exactly N partitions;
# coalesce() reduces without full shuffle β use coalesce to shrink, repartition to redistribute.
orders_df['category'] = np.where(orders_df['product_id'] <= 6, 'A',
np.where(orders_df['product_id'] <= 13, 'B', 'C'))
if SPARK:
sdf_cat = spark.createDataFrame(orders_df)
sdf_repartitioned = sdf_cat.repartition(3, 'category')
# Count per partition via RDD
partition_counts = sdf_repartitioned.rdd.mapPartitions(
lambda it: [sum(1 for _ in it)]).collect()
print('Rows per partition after repartition by category:', partition_counts)
else:
print('Partition simulation (pandas groupby):')
print(orders_df.groupby('category').size())
# Exercise 5: Spark SQL RANK() top-3 per category
# Key insight: Spark SQL and the DataFrame API produce the same physical plan;
# SQL is often more readable for window operations β compare .explain() outputs.
if SPARK:
sdf_cat = spark.createDataFrame(orders_df)
sdf_cat.createOrReplaceTempView('orders_cat')
sql_top3 = """
SELECT * FROM (
SELECT category, product_id,
SUM(total_value) AS revenue,
RANK() OVER (PARTITION BY category ORDER BY SUM(total_value) DESC) AS rnk
FROM orders_cat
GROUP BY category, product_id
) WHERE rnk <= 3
ORDER BY category, rnk;
"""
spark.sql(sql_top3).show()
# Equivalent DataFrame API
from pyspark.sql.window import Window
w_cat = Window.partitionBy('category').orderBy(F.desc('revenue'))
(sdf_cat.groupBy('category','product_id')
.agg(F.sum('total_value').alias('revenue'))
.withColumn('rnk', F.rank().over(w_cat))
.filter(F.col('rnk') <= 3)
.orderBy('category','rnk')
.show())
else:
agg = orders_df.groupby(['category','product_id'])['total_value'].sum().reset_index()
agg['rnk'] = agg.groupby('category')['total_value'].rank(ascending=False, method='min').astype(int)
print(agg[agg['rnk'] <= 3].sort_values(['category','rnk']).to_string(index=False))
05 β dbt Data Modeling (pattern demonstrations)ΒΆ
# dbt runs against a warehouse; here we demonstrate each pattern as SQL/Python.
# In a real project, each snippet maps directly to a dbt model file.
conn_dbt = sqlite3.connect(':memory:')
conn_dbt.executescript("""
CREATE TABLE raw_events (
EventId INTEGER, UserId TEXT, EventType TEXT,
EventTimestamp TEXT, Revenue REAL);
INSERT INTO raw_events VALUES
(1,'u1','purchase','2023-06-01 10:00:00',99.99),
(2,'u2','view', '2023-06-01 11:00:00',NULL),
(3,'u1','purchase','2023-06-02 09:30:00',49.50);
""")
conn_dbt.commit()
# Exercise 1: Staging model β cast types, rename to snake_case, add loaded_at
# Key insight: Staging models are the single point of type casting and renaming;
# downstream models should never read from raw tables directly.
sql_staging = """
SELECT
CAST(EventId AS INTEGER) AS event_id,
CAST(UserId AS TEXT) AS user_id,
CAST(EventType AS TEXT) AS event_type,
CAST(EventTimestamp AS TEXT) AS event_timestamp,
CAST(COALESCE(Revenue, 0) AS REAL) AS revenue,
CURRENT_TIMESTAMP AS loaded_at
FROM raw_events;
"""
print('Staging model output:')
print(run_sql(conn_dbt, sql_staging))
# Exercise 2: dbt schema tests β not_null, unique, accepted_values, relationships
# Key insight: Schema tests run as SQL assertions; failures block the DAG,
# preventing bad data from reaching downstream consumers.
def dbt_test_not_null(conn, table, column):
n = run_sql(conn, f'SELECT COUNT(*) AS n FROM {table} WHERE {column} IS NULL').iloc[0,0]
status = 'PASS' if n == 0 else f'FAIL ({n} nulls)'
print(f' not_null({table}.{column}): {status}')
return n == 0
def dbt_test_unique(conn, table, column):
n = run_sql(conn, f'SELECT COUNT(*) - COUNT(DISTINCT {column}) AS n FROM {table}').iloc[0,0]
status = 'PASS' if n == 0 else f'FAIL ({n} duplicates)'
print(f' unique({table}.{column}): {status}')
return n == 0
def dbt_test_accepted_values(conn, table, column, values):
placeholders = ','.join('?' * len(values))
n = conn.execute(
f'SELECT COUNT(*) FROM {table} WHERE {column} NOT IN ({placeholders})',
values).fetchone()[0]
status = 'PASS' if n == 0 else f'FAIL ({n} bad values)'
print(f' accepted_values({table}.{column}): {status}')
return n == 0
conn_dbt.execute("CREATE VIEW stg_events AS " + sql_staging)
conn_dbt.commit()
print('Running schema tests on stg_events:')
dbt_test_not_null(conn_dbt, 'stg_events', 'event_id')
dbt_test_unique(conn_dbt, 'stg_events', 'event_id')
dbt_test_accepted_values(conn_dbt, 'stg_events', 'event_type', ['purchase','view','click','cart'])
# Exercise 3: Incremental model β load only orders from last 3 days
# Key insight: Incremental models use a WHERE clause on the max timestamp
# already in the target table; dbt's {{ is_incremental() }} macro toggles this.
# Simulate the dbt incremental pattern
def incremental_load_orders(conn, target_table, source_table='raw_events', lookback_days=3):
# Check if target exists
exists = conn.execute(
f"SELECT name FROM sqlite_master WHERE type='table' AND name='{target_table}'"
).fetchone()
if exists:
# Incremental: only new rows
sql = f"""
INSERT INTO {target_table}
SELECT * FROM {source_table}
WHERE date(EventTimestamp) >= date('now', '-{lookback_days} days')
AND EventId NOT IN (SELECT EventId FROM {target_table})"""
else:
# Full load on first run
sql = f"CREATE TABLE {target_table} AS SELECT * FROM {source_table}"
conn.execute(sql); conn.commit()
return run_sql(conn, f'SELECT COUNT(*) AS rows FROM {target_table}').iloc[0,0]
rows = incremental_load_orders(conn_dbt, 'fct_orders_incremental')
print(f'Rows in incremental target: {rows}')
print('Re-run (simulates incremental):')
rows2 = incremental_load_orders(conn_dbt, 'fct_orders_incremental')
print(f'Rows after second run (no duplicates): {rows2}')
# Exercise 4: Macro safe_divide β return NULL instead of division by zero
# Key insight: In dbt, macros are Jinja2 functions; safe_divide compiles to
# NULLIF(denominator, 0) which causes the DB to return NULL on zero division.
# Python simulation of the compiled SQL macro output
def safe_divide_sql(numerator, denominator):
"""Returns the SQL expression that dbt's safe_divide macro would render."""
return f'CAST({numerator} AS REAL) / NULLIF({denominator}, 0)'
# Test in SQLite
for num, den in [(10, 2), (10, 0), (0, 5), (7, 3)]:
expr = safe_divide_sql(num, den)
val = conn_dbt.execute(f'SELECT {expr}').fetchone()[0]
print(f'safe_divide({num}, {den}) = {val}')
# Exercise 5: SCD Type 2 snapshot β track valid_from/valid_to for customer address
# Key insight: SCD2 inserts a new row when a dimension attribute changes,
# setting valid_to on the old row and valid_from=now on the new row.
conn_scd = sqlite3.connect(':memory:')
conn_scd.executescript("""
CREATE TABLE dim_customers_scd2 (
customer_id TEXT, address TEXT,
valid_from TEXT, valid_to TEXT, is_current INTEGER);
CREATE TABLE source_customers (customer_id TEXT, address TEXT);
INSERT INTO source_customers VALUES ('C1','123 Main St'),('C2','456 Oak Ave');
""")
conn_scd.commit()
def scd2_upsert(conn, source_rows):
today = datetime.today().strftime('%Y-%m-%d')
for cust_id, new_addr in source_rows:
cur = conn.execute(
'SELECT address FROM dim_customers_scd2 WHERE customer_id=? AND is_current=1',
(cust_id,)).fetchone()
if cur is None:
conn.execute('INSERT INTO dim_customers_scd2 VALUES (?,?,?,NULL,1)',
(cust_id, new_addr, today))
elif cur[0] != new_addr:
# Expire old row
conn.execute('UPDATE dim_customers_scd2 SET valid_to=?, is_current=0 '
'WHERE customer_id=? AND is_current=1', (today, cust_id))
# Insert new row
conn.execute('INSERT INTO dim_customers_scd2 VALUES (?,?,?,NULL,1)',
(cust_id, new_addr, today))
conn.commit()
scd2_upsert(conn_scd, [('C1','123 Main St'), ('C2','456 Oak Ave')]) # first load
scd2_upsert(conn_scd, [('C1','999 New Blvd')]) # C1 moves
print(run_sql(conn_scd, 'SELECT * FROM dim_customers_scd2 ORDER BY customer_id, valid_from'))
06 β Streaming BasicsΒΆ
# These exercises simulate streaming primitives in pure Python.
# Real deployments use Kafka + Flink/Spark Structured Streaming.
from collections import deque, defaultdict
import heapq
# Exercise 1: SlidingWindowAggregator with configurable window_size and slide_interval
# Key insight: Sliding windows compute aggregates over overlapping time buckets;
# window_size determines how far back events are included, slide_interval how often to emit.
class SlidingWindowAggregator:
def __init__(self, window_size, slide_interval, agg_fn=sum):
self.window_size = window_size
self.slide_interval = slide_interval
self.agg_fn = agg_fn
self.buffer = deque() # (timestamp, value)
self.last_emit = None
self.results = []
def process(self, timestamp, value):
self.buffer.append((timestamp, value))
# Evict events outside the window
while self.buffer and self.buffer[0][0] < timestamp - self.window_size:
self.buffer.popleft()
# Emit if slide_interval has passed
if self.last_emit is None or timestamp - self.last_emit >= self.slide_interval:
agg = self.agg_fn(v for _, v in self.buffer)
self.results.append((timestamp, agg))
self.last_emit = timestamp
return self.results[-1] if self.results else None
# Simulate stream: sales events every second
agg = SlidingWindowAggregator(window_size=5, slide_interval=2, agg_fn=sum)
stream = [(t, np.random.randint(1, 20)) for t in range(20)]
for ts, val in stream:
out = agg.process(ts, val)
if out and out[0] == ts:
print(f't={ts:2d}: window_sum={out[1]}')
# Exercise 2: Rolling leaderboard β top-10 players in a 1-hour window
# Key insight: Maintain a heap of (score, player_id) with eviction of events
# older than 1 hour; recompute top-K from active scores using a score_map.
class RollingLeaderboard:
def __init__(self, window_seconds=3600, top_k=10):
self.window = window_seconds
self.top_k = top_k
self.events = deque() # (timestamp, player_id, score)
self.scores = defaultdict(float)
def add_event(self, timestamp, player_id, score):
self.events.append((timestamp, player_id, score))
self.scores[player_id] += score
# Evict old events
while self.events and self.events[0][0] < timestamp - self.window:
_, old_player, old_score = self.events.popleft()
self.scores[old_player] -= old_score
if self.scores[old_player] <= 0:
del self.scores[old_player]
def top_players(self):
return sorted(self.scores.items(), key=lambda x: -x[1])[:self.top_k]
lb = RollingLeaderboard(window_seconds=3600, top_k=5)
np.random.seed(3)
for t in range(200):
lb.add_event(t * 30, f'player_{np.random.randint(1,20)}', np.random.randint(10,500))
print('Top-5 leaderboard (1-hour window):')
for rank, (player, score) in enumerate(lb.top_players(), 1):
print(f' {rank}. {player}: {score:.0f} pts')
# Exercise 3: Consumer group rebalancing
# Key insight: When a consumer joins or leaves, partitions are redistributed
# evenly (round-robin or range strategy); this is triggered by a group coordinator.
def rebalance_partitions(partitions, consumers):
"""Round-robin partition assignment."""
assignment = defaultdict(list)
for i, p in enumerate(partitions):
assignment[consumers[i % len(consumers)]].append(p)
return dict(assignment)
partitions = [f'p{i}' for i in range(6)]
print('Initial (3 consumers):')
consumers = ['C1','C2','C3']
assign1 = rebalance_partitions(partitions, consumers)
for c, ps in assign1.items(): print(f' {c}: {ps}')
print('\nAfter C4 joins (4 consumers):')
consumers2 = ['C1','C2','C3','C4']
assign2 = rebalance_partitions(partitions, consumers2)
for c, ps in assign2.items(): print(f' {c}: {ps}')
print('\nAfter C2 leaves (3 consumers):')
consumers3 = ['C1','C3','C4']
assign3 = rebalance_partitions(partitions, consumers3)
for c, ps in assign3.items(): print(f' {c}: {ps}')
# Exercise 4: Dead letter queue β failed events routed for inspection
# Key insight: A DLQ captures events that cannot be processed (schema errors,
# downstream failures) so they can be replayed or inspected without data loss.
from queue import Queue
class StreamProcessor:
def __init__(self):
self.main_queue = Queue()
self.dlq = Queue()
self.processed = []
def process_event(self, event):
try:
# Simulate: events with negative amount are invalid
if not isinstance(event.get('amount'), (int, float)):
raise ValueError(f'Invalid amount: {event.get("amount")}')
if event['amount'] < 0:
raise ValueError(f'Negative amount: {event["amount"]}')
self.processed.append(event)
return 'processed'
except Exception as e:
self.dlq.put({'event': event, 'error': str(e), 'timestamp': datetime.now()})
return 'dlq'
events = [
{'id': 1, 'amount': 100}, {'id': 2, 'amount': -50},
{'id': 3, 'amount': 'bad'}, {'id': 4, 'amount': 200}
]
proc = StreamProcessor()
for ev in events:
status = proc.process_event(ev)
print(f'Event {ev["id"]}: {status}')
print(f'\nProcessed: {len(proc.processed)}, DLQ: {proc.dlq.qsize()}')
print('DLQ contents:')
while not proc.dlq.empty():
item = proc.dlq.get()
print(f' {item["event"]} -> {item["error"]}')
# Exercise 5: Stream join β match purchase events with product catalog within 5-minute window
# Key insight: Stream-table joins look up the catalog on each event;
# stream-stream joins buffer events and match within a time tolerance window.
import time as time_module
class StreamJoiner:
def __init__(self, window_seconds=300):
self.window = window_seconds
self.left_buffer = deque() # purchase events
self.catalog = {} # product_id -> product_info
self.matches = []
def add_catalog(self, product_id, info):
self.catalog[product_id] = info
def add_purchase(self, timestamp, event):
self.left_buffer.append((timestamp, event))
# Evict old events
while self.left_buffer and self.left_buffer[0][0] < timestamp - self.window:
self.left_buffer.popleft()
# Try to join with catalog
pid = event.get('product_id')
if pid in self.catalog:
self.matches.append({**event, **self.catalog[pid], 'matched_at': timestamp})
return self.matches[-1] if self.matches else None
joiner = StreamJoiner(window_seconds=300)
joiner.add_catalog(101, {'name': 'Widget A', 'category': 'Gadgets'})
joiner.add_catalog(202, {'name': 'Widget B', 'category': 'Tools'})
purchases = [
(0, {'user_id': 'u1', 'product_id': 101, 'amount': 29.99}),
(120, {'user_id': 'u2', 'product_id': 999, 'amount': 9.99}), # no catalog match
(240, {'user_id': 'u3', 'product_id': 202, 'amount': 49.99}),
]
for ts, purchase in purchases:
match = joiner.add_purchase(ts, purchase)
print(f't={ts}s: {"JOINED -> " + str(match) if match and match.get("name") else "NO MATCH"}')