Data Cleaning Pipelines: From Messy Data to Model-Ready FeaturesΒΆ
Real-world data is never clean. This notebook teaches a systematic approach: detect β decide β fix β and then bake it all into a reproducible sklearn pipeline that works on both train and test data.
import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.base import BaseEstimator, TransformerMixin
import warnings
warnings.filterwarnings('ignore')
# Create a deliberately messy dataset
np.random.seed(42)
n = 500
raw_data = {
'age': np.where(np.random.random(n) < 0.1, np.nan, np.random.randint(18, 80, n)).astype(float),
'income': np.where(np.random.random(n) < 0.05, np.nan, np.random.lognormal(10.5, 0.8, n)).round(2),
'education': np.random.choice(['high school', 'bachelor', 'master', 'phd', None, 'High School', 'BACHELOR'], n, p=[0.3, 0.3, 0.2, 0.1, 0.05, 0.03, 0.02]),
'city': np.random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston', None, 'new york', 'NY'], n),
'score': np.where(np.random.random(n) < 0.08, np.nan, np.random.uniform(0, 100, n)).round(1),
'target': np.random.randint(0, 2, n),
}
# Add duplicates
df_raw = pd.DataFrame(raw_data)
df_raw = pd.concat([df_raw, df_raw.sample(20)], ignore_index=True)
# Add outliers
df_raw.loc[0:4, 'income'] = [999999, -5000, 1500000, 0, -100]
print(f'Raw dataset: {df_raw.shape}')
df_raw.head()
1. Detect β Find All Problems FirstΒΆ
def diagnose_dataframe(df: pd.DataFrame) -> dict:
"""Comprehensive data quality diagnosis."""
issues = {}
# 1. Duplicates
n_dupes = df.duplicated().sum()
issues['duplicates'] = n_dupes
# 2. Missing values
missing = df.isnull().sum()
issues['missing'] = missing[missing > 0].to_dict()
# 3. Constant columns (zero variance)
numeric_cols = df.select_dtypes(include='number').columns
const_cols = [c for c in numeric_cols if df[c].nunique() == 1]
issues['constant_cols'] = const_cols
# 4. High cardinality text (may be IDs)
obj_cols = df.select_dtypes(include='object').columns
high_cardinality = {c: df[c].nunique() for c in obj_cols if df[c].nunique() > 50}
issues['high_cardinality'] = high_cardinality
# 5. Inconsistent text values (case issues)
text_issues = {}
for c in obj_cols:
unique_vals = df[c].dropna().unique()
lowercased = [v.lower().strip() for v in unique_vals]
if len(set(lowercased)) < len(unique_vals):
text_issues[c] = f'{len(unique_vals)} raw β {len(set(lowercased))} normalized'
issues['text_inconsistencies'] = text_issues
# 6. Outliers (IQR)
outlier_cols = {}
for c in numeric_cols:
Q1, Q3 = df[c].quantile([0.25, 0.75])
IQR = Q3 - Q1
n_outliers = ((df[c] < Q1 - 1.5*IQR) | (df[c] > Q3 + 1.5*IQR)).sum()
if n_outliers > 0:
outlier_cols[c] = n_outliers
issues['outliers'] = outlier_cols
return issues
issues = diagnose_dataframe(df_raw)
print('=== DATA QUALITY REPORT ===')
for k, v in issues.items():
print(f'\n{k.upper()}:')
print(f' {v}')
2. Decide & Fix β Column by ColumnΒΆ
def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
"""Apply all cleaning steps in order."""
df = df.copy()
# 1. Remove exact duplicates
before = len(df)
df = df.drop_duplicates()
print(f'Dropped {before - len(df)} duplicate rows')
# 2. Normalize text: lowercase + strip
for col in df.select_dtypes(include='object').columns:
df[col] = df[col].str.lower().str.strip()
# 3. Fix known aliases
edu_map = {'high school': 'high school', 'bachelor': 'bachelor', 'master': 'master', 'phd': 'phd'}
df['education'] = df['education'].map(edu_map) # unmapped β NaN
city_map = {'new york': 'new york', 'ny': 'new york', 'los angeles': 'los angeles',
'chicago': 'chicago', 'houston': 'houston'}
df['city'] = df['city'].map(city_map)
# 4. Cap income outliers at 99th percentile (Winsorization)
income_cap = df['income'].quantile(0.99)
income_floor = 0 # Negative income makes no sense
df['income'] = df['income'].clip(lower=income_floor, upper=income_cap)
print(f'Capped income at ${income_cap:,.0f}')
return df
df_clean = clean_dataframe(df_raw)
print(f'\nClean dataset: {df_clean.shape}')
3. Missing Value StrategiesΒΆ
# Different imputation strategies and when to use them
numeric_with_missing = ['age', 'income', 'score']
cat_with_missing = ['education', 'city']
# Compare imputation strategies
strategies = {
'mean': SimpleImputer(strategy='mean'),
'median': SimpleImputer(strategy='median'),
'knn5': KNNImputer(n_neighbors=5),
}
print('Imputation strategy comparison (age):')
age_data = df_clean[['age', 'income', 'score']]
for name, imputer in strategies.items():
imputed = pd.DataFrame(imputer.fit_transform(age_data), columns=age_data.columns)
print(f' {name:8s}: mean={imputed["age"].mean():.1f}, std={imputed["age"].std():.1f}')
# Add a missing indicator: flag that a value WAS missing (can be predictive!)
for col in ['age', 'income']:
df_clean[f'{col}_was_missing'] = df_clean[col].isnull().astype(int)
print('\nMissing indicator columns added: age_was_missing, income_was_missing')
4. Custom Transformers for sklearn PipelinesΒΆ
class OutlierWinsorizer(BaseEstimator, TransformerMixin):
"""Cap outliers at specified percentiles (fit on train, apply to test)."""
def __init__(self, lower_pct: float = 0.01, upper_pct: float = 0.99):
self.lower_pct = lower_pct
self.upper_pct = upper_pct
def fit(self, X, y=None):
X = pd.DataFrame(X)
self.lower_ = X.quantile(self.lower_pct)
self.upper_ = X.quantile(self.upper_pct)
return self
def transform(self, X):
X = pd.DataFrame(X).copy()
return X.clip(lower=self.lower_, upper=self.upper_, axis=1)
class MissingIndicator(BaseEstimator, TransformerMixin):
"""Adds binary columns indicating which values were imputed."""
def fit(self, X, y=None):
X = pd.DataFrame(X)
self.missing_cols_ = X.columns[X.isnull().any()].tolist()
return self
def transform(self, X):
X = pd.DataFrame(X)
indicators = X[self.missing_cols_].isnull().astype(int)
indicators.columns = [f'{c}_missing' for c in self.missing_cols_]
return pd.concat([X, indicators], axis=1).values
print('Custom transformers: OutlierWinsorizer, MissingIndicator')
5. Full sklearn ColumnTransformer PipelineΒΆ
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report
# Prep
X = df_clean.drop(columns=['target', 'age_was_missing', 'income_was_missing'])
y = df_clean['target']
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
# Define column groups
numeric_features = ['age', 'income', 'score']
categorical_features = ['education', 'city']
# Numeric pipeline: Winsorize β Impute median β Scale
numeric_transformer = Pipeline(steps=[
('winsorize', OutlierWinsorizer(0.01, 0.99)),
('imputer', SimpleImputer(strategy='median')),
('scaler', StandardScaler()),
])
# Categorical pipeline: Impute most_frequent β One-hot encode
categorical_transformer = Pipeline(steps=[
('imputer', SimpleImputer(strategy='most_frequent')),
('onehot', OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])
# Combine
preprocessor = ColumnTransformer(transformers=[
('num', numeric_transformer, numeric_features),
('cat', categorical_transformer, categorical_features),
], remainder='drop') # Drop any other columns
# Full pipeline: preprocess + model
full_pipeline = Pipeline(steps=[
('preprocessor', preprocessor),
('classifier', RandomForestClassifier(n_estimators=100, random_state=42)),
])
# Fit on train β ONLY fit on train, transform both
full_pipeline.fit(X_train, y_train)
y_pred = full_pipeline.predict(X_test)
print('Pipeline Performance:')
print(classification_report(y_test, y_pred))
# Save and reload the pipeline (production-ready)
import joblib
joblib.dump(full_pipeline, '/tmp/cleaning_pipeline.pkl')
loaded_pipeline = joblib.load('/tmp/cleaning_pipeline.pkl')
# Verify it works on new data
sample = X_test.iloc[:3]
print('Predictions on new data:')
print(loaded_pipeline.predict(sample))
print('Pipeline saved and reloaded successfully.')
Golden Rules of Data CleaningΒΆ
1. NEVER modify raw data β always work on a copy
2. ALWAYS fit transformers on train data only β apply to test
3. DOCUMENT every decision β why did you drop/impute/cap?
4. AUDIT after cleaning β run quick_overview() again to confirm
5. THINK before imputing β sometimes NaN IS the signal
6. WRAP it in a pipeline β code that works once should work always
ExercisesΒΆ
Add a
LogTransformercustom sklearn transformer that log-transforms skewed columns, and add it to the numeric pipeline.The
educationcolumn has inconsistent values β write a fuzzy-matching cleaner usingdifflib.get_close_matches.Extend
diagnose_dataframeto detect columns with >50% missing and recommend dropping them.Rebuild the pipeline to use
KNNImputerfor numeric columns β does it improve model accuracy?Use
pipeline.set_params(classifier__n_estimators=200)to tune the model without rewriting the pipeline.