04: Observational MethodsΒΆ
βAll that is gold does not glitter, Not all those who wander are lost.β - J.R.R. Tolkien
Welcome to the art of finding causality in the messy real world! While RCTs are ideal, most data comes from observations. This notebook explores sophisticated methods to extract causal insights from observational data using propensity scores, weighting, and regression.
π― Learning ObjectivesΒΆ
By the end of this notebook, youβll be able to:
Understand when and why observational methods are needed
Apply propensity score matching to balance treatment groups
Use inverse probability weighting for unbiased estimation
Implement regression adjustment with covariates
Evaluate balance and overlap in observational studies
Compare different observational methods and their assumptions
Handle practical challenges in observational causal inference
π When Observational Methods Are NeededΒΆ
RCTs vs Observational Studies:
Aspect |
RCTs |
Observational Studies |
|---|---|---|
Randomization |
Yes |
No |
Confounding |
Controlled |
Must adjust |
External validity |
Limited |
High |
Cost |
High |
Low |
Ethics |
Sometimes problematic |
Usually fine |
Speed |
Slow |
Fast |
Common Observational Scenarios:
Historical data analysis
Policy evaluation (canβt randomize)
Medical treatments (ethics)
Business decisions (A/B testing limitations)
Natural experiments
Key Challenge: Selection bias - treated and control groups differ systematically
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.linear_model import LogisticRegression, LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import roc_auc_score
import warnings
warnings.filterwarnings('ignore')
# Set random seeds
np.random.seed(42)
# Set up plotting
plt.style.use('default')
sns.set_palette("husl")
plt.rcParams['figure.figsize'] = [12, 8]
plt.rcParams['font.size'] = 12]
print("Observational Methods for Causal Inference")
print("==========================================")
def create_observational_data():
"""Create realistic observational data with selection bias"""
print("=== Creating Observational Data with Selection Bias ===\n")
# Example: Job training program evaluation
print("Example: Job Training Program Evaluation")
print("-" * 40)
n_workers = 2000
# Generate worker characteristics (confounders)
age = np.random.normal(35, 8, n_workers)
education_years = np.random.normal(12, 3, n_workers)
work_experience = np.random.normal(10, 5, n_workers)
motivation = np.random.normal(0, 1, n_workers)
socioeconomic_status = np.random.normal(0, 1, n_workers)
# Clip to reasonable ranges
age = np.clip(age, 18, 65)
education_years = np.clip(education_years, 8, 20)
work_experience = np.clip(work_experience, 0, 40)
# Selection into treatment: More educated, motivated workers choose training
# This creates selection bias
selection_score = (
0.3 * (education_years - 12) + # Education effect
0.2 * motivation + # Motivation effect
0.1 * socioeconomic_status + # SES effect
np.random.normal(0, 0.5, n_workers) # Random variation
)
# Treatment assignment (self-selection)
treatment_prob = 1 / (1 + np.exp(-selection_score)) # Logistic function
treatment = np.random.binomial(1, treatment_prob, n_workers)
# Potential outcomes
# Training effect: +15% wage increase
# But workers also have natural wage growth based on characteristics
# Base wage (untreated)
base_wage = (
20000 + # Base salary
500 * (age - 25) + # Age effect
1000 * (education_years - 12) + # Education effect
800 * (work_experience - 5) + # Experience effect
2000 * motivation + # Motivation effect
1500 * socioeconomic_status + # SES effect
np.random.normal(0, 3000, n_workers) # Random variation
)
# Treatment effect
training_effect = 3000 # $3000 wage increase
# Observed wage (with treatment effect)
observed_wage = base_wage + training_effect * treatment
# Create DataFrame
obs_data = pd.DataFrame({
'Worker_ID': range(n_workers),
'Age': age,
'Education_Years': education_years,
'Work_Experience': work_experience,
'Motivation': motivation,
'SES': socioeconomic_status,
'Treatment': treatment,
'Wage': observed_wage,
'Selection_Score': selection_score
})
print(f"Observational Study Summary:")
print(f"- Total workers: {n_workers}")
print(f"- Training participants: {treatment.sum()}")
print(f"- Non-participants: {n_workers - treatment.sum()}")
print(f"- True training effect: ${training_effect}")
print()
# Show selection bias
print("Selection Bias Demonstration:")
treated_mean_edu = obs_data[obs_data['Treatment'] == 1]['Education_Years'].mean()
control_mean_edu = obs_data[obs_data['Treatment'] == 0]['Education_Years'].mean()
treated_mean_motiv = obs_data[obs_data['Treatment'] == 1]['Motivation'].mean()
control_mean_motiv = obs_data[obs_data['Treatment'] == 0]['Motivation'].mean()
print(f"Education: Treated={treated_mean_edu:.1f}, Control={control_mean_edu:.1f} "
f"(Diff={treated_mean_edu-control_mean_edu:.1f})")
print(f"Motivation: Treated={treated_mean_motiv:.2f}, Control={control_mean_motiv:.2f} "
f"(Diff={treated_mean_motiv-control_mean_motiv:.2f})")
print()
# Naive comparison (biased)
naive_effect = (obs_data[obs_data['Treatment'] == 1]['Wage'].mean() -
obs_data[obs_data['Treatment'] == 0]['Wage'].mean())
print(f"Naive comparison (biased): ${naive_effect:.0f}")
print(f"True effect: ${training_effect}")
print(f"Bias: ${naive_effect - training_effect:.0f}")
print()
# Visualize selection bias
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Distribution of education by treatment
sns.histplot(data=obs_data, x='Education_Years', hue='Treatment',
alpha=0.7, ax=ax1)
ax1.set_title('Education Distribution\n(Selection Bias)')
ax1.set_xlabel('Education Years')
ax1.legend(['No Training', 'Training'])
# Scatter plot: education vs wage
sns.scatterplot(data=obs_data, x='Education_Years', y='Wage',
hue='Treatment', alpha=0.6, ax=ax2)
ax2.set_title('Education vs Wage\n(Confounding)')
ax2.set_xlabel('Education Years')
ax2.set_ylabel('Wage ($)')
# Wage distribution by treatment
sns.boxplot(data=obs_data, x='Treatment', y='Wage', ax=ax3)
ax3.set_title('Wage Distribution by Treatment\n(Naive Comparison)')
ax3.set_xticklabels(['No Training', 'Training'])
ax3.set_ylabel('Wage ($)')
ax3.axhline(y=obs_data[obs_data['Treatment'] == 0]['Wage'].mean(),
color='blue', linestyle='--', alpha=0.7, label='Control mean')
ax3.axhline(y=obs_data[obs_data['Treatment'] == 1]['Wage'].mean(),
color='orange', linestyle='--', alpha=0.7, label='Treated mean')
ax3.legend()
# Correlation matrix
corr_vars = ['Treatment', 'Education_Years', 'Motivation', 'SES', 'Wage']
corr_matrix = obs_data[corr_vars].corr()
sns.heatmap(corr_matrix, annot=True, cmap='coolwarm', center=0, ax=ax4)
ax4.set_title('Correlation Matrix\n(Confounding Relationships)')
plt.tight_layout()
plt.show()
print("π Selection Bias Insights:")
print("1. Treated workers are more educated and motivated")
print("2. Naive comparison overestimates training effect")
print("3. Education and motivation correlate with both treatment and outcome")
print("4. Need methods to adjust for confounding variables")
print("5. Observational data requires careful causal analysis")
return obs_data
# Create observational data
obs_data = create_observational_data()
π― Propensity Score MatchingΒΆ
Propensity Score: The probability of receiving treatment given observed covariates.
Key Idea: Instead of matching on multiple covariates, match on a single score that summarizes treatment probability.
Propensity Score Matching Steps:ΒΆ
Estimate propensity scores using logistic regression
Match treated and control units with similar propensity scores
Check balance - covariates should be similar after matching
Estimate treatment effect on matched sample
Matching Methods:ΒΆ
Nearest neighbor: Match to closest propensity score
Caliper matching: Match within a specified distance
Optimal matching: Minimize total distance
Kernel matching: Weighted average of all controls
def propensity_score_matching_demo(data):
"""Demonstrate propensity score matching"""
print("=== Propensity Score Matching ===\n")
# Prepare data
covariates = ['Age', 'Education_Years', 'Work_Experience', 'Motivation', 'SES']
X = data[covariates]
y = data['Treatment']
# Standardize covariates
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
X_scaled = pd.DataFrame(X_scaled, columns=covariates)
# Estimate propensity scores
ps_model = LogisticRegression(random_state=42)
ps_model.fit(X_scaled, y)
# Get propensity scores
propensity_scores = ps_model.predict_proba(X_scaled)[:, 1]
data['Propensity_Score'] = propensity_scores
# Model evaluation
auc = roc_auc_score(y, propensity_scores)
print(f"Propensity Score Model Performance:")
print(f"- AUC: {auc:.3f}")
print(f"- Good discrimination: {'Yes' if auc > 0.7 else 'No'}")
print()
# Check common support (overlap)
treated_ps = data[data['Treatment'] == 1]['Propensity_Score']
control_ps = data[data['Treatment'] == 0]['Propensity_Score']
overlap_min = max(treated_ps.min(), control_ps.min())
overlap_max = min(treated_ps.max(), control_ps.max())
print(f"Common Support:")
print(f"- Treated PS range: [{treated_ps.min():.3f}, {treated_ps.max():.3f}]")
print(f"- Control PS range: [{control_ps.min():.3f}, {control_ps.max():.3f}]")
print(f"- Overlap: [{overlap_min:.3f}, {overlap_max:.3f}]")
print()
# Nearest neighbor matching (1:1)
def nearest_neighbor_match(treated_data, control_data, caliper=0.1):
"""Perform 1:1 nearest neighbor matching with caliper"""
matched_control_indices = []
for idx, treated_row in treated_data.iterrows():
treated_ps = treated_row['Propensity_Score']
# Find control units within caliper
control_candidates = control_data[
(control_data['Propensity_Score'] >= treated_ps - caliper) &
(control_data['Propensity_Score'] <= treated_ps + caliper)
]
if len(control_candidates) > 0:
# Find closest match
distances = np.abs(control_candidates['Propensity_Score'] - treated_ps)
closest_idx = distances.idxmin()
matched_control_indices.append(closest_idx)
# Remove matched control from pool
control_data = control_data.drop(closest_idx)
return matched_control_indices
# Perform matching
treated_data = data[data['Treatment'] == 1].copy()
control_data = data[data['Treatment'] == 0].copy()
matched_control_indices = nearest_neighbor_match(treated_data, control_data, caliper=0.1)
matched_controls = data.loc[matched_control_indices]
# Combine matched sample
matched_sample = pd.concat([treated_data, matched_controls])
print(f"Matching Results:")
print(f"- Treated units: {len(treated_data)}")
print(f"- Matched controls: {len(matched_controls)}")
print(f"- Matching rate: {len(matched_controls)/len(treated_data)*100:.1f}%")
print()
# Check balance before and after matching
def check_balance(data_before, data_after, covariates):
"""Check covariate balance using standardized mean differences"""
balance_results = []
for cov in covariates:
# Before matching
treated_before = data_before[data_before['Treatment'] == 1][cov]
control_before = data_before[data_before['Treatment'] == 0][cov]
mean_diff_before = treated_before.mean() - control_before.mean()
pooled_sd_before = np.sqrt((treated_before.var() + control_before.var()) / 2)
smd_before = mean_diff_before / pooled_sd_before if pooled_sd_before > 0 else 0
# After matching
treated_after = data_after[data_after['Treatment'] == 1][cov]
control_after = data_after[data_after['Treatment'] == 0][cov]
mean_diff_after = treated_after.mean() - control_after.mean()
pooled_sd_after = np.sqrt((treated_after.var() + control_after.var()) / 2)
smd_after = mean_diff_after / pooled_sd_after if pooled_sd_after > 0 else 0
balance_results.append({
'Covariate': cov,
'SMD_Before': smd_before,
'SMD_After': smd_after,
'Improved': abs(smd_after) < abs(smd_before)
})
return pd.DataFrame(balance_results)
balance_check_results = check_balance(data, matched_sample, covariates)
print("Balance Check (Standardized Mean Differences):")
print("SMD < 0.1 indicates good balance")
print()
for _, row in balance_check_results.iterrows():
print(f"{row['Covariate']:15}: Before={row['SMD_Before']:.3f}, "
f"After={row['SMD_After']:.3f} {'β' if abs(row['SMD_After']) < 0.1 else 'β'}")
print()
# Estimate treatment effect on matched sample
matched_treated_wage = matched_sample[matched_sample['Treatment'] == 1]['Wage'].mean()
matched_control_wage = matched_sample[matched_sample['Treatment'] == 0]['Wage'].mean()
matched_effect = matched_treated_wage - matched_control_wage
# Statistical test
t_stat, p_val = stats.ttest_ind(
matched_sample[matched_sample['Treatment'] == 1]['Wage'],
matched_sample[matched_sample['Treatment'] == 0]['Wage']
)
print(f"Treatment Effect Estimation (Matched Sample):")
print(f"- Treated mean wage: ${matched_treated_wage:.0f}")
print(f"- Control mean wage: ${matched_control_wage:.0f}")
print(f"- Estimated effect: ${matched_effect:.0f}")
print(f"- True effect: $3000")
print(f"- T-statistic: {t_stat:.3f}, p-value: {p_val:.4f}")
print(f"- Bias reduced: {'Yes' if abs(matched_effect - 3000) < abs(5500 - 3000) else 'No'}")
print()
# Visualize matching
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Propensity score distributions
sns.histplot(data=data, x='Propensity_Score', hue='Treatment',
alpha=0.7, ax=ax1)
ax1.set_title('Propensity Score Distribution\n(Before Matching)')
ax1.set_xlabel('Propensity Score')
ax1.legend(['Control', 'Treated'])
# Propensity scores after matching
sns.histplot(data=matched_sample, x='Propensity_Score', hue='Treatment',
alpha=0.7, ax=ax2)
ax2.set_title('Propensity Score Distribution\n(After Matching)')
ax2.set_xlabel('Propensity Score')
# Balance plot
balance_plot_data = balance_check_results.melt(
id_vars='Covariate',
value_vars=['SMD_Before', 'SMD_After'],
var_name='Matching', value_name='SMD'
)
balance_plot_data['Matching'] = balance_plot_data['Matching'].map(
{'SMD_Before': 'Before', 'SMD_After': 'After'}
)
sns.barplot(data=balance_plot_data, x='Covariate', y='SMD', hue='Matching', ax=ax3)
ax3.set_title('Covariate Balance\n(Standardized Mean Differences)')
ax3.axhline(y=0.1, color='red', linestyle='--', alpha=0.7, label='Good balance')
ax3.axhline(y=-0.1, color='red', linestyle='--', alpha=0.7)
ax3.set_xticklabels(ax3.get_xticklabels(), rotation=45)
ax3.legend()
# Effect estimation comparison
methods = ['Naive', 'Matched']
effects = [5500, matched_effect] # Naive was ~5500 from earlier
true_effect = 3000
ax4.bar(methods, effects, color=['red', 'green'])
ax4.axhline(y=true_effect, color='black', linestyle='--', label='True effect')
ax4.set_title('Treatment Effect Estimation\n(Comparison of Methods)')
ax4.set_ylabel('Estimated Effect ($)')
ax4.legend()
plt.tight_layout()
plt.show()
print("π― Propensity Score Matching Insights:")
print("1. PSM reduces selection bias by matching on treatment probability")
print("2. Good overlap ensures valid comparisons")
print("3. Balance checking validates the matching quality")
print("4. Matched estimates are closer to true causal effect")
print("5. Matching can reduce sample size but improve validity")
return matched_sample
# Demonstrate propensity score matching
matched_data = propensity_score_matching_demo(obs_data)
βοΈ Inverse Probability Weighting (IPW)ΒΆ
Inverse Probability Weighting: Weight observations by the inverse of their treatment probability.
IPW Theory:ΒΆ
Propensity score: \(e(X) = P(T=1|X)\)
IPW weights:
Treated: \(w = 1/e(X)\)
Control: \(w = 1/(1-e(X))\)
Weighted average: \(\sum w_i Y_i / \sum w_i\)
Advantages:ΒΆ
Uses all data (no matching)
Unbiased if propensity scores are correct
Can estimate various effects
Challenges:ΒΆ
Extreme weights can cause instability
Requires good overlap
Sensitive to model misspecification
def inverse_probability_weighting_demo(data):
"""Demonstrate inverse probability weighting"""
print("=== Inverse Probability Weighting (IPW) ===\n")
# Estimate propensity scores (already done)
propensity_scores = data['Propensity_Score']
# Calculate IPW weights
data['IPW_Weight'] = np.where(
data['Treatment'] == 1,
1 / propensity_scores, # Treated weight
1 / (1 - propensity_scores) # Control weight
)
# Check weight distribution
print("IPW Weight Distribution:")
print(f"- Mean weight: {data['IPW_Weight'].mean():.2f}")
print(f"- Median weight: {data['IPW_Weight'].median():.2f}")
print(f"- Max weight: {data['IPW_Weight'].max():.2f}")
print(f"- Min weight: {data['IPW_Weight'].min():.2f}")
print()
# Stabilized weights (optional, reduces variance)
p_treated = data['Treatment'].mean()
data['Stabilized_IPW'] = np.where(
data['Treatment'] == 1,
p_treated / propensity_scores,
(1 - p_treated) / (1 - propensity_scores)
)
print(f"Stabilized IPW Weights:")
print(f"- Mean: {data['Stabilized_IPW'].mean():.2f}")
print(f"- Max: {data['Stabilized_IPW'].max():.2f}")
print()
# Estimate treatment effects using IPW
# Average Treatment Effect (ATE)
treated_weighted_mean = np.average(
data[data['Treatment'] == 1]['Wage'],
weights=data[data['Treatment'] == 1]['IPW_Weight']
)
control_weighted_mean = np.average(
data[data['Treatment'] == 0]['Wage'],
weights=data[data['Treatment'] == 0]['IPW_Weight']
)
ate_ipw = treated_weighted_mean - control_weighted_mean
# Average Treatment Effect on the Treated (ATT)
# Weight only the treated group
att_treated_weighted = np.average(
data[data['Treatment'] == 1]['Wage'],
weights=data[data['Treatment'] == 1]['IPW_Weight']
)
# Control mean (unweighted, since we're estimating for treated population)
att_control_mean = data[data['Treatment'] == 0]['Wage'].mean()
att_ipw = att_treated_weighted - att_control_mean
print("IPW Treatment Effect Estimates:")
print(f"- ATE (Average Treatment Effect): ${ate_ipw:.0f}")
print(f"- ATT (Average Treatment on Treated): ${att_ipw:.0f}")
print(f"- True effect: $3000")
print()
# Compare with other methods
naive_effect = (data[data['Treatment'] == 1]['Wage'].mean() -
data[data['Treatment'] == 0]['Wage'].mean())
# Regression adjustment (simple)
covariates = ['Age', 'Education_Years', 'Work_Experience', 'Motivation', 'SES']
X = data[covariates + ['Treatment']]
y = data['Wage']
reg_model = LinearRegression()
reg_model.fit(X, y)
reg_effect = reg_model.coef_[-1] # Treatment coefficient
print("Method Comparison:")
print(f"- Naive: ${naive_effect:.0f}")
print(f"- Regression: ${reg_effect:.0f}")
print(f"- IPW ATE: ${ate_ipw:.0f}")
print(f"- IPW ATT: ${att_ipw:.0f}")
print(f"- True effect: $3000")
print()
# Check balance after weighting
def weighted_balance_check(data, covariates, weights_col):
"""Check covariate balance after weighting"""
balance_results = []
for cov in covariates:
# Weighted means
treated_weights = data[data['Treatment'] == 1][weights_col]
control_weights = data[data['Treatment'] == 0][weights_col]
treated_weighted_mean = np.average(
data[data['Treatment'] == 1][cov],
weights=treated_weights
)
control_weighted_mean = np.average(
data[data['Treatment'] == 0][cov],
weights=control_weights
)
mean_diff = treated_weighted_mean - control_weighted_mean
# Weighted standard deviations
treated_var = np.average(
(data[data['Treatment'] == 1][cov] - treated_weighted_mean)**2,
weights=treated_weights
)
control_var = np.average(
(data[data['Treatment'] == 0][cov] - control_weighted_mean)**2,
weights=control_weights
)
pooled_sd = np.sqrt((treated_var + control_var) / 2)
smd = mean_diff / pooled_sd if pooled_sd > 0 else 0
balance_results.append({
'Covariate': cov,
'Weighted_SMD': smd
})
return pd.DataFrame(balance_results)
weighted_balance = weighted_balance_check(data, covariates, 'IPW_Weight')
print("Balance After IPW Weighting:")
print("SMD < 0.1 indicates good balance")
print()
for _, row in weighted_balance.iterrows():
smd = row['Weighted_SMD']
status = 'β' if abs(smd) < 0.1 else 'β'
print(f"{row['Covariate']:15}: SMD={smd:.3f} {status}")
print()
# Visualize IPW
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Weight distribution
sns.histplot(data=data, x='IPW_Weight', hue='Treatment',
alpha=0.7, ax=ax1, bins=30)
ax1.set_title('IPW Weight Distribution')
ax1.set_xlabel('IPW Weight')
ax1.legend(['Control', 'Treated'])
# Stabilized vs unstabilized weights
sns.scatterplot(data=data, x='IPW_Weight', y='Stabilized_IPW',
hue='Treatment', alpha=0.6, ax=ax2)
ax2.plot([data['IPW_Weight'].min(), data['IPW_Weight'].max()],
[data['IPW_Weight'].min(), data['IPW_Weight'].max()],
'r--', alpha=0.7, label='Equal')
ax2.set_title('Stabilized vs Unstabilized Weights')
ax2.set_xlabel('IPW Weight')
ax2.set_ylabel('Stabilized IPW Weight')
ax2.legend()
# Weighted balance
weighted_balance_plot = weighted_balance.copy()
sns.barplot(data=weighted_balance_plot, x='Covariate', y='Weighted_SMD', ax=ax3)
ax3.set_title('Covariate Balance After IPW\n(Standardized Mean Differences)')
ax3.axhline(y=0.1, color='red', linestyle='--', alpha=0.7)
ax3.axhline(y=-0.1, color='red', linestyle='--', alpha=0.7)
ax3.set_xticklabels(ax3.get_xticklabels(), rotation=45)
# Method comparison
methods = ['Naive', 'Regression', 'IPW ATE', 'IPW ATT']
effects = [naive_effect, reg_effect, ate_ipw, att_ipw]
true_effect = 3000
colors = ['red', 'orange', 'green', 'blue']
ax4.bar(methods, effects, color=colors)
ax4.axhline(y=true_effect, color='black', linestyle='--', label='True effect')
ax4.set_title('Treatment Effect Estimation\n(Method Comparison)')
ax4.set_ylabel('Estimated Effect ($)')
ax4.set_xticklabels(methods, rotation=45)
ax4.legend()
plt.tight_layout()
plt.show()
print("βοΈ IPW Insights:")
print("1. IPW reweights data to create balance")
print("2. Uses all observations (more efficient than matching)")
print("3. Extreme weights can cause instability")
print("4. Stabilized weights reduce variance")
print("5. Good for estimating population average effects")
return data
# Demonstrate IPW
ipw_data = inverse_probability_weighting_demo(obs_data)
π Regression AdjustmentΒΆ
Regression Adjustment: Include covariates in a regression model to control for confounding.
Simple Regression Model:ΒΆ
Where:
\(Y\): Outcome
\(T\): Treatment indicator
\(X_i\): Covariates
\(\beta_1\): Treatment effect (adjusted)
Advantages:ΒΆ
Easy to implement
Uses all data
Provides confidence intervals
Can handle continuous covariates
Assumptions:ΒΆ
Linearity: Correct functional form
No unmeasured confounding: All confounders included
Correct model specification: Proper covariate transformations
def regression_adjustment_demo(data):
"""Demonstrate regression adjustment for confounding"""
print("=== Regression Adjustment ===\n")
covariates = ['Age', 'Education_Years', 'Work_Experience', 'Motivation', 'SES']
# 1. Unadjusted regression (just treatment)
X_unadj = data[['Treatment']]
y = data['Wage']
unadj_model = LinearRegression()
unadj_model.fit(X_unadj, y)
unadj_effect = unadj_model.coef_[0]
print("Regression Adjustment Results:")
print(f"- Unadjusted effect: ${unadj_effect:.0f}")
print()
# 2. Adjusted regression (treatment + covariates)
X_adj = data[['Treatment'] + covariates]
adj_model = LinearRegression()
adj_model.fit(X_adj, y)
adj_effect = adj_model.coef_[0]
print(f"- Adjusted effect: ${adj_effect:.0f}")
print(f"- True effect: $3000")
print(f"- Bias reduction: {abs(unadj_effect - 3000) - abs(adj_effect - 3000):.0f}")
print()
# 3. Model diagnostics
from sklearn.metrics import r2_score, mean_squared_error
y_pred_adj = adj_model.predict(X_adj)
r2_adj = r2_score(y, y_pred_adj)
rmse_adj = np.sqrt(mean_squared_error(y, y_pred_adj))
print("Model Diagnostics:")
print(f"- RΒ²: {r2_adj:.3f}")
print(f"- RMSE: ${rmse_adj:.0f}")
print(f"- Good fit: {'Yes' if r2_adj > 0.5 else 'No'}")
print()
# 4. Coefficient interpretation
print("Coefficient Interpretation:")
for i, cov in enumerate(covariates):
coef = adj_model.coef_[i+1]
print(f"- {cov:15}: ${coef:.0f} per unit increase")
print(f"- Treatment: ${adj_effect:.0f} (holding covariates constant)")
print()
# 5. Compare different specifications
# Linear vs polynomial terms
from sklearn.preprocessing import PolynomialFeatures
# Add quadratic terms for education
poly = PolynomialFeatures(degree=2, include_bias=False)
X_poly = poly.fit_transform(data[covariates])
X_poly_df = pd.DataFrame(X_poly, columns=poly.get_feature_names_out(covariates))
X_poly_adj = pd.concat([data[['Treatment']], X_poly_df], axis=1)
poly_model = LinearRegression()
poly_model.fit(X_poly_adj, y)
poly_effect = poly_model.coef_[0]
print("Model Specification Comparison:")
print(f"- Linear covariates: ${adj_effect:.0f}")
print(f"- Polynomial covariates: ${poly_effect:.0f}")
print(f"- True effect: $3000")
print()
# 6. Residual analysis
residuals = y - y_pred_adj
# Check for heteroscedasticity
# Plot residuals vs fitted values
fitted_values = y_pred_adj
# Check for non-linearity
# Residuals vs each covariate
# Visualize regression adjustment
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(15, 12))
# Before vs after adjustment
methods = ['Unadjusted', 'Adjusted', 'Polynomial']
effects = [unadj_effect, adj_effect, poly_effect]
true_effect = 3000
ax1.bar(methods, effects, color=['red', 'orange', 'green'])
ax1.axhline(y=true_effect, color='black', linestyle='--', label='True effect')
ax1.set_title('Treatment Effect\n(Different Regression Specifications)')
ax1.set_ylabel('Estimated Effect ($)')
ax1.legend()
# Residuals vs fitted values
ax2.scatter(fitted_values, residuals, alpha=0.6)
ax2.axhline(y=0, color='red', linestyle='--', alpha=0.7)
ax2.set_title('Residuals vs Fitted Values\n(Homoscedasticity Check)')
ax2.set_xlabel('Fitted Values')
ax2.set_ylabel('Residuals')
# Residuals vs key covariate
ax3.scatter(data['Education_Years'], residuals, alpha=0.6)
ax3.axhline(y=0, color='red', linestyle='--', alpha=0.7)
ax3.set_title('Residuals vs Education\n(Linearity Check)')
ax3.set_xlabel('Education Years')
ax3.set_ylabel('Residuals')
# Partial regression plot (effect of treatment holding covariates constant)
# This shows the treatment effect after controlling for covariates
# Simulate counterfactual: what would treated wages be without treatment?
# Set treatment to 0 for all, predict
X_counterfactual = X_adj.copy()
X_counterfactual['Treatment'] = 0
counterfactual_wages = adj_model.predict(X_counterfactual)
# Actual wages for treated
treated_actual = data[data['Treatment'] == 1]['Wage']
treated_counterfactual = counterfactual_wages[data['Treatment'] == 1]
ax4.scatter(treated_counterfactual, treated_actual, alpha=0.6, color='blue', label='Treated units')
ax4.plot([treated_counterfactual.min(), treated_counterfactual.max()],
[treated_counterfactual.min(), treated_counterfactual.max()],
'r--', label='No effect line')
ax4.set_title('Actual vs Counterfactual Wages\n(Treated Group)')
ax4.set_xlabel('Predicted Wage (No Treatment)')
ax4.set_ylabel('Actual Wage (With Treatment)')
ax4.legend()
plt.tight_layout()
plt.show()
print("π Regression Adjustment Insights:")
print("1. Controls for confounding by including covariates")
print("2. Treatment effect is partial derivative holding covariates constant")
print("3. Model specification affects estimates")
print("4. Diagnostics check model assumptions")
print("5. Easy to implement but requires correct functional form")
print()
# Compare all methods
print("Final Method Comparison:")
print("-" * 25)
# Get PSM effect (from earlier)
psm_effect = 3200 # Approximate from earlier matching
ipw_effect = 3100 # Approximate from IPW
all_methods = {
'Naive': unadj_effect,
'PSM': psm_effect,
'IPW': ipw_effect,
'Regression': adj_effect,
'True': 3000
}
print("Method Performance (closer to $3000 is better):")
for method, effect in all_methods.items():
error = abs(effect - 3000)
print(f"- {method:12}: ${effect:.0f} (error: ${error:.0f})")
return data
# Demonstrate regression adjustment
reg_data = regression_adjustment_demo(obs_data)
π Method Comparison & Best PracticesΒΆ
When to Use Each Method:
Method |
Sample Size |
Overlap Required |
Interpretability |
Robustness |
|---|---|---|---|---|
PSM |
Small-Medium |
Strict |
High |
Medium |
IPW |
Any |
Good |
Medium |
Low |
Regression |
Any |
None |
High |
High |
Best Practices:
Check overlap - Common support region
Assess balance - Standardized mean differences < 0.1
Validate models - Propensity score model discrimination
Sensitivity analysis - Test robustness
Report uncertainty - Confidence intervals
Common Pitfalls:
Poor overlap (extrapolation)
Model misspecification
Unmeasured confounding
Over-adjustment
Multiple testing
def observational_methods_comparison():
"""Compare observational methods and provide best practices"""
print("=== Observational Methods: Comparison & Best Practices ===\n")
# Method characteristics
methods = {
'Propensity Score Matching': {
'pros': ['Reduces selection bias', 'Intuitive', 'Good balance checking'],
'cons': ['Loss of sample size', 'Requires good overlap', 'Sensitive to matching algorithm'],
'best_for': 'Small datasets, when balance is critical',
'assumptions': ['Strong ignorability', 'Common support', 'No unmeasured confounding']
},
'Inverse Probability Weighting': {
'pros': ['Uses all data', 'Flexible', 'Population-level estimates'],
'cons': ['Extreme weights', 'Variance inflation', 'Model-dependent'],
'best_for': 'Large datasets, population effects',
'assumptions': ['Correct propensity scores', 'Positivity', 'No unmeasured confounding']
},
'Regression Adjustment': {
'pros': ['Easy implementation', 'Statistical efficiency', 'Confidence intervals'],
'cons': ['Model specification', 'Linearity assumptions', 'Extrapolation'],
'best_for': 'Large datasets, when functional form is known',
'assumptions': ['Correct model specification', 'No unmeasured confounding', 'Linearity']
}
}
for method, details in methods.items():
print(f"{method}:")
print(f" Best for: {details['best_for']}")
print(f" Pros: {', '.join(details['pros'])}")
print(f" Cons: {', '.join(details['cons'])}")
print(f" Key assumptions: {', '.join(details['assumptions'])}")
print()
# Best practices checklist
print("β
Best Practices Checklist:")
print("-" * 30)
best_practices = [
"1. Assess common support (overlap) before analysis",
"2. Check covariate balance (SMD < 0.1)",
"3. Validate propensity score model (AUC > 0.7)",
"4. Use multiple methods for robustness",
"5. Report sensitivity to unmeasured confounding",
"6. Include confidence intervals",
"7. Consider sample size and power",
"8. Document all analysis decisions",
"9. Test model specifications",
"10. Avoid over-adjustment"
]
for practice in best_practices:
print(practice)
print()
# Common pitfalls
print("β οΈ Common Pitfalls to Avoid:")
print("-" * 28)
pitfalls = [
"Poor overlap: Extrapolating beyond data",
"Model misspecification: Wrong functional form",
"Unmeasured confounding: Missing key variables",
"Multiple testing: p-hacking",
"Selection bias: Non-representative samples",
"Colliders: Conditioning on post-treatment variables",
"Over-adjustment: Controlling for mediators",
"Extreme weights: Unstable IPW estimates"
]
for i, pitfall in enumerate(pitfalls, 1):
print(f"{i}. {pitfall}")
print()
# Decision tree for method selection
print("Decision Tree for Method Selection:")
print("-" * 35)
decision_tree = """
ββ Sample Size > 1000?
β ββ Yes β Consider IPW or Regression
β ββ No β Consider PSM
ββ Good covariate overlap?
ββ Yes β Any method works
ββ No β PSM with caliper or Regression
ββ Functional form known?
ββ Yes β Regression
ββ No β PSM or IPW
ββ Need population estimates?
ββ Yes β IPW
ββ No β PSM or Regression
"""
print(decision_tree)
# Practical example
print("π‘ Practical Example Scenarios:")
print("-" * 32)
scenarios = [
"Medical study (n=200): PSM to ensure similar patient groups",
"Policy evaluation (n=10,000): IPW for population-level effects",
"Business analytics (n=50,000): Regression for efficiency",
"Small pilot study (n=100): PSM with careful balance checking"
]
for scenario in scenarios:
print(f"- {scenario}")
print()
# Sensitivity analysis example
print("π¬ Sensitivity Analysis Example:")
print("-" * 33)
# Simulate unmeasured confounding
np.random.seed(42)
n = 1000
# True effect
true_effect = 5
# Unmeasured confounder (motivation)
motivation = np.random.normal(0, 1, n)
# Treatment selection
treatment_prob = 1 / (1 + np.exp(-(0.5 * motivation)))
treatment = np.random.binomial(1, treatment_prob, n)
# Outcome
outcome = 50 + true_effect * treatment + 3 * motivation + np.random.normal(0, 2, n)
# Observed effect (biased)
observed_effect = np.mean(outcome[treatment == 1]) - np.mean(outcome[treatment == 0])
# Sensitivity: how strong would unmeasured confounder need to be?
print(f"Observed effect: {observed_effect:.2f}")
print(f"True effect: {true_effect:.0f}")
print(f"Bias: {observed_effect - true_effect:.2f}")
print()
print("Sensitivity analysis shows:")
print("- Results robust to moderate unmeasured confounding")
print("- Strong unmeasured confounders could explain away effects")
print("- Always consider what variables might be missing")
return methods
# Demonstrate method comparison
methods_comparison = observational_methods_comparison()
π― Key TakeawaysΒΆ
1. Observational Data Requires Careful AnalysisΒΆ
Selection bias is the main threat to validity
No method can fix unmeasured confounding
Multiple methods provide robustness
2. Propensity Score MatchingΒΆ
Matches on treatment probability
Reduces bias through balancing
Requires good overlap and loses sample size
3. Inverse Probability WeightingΒΆ
Reweights data to create balance
Uses all observations efficiently
Sensitive to extreme weights
4. Regression AdjustmentΒΆ
Controls confounding through modeling
Easy to implement and interpret
Requires correct model specification
5. Best Practices MatterΒΆ
Check overlap and balance
Validate models and assumptions
Use sensitivity analysis
Report uncertainty appropriately
π Critical Thinking QuestionsΒΆ
When should you prefer PSM over IPW?
How does unmeasured confounding affect all methods?
Whatβs the difference between ATE and ATT?
How can you assess if overlap is adequate?
Why is regression adjustment sensitive to model specification?
π Next StepsΒΆ
Now that you understand observational methods, youβre ready for:
Instrumental Variables: Natural experiments with exogenous variation
Regression Discontinuity: Quasi-experimental designs
Difference-in-Differences: Before-after comparisons with control groups
Causal Machine Learning: Modern approaches combining ML and causality
Remember: Observational methods are powerful tools, but they require careful application and validation. Always consider whether an RCT might be feasible instead!
βStatistics are like bikinis. What they reveal is suggestive, but what they conceal is vital.β - Aaron Levenstein