Data Cleaning Pipelines: From Messy Data to Model-Ready FeaturesΒΆ

Real-world data is never clean. This notebook teaches a systematic approach: detect β†’ decide β†’ fix β€” and then bake it all into a reproducible sklearn pipeline that works on both train and test data.

import pandas as pd
import numpy as np
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import StandardScaler, OneHotEncoder, LabelEncoder
from sklearn.impute import SimpleImputer, KNNImputer
from sklearn.base import BaseEstimator, TransformerMixin
import warnings
warnings.filterwarnings('ignore')

# Create a deliberately messy dataset
np.random.seed(42)
n = 500

raw_data = {
    'age':       np.where(np.random.random(n) < 0.1, np.nan, np.random.randint(18, 80, n)).astype(float),
    'income':    np.where(np.random.random(n) < 0.05, np.nan, np.random.lognormal(10.5, 0.8, n)).round(2),
    'education': np.random.choice(['high school', 'bachelor', 'master', 'phd', None, 'High School', 'BACHELOR'], n, p=[0.3, 0.3, 0.2, 0.1, 0.05, 0.03, 0.02]),
    'city':      np.random.choice(['New York', 'Los Angeles', 'Chicago', 'Houston', None, 'new york', 'NY'], n),
    'score':     np.where(np.random.random(n) < 0.08, np.nan, np.random.uniform(0, 100, n)).round(1),
    'target':    np.random.randint(0, 2, n),
}
# Add duplicates
df_raw = pd.DataFrame(raw_data)
df_raw = pd.concat([df_raw, df_raw.sample(20)], ignore_index=True)
# Add outliers
df_raw.loc[0:4, 'income'] = [999999, -5000, 1500000, 0, -100]

print(f'Raw dataset: {df_raw.shape}')
df_raw.head()

1. Detect β€” Find All Problems FirstΒΆ

def diagnose_dataframe(df: pd.DataFrame) -> dict:
    """Comprehensive data quality diagnosis."""
    issues = {}
    
    # 1. Duplicates
    n_dupes = df.duplicated().sum()
    issues['duplicates'] = n_dupes
    
    # 2. Missing values
    missing = df.isnull().sum()
    issues['missing'] = missing[missing > 0].to_dict()
    
    # 3. Constant columns (zero variance)
    numeric_cols = df.select_dtypes(include='number').columns
    const_cols = [c for c in numeric_cols if df[c].nunique() == 1]
    issues['constant_cols'] = const_cols
    
    # 4. High cardinality text (may be IDs)
    obj_cols = df.select_dtypes(include='object').columns
    high_cardinality = {c: df[c].nunique() for c in obj_cols if df[c].nunique() > 50}
    issues['high_cardinality'] = high_cardinality
    
    # 5. Inconsistent text values (case issues)
    text_issues = {}
    for c in obj_cols:
        unique_vals = df[c].dropna().unique()
        lowercased = [v.lower().strip() for v in unique_vals]
        if len(set(lowercased)) < len(unique_vals):
            text_issues[c] = f'{len(unique_vals)} raw β†’ {len(set(lowercased))} normalized'
    issues['text_inconsistencies'] = text_issues
    
    # 6. Outliers (IQR)
    outlier_cols = {}
    for c in numeric_cols:
        Q1, Q3 = df[c].quantile([0.25, 0.75])
        IQR = Q3 - Q1
        n_outliers = ((df[c] < Q1 - 1.5*IQR) | (df[c] > Q3 + 1.5*IQR)).sum()
        if n_outliers > 0:
            outlier_cols[c] = n_outliers
    issues['outliers'] = outlier_cols
    
    return issues

issues = diagnose_dataframe(df_raw)
print('=== DATA QUALITY REPORT ===')
for k, v in issues.items():
    print(f'\n{k.upper()}:')
    print(f'  {v}')

2. Decide & Fix β€” Column by ColumnΒΆ

def clean_dataframe(df: pd.DataFrame) -> pd.DataFrame:
    """Apply all cleaning steps in order."""
    df = df.copy()
    
    # 1. Remove exact duplicates
    before = len(df)
    df = df.drop_duplicates()
    print(f'Dropped {before - len(df)} duplicate rows')
    
    # 2. Normalize text: lowercase + strip
    for col in df.select_dtypes(include='object').columns:
        df[col] = df[col].str.lower().str.strip()
    
    # 3. Fix known aliases
    edu_map = {'high school': 'high school', 'bachelor': 'bachelor', 'master': 'master', 'phd': 'phd'}
    df['education'] = df['education'].map(edu_map)  # unmapped β†’ NaN
    
    city_map = {'new york': 'new york', 'ny': 'new york', 'los angeles': 'los angeles',
                'chicago': 'chicago', 'houston': 'houston'}
    df['city'] = df['city'].map(city_map)
    
    # 4. Cap income outliers at 99th percentile (Winsorization)
    income_cap = df['income'].quantile(0.99)
    income_floor = 0  # Negative income makes no sense
    df['income'] = df['income'].clip(lower=income_floor, upper=income_cap)
    print(f'Capped income at ${income_cap:,.0f}')
    
    return df

df_clean = clean_dataframe(df_raw)
print(f'\nClean dataset: {df_clean.shape}')

3. Missing Value StrategiesΒΆ

# Different imputation strategies and when to use them
numeric_with_missing = ['age', 'income', 'score']
cat_with_missing = ['education', 'city']

# Compare imputation strategies
strategies = {
    'mean':   SimpleImputer(strategy='mean'),
    'median': SimpleImputer(strategy='median'),
    'knn5':   KNNImputer(n_neighbors=5),
}

print('Imputation strategy comparison (age):')
age_data = df_clean[['age', 'income', 'score']]
for name, imputer in strategies.items():
    imputed = pd.DataFrame(imputer.fit_transform(age_data), columns=age_data.columns)
    print(f'  {name:8s}: mean={imputed["age"].mean():.1f}, std={imputed["age"].std():.1f}')

# Add a missing indicator: flag that a value WAS missing (can be predictive!)
for col in ['age', 'income']:
    df_clean[f'{col}_was_missing'] = df_clean[col].isnull().astype(int)
    
print('\nMissing indicator columns added: age_was_missing, income_was_missing')

4. Custom Transformers for sklearn PipelinesΒΆ

class OutlierWinsorizer(BaseEstimator, TransformerMixin):
    """Cap outliers at specified percentiles (fit on train, apply to test)."""
    
    def __init__(self, lower_pct: float = 0.01, upper_pct: float = 0.99):
        self.lower_pct = lower_pct
        self.upper_pct = upper_pct
    
    def fit(self, X, y=None):
        X = pd.DataFrame(X)
        self.lower_ = X.quantile(self.lower_pct)
        self.upper_ = X.quantile(self.upper_pct)
        return self
    
    def transform(self, X):
        X = pd.DataFrame(X).copy()
        return X.clip(lower=self.lower_, upper=self.upper_, axis=1)


class MissingIndicator(BaseEstimator, TransformerMixin):
    """Adds binary columns indicating which values were imputed."""
    
    def fit(self, X, y=None):
        X = pd.DataFrame(X)
        self.missing_cols_ = X.columns[X.isnull().any()].tolist()
        return self
    
    def transform(self, X):
        X = pd.DataFrame(X)
        indicators = X[self.missing_cols_].isnull().astype(int)
        indicators.columns = [f'{c}_missing' for c in self.missing_cols_]
        return pd.concat([X, indicators], axis=1).values

print('Custom transformers: OutlierWinsorizer, MissingIndicator')

5. Full sklearn ColumnTransformer PipelineΒΆ

from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import classification_report

# Prep
X = df_clean.drop(columns=['target', 'age_was_missing', 'income_was_missing'])
y = df_clean['target']

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Define column groups
numeric_features = ['age', 'income', 'score']
categorical_features = ['education', 'city']

# Numeric pipeline: Winsorize β†’ Impute median β†’ Scale
numeric_transformer = Pipeline(steps=[
    ('winsorize', OutlierWinsorizer(0.01, 0.99)),
    ('imputer',   SimpleImputer(strategy='median')),
    ('scaler',    StandardScaler()),
])

# Categorical pipeline: Impute most_frequent β†’ One-hot encode
categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot',  OneHotEncoder(handle_unknown='ignore', sparse_output=False)),
])

# Combine
preprocessor = ColumnTransformer(transformers=[
    ('num', numeric_transformer,   numeric_features),
    ('cat', categorical_transformer, categorical_features),
], remainder='drop')  # Drop any other columns

# Full pipeline: preprocess + model
full_pipeline = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('classifier',   RandomForestClassifier(n_estimators=100, random_state=42)),
])

# Fit on train β€” ONLY fit on train, transform both
full_pipeline.fit(X_train, y_train)
y_pred = full_pipeline.predict(X_test)

print('Pipeline Performance:')
print(classification_report(y_test, y_pred))
# Save and reload the pipeline (production-ready)
import joblib

joblib.dump(full_pipeline, '/tmp/cleaning_pipeline.pkl')
loaded_pipeline = joblib.load('/tmp/cleaning_pipeline.pkl')

# Verify it works on new data
sample = X_test.iloc[:3]
print('Predictions on new data:')
print(loaded_pipeline.predict(sample))
print('Pipeline saved and reloaded successfully.')

Golden Rules of Data CleaningΒΆ

1. NEVER modify raw data β€” always work on a copy
2. ALWAYS fit transformers on train data only β€” apply to test
3. DOCUMENT every decision β€” why did you drop/impute/cap?
4. AUDIT after cleaning β€” run quick_overview() again to confirm
5. THINK before imputing β€” sometimes NaN IS the signal
6. WRAP it in a pipeline β€” code that works once should work always

ExercisesΒΆ

  1. Add a LogTransformer custom sklearn transformer that log-transforms skewed columns, and add it to the numeric pipeline.

  2. The education column has inconsistent values β€” write a fuzzy-matching cleaner using difflib.get_close_matches.

  3. Extend diagnose_dataframe to detect columns with >50% missing and recommend dropping them.

  4. Rebuild the pipeline to use KNNImputer for numeric columns β€” does it improve model accuracy?

  5. Use pipeline.set_params(classifier__n_estimators=200) to tune the model without rewriting the pipeline.