sklearn Pipelines: The Right Way to Build ML WorkflowsΒΆ

Pipelines prevent data leakage, make code reproducible, and enable one-line cross-validation. Learn to combine preprocessing and modeling into a single object.

1. Why Pipelines? The Data Leakage ProblemΒΆ

Data leakage happens when information from the test set β€œleaks” into training β€” leading to overoptimistic evaluation scores that don’t generalize.

The most common form: fitting a scaler on the entire dataset before splitting.

import numpy as np
import pandas as pd
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score

# Synthetic dataset
X, y = make_classification(n_samples=1000, n_features=20, random_state=42)

# ❌ WRONG: Scaler fitted on ALL data before split
scaler = StandardScaler()
X_scaled_wrong = scaler.fit_transform(X)  # uses test data statistics!
X_train_wrong, X_test_wrong, y_train, y_test = train_test_split(
    X_scaled_wrong, y, test_size=0.2, random_state=42
)
model = LogisticRegression(random_state=42)
model.fit(X_train_wrong, y_train)
wrong_score = accuracy_score(y_test, model.predict(X_test_wrong))
print(f"Wrong approach (leaky) accuracy:   {wrong_score:.4f}")

# βœ… CORRECT: Scaler fitted only on train data
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42
)
scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train)  # fit only on train
X_test_scaled  = scaler.transform(X_test)        # transform only
model = LogisticRegression(random_state=42)
model.fit(X_train_scaled, y_train)
correct_score = accuracy_score(y_test, model.predict(X_test_scaled))
print(f"Correct approach (no leakage) accuracy: {correct_score:.4f}")

print(f"\nDifference: {wrong_score - correct_score:.4f} (leaky inflates score)")

The difference looks small here, but with small datasets or aggressive preprocessing (e.g., feature selection), leakage can inflate accuracy by 5–20%.

Pipelines eliminate this problem entirely by ensuring fit only ever sees training data.

2. Building Your First PipelineΒΆ

We’ll use the Titanic dataset from seaborn β€” a classic ML dataset with mixed types and missing values.

import seaborn as sns
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score, classification_report

# Load Titanic from seaborn
titanic = sns.load_dataset('titanic')
print(titanic[['age', 'fare', 'pclass', 'survived']].head())
print(f"\nMissing values:\n{titanic[['age', 'fare']].isnull().sum()}")
# Use only numeric features for this simple example
features = ['age', 'fare', 'pclass']
target = 'survived'

df = titanic[features + [target]].dropna(subset=[target])
X = df[features]
y = df[target].astype(int)

X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=42, stratify=y
)

# Build the pipeline: steps are (name, estimator) tuples
pipe = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),   # step 1: fill NaNs
    ('scaler',  StandardScaler()),                    # step 2: normalize
    ('clf',     LogisticRegression(random_state=42)) # step 3: model
])

# One call handles ALL steps, with zero leakage
pipe.fit(X_train, y_train)

y_pred = pipe.predict(X_test)
print(f"Accuracy: {accuracy_score(y_test, y_pred):.4f}")
print("\n", classification_report(y_test, y_pred))

Key insight: pipe.fit(X_train, y_train) calls fit_transform on each transformer and fit on the final estimator. During pipe.predict(), only transform is called β€” no refitting.

3. ColumnTransformer: Handling Mixed Data TypesΒΆ

Real datasets have numeric AND categorical columns. ColumnTransformer lets you apply different preprocessing to each column type.

from sklearn.compose import ColumnTransformer
from sklearn.preprocessing import OneHotEncoder
from sklearn.ensemble import RandomForestClassifier

# Use more features including categorical
features_num = ['age', 'fare', 'pclass']
features_cat = ['sex', 'embarked']
all_features = features_num + features_cat

df2 = titanic[all_features + ['survived']].copy()
df2 = df2.dropna(subset=['survived'])
X2 = df2[all_features]
y2 = df2['survived'].astype(int)

X_train2, X_test2, y_train2, y_test2 = train_test_split(
    X2, y2, test_size=0.2, random_state=42, stratify=y2
)

# Define preprocessing for each column type
numeric_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler', StandardScaler())
])

categorical_transformer = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='most_frequent')),
    ('onehot',  OneHotEncoder(handle_unknown='ignore', sparse_output=False))
])

# Combine into a ColumnTransformer
preprocessor = ColumnTransformer(transformers=[
    ('num', numeric_transformer, features_num),
    ('cat', categorical_transformer, features_cat)
])

# Full pipeline: preprocessor + model
full_pipe = Pipeline(steps=[
    ('preprocessor', preprocessor),
    ('clf', RandomForestClassifier(n_estimators=100, random_state=42))
])

full_pipe.fit(X_train2, y_train2)
print(f"Accuracy with full feature set: {full_pipe.score(X_test2, y_test2):.4f}")
print("\nPipeline structure:")
print(full_pipe)

4. Custom TransformersΒΆ

You can write your own sklearn-compatible transformers by inheriting from BaseEstimator and TransformerMixin. The magic: implement fit and transform, get fit_transform for free.

from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.preprocessing import PowerTransformer
import numpy as np

class SkewnessReducer(BaseEstimator, TransformerMixin):
    """
    Apply Yeo-Johnson transform only to columns with |skewness| > threshold.
    Columns below threshold are left untouched.
    """
    def __init__(self, threshold=0.5):
        self.threshold = threshold

    def fit(self, X, y=None):
        X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X
        self.skewed_cols_ = X.columns[
            X.skew().abs() > self.threshold
        ].tolist()
        # Fit a PowerTransformer only on skewed columns
        if self.skewed_cols_:
            self.pt_ = PowerTransformer(method='yeo-johnson')
            self.pt_.fit(X[self.skewed_cols_])
        return self

    def transform(self, X, y=None):
        X = pd.DataFrame(X) if not isinstance(X, pd.DataFrame) else X.copy()
        if self.skewed_cols_:
            X[self.skewed_cols_] = self.pt_.transform(X[self.skewed_cols_])
        return X.values

# Test it
from sklearn.datasets import make_regression
X_demo, _ = make_regression(n_samples=200, n_features=4, random_state=42)
X_demo_df = pd.DataFrame(X_demo, columns=['a', 'b', 'c', 'd'])
# Add a skewed column
X_demo_df['e'] = np.exp(np.random.randn(200))  # log-normal, highly skewed

reducer = SkewnessReducer(threshold=0.5)
reducer.fit(X_demo_df)
print(f"Skewed columns found: {reducer.skewed_cols_}")
X_transformed = reducer.transform(X_demo_df)
print(f"Output shape: {X_transformed.shape}")
print("Custom transformer works seamlessly in a Pipeline!")

5. Pipeline + GridSearchCV: The Power ComboΒΆ

The __ (double underscore) notation lets you set parameters on any step inside the pipeline. This makes hyperparameter tuning dead simple.

from sklearn.model_selection import GridSearchCV
from sklearn.svm import SVC

# Build a pipeline with SVC
pipe_svc = Pipeline(steps=[
    ('imputer', SimpleImputer(strategy='median')),
    ('scaler',  StandardScaler()),
    ('clf',     SVC(random_state=42))
])

# Access params with: <step_name>__<param_name>
param_grid = {
    'imputer__strategy': ['mean', 'median'],  # tune imputer
    'clf__C':            [0.1, 1.0, 10.0],   # tune model
    'clf__kernel':       ['rbf', 'linear'],
}

grid_search = GridSearchCV(
    pipe_svc,
    param_grid,
    cv=5,
    scoring='accuracy',
    n_jobs=-1,
    verbose=1
)

# Use the smaller numeric-only dataset
grid_search.fit(X_train, y_train)

print(f"Best params:  {grid_search.best_params_}")
print(f"Best CV score: {grid_search.best_score_:.4f}")
print(f"Test score:    {grid_search.score(X_test, y_test):.4f}")

6. Saving and Loading Pipelines with joblibΒΆ

import joblib
import os
import tempfile

# Save the best pipeline from GridSearchCV
best_pipe = grid_search.best_estimator_

with tempfile.TemporaryDirectory() as tmpdir:
    model_path = os.path.join(tmpdir, 'titanic_pipeline.joblib')
    
    # Save
    joblib.dump(best_pipe, model_path)
    size_kb = os.path.getsize(model_path) / 1024
    print(f"Pipeline saved to: {model_path}")
    print(f"File size: {size_kb:.1f} KB")
    
    # Load
    loaded_pipe = joblib.load(model_path)
    
    # Verify identical predictions
    preds_original = best_pipe.predict(X_test)
    preds_loaded   = loaded_pipe.predict(X_test)
    print(f"\nPredictions match: {np.array_equal(preds_original, preds_loaded)}")
    print("\nProduction usage:")
    print("  joblib.dump(pipe, 'model.joblib')  # Save once")
    print("  pipe = joblib.load('model.joblib') # Load anywhere")
    print("  pipe.predict(new_data)             # Preprocessing included!")

7. Common GotchasΒΆ

Gotcha

Problem

Fix

Fitting scaler before split

Data leakage

Always use Pipeline

fit_transform on test data

Leakage

Pipeline calls transform automatically

Forgetting handle_unknown='ignore' in OHE

Crashes on new categories in prod

Always set this

set_params wrong name

ValueError

Use pipe.get_params().keys() to check

Saving only the model (not scaler)

Predictions wrong in prod

Save the whole pipeline

sparse_output=True (default) breaks downstream

Shape issues

Use sparse_output=False or make_pipeline

Tip: Run pipe.get_params() to see all tunable parameters β€” it’s the fastest way to build your param_grid.

# Useful debugging: list all parameters
for key in sorted(full_pipe.get_params().keys()):
    print(f"  {key}")

ExercisesΒΆ

  1. Leakage audit: Take the β€œwrong” approach code from Section 1 and measure how much the accuracy changes across 10 different random seeds. Is the leakage consistent?

  2. Pipeline extension: Add a SelectKBest feature selector as a step between preprocessor and clf in the full_pipe. Tune k in GridSearchCV.

  3. Custom transformer: Write a CyclicalEncoder that transforms a month column (1–12) into sin and cos features. Make it sklearn-compatible (inherits BaseEstimator, TransformerMixin).

  4. Production pipeline: Build a pipeline on the diamonds seaborn dataset (predict price). Save it with joblib, reload it, and verify predictions match. Include both numeric and categorical columns.

  5. Compare strategies: Build two pipelines β€” one with SimpleImputer(strategy='mean') and one with KNNImputer(n_neighbors=5). Which performs better on Titanic? Use cross-validation to compare fairly.