Solutions: Python & Data Science TrackΒΆ

Worked solutions to all exercises from the python-data-science/ notebooks.

01 β€” Pandas FundamentalsΒΆ

import pandas as pd
import numpy as np

# ── Exercise 1: Chain 3+ method calls ──────────────────────────────────────
# Key insight: method chaining keeps transformations readable and avoids temp variables.

rng = np.random.default_rng(42)
n = 891
titanic = pd.DataFrame({
    'survived': rng.integers(0, 2, n),
    'pclass':   rng.choice([1, 2, 3], n),
    'age':      rng.normal(30, 14, n).clip(1, 80),
    'fare':     rng.exponential(32, n),
    'sex':      rng.choice(['male', 'female'], n),
})

result = (
    titanic
    .query('age > 18')                                    # filter adults
    .groupby(['pclass', 'sex'])['fare']                   # groupby
    .transform('mean')                                    # transform: replace with group mean
    .reset_index(name='mean_fare_by_class_sex')           # flatten
    .sort_values('mean_fare_by_class_sex', ascending=False)
    .head(10)
)
print(result)
# ── Exercise 2: Reshape wideβ†’long with melt, then back with pivot_table ────
# Key insight: melt unpivots value columns into rows; pivot_table is the inverse.

wide = pd.DataFrame({
    'student':  ['Alice', 'Bob', 'Carol'],
    'math':     [90, 75, 85],
    'english':  [80, 95, 70],
    'science':  [88, 60, 92],
})

# Wide β†’ Long
long = wide.melt(id_vars='student', var_name='subject', value_name='score')
print('Long format:')
print(long)

# Long β†’ Wide  (reconstructs original)
wide_again = long.pivot_table(index='student', columns='subject', values='score')
wide_again.columns.name = None
wide_again = wide_again.reset_index()
print('\nReconstructed wide format:')
print(wide_again)
# ── Exercise 3: Optimize 1M-row DataFrame memory ──────────────────────────
# Key insight: int64β†’int16/int8 and float64β†’float32 halve/quarter memory;
#              object columns with few unique values become category.

rng = np.random.default_rng(0)
N = 1_000_000
df = pd.DataFrame({
    'age':      rng.integers(0, 120, N).astype('int64'),
    'score':    rng.random(N).astype('float64'),
    'count':    rng.integers(0, 50_000, N).astype('int64'),
    'category': rng.choice(['A', 'B', 'C', 'D'], N),
    'status':   rng.choice(['active', 'inactive', 'pending'], N),
})

before_mb = df.memory_usage(deep=True).sum() / 1024**2

# Downcast numerics
df['age']   = pd.to_numeric(df['age'],   downcast='integer')
df['count'] = pd.to_numeric(df['count'], downcast='integer')
df['score'] = df['score'].astype('float32')
# Convert low-cardinality strings to category
df['category'] = df['category'].astype('category')
df['status']   = df['status'].astype('category')

after_mb = df.memory_usage(deep=True).sum() / 1024**2
print(f'Before: {before_mb:.1f} MB  |  After: {after_mb:.1f} MB  |  Reduction: {1 - after_mb/before_mb:.0%}')
# ── Exercise 4: Custom aggregation with groupby.apply ─────────────────────
# Key insight: apply receives a sub-DataFrame and can return a Series with
#              arbitrary keys β€” useful when you need multiple stats at once.

from scipy import stats as sp_stats

def rich_stats(group: pd.DataFrame) -> pd.Series:
    vals = group['fare']
    return pd.Series({
        'mean':     vals.mean(),
        'median':   vals.median(),
        'std':      vals.std(),
        'skew':     vals.skew(),
        'q25':      vals.quantile(0.25),
        'q75':      vals.quantile(0.75),
        'count':    len(vals),
    })

agg_result = titanic.groupby('pclass').apply(rich_stats)
print(agg_result)
# ── Exercise 5: Merge 3 DataFrames with different join types ──────────────
# Key insight: always verify row counts after each merge to catch fan-out or
#              dropped rows from inner joins.

customers = pd.DataFrame({'cid': [1, 2, 3, 4], 'name': ['Alice', 'Bob', 'Carol', 'Dan']})
orders    = pd.DataFrame({'oid': [10, 11, 12], 'cid': [1, 2, 1], 'amount': [100, 200, 150]})
payments  = pd.DataFrame({'oid': [10, 11, 99], 'paid': [True, False, True]})

# Left join: keep all customers even without orders
cust_orders = customers.merge(orders, on='cid', how='left')
print(f'customers({len(customers)}) LEFT orders({len(orders)}) β†’ {len(cust_orders)} rows')

# Inner join: only orders that have a matching payment
order_payments = orders.merge(payments, on='oid', how='inner')
print(f'orders({len(orders)}) INNER payments({len(payments)}) β†’ {len(order_payments)} rows')

# Outer join on everything
full = cust_orders.merge(payments, on='oid', how='outer')
print(f'Full outer result: {len(full)} rows')
print(full)

02 β€” Exploratory Data AnalysisΒΆ

import matplotlib.pyplot as plt
import seaborn as sns
from scipy.spatial.distance import mahalanobis
from scipy import stats

rng = np.random.default_rng(7)
X = rng.multivariate_normal([0, 0], [[1, 0.8], [0.8, 1]], 200)
# Inject 10 outliers
outliers = rng.uniform(3, 5, (10, 2))
X = np.vstack([X, outliers])
df_eda = pd.DataFrame(X, columns=['x', 'y'])

# ── Exercise 1: Mahalanobis-based multivariate outlier detection ───────────
# Key insight: Mahalanobis accounts for variable correlations β€” a point far
#              from the centroid in the covariance-adjusted sense is an outlier.

cov = np.cov(df_eda[['x', 'y']].T)
inv_cov = np.linalg.inv(cov)
center = df_eda[['x', 'y']].mean().values

df_eda['mahal'] = df_eda[['x', 'y']].apply(
    lambda row: mahalanobis(row.values, center, inv_cov), axis=1
)
# Chi2 threshold at 97.5th percentile with 2 dof
threshold = np.sqrt(stats.chi2.ppf(0.975, df=2))
df_eda['outlier'] = df_eda['mahal'] > threshold

fig, ax = plt.subplots(figsize=(6, 5))
colors = df_eda['outlier'].map({False: 'steelblue', True: 'crimson'})
ax.scatter(df_eda['x'], df_eda['y'], c=colors, alpha=0.7)
ax.set_title(f'Mahalanobis Outliers (threshold={threshold:.2f})')
ax.set_xlabel('x'); ax.set_ylabel('y')
plt.tight_layout(); plt.show()
print(f'Detected {df_eda["outlier"].sum()} outliers')
# ── Exercise 2: Pairplot with regression lines, color by target ───────────
# Key insight: sns.pairplot with kind='reg' adds regression lines per pair;
#              hue separates classes so you can spot separability visually.

from sklearn.datasets import load_iris
iris = load_iris(as_frame=True)
df_iris = iris.frame.copy()
df_iris['species'] = pd.Categorical.from_codes(df_iris['target'], iris.target_names)

g = sns.pairplot(df_iris.drop(columns='target'), hue='species', kind='reg',
                 plot_kws={'scatter_kws': {'alpha': 0.4}, 'line_kws': {'lw': 1}})
g.fig.suptitle('Iris Pairplot β€” regression lines, colored by species', y=1.02)
plt.show()

# Correlation matrix
corr = df_iris.drop(columns=['target', 'species']).corr()
print('Correlation matrix:')
print(corr.round(2))
# ── Exercise 3: Custom profiling function ─────────────────────────────────
# Key insight: a good profiler surfaces everything a data scientist needs
#              at a glance before modelling β€” dtypes, nulls, cardinality, distribution.

def profile_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    rows = []
    for col in df.columns:
        s = df[col]
        info = {
            'column':     col,
            'dtype':      str(s.dtype),
            'null_pct':   f"{s.isna().mean()*100:.1f}%",
            'unique':     s.nunique(),
            'top5':       str(s.value_counts().head(5).index.tolist()),
        }
        if pd.api.types.is_numeric_dtype(s):
            info.update({
                'mean':  round(s.mean(), 4),
                'std':   round(s.std(),  4),
                'skew':  round(s.skew(), 4),
                'kurt':  round(s.kurt(), 4),
            })
        rows.append(info)
    return pd.DataFrame(rows).set_index('column')

rng = np.random.default_rng(1)
demo = pd.DataFrame({
    'age':    rng.integers(18, 80, 500).astype(float),
    'income': np.where(rng.random(500) < 0.05, np.nan, rng.exponential(50_000, 500)),
    'city':   rng.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix'], 500),
})

print(profile_dataframe(demo).to_string())
# ── Exercise 4: pd.cut vs pd.qcut ─────────────────────────────────────────
# Key insight: pd.cut uses equal-width bins (good for uniform distributions);
#              pd.qcut uses equal-frequency bins (good for skewed distributions).

rng = np.random.default_rng(42)
income = pd.Series(rng.exponential(50_000, 1000), name='income')

cut_bins  = pd.cut(income,  bins=5, labels=[f'cut_{i}' for i in range(1,6)])
qcut_bins = pd.qcut(income, q=5,   labels=[f'qcut_{i}' for i in range(1,6)])

fig, axes = plt.subplots(1, 2, figsize=(12, 4))
cut_bins.value_counts().sort_index().plot(kind='bar', ax=axes[0], color='steelblue')
axes[0].set_title('pd.cut β€” equal-width bins (skewed counts)'); axes[0].set_ylabel('count')

qcut_bins.value_counts().sort_index().plot(kind='bar', ax=axes[1], color='darkorange')
axes[1].set_title('pd.qcut β€” equal-frequency bins (uniform counts)'); axes[1].set_ylabel('count')

plt.tight_layout(); plt.show()
# ── Exercise 5: Feature drift detection using Kolmogorov-Smirnov test ─────
# Key insight: KS test compares two empirical distributions without assuming normality;
#              a small p-value signals that the feature has drifted between time periods.

from scipy.stats import ks_2samp

rng = np.random.default_rng(0)
n = 500
# Period A (stable)
period_a = pd.DataFrame({
    'age':    rng.normal(35, 10, n),
    'income': rng.normal(60_000, 15_000, n),
    'score':  rng.uniform(0, 1, n),
})
# Period B (age drifts, income stays, score drifts)
period_b = pd.DataFrame({
    'age':    rng.normal(40, 12, n),      # shifted mean
    'income': rng.normal(61_000, 15_200, n),  # negligible drift
    'score':  rng.beta(2, 5, n),          # distribution shape change
})

results = []
for col in period_a.columns:
    stat, p = ks_2samp(period_a[col], period_b[col])
    results.append({'feature': col, 'ks_stat': round(stat, 4), 'p_value': round(p, 4),
                    'drifted': p < 0.05})

drift_df = pd.DataFrame(results)
print(drift_df.to_string(index=False))

03 β€” Data VisualizationΒΆ

import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd

rng = np.random.default_rng(42)

# ── Exercise 1: Multi-panel 2Γ—3 dashboard ─────────────────────────────────
# Key insight: plt.subplots creates an axis grid; tight_layout prevents overlapping labels.

months = ['Jan','Feb','Mar','Apr','May','Jun']
sales  = rng.integers(100, 400, 6)
temps  = rng.normal(15, 8, 100)
x_sc   = rng.normal(0, 1, 80)
y_sc   = x_sc * 0.7 + rng.normal(0, 0.5, 80)
corr_m = np.corrcoef(rng.normal(0,1,(4,50)))
groups = rng.choice(['A','B','C'], 150)
values = rng.exponential(1, 150) + (groups == 'B') * 0.5

fig, axes = plt.subplots(2, 3, figsize=(14, 8))

# Bar
axes[0,0].bar(months, sales, color='steelblue')
axes[0,0].set_title('Monthly Sales (Bar)')

# Line
axes[0,1].plot(months, sales, marker='o', color='darkorange')
axes[0,1].set_title('Monthly Sales (Line)')

# Scatter
axes[0,2].scatter(x_sc, y_sc, alpha=0.5, color='green')
axes[0,2].set_title('Scatter: x vs y')

# Heatmap
im = axes[1,0].imshow(corr_m, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(im, ax=axes[1,0], fraction=0.046)
axes[1,0].set_title('Correlation Heatmap')

# Box
data_box = [values[groups == g] for g in ['A','B','C']]
axes[1,1].boxplot(data_box, labels=['A','B','C'], patch_artist=True)
axes[1,1].set_title('Box: Values by Group')

# Histogram
axes[1,2].hist(temps, bins=20, color='purple', edgecolor='white')
axes[1,2].set_title('Temperature Distribution')

plt.suptitle('2Γ—3 Dashboard', fontsize=14, y=1.01)
plt.tight_layout()
plt.show()
# ── Exercise 2: Animated cumulative sales (matplotlib FuncAnimation) ───────
# Key insight: FuncAnimation calls update() for each frame; blit=True only
#              redraws changed artists β€” critical for smooth animation.

from matplotlib.animation import FuncAnimation
from IPython.display import HTML

rng = np.random.default_rng(5)
months = [f'M{i:02d}' for i in range(1, 25)]
monthly_sales = rng.integers(50, 200, 24)
cumulative    = np.cumsum(monthly_sales)

fig, ax = plt.subplots(figsize=(8, 4))
ax.set_xlim(0, len(months) - 1)
ax.set_ylim(0, cumulative.max() * 1.1)
ax.set_xticks(range(len(months))); ax.set_xticklabels(months, rotation=45, ha='right')
ax.set_title('Cumulative Sales (Animated)')
ax.set_ylabel('Cumulative Sales')
line, = ax.plot([], [], 'o-', color='steelblue', lw=2)

def update(frame):
    line.set_data(range(frame + 1), cumulative[:frame + 1])
    return (line,)

ani = FuncAnimation(fig, update, frames=len(months), interval=150, blit=True)
plt.tight_layout()
# Display as HTML in Jupyter; save with ani.save('sales.gif') elsewhere
display(HTML(ani.to_jshtml()))
plt.close()
# ── Exercise 3: Recreate FacetGrid from scratch with matplotlib ───────────
# Key insight: a FacetGrid is just a loop over unique values creating subplots;
#              sharing axes via sharex/sharey gives aligned scales automatically.

from sklearn.datasets import load_iris
iris = load_iris(as_frame=True)
df = iris.frame.copy()
df['species'] = pd.Categorical.from_codes(df['target'], iris.target_names)

species_list = df['species'].cat.categories.tolist()
ncols = len(species_list)
fig, axes = plt.subplots(1, ncols, figsize=(5 * ncols, 4), sharey=True)

for ax, sp in zip(axes, species_list):
    subset = df[df['species'] == sp]
    ax.scatter(subset['sepal length (cm)'], subset['petal length (cm)'],
               alpha=0.6, edgecolors='k', linewidths=0.3)
    ax.set_title(sp, fontsize=12)
    ax.set_xlabel('Sepal Length (cm)')

axes[0].set_ylabel('Petal Length (cm)')
plt.suptitle('Sepal Length vs Petal Length by Species (manual FacetGrid)', y=1.02)
plt.tight_layout()
plt.show()
# ── Exercise 4: Deterministic categorical color palette function ───────────
# Key insight: hash(category) gives a deterministic integer; modding by 360
#              maps it to an HSL hue so the same string always β†’ same color.

import colorsys

def make_palette(categories, saturation=0.65, lightness=0.50):
    """Map each category to a hex color deterministically via hash."""
    palette = {}
    for cat in categories:
        hue = (hash(str(cat)) % 360) / 360.0
        r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
        palette[cat] = '#{:02x}{:02x}{:02x}'.format(int(r*255), int(g*255), int(b*255))
    return palette

cats = ['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon']
pal = make_palette(cats)

fig, ax = plt.subplots(figsize=(7, 2))
for i, (cat, color) in enumerate(pal.items()):
    ax.add_patch(plt.Rectangle((i, 0), 0.9, 1, color=color))
    ax.text(i + 0.45, -0.2, f'{cat}\n{color}', ha='center', va='top', fontsize=8)
ax.set_xlim(0, len(cats))
ax.set_ylim(-0.5, 1.1)
ax.axis('off')
ax.set_title('Deterministic color palette')
plt.tight_layout()
plt.show()
print(pal)
# ── Exercise 5: Plotly Express scatter with marginal distributions ─────────
# Key insight: px.scatter's marginal_x/marginal_y parameters add side histograms
#              or box plots with zero extra code; hover_data enriches tooltips.

import plotly.express as px

rng = np.random.default_rng(3)
n = 300
df_px = pd.DataFrame({
    'x':        rng.normal(0, 1, n),
    'y':        rng.normal(0, 1, n) * 0.5 + rng.normal(0, 1, n) * 0.5,
    'category': rng.choice(['Group A', 'Group B', 'Group C'], n),
    'size_val':  rng.uniform(5, 20, n),
    'id':        range(n),
})

fig = px.scatter(
    df_px, x='x', y='y',
    color='category',
    size='size_val',
    hover_data=['id', 'size_val'],
    marginal_x='histogram',
    marginal_y='box',
    title='Scatter with Marginal Distributions (Plotly Express)',
    opacity=0.7,
)
fig.show()

04 β€” Data Cleaning PipelinesΒΆ

import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification

# ── Exercise 1: IQR Outlier Flagger transformer ────────────────────────────
# Key insight: a custom transformer that fits the IQR on training data and flags
#              outliers at predict time avoids data leakage.

class IQROutlierFlagger(BaseEstimator, TransformerMixin):
    """Adds a binary `is_outlier` column based on IQR of a target numeric column."""

    def __init__(self, column: str, multiplier: float = 1.5):
        self.column = column
        self.multiplier = multiplier

    def fit(self, X: pd.DataFrame, y=None):
        q1 = X[self.column].quantile(0.25)
        q3 = X[self.column].quantile(0.75)
        iqr = q3 - q1
        self.lower_ = q1 - self.multiplier * iqr
        self.upper_ = q3 + self.multiplier * iqr
        return self

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X = X.copy()
        X['is_outlier'] = ((X[self.column] < self.lower_) |
                           (X[self.column] > self.upper_)).astype(int)
        return X

rng = np.random.default_rng(0)
df_clean = pd.DataFrame({'income': rng.normal(50_000, 10_000, 200)})
# Inject outliers
df_clean.loc[0, 'income'] = 200_000
df_clean.loc[1, 'income'] = -5_000

flagger = IQROutlierFlagger(column='income')
result  = flagger.fit_transform(df_clean)
print(f'Outliers flagged: {result["is_outlier"].sum()}')
print(result[result['is_outlier'] == 1].head())
# ── Exercise 2: ColumnTransformer handling multiple feature types ──────────
# Key insight: ColumnTransformer applies different transformers to different
#              column subsets and concatenates results β€” the standard pattern
#              for heterogeneous tabular data.

rng = np.random.default_rng(1)
n = 300
df_het = pd.DataFrame({
    'age':       rng.integers(18, 70, n).astype(float),
    'income':    np.where(rng.random(n) < 0.1, np.nan, rng.normal(50_000, 10_000, n)),
    'education': rng.choice(['high_school', 'bachelors', 'masters', 'phd'], n),
    'city':      rng.choice(['NYC', 'LA', 'Chicago'], n),
    'signup_dt': pd.date_range('2020-01-01', periods=n, freq='D').to_series().sample(n, replace=True, random_state=0).values,
})

# Extract datetime features before ColumnTransformer
df_het['year']        = pd.to_datetime(df_het['signup_dt']).dt.year
df_het['month']       = pd.to_datetime(df_het['signup_dt']).dt.month
df_het['dayofweek']   = pd.to_datetime(df_het['signup_dt']).dt.dayofweek
df_het = df_het.drop(columns='signup_dt')

numeric_features  = ['age', 'income', 'year', 'month', 'dayofweek']
ordinal_features  = ['education']
nominal_features  = ['city']
ordinal_categories = [['high_school', 'bachelors', 'masters', 'phd']]

preprocessor = ColumnTransformer([
    ('num',     Pipeline([('imp', SimpleImputer()), ('scl', StandardScaler())]), numeric_features),
    ('ord',     OrdinalEncoder(categories=ordinal_categories),                  ordinal_features),
    ('nom',     OneHotEncoder(handle_unknown='ignore', sparse_output=False),    nominal_features),
])

X_transformed = preprocessor.fit_transform(df_het)
print(f'Input shape: {df_het.shape}  β†’  Output shape: {X_transformed.shape}')
# ── Exercise 3: DataCleaner class with configurable pipeline steps ─────────
# Key insight: encapsulating each cleaning step as a method with config params
#              makes the cleaner reusable, testable, and easy to extend.

class DataCleaner:
    def __init__(self,
                 missing_strategy='median',
                 outlier_method='iqr',
                 iqr_multiplier=1.5,
                 cap_instead_of_drop=True):
        self.missing_strategy  = missing_strategy
        self.outlier_method    = outlier_method
        self.iqr_multiplier    = iqr_multiplier
        self.cap_instead_of_drop = cap_instead_of_drop

    def handle_missing(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        for col in df.select_dtypes(include='number').columns:
            if self.missing_strategy == 'median':
                df[col] = df[col].fillna(df[col].median())
            elif self.missing_strategy == 'mean':
                df[col] = df[col].fillna(df[col].mean())
        for col in df.select_dtypes(include='object').columns:
            df[col] = df[col].fillna(df[col].mode()[0])
        return df

    def fix_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        for col in df.select_dtypes(include='object').columns:
            if df[col].nunique() / len(df) < 0.05:
                df[col] = df[col].astype('category')
        return df

    def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
        before = len(df)
        df = df.drop_duplicates()
        print(f'  remove_duplicates: {before - len(df)} rows removed')
        return df

    def cap_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
        df = df.copy()
        for col in df.select_dtypes(include='number').columns:
            q1, q3 = df[col].quantile([0.25, 0.75])
            iqr = q3 - q1
            lo, hi = q1 - self.iqr_multiplier * iqr, q3 + self.iqr_multiplier * iqr
            if self.cap_instead_of_drop:
                df[col] = df[col].clip(lo, hi)
            else:
                df = df[(df[col] >= lo) & (df[col] <= hi)]
        return df

    def clean(self, df: pd.DataFrame) -> pd.DataFrame:
        df = self.handle_missing(df)
        df = self.fix_dtypes(df)
        df = self.remove_duplicates(df)
        df = self.cap_outliers(df)
        return df

rng = np.random.default_rng(42)
messy = pd.DataFrame({
    'age':    np.where(rng.random(200) < 0.1, np.nan, rng.integers(18, 80, 200).astype(float)),
    'income': rng.normal(50_000, 15_000, 200),
    'city':   rng.choice(['NYC', 'LA', 'Chicago'], 200),
})
messy.loc[0, 'income'] = 500_000  # outlier
messy = pd.concat([messy, messy.iloc[:5]])  # add duplicates

cleaner = DataCleaner()
clean   = cleaner.clean(messy)
print(f'Shape: {messy.shape} β†’ {clean.shape}')
print(clean.dtypes)
# ── Exercise 4: Full sklearn Pipeline β€” imputeβ†’scaleβ†’PCAβ†’LogReg ────────────
# Key insight: Pipeline ensures all transformations are fitted on training data
#              only, preventing leakage; make_pipeline infers step names.

from sklearn.model_selection import cross_val_score

X_raw, y = make_classification(n_samples=500, n_features=20, n_informative=10,
                                random_state=42)
# Add some missing values
mask = np.random.default_rng(0).random(X_raw.shape) < 0.05
X_raw[mask] = np.nan

pipe = make_pipeline(
    SimpleImputer(strategy='mean'),
    StandardScaler(),
    PCA(n_components=8),
    LogisticRegression(max_iter=500, random_state=42)
)

cv_scores = cross_val_score(pipe, X_raw, y, cv=5, scoring='accuracy')
print(f'CV accuracy: {cv_scores.mean():.4f} Β± {cv_scores.std():.4f}')
print(f'Pipeline steps: {[name for name, _ in pipe.steps]}')
# ── Exercise 5: Schema validation function ────────────────────────────────
# Key insight: validating data against a schema at pipeline ingestion catches
#              upstream errors early before they silently corrupt model outputs.

from typing import Any

def validate_schema(df: pd.DataFrame, schema: dict) -> list[str]:
    """
    schema = {
        'col_name': {
            'dtype': 'numeric' | 'categorical' | 'datetime',
            'min':   ...,  # optional, numeric only
            'max':   ...,  # optional, numeric only
            'allowed': [...],  # optional, categorical only
            'nullable': bool,   # default False
        }
    }
    Returns a list of violation messages (empty = pass).
    """
    violations = []
    for col, rules in schema.items():
        if col not in df.columns:
            violations.append(f'MISSING column: {col}'); continue

        s = df[col]
        nullable = rules.get('nullable', False)
        if not nullable and s.isna().any():
            violations.append(f'{col}: unexpected nulls ({s.isna().sum()})')

        dtype_kind = rules.get('dtype')
        if dtype_kind == 'numeric' and not pd.api.types.is_numeric_dtype(s):
            violations.append(f'{col}: expected numeric, got {s.dtype}')
        if dtype_kind == 'datetime' and not pd.api.types.is_datetime64_any_dtype(s):
            violations.append(f'{col}: expected datetime, got {s.dtype}')

        if 'min' in rules and s.min() < rules['min']:
            violations.append(f'{col}: min={s.min()} below allowed {rules["min"]}')
        if 'max' in rules and s.max() > rules['max']:
            violations.append(f'{col}: max={s.max()} above allowed {rules["max"]}')
        if 'allowed' in rules:
            bad = set(s.dropna().unique()) - set(rules['allowed'])
            if bad:
                violations.append(f'{col}: disallowed values {bad}')

    return violations


schema = {
    'age':    {'dtype': 'numeric', 'min': 0, 'max': 120, 'nullable': False},
    'status': {'dtype': 'categorical', 'allowed': ['active', 'inactive'], 'nullable': False},
    'score':  {'dtype': 'numeric', 'min': 0.0, 'max': 1.0, 'nullable': True},
}

good_df = pd.DataFrame({'age': [25, 40], 'status': ['active', 'inactive'], 'score': [0.9, np.nan]})
bad_df  = pd.DataFrame({'age': [-1, 200], 'status': ['active', 'UNKNOWN'], 'score': [1.5, 0.2]})

print('Good DataFrame violations:', validate_schema(good_df, schema))
print('Bad DataFrame violations:')
for v in validate_schema(bad_df, schema):
    print(' ', v)

05 β€” Feature EngineeringΒΆ

import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import KFold
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.preprocessing import PolynomialFeatures
from sklearn.datasets import make_classification

# ── Exercise 1: Target encoding with cross-validation (leak-free) ──────────
# Key insight: encoding the target on the FULL training set leaks information;
#              using OOF predictions to compute encodings prevents this.

rng = np.random.default_rng(0)
n = 1000
df_te = pd.DataFrame({
    'city':   rng.choice(['NYC', 'LA', 'Chicago', 'Houston'], n),
    'target': rng.integers(0, 2, n),
})

def target_encode_cv(df: pd.DataFrame, col: str, target: str, n_splits=5) -> pd.Series:
    kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
    encoded = pd.Series(np.nan, index=df.index)
    global_mean = df[target].mean()

    for train_idx, val_idx in kf.split(df):
        means = df.iloc[train_idx].groupby(col)[target].mean()
        encoded.iloc[val_idx] = df.iloc[val_idx][col].map(means).fillna(global_mean)

    return encoded

df_te['city_encoded'] = target_encode_cv(df_te, 'city', 'target')
print(df_te.groupby('city')[['target', 'city_encoded']].mean())
# ── Exercise 2: Interaction features via pairwise products β†’ SelectKBest ───
# Key insight: pairwise products capture multiplicative relationships that linear
#              models cannot express; SelectKBest prunes uninformative combos.

X, y = make_classification(n_samples=500, n_features=10, n_informative=5, random_state=1)
feature_names = [f'f{i}' for i in range(X.shape[1])]
df_x = pd.DataFrame(X, columns=feature_names)

top5 = df_x.var().nlargest(5).index.tolist()

interactions = {}
for i, a in enumerate(top5):
    for b in top5[i+1:]:
        interactions[f'{a}x{b}'] = df_x[a] * df_x[b]

df_inter = pd.concat([df_x] + [pd.Series(v, name=k) for k, v in interactions.items()], axis=1)

selector = SelectKBest(score_func=mutual_info_classif, k=10)
X_selected = selector.fit_transform(df_inter, y)
selected_names = df_inter.columns[selector.get_support()].tolist()

print(f'Original features: {df_x.shape[1]}, with interactions: {df_inter.shape[1]}')
print(f'Top-10 selected: {selected_names}')
# ── Exercise 3: DateFeatureExtractor transformer ───────────────────────────
# Key insight: raw datetimes are useless to ML models; decomposing them into
#              cyclic and ordinal parts captures seasonality and trends.

class DateFeatureExtractor(BaseEstimator, TransformerMixin):
    """Extracts year, month, day, hour, day_of_week, is_weekend, quarter,
       days_since_epoch from a datetime column."""

    def __init__(self, date_col: str, drop_original: bool = True):
        self.date_col      = date_col
        self.drop_original = drop_original

    def fit(self, X, y=None):
        return self  # stateless

    def transform(self, X: pd.DataFrame) -> pd.DataFrame:
        X = X.copy()
        dt = pd.to_datetime(X[self.date_col])
        epoch = pd.Timestamp('1970-01-01')
        X[f'{self.date_col}_year']           = dt.dt.year
        X[f'{self.date_col}_month']          = dt.dt.month
        X[f'{self.date_col}_day']            = dt.dt.day
        X[f'{self.date_col}_hour']           = dt.dt.hour
        X[f'{self.date_col}_dayofweek']      = dt.dt.dayofweek
        X[f'{self.date_col}_is_weekend']     = (dt.dt.dayofweek >= 5).astype(int)
        X[f'{self.date_col}_quarter']        = dt.dt.quarter
        X[f'{self.date_col}_days_since_epoch'] = (dt - epoch).dt.days
        if self.drop_original:
            X = X.drop(columns=[self.date_col])
        return X

rng = np.random.default_rng(7)
df_dates = pd.DataFrame({
    'event_date': pd.date_range('2022-01-01', periods=6, freq='45D'),
    'value':      rng.integers(1, 100, 6),
})
print(DateFeatureExtractor(date_col='event_date').fit_transform(df_dates))
# ── Exercise 4: Manual degree-2 polynomial features vs sklearn ────────────
# Key insight: polynomial features include the original, squares, and pairwise
#              cross terms. The manual implementation verifies understanding;
#              sklearn's version is optimised and handles edge cases.

from itertools import combinations_with_replacement

def manual_poly2(X: np.ndarray) -> np.ndarray:
    """Degree-2 polynomial features: 1, x_i, x_i*x_j (i<=j)."""
    n, p = X.shape
    cols = [np.ones((n, 1))]  # bias
    # Degree 1
    cols.append(X)
    # Degree 2
    for i, j in combinations_with_replacement(range(p), 2):
        cols.append((X[:, i] * X[:, j]).reshape(-1, 1))
    return np.hstack(cols)

X_small = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])

manual_result = manual_poly2(X_small)

sk_poly = PolynomialFeatures(degree=2, include_bias=True)
sklearn_result = sk_poly.fit_transform(X_small)

print('Manual  result:\n', manual_result)
print('sklearn result:\n', sklearn_result)
print('Match:', np.allclose(manual_result, sklearn_result))
# ── Exercise 5: Mutual-information FeatureSelector for sklearn pipelines ───
# Key insight: wrapping SelectKBest in a custom transformer lets you tune k
#              as a pipeline hyperparameter in GridSearchCV.

class MIFeatureSelector(BaseEstimator, TransformerMixin):
    """Select top-k features by mutual information. Works in sklearn pipelines."""

    def __init__(self, k: int = 10, task: str = 'classification'):
        self.k    = k
        self.task = task

    def fit(self, X, y):
        score_fn = mutual_info_classif if self.task == 'classification' else \
                   __import__('sklearn.feature_selection', fromlist=['mutual_info_regression']).mutual_info_regression
        self.selector_ = SelectKBest(score_func=score_fn, k=self.k)
        self.selector_.fit(X, y)
        return self

    def transform(self, X):
        return self.selector_.transform(X)

    def get_feature_names_out(self, input_features=None):
        return (np.array(input_features) if input_features is not None
                else np.arange(self.selector_.n_features_in_))[self.selector_.get_support()]

from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score

X, y = make_classification(n_samples=400, n_features=30, n_informative=8, random_state=0)

pipe = make_pipeline(
    MIFeatureSelector(k=8),
    LogisticRegression(max_iter=300)
)

scores = cross_val_score(pipe, X, y, cv=5)
print(f'CV accuracy with MI selection (k=8): {scores.mean():.4f} Β± {scores.std():.4f}')