Solutions: Python & Data Science TrackΒΆ
Worked solutions to all exercises from the python-data-science/ notebooks.
01 β Pandas FundamentalsΒΆ
import pandas as pd
import numpy as np
# ββ Exercise 1: Chain 3+ method calls ββββββββββββββββββββββββββββββββββββββ
# Key insight: method chaining keeps transformations readable and avoids temp variables.
rng = np.random.default_rng(42)
n = 891
titanic = pd.DataFrame({
'survived': rng.integers(0, 2, n),
'pclass': rng.choice([1, 2, 3], n),
'age': rng.normal(30, 14, n).clip(1, 80),
'fare': rng.exponential(32, n),
'sex': rng.choice(['male', 'female'], n),
})
result = (
titanic
.query('age > 18') # filter adults
.groupby(['pclass', 'sex'])['fare'] # groupby
.transform('mean') # transform: replace with group mean
.reset_index(name='mean_fare_by_class_sex') # flatten
.sort_values('mean_fare_by_class_sex', ascending=False)
.head(10)
)
print(result)
# ββ Exercise 2: Reshape wideβlong with melt, then back with pivot_table ββββ
# Key insight: melt unpivots value columns into rows; pivot_table is the inverse.
wide = pd.DataFrame({
'student': ['Alice', 'Bob', 'Carol'],
'math': [90, 75, 85],
'english': [80, 95, 70],
'science': [88, 60, 92],
})
# Wide β Long
long = wide.melt(id_vars='student', var_name='subject', value_name='score')
print('Long format:')
print(long)
# Long β Wide (reconstructs original)
wide_again = long.pivot_table(index='student', columns='subject', values='score')
wide_again.columns.name = None
wide_again = wide_again.reset_index()
print('\nReconstructed wide format:')
print(wide_again)
# ββ Exercise 3: Optimize 1M-row DataFrame memory ββββββββββββββββββββββββββ
# Key insight: int64βint16/int8 and float64βfloat32 halve/quarter memory;
# object columns with few unique values become category.
rng = np.random.default_rng(0)
N = 1_000_000
df = pd.DataFrame({
'age': rng.integers(0, 120, N).astype('int64'),
'score': rng.random(N).astype('float64'),
'count': rng.integers(0, 50_000, N).astype('int64'),
'category': rng.choice(['A', 'B', 'C', 'D'], N),
'status': rng.choice(['active', 'inactive', 'pending'], N),
})
before_mb = df.memory_usage(deep=True).sum() / 1024**2
# Downcast numerics
df['age'] = pd.to_numeric(df['age'], downcast='integer')
df['count'] = pd.to_numeric(df['count'], downcast='integer')
df['score'] = df['score'].astype('float32')
# Convert low-cardinality strings to category
df['category'] = df['category'].astype('category')
df['status'] = df['status'].astype('category')
after_mb = df.memory_usage(deep=True).sum() / 1024**2
print(f'Before: {before_mb:.1f} MB | After: {after_mb:.1f} MB | Reduction: {1 - after_mb/before_mb:.0%}')
# ββ Exercise 4: Custom aggregation with groupby.apply βββββββββββββββββββββ
# Key insight: apply receives a sub-DataFrame and can return a Series with
# arbitrary keys β useful when you need multiple stats at once.
from scipy import stats as sp_stats
def rich_stats(group: pd.DataFrame) -> pd.Series:
vals = group['fare']
return pd.Series({
'mean': vals.mean(),
'median': vals.median(),
'std': vals.std(),
'skew': vals.skew(),
'q25': vals.quantile(0.25),
'q75': vals.quantile(0.75),
'count': len(vals),
})
agg_result = titanic.groupby('pclass').apply(rich_stats)
print(agg_result)
# ββ Exercise 5: Merge 3 DataFrames with different join types ββββββββββββββ
# Key insight: always verify row counts after each merge to catch fan-out or
# dropped rows from inner joins.
customers = pd.DataFrame({'cid': [1, 2, 3, 4], 'name': ['Alice', 'Bob', 'Carol', 'Dan']})
orders = pd.DataFrame({'oid': [10, 11, 12], 'cid': [1, 2, 1], 'amount': [100, 200, 150]})
payments = pd.DataFrame({'oid': [10, 11, 99], 'paid': [True, False, True]})
# Left join: keep all customers even without orders
cust_orders = customers.merge(orders, on='cid', how='left')
print(f'customers({len(customers)}) LEFT orders({len(orders)}) β {len(cust_orders)} rows')
# Inner join: only orders that have a matching payment
order_payments = orders.merge(payments, on='oid', how='inner')
print(f'orders({len(orders)}) INNER payments({len(payments)}) β {len(order_payments)} rows')
# Outer join on everything
full = cust_orders.merge(payments, on='oid', how='outer')
print(f'Full outer result: {len(full)} rows')
print(full)
02 β Exploratory Data AnalysisΒΆ
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.spatial.distance import mahalanobis
from scipy import stats
rng = np.random.default_rng(7)
X = rng.multivariate_normal([0, 0], [[1, 0.8], [0.8, 1]], 200)
# Inject 10 outliers
outliers = rng.uniform(3, 5, (10, 2))
X = np.vstack([X, outliers])
df_eda = pd.DataFrame(X, columns=['x', 'y'])
# ββ Exercise 1: Mahalanobis-based multivariate outlier detection βββββββββββ
# Key insight: Mahalanobis accounts for variable correlations β a point far
# from the centroid in the covariance-adjusted sense is an outlier.
cov = np.cov(df_eda[['x', 'y']].T)
inv_cov = np.linalg.inv(cov)
center = df_eda[['x', 'y']].mean().values
df_eda['mahal'] = df_eda[['x', 'y']].apply(
lambda row: mahalanobis(row.values, center, inv_cov), axis=1
)
# Chi2 threshold at 97.5th percentile with 2 dof
threshold = np.sqrt(stats.chi2.ppf(0.975, df=2))
df_eda['outlier'] = df_eda['mahal'] > threshold
fig, ax = plt.subplots(figsize=(6, 5))
colors = df_eda['outlier'].map({False: 'steelblue', True: 'crimson'})
ax.scatter(df_eda['x'], df_eda['y'], c=colors, alpha=0.7)
ax.set_title(f'Mahalanobis Outliers (threshold={threshold:.2f})')
ax.set_xlabel('x'); ax.set_ylabel('y')
plt.tight_layout(); plt.show()
print(f'Detected {df_eda["outlier"].sum()} outliers')
# ββ Exercise 2: Pairplot with regression lines, color by target βββββββββββ
# Key insight: sns.pairplot with kind='reg' adds regression lines per pair;
# hue separates classes so you can spot separability visually.
from sklearn.datasets import load_iris
iris = load_iris(as_frame=True)
df_iris = iris.frame.copy()
df_iris['species'] = pd.Categorical.from_codes(df_iris['target'], iris.target_names)
g = sns.pairplot(df_iris.drop(columns='target'), hue='species', kind='reg',
plot_kws={'scatter_kws': {'alpha': 0.4}, 'line_kws': {'lw': 1}})
g.fig.suptitle('Iris Pairplot β regression lines, colored by species', y=1.02)
plt.show()
# Correlation matrix
corr = df_iris.drop(columns=['target', 'species']).corr()
print('Correlation matrix:')
print(corr.round(2))
# ββ Exercise 3: Custom profiling function βββββββββββββββββββββββββββββββββ
# Key insight: a good profiler surfaces everything a data scientist needs
# at a glance before modelling β dtypes, nulls, cardinality, distribution.
def profile_dataframe(df: pd.DataFrame) -> pd.DataFrame:
rows = []
for col in df.columns:
s = df[col]
info = {
'column': col,
'dtype': str(s.dtype),
'null_pct': f"{s.isna().mean()*100:.1f}%",
'unique': s.nunique(),
'top5': str(s.value_counts().head(5).index.tolist()),
}
if pd.api.types.is_numeric_dtype(s):
info.update({
'mean': round(s.mean(), 4),
'std': round(s.std(), 4),
'skew': round(s.skew(), 4),
'kurt': round(s.kurt(), 4),
})
rows.append(info)
return pd.DataFrame(rows).set_index('column')
rng = np.random.default_rng(1)
demo = pd.DataFrame({
'age': rng.integers(18, 80, 500).astype(float),
'income': np.where(rng.random(500) < 0.05, np.nan, rng.exponential(50_000, 500)),
'city': rng.choice(['NYC', 'LA', 'Chicago', 'Houston', 'Phoenix'], 500),
})
print(profile_dataframe(demo).to_string())
# ββ Exercise 4: pd.cut vs pd.qcut βββββββββββββββββββββββββββββββββββββββββ
# Key insight: pd.cut uses equal-width bins (good for uniform distributions);
# pd.qcut uses equal-frequency bins (good for skewed distributions).
rng = np.random.default_rng(42)
income = pd.Series(rng.exponential(50_000, 1000), name='income')
cut_bins = pd.cut(income, bins=5, labels=[f'cut_{i}' for i in range(1,6)])
qcut_bins = pd.qcut(income, q=5, labels=[f'qcut_{i}' for i in range(1,6)])
fig, axes = plt.subplots(1, 2, figsize=(12, 4))
cut_bins.value_counts().sort_index().plot(kind='bar', ax=axes[0], color='steelblue')
axes[0].set_title('pd.cut β equal-width bins (skewed counts)'); axes[0].set_ylabel('count')
qcut_bins.value_counts().sort_index().plot(kind='bar', ax=axes[1], color='darkorange')
axes[1].set_title('pd.qcut β equal-frequency bins (uniform counts)'); axes[1].set_ylabel('count')
plt.tight_layout(); plt.show()
# ββ Exercise 5: Feature drift detection using Kolmogorov-Smirnov test βββββ
# Key insight: KS test compares two empirical distributions without assuming normality;
# a small p-value signals that the feature has drifted between time periods.
from scipy.stats import ks_2samp
rng = np.random.default_rng(0)
n = 500
# Period A (stable)
period_a = pd.DataFrame({
'age': rng.normal(35, 10, n),
'income': rng.normal(60_000, 15_000, n),
'score': rng.uniform(0, 1, n),
})
# Period B (age drifts, income stays, score drifts)
period_b = pd.DataFrame({
'age': rng.normal(40, 12, n), # shifted mean
'income': rng.normal(61_000, 15_200, n), # negligible drift
'score': rng.beta(2, 5, n), # distribution shape change
})
results = []
for col in period_a.columns:
stat, p = ks_2samp(period_a[col], period_b[col])
results.append({'feature': col, 'ks_stat': round(stat, 4), 'p_value': round(p, 4),
'drifted': p < 0.05})
drift_df = pd.DataFrame(results)
print(drift_df.to_string(index=False))
03 β Data VisualizationΒΆ
import matplotlib.pyplot as plt
import seaborn as sns
import numpy as np
import pandas as pd
rng = np.random.default_rng(42)
# ββ Exercise 1: Multi-panel 2Γ3 dashboard βββββββββββββββββββββββββββββββββ
# Key insight: plt.subplots creates an axis grid; tight_layout prevents overlapping labels.
months = ['Jan','Feb','Mar','Apr','May','Jun']
sales = rng.integers(100, 400, 6)
temps = rng.normal(15, 8, 100)
x_sc = rng.normal(0, 1, 80)
y_sc = x_sc * 0.7 + rng.normal(0, 0.5, 80)
corr_m = np.corrcoef(rng.normal(0,1,(4,50)))
groups = rng.choice(['A','B','C'], 150)
values = rng.exponential(1, 150) + (groups == 'B') * 0.5
fig, axes = plt.subplots(2, 3, figsize=(14, 8))
# Bar
axes[0,0].bar(months, sales, color='steelblue')
axes[0,0].set_title('Monthly Sales (Bar)')
# Line
axes[0,1].plot(months, sales, marker='o', color='darkorange')
axes[0,1].set_title('Monthly Sales (Line)')
# Scatter
axes[0,2].scatter(x_sc, y_sc, alpha=0.5, color='green')
axes[0,2].set_title('Scatter: x vs y')
# Heatmap
im = axes[1,0].imshow(corr_m, cmap='coolwarm', vmin=-1, vmax=1)
fig.colorbar(im, ax=axes[1,0], fraction=0.046)
axes[1,0].set_title('Correlation Heatmap')
# Box
data_box = [values[groups == g] for g in ['A','B','C']]
axes[1,1].boxplot(data_box, labels=['A','B','C'], patch_artist=True)
axes[1,1].set_title('Box: Values by Group')
# Histogram
axes[1,2].hist(temps, bins=20, color='purple', edgecolor='white')
axes[1,2].set_title('Temperature Distribution')
plt.suptitle('2Γ3 Dashboard', fontsize=14, y=1.01)
plt.tight_layout()
plt.show()
# ββ Exercise 2: Animated cumulative sales (matplotlib FuncAnimation) βββββββ
# Key insight: FuncAnimation calls update() for each frame; blit=True only
# redraws changed artists β critical for smooth animation.
from matplotlib.animation import FuncAnimation
from IPython.display import HTML
rng = np.random.default_rng(5)
months = [f'M{i:02d}' for i in range(1, 25)]
monthly_sales = rng.integers(50, 200, 24)
cumulative = np.cumsum(monthly_sales)
fig, ax = plt.subplots(figsize=(8, 4))
ax.set_xlim(0, len(months) - 1)
ax.set_ylim(0, cumulative.max() * 1.1)
ax.set_xticks(range(len(months))); ax.set_xticklabels(months, rotation=45, ha='right')
ax.set_title('Cumulative Sales (Animated)')
ax.set_ylabel('Cumulative Sales')
line, = ax.plot([], [], 'o-', color='steelblue', lw=2)
def update(frame):
line.set_data(range(frame + 1), cumulative[:frame + 1])
return (line,)
ani = FuncAnimation(fig, update, frames=len(months), interval=150, blit=True)
plt.tight_layout()
# Display as HTML in Jupyter; save with ani.save('sales.gif') elsewhere
display(HTML(ani.to_jshtml()))
plt.close()
# ββ Exercise 3: Recreate FacetGrid from scratch with matplotlib βββββββββββ
# Key insight: a FacetGrid is just a loop over unique values creating subplots;
# sharing axes via sharex/sharey gives aligned scales automatically.
from sklearn.datasets import load_iris
iris = load_iris(as_frame=True)
df = iris.frame.copy()
df['species'] = pd.Categorical.from_codes(df['target'], iris.target_names)
species_list = df['species'].cat.categories.tolist()
ncols = len(species_list)
fig, axes = plt.subplots(1, ncols, figsize=(5 * ncols, 4), sharey=True)
for ax, sp in zip(axes, species_list):
subset = df[df['species'] == sp]
ax.scatter(subset['sepal length (cm)'], subset['petal length (cm)'],
alpha=0.6, edgecolors='k', linewidths=0.3)
ax.set_title(sp, fontsize=12)
ax.set_xlabel('Sepal Length (cm)')
axes[0].set_ylabel('Petal Length (cm)')
plt.suptitle('Sepal Length vs Petal Length by Species (manual FacetGrid)', y=1.02)
plt.tight_layout()
plt.show()
# ββ Exercise 4: Deterministic categorical color palette function βββββββββββ
# Key insight: hash(category) gives a deterministic integer; modding by 360
# maps it to an HSL hue so the same string always β same color.
import colorsys
def make_palette(categories, saturation=0.65, lightness=0.50):
"""Map each category to a hex color deterministically via hash."""
palette = {}
for cat in categories:
hue = (hash(str(cat)) % 360) / 360.0
r, g, b = colorsys.hls_to_rgb(hue, lightness, saturation)
palette[cat] = '#{:02x}{:02x}{:02x}'.format(int(r*255), int(g*255), int(b*255))
return palette
cats = ['Alpha', 'Beta', 'Gamma', 'Delta', 'Epsilon']
pal = make_palette(cats)
fig, ax = plt.subplots(figsize=(7, 2))
for i, (cat, color) in enumerate(pal.items()):
ax.add_patch(plt.Rectangle((i, 0), 0.9, 1, color=color))
ax.text(i + 0.45, -0.2, f'{cat}\n{color}', ha='center', va='top', fontsize=8)
ax.set_xlim(0, len(cats))
ax.set_ylim(-0.5, 1.1)
ax.axis('off')
ax.set_title('Deterministic color palette')
plt.tight_layout()
plt.show()
print(pal)
# ββ Exercise 5: Plotly Express scatter with marginal distributions βββββββββ
# Key insight: px.scatter's marginal_x/marginal_y parameters add side histograms
# or box plots with zero extra code; hover_data enriches tooltips.
import plotly.express as px
rng = np.random.default_rng(3)
n = 300
df_px = pd.DataFrame({
'x': rng.normal(0, 1, n),
'y': rng.normal(0, 1, n) * 0.5 + rng.normal(0, 1, n) * 0.5,
'category': rng.choice(['Group A', 'Group B', 'Group C'], n),
'size_val': rng.uniform(5, 20, n),
'id': range(n),
})
fig = px.scatter(
df_px, x='x', y='y',
color='category',
size='size_val',
hover_data=['id', 'size_val'],
marginal_x='histogram',
marginal_y='box',
title='Scatter with Marginal Distributions (Plotly Express)',
opacity=0.7,
)
fig.show()
04 β Data Cleaning PipelinesΒΆ
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.pipeline import Pipeline, make_pipeline
from sklearn.preprocessing import StandardScaler, OrdinalEncoder, OneHotEncoder
from sklearn.impute import SimpleImputer
from sklearn.compose import ColumnTransformer
from sklearn.decomposition import PCA
from sklearn.linear_model import LogisticRegression
from sklearn.datasets import make_classification
# ββ Exercise 1: IQR Outlier Flagger transformer ββββββββββββββββββββββββββββ
# Key insight: a custom transformer that fits the IQR on training data and flags
# outliers at predict time avoids data leakage.
class IQROutlierFlagger(BaseEstimator, TransformerMixin):
"""Adds a binary `is_outlier` column based on IQR of a target numeric column."""
def __init__(self, column: str, multiplier: float = 1.5):
self.column = column
self.multiplier = multiplier
def fit(self, X: pd.DataFrame, y=None):
q1 = X[self.column].quantile(0.25)
q3 = X[self.column].quantile(0.75)
iqr = q3 - q1
self.lower_ = q1 - self.multiplier * iqr
self.upper_ = q3 + self.multiplier * iqr
return self
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
X = X.copy()
X['is_outlier'] = ((X[self.column] < self.lower_) |
(X[self.column] > self.upper_)).astype(int)
return X
rng = np.random.default_rng(0)
df_clean = pd.DataFrame({'income': rng.normal(50_000, 10_000, 200)})
# Inject outliers
df_clean.loc[0, 'income'] = 200_000
df_clean.loc[1, 'income'] = -5_000
flagger = IQROutlierFlagger(column='income')
result = flagger.fit_transform(df_clean)
print(f'Outliers flagged: {result["is_outlier"].sum()}')
print(result[result['is_outlier'] == 1].head())
# ββ Exercise 2: ColumnTransformer handling multiple feature types ββββββββββ
# Key insight: ColumnTransformer applies different transformers to different
# column subsets and concatenates results β the standard pattern
# for heterogeneous tabular data.
rng = np.random.default_rng(1)
n = 300
df_het = pd.DataFrame({
'age': rng.integers(18, 70, n).astype(float),
'income': np.where(rng.random(n) < 0.1, np.nan, rng.normal(50_000, 10_000, n)),
'education': rng.choice(['high_school', 'bachelors', 'masters', 'phd'], n),
'city': rng.choice(['NYC', 'LA', 'Chicago'], n),
'signup_dt': pd.date_range('2020-01-01', periods=n, freq='D').to_series().sample(n, replace=True, random_state=0).values,
})
# Extract datetime features before ColumnTransformer
df_het['year'] = pd.to_datetime(df_het['signup_dt']).dt.year
df_het['month'] = pd.to_datetime(df_het['signup_dt']).dt.month
df_het['dayofweek'] = pd.to_datetime(df_het['signup_dt']).dt.dayofweek
df_het = df_het.drop(columns='signup_dt')
numeric_features = ['age', 'income', 'year', 'month', 'dayofweek']
ordinal_features = ['education']
nominal_features = ['city']
ordinal_categories = [['high_school', 'bachelors', 'masters', 'phd']]
preprocessor = ColumnTransformer([
('num', Pipeline([('imp', SimpleImputer()), ('scl', StandardScaler())]), numeric_features),
('ord', OrdinalEncoder(categories=ordinal_categories), ordinal_features),
('nom', OneHotEncoder(handle_unknown='ignore', sparse_output=False), nominal_features),
])
X_transformed = preprocessor.fit_transform(df_het)
print(f'Input shape: {df_het.shape} β Output shape: {X_transformed.shape}')
# ββ Exercise 3: DataCleaner class with configurable pipeline steps βββββββββ
# Key insight: encapsulating each cleaning step as a method with config params
# makes the cleaner reusable, testable, and easy to extend.
class DataCleaner:
def __init__(self,
missing_strategy='median',
outlier_method='iqr',
iqr_multiplier=1.5,
cap_instead_of_drop=True):
self.missing_strategy = missing_strategy
self.outlier_method = outlier_method
self.iqr_multiplier = iqr_multiplier
self.cap_instead_of_drop = cap_instead_of_drop
def handle_missing(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for col in df.select_dtypes(include='number').columns:
if self.missing_strategy == 'median':
df[col] = df[col].fillna(df[col].median())
elif self.missing_strategy == 'mean':
df[col] = df[col].fillna(df[col].mean())
for col in df.select_dtypes(include='object').columns:
df[col] = df[col].fillna(df[col].mode()[0])
return df
def fix_dtypes(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for col in df.select_dtypes(include='object').columns:
if df[col].nunique() / len(df) < 0.05:
df[col] = df[col].astype('category')
return df
def remove_duplicates(self, df: pd.DataFrame) -> pd.DataFrame:
before = len(df)
df = df.drop_duplicates()
print(f' remove_duplicates: {before - len(df)} rows removed')
return df
def cap_outliers(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.copy()
for col in df.select_dtypes(include='number').columns:
q1, q3 = df[col].quantile([0.25, 0.75])
iqr = q3 - q1
lo, hi = q1 - self.iqr_multiplier * iqr, q3 + self.iqr_multiplier * iqr
if self.cap_instead_of_drop:
df[col] = df[col].clip(lo, hi)
else:
df = df[(df[col] >= lo) & (df[col] <= hi)]
return df
def clean(self, df: pd.DataFrame) -> pd.DataFrame:
df = self.handle_missing(df)
df = self.fix_dtypes(df)
df = self.remove_duplicates(df)
df = self.cap_outliers(df)
return df
rng = np.random.default_rng(42)
messy = pd.DataFrame({
'age': np.where(rng.random(200) < 0.1, np.nan, rng.integers(18, 80, 200).astype(float)),
'income': rng.normal(50_000, 15_000, 200),
'city': rng.choice(['NYC', 'LA', 'Chicago'], 200),
})
messy.loc[0, 'income'] = 500_000 # outlier
messy = pd.concat([messy, messy.iloc[:5]]) # add duplicates
cleaner = DataCleaner()
clean = cleaner.clean(messy)
print(f'Shape: {messy.shape} β {clean.shape}')
print(clean.dtypes)
# ββ Exercise 4: Full sklearn Pipeline β imputeβscaleβPCAβLogReg ββββββββββββ
# Key insight: Pipeline ensures all transformations are fitted on training data
# only, preventing leakage; make_pipeline infers step names.
from sklearn.model_selection import cross_val_score
X_raw, y = make_classification(n_samples=500, n_features=20, n_informative=10,
random_state=42)
# Add some missing values
mask = np.random.default_rng(0).random(X_raw.shape) < 0.05
X_raw[mask] = np.nan
pipe = make_pipeline(
SimpleImputer(strategy='mean'),
StandardScaler(),
PCA(n_components=8),
LogisticRegression(max_iter=500, random_state=42)
)
cv_scores = cross_val_score(pipe, X_raw, y, cv=5, scoring='accuracy')
print(f'CV accuracy: {cv_scores.mean():.4f} Β± {cv_scores.std():.4f}')
print(f'Pipeline steps: {[name for name, _ in pipe.steps]}')
# ββ Exercise 5: Schema validation function ββββββββββββββββββββββββββββββββ
# Key insight: validating data against a schema at pipeline ingestion catches
# upstream errors early before they silently corrupt model outputs.
from typing import Any
def validate_schema(df: pd.DataFrame, schema: dict) -> list[str]:
"""
schema = {
'col_name': {
'dtype': 'numeric' | 'categorical' | 'datetime',
'min': ..., # optional, numeric only
'max': ..., # optional, numeric only
'allowed': [...], # optional, categorical only
'nullable': bool, # default False
}
}
Returns a list of violation messages (empty = pass).
"""
violations = []
for col, rules in schema.items():
if col not in df.columns:
violations.append(f'MISSING column: {col}'); continue
s = df[col]
nullable = rules.get('nullable', False)
if not nullable and s.isna().any():
violations.append(f'{col}: unexpected nulls ({s.isna().sum()})')
dtype_kind = rules.get('dtype')
if dtype_kind == 'numeric' and not pd.api.types.is_numeric_dtype(s):
violations.append(f'{col}: expected numeric, got {s.dtype}')
if dtype_kind == 'datetime' and not pd.api.types.is_datetime64_any_dtype(s):
violations.append(f'{col}: expected datetime, got {s.dtype}')
if 'min' in rules and s.min() < rules['min']:
violations.append(f'{col}: min={s.min()} below allowed {rules["min"]}')
if 'max' in rules and s.max() > rules['max']:
violations.append(f'{col}: max={s.max()} above allowed {rules["max"]}')
if 'allowed' in rules:
bad = set(s.dropna().unique()) - set(rules['allowed'])
if bad:
violations.append(f'{col}: disallowed values {bad}')
return violations
schema = {
'age': {'dtype': 'numeric', 'min': 0, 'max': 120, 'nullable': False},
'status': {'dtype': 'categorical', 'allowed': ['active', 'inactive'], 'nullable': False},
'score': {'dtype': 'numeric', 'min': 0.0, 'max': 1.0, 'nullable': True},
}
good_df = pd.DataFrame({'age': [25, 40], 'status': ['active', 'inactive'], 'score': [0.9, np.nan]})
bad_df = pd.DataFrame({'age': [-1, 200], 'status': ['active', 'UNKNOWN'], 'score': [1.5, 0.2]})
print('Good DataFrame violations:', validate_schema(good_df, schema))
print('Bad DataFrame violations:')
for v in validate_schema(bad_df, schema):
print(' ', v)
05 β Feature EngineeringΒΆ
import pandas as pd
import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.model_selection import KFold
from sklearn.feature_selection import SelectKBest, mutual_info_classif
from sklearn.preprocessing import PolynomialFeatures
from sklearn.datasets import make_classification
# ββ Exercise 1: Target encoding with cross-validation (leak-free) ββββββββββ
# Key insight: encoding the target on the FULL training set leaks information;
# using OOF predictions to compute encodings prevents this.
rng = np.random.default_rng(0)
n = 1000
df_te = pd.DataFrame({
'city': rng.choice(['NYC', 'LA', 'Chicago', 'Houston'], n),
'target': rng.integers(0, 2, n),
})
def target_encode_cv(df: pd.DataFrame, col: str, target: str, n_splits=5) -> pd.Series:
kf = KFold(n_splits=n_splits, shuffle=True, random_state=42)
encoded = pd.Series(np.nan, index=df.index)
global_mean = df[target].mean()
for train_idx, val_idx in kf.split(df):
means = df.iloc[train_idx].groupby(col)[target].mean()
encoded.iloc[val_idx] = df.iloc[val_idx][col].map(means).fillna(global_mean)
return encoded
df_te['city_encoded'] = target_encode_cv(df_te, 'city', 'target')
print(df_te.groupby('city')[['target', 'city_encoded']].mean())
# ββ Exercise 2: Interaction features via pairwise products β SelectKBest βββ
# Key insight: pairwise products capture multiplicative relationships that linear
# models cannot express; SelectKBest prunes uninformative combos.
X, y = make_classification(n_samples=500, n_features=10, n_informative=5, random_state=1)
feature_names = [f'f{i}' for i in range(X.shape[1])]
df_x = pd.DataFrame(X, columns=feature_names)
top5 = df_x.var().nlargest(5).index.tolist()
interactions = {}
for i, a in enumerate(top5):
for b in top5[i+1:]:
interactions[f'{a}x{b}'] = df_x[a] * df_x[b]
df_inter = pd.concat([df_x] + [pd.Series(v, name=k) for k, v in interactions.items()], axis=1)
selector = SelectKBest(score_func=mutual_info_classif, k=10)
X_selected = selector.fit_transform(df_inter, y)
selected_names = df_inter.columns[selector.get_support()].tolist()
print(f'Original features: {df_x.shape[1]}, with interactions: {df_inter.shape[1]}')
print(f'Top-10 selected: {selected_names}')
# ββ Exercise 3: DateFeatureExtractor transformer βββββββββββββββββββββββββββ
# Key insight: raw datetimes are useless to ML models; decomposing them into
# cyclic and ordinal parts captures seasonality and trends.
class DateFeatureExtractor(BaseEstimator, TransformerMixin):
"""Extracts year, month, day, hour, day_of_week, is_weekend, quarter,
days_since_epoch from a datetime column."""
def __init__(self, date_col: str, drop_original: bool = True):
self.date_col = date_col
self.drop_original = drop_original
def fit(self, X, y=None):
return self # stateless
def transform(self, X: pd.DataFrame) -> pd.DataFrame:
X = X.copy()
dt = pd.to_datetime(X[self.date_col])
epoch = pd.Timestamp('1970-01-01')
X[f'{self.date_col}_year'] = dt.dt.year
X[f'{self.date_col}_month'] = dt.dt.month
X[f'{self.date_col}_day'] = dt.dt.day
X[f'{self.date_col}_hour'] = dt.dt.hour
X[f'{self.date_col}_dayofweek'] = dt.dt.dayofweek
X[f'{self.date_col}_is_weekend'] = (dt.dt.dayofweek >= 5).astype(int)
X[f'{self.date_col}_quarter'] = dt.dt.quarter
X[f'{self.date_col}_days_since_epoch'] = (dt - epoch).dt.days
if self.drop_original:
X = X.drop(columns=[self.date_col])
return X
rng = np.random.default_rng(7)
df_dates = pd.DataFrame({
'event_date': pd.date_range('2022-01-01', periods=6, freq='45D'),
'value': rng.integers(1, 100, 6),
})
print(DateFeatureExtractor(date_col='event_date').fit_transform(df_dates))
# ββ Exercise 4: Manual degree-2 polynomial features vs sklearn ββββββββββββ
# Key insight: polynomial features include the original, squares, and pairwise
# cross terms. The manual implementation verifies understanding;
# sklearn's version is optimised and handles edge cases.
from itertools import combinations_with_replacement
def manual_poly2(X: np.ndarray) -> np.ndarray:
"""Degree-2 polynomial features: 1, x_i, x_i*x_j (i<=j)."""
n, p = X.shape
cols = [np.ones((n, 1))] # bias
# Degree 1
cols.append(X)
# Degree 2
for i, j in combinations_with_replacement(range(p), 2):
cols.append((X[:, i] * X[:, j]).reshape(-1, 1))
return np.hstack(cols)
X_small = np.array([[1.0, 2.0], [3.0, 4.0], [5.0, 6.0]])
manual_result = manual_poly2(X_small)
sk_poly = PolynomialFeatures(degree=2, include_bias=True)
sklearn_result = sk_poly.fit_transform(X_small)
print('Manual result:\n', manual_result)
print('sklearn result:\n', sklearn_result)
print('Match:', np.allclose(manual_result, sklearn_result))
# ββ Exercise 5: Mutual-information FeatureSelector for sklearn pipelines βββ
# Key insight: wrapping SelectKBest in a custom transformer lets you tune k
# as a pipeline hyperparameter in GridSearchCV.
class MIFeatureSelector(BaseEstimator, TransformerMixin):
"""Select top-k features by mutual information. Works in sklearn pipelines."""
def __init__(self, k: int = 10, task: str = 'classification'):
self.k = k
self.task = task
def fit(self, X, y):
score_fn = mutual_info_classif if self.task == 'classification' else \
__import__('sklearn.feature_selection', fromlist=['mutual_info_regression']).mutual_info_regression
self.selector_ = SelectKBest(score_func=score_fn, k=self.k)
self.selector_.fit(X, y)
return self
def transform(self, X):
return self.selector_.transform(X)
def get_feature_names_out(self, input_features=None):
return (np.array(input_features) if input_features is not None
else np.arange(self.selector_.n_features_in_))[self.selector_.get_support()]
from sklearn.pipeline import make_pipeline
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import cross_val_score
X, y = make_classification(n_samples=400, n_features=30, n_informative=8, random_state=0)
pipe = make_pipeline(
MIFeatureSelector(k=8),
LogisticRegression(max_iter=300)
)
scores = cross_val_score(pipe, X, y, cv=5)
print(f'CV accuracy with MI selection (k=8): {scores.mean():.4f} Β± {scores.std():.4f}')