# Install required packages
!pip install presidio-analyzer presidio-anonymizer spacy faker openai python-dotenv -q
!python -m spacy download en_core_web_lg -q
import re
import hashlib
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from faker import Faker
import json
Understanding PII Types: A Risk-Based ClassificationΒΆ
Personally Identifiable Information (PII) spans a spectrum from direct identifiers (SSN, passport number) that uniquely identify an individual to quasi-identifiers (zip code, age, gender) that can re-identify individuals when combined. The risk classification below organizes PII into five categories: direct identifiers (highest risk β a single leaked SSN enables identity theft), contact information (medium risk β enables targeted phishing), financial data (critical risk β enables direct financial fraud), health information (protected by HIPAA with penalties up to $1.5M per violation), and online identifiers (variable risk depending on linkability).
Why PII detection matters for AI systems: LLMs can inadvertently memorize and regurgitate PII from training data, and users may unknowingly paste sensitive information into chat interfaces. A production AI system must detect and handle PII at three points: on input (prevent PII from reaching the model), during processing (anonymize PII before storing conversation logs), and on output (filter any PII the model generates from memorized training data).
PII_CATEGORIES = {
"Direct Identifiers": [
"Full name",
"Social Security Number (SSN)",
"Driver's license number",
"Passport number",
"Biometric data",
"Account numbers"
],
"Contact Information": [
"Email address",
"Phone number",
"Physical address",
"IP address"
],
"Financial Data": [
"Credit card numbers",
"Bank account numbers",
"Routing numbers",
"Financial account credentials"
],
"Health Information": [
"Medical record numbers",
"Health insurance numbers",
"Diagnosis information",
"Prescription data"
],
"Online Identifiers": [
"Username/passwords",
"Device IDs",
"Cookie IDs",
"Session tokens"
]
}
print("π PII Categories and Examples\n" + "=" * 60)
for category, examples in PII_CATEGORIES.items():
print(f"\n{category}:")
for example in examples:
print(f" β’ {example}")
print("\n\nβ οΈ Risk Levels:")
print(" π΄ HIGH: SSN, Credit Card, Medical Records")
print(" π‘ MEDIUM: Email, Phone, Address")
print(" π’ LOW: First name only, General location")
Basic PII Detection with Regex: Fast Pattern MatchingΒΆ
Regex-based PII detection provides the fastest detection layer, running in microseconds per text and catching structured PII formats that follow predictable patterns: email addresses (user@domain.tld), US phone numbers (10 digits with optional separators), SSNs (XXX-XX-XXXX with validation constraints), credit card numbers (Luhn-checkable 13-19 digit sequences), and IP addresses. The RegexPIIDetector class below uses compiled patterns for efficiency, detects all matches with their positions in the text, and supports redaction by replacing matches with [REDACTED] tokens.
Limitations of regex-based detection: regex catches only PII with consistent formatting. A name like βJohn Smithβ has no regex-detectable pattern, addresses vary wildly in format across countries, and medical record numbers follow institution-specific schemas. For these unstructured PII types, ML-based Named Entity Recognition (NER) is essential β which is why production systems layer regex detection (fast, high precision) with NER-based detection (slower, better recall) as shown in Part 3 with Presidio.
@dataclass
class PIIMatch:
type: str
value: str
start: int
end: int
confidence: float
class RegexPIIDetector:
"""Pattern-based PII detection"""
def __init__(self):
self.patterns = {
"EMAIL": re.compile(
r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
),
"PHONE": re.compile(
r'\b(?:\+?1[-.]?)?\(?([0-9]{3})\)?[-.]?([0-9]{3})[-.]?([0-9]{4})\b'
),
"SSN": re.compile(
r'\b(?!000|666|9\d{2})([0-8]\d{2}|7([0-6]\d))[-]?(?!00)\d{2}[-]?(?!0000)\d{4}\b'
),
"CREDIT_CARD": re.compile(
r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13}|6(?:011|5[0-9]{2})[0-9]{12})\b'
),
"IP_ADDRESS": re.compile(
r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
),
"DATE_OF_BIRTH": re.compile(
r'\b(?:0?[1-9]|1[0-2])[/-](?:0?[1-9]|[12][0-9]|3[01])[/-](?:19|20)\d{2}\b'
),
}
def detect(self, text: str) -> List[PIIMatch]:
"""Detect PII in text using regex patterns"""
matches = []
for pii_type, pattern in self.patterns.items():
for match in pattern.finditer(text):
matches.append(PIIMatch(
type=pii_type,
value=match.group(),
start=match.start(),
end=match.end(),
confidence=0.9 # High confidence for regex matches
))
return sorted(matches, key=lambda x: x.start)
def redact(self, text: str, placeholder: str = "[REDACTED]") -> str:
"""Redact PII from text"""
matches = self.detect(text)
# Replace from end to start to preserve indices
for match in reversed(matches):
text = text[:match.start] + placeholder + text[match.end:]
return text
# Test regex detector
detector = RegexPIIDetector()
test_text = """
Contact me at john.doe@email.com or call 555-123-4567.
My SSN is 123-45-6789 and card number is 4532-1234-5678-9010.
Server IP: 192.168.1.1, DOB: 01/15/1990
"""
print("\nπ Regex PII Detection Results\n" + "=" * 60)
print(f"Original Text:\n{test_text}")
matches = detector.detect(test_text)
print(f"\nDetected PII ({len(matches)} matches):")
for match in matches:
print(f" β’ {match.type}: '{match.value}' (confidence: {match.confidence})")
redacted = detector.redact(test_text)
print(f"\nRedacted Text:\n{redacted}")
Advanced PII Detection with Presidio: ML-Powered Entity RecognitionΒΆ
Microsoft Presidio combines spaCyβs NER models (which detect names, locations, and organizations from context) with customizable pattern recognizers and configurable anonymization strategies. Unlike regex, Presidioβs ML backbone can identify βJohn Smithβ as a PERSON entity even without structural formatting cues, and can distinguish between βWashingtonβ the person and βWashingtonβ the city based on surrounding context. The AnalyzerEngine orchestrates multiple recognizers (regex + NER + custom) and returns entity spans with confidence scores.
Custom recognizers extend Presidioβs built-in capabilities for domain-specific PII. The example below adds an employee ID recognizer (pattern: EMP-XXXXXX) that standard NER models would not detect. In production, add recognizers for your organizationβs internal identifiers, customer account numbers, and domain-specific sensitive fields. The AnonymizerEngine then applies configurable strategies (replace, mask, hash, or pseudonymize) to each detected entity, and the choice of strategy depends on the downstream use case β analysis requires pseudonymization (preserving statistical properties), while logging requires irreversible hashing.
class PresidioPIIDetector:
"""Advanced PII detection using Microsoft Presidio"""
def __init__(self):
self.analyzer = AnalyzerEngine()
self.anonymizer = AnonymizerEngine()
# Add custom recognizers
self._add_custom_recognizers()
def _add_custom_recognizers(self):
"""Add custom pattern recognizers"""
# Example: Employee ID pattern
employee_id_pattern = Pattern(
name="employee_id_pattern",
regex=r"\bEMP-\d{6}\b",
score=0.8
)
employee_id_recognizer = PatternRecognizer(
supported_entity="EMPLOYEE_ID",
patterns=[employee_id_pattern]
)
self.analyzer.registry.add_recognizer(employee_id_recognizer)
def analyze(self, text: str, language: str = "en") -> List[Dict]:
"""Analyze text for PII"""
results = self.analyzer.analyze(
text=text,
language=language,
score_threshold=0.5
)
return [
{
"type": result.entity_type,
"start": result.start,
"end": result.end,
"score": result.score,
"text": text[result.start:result.end]
}
for result in results
]
def anonymize(
self,
text: str,
strategy: str = "replace",
mask_char: str = "*"
) -> str:
"""Anonymize PII in text"""
# Analyze first
analysis_results = self.analyzer.analyze(
text=text,
language="en"
)
# Define anonymization operators
operators = {}
if strategy == "replace":
for result in analysis_results:
operators[result.entity_type] = OperatorConfig(
"replace",
{"new_value": f"<{result.entity_type}>"}
)
elif strategy == "mask":
for result in analysis_results:
operators[result.entity_type] = OperatorConfig(
"mask",
{"masking_char": mask_char, "chars_to_mask": 100, "from_end": False}
)
elif strategy == "hash":
for result in analysis_results:
operators[result.entity_type] = OperatorConfig("hash", {})
# Anonymize
anonymized = self.anonymizer.anonymize(
text=text,
analyzer_results=analysis_results,
operators=operators
)
return anonymized.text
def pseudonymize(self, text: str) -> Tuple[str, Dict]:
"""Replace PII with fake but realistic values"""
faker = Faker()
analysis_results = self.analyzer.analyze(text=text, language="en")
# Map original values to pseudonyms
mapping = {}
# Define operators with fake data
operators = {
"PERSON": OperatorConfig("replace", {"new_value": faker.name()}),
"EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": faker.email()}),
"PHONE_NUMBER": OperatorConfig("replace", {"new_value": faker.phone_number()}),
"LOCATION": OperatorConfig("replace", {"new_value": faker.city()}),
"CREDIT_CARD": OperatorConfig("replace", {"new_value": faker.credit_card_number()}),
}
anonymized = self.anonymizer.anonymize(
text=text,
analyzer_results=analysis_results,
operators=operators
)
# Build mapping
for result in analysis_results:
original = text[result.start:result.end]
mapping[original] = result.entity_type
return anonymized.text, mapping
print("β
Presidio PII Detector initialized")
print("Supports: PERSON, EMAIL, PHONE, SSN, CREDIT_CARD, LOCATION, and more")
Anonymization Strategies: Choosing the Right ApproachΒΆ
Four anonymization strategies serve different use cases. Replace substitutes PII with entity-type labels (<PERSON>, <EMAIL_ADDRESS>), preserving sentence structure while completely removing identifying information β ideal for displaying redacted text to users. Mask overwrites characters with a placeholder symbol (****), preserving the visual length of the original value β useful for receipts and confirmations where users expect to see a partially visible value. Hash applies SHA-256 to produce a deterministic, irreversible fingerprint β useful for deduplication and joining records across datasets without exposing the original PII. Pseudonymize replaces real PII with realistic fake values (via the Faker library), preserving statistical properties and readability β essential for creating realistic test datasets and for analytics where demographic distributions must remain valid.
Reversibility spectrum: replacement and masking are irreversible by design (the original value is destroyed). Hashing is deterministic but computationally irreversible (the same input always produces the same hash, enabling record linkage, but you cannot recover the input from the hash). Pseudonymization with a stored mapping is reversible (the mapping can re-identify individuals), which means pseudonymized data is still considered personal data under GDPR and requires appropriate safeguards.
# Test different anonymization strategies
presidio_detector = PresidioPIIDetector()
sample_text = """
Hi, I'm John Smith from New York.
You can reach me at john.smith@company.com or 555-0123.
My employee ID is EMP-123456.
"""
print("\nπ Anonymization Strategy Comparison\n" + "=" * 70)
print(f"Original Text:\n{sample_text}")
# Note: Uncomment to run (requires spacy model)
# print("\n1οΈβ£ Replace Strategy:")
# replaced = presidio_detector.anonymize(sample_text, strategy="replace")
# print(replaced)
# print("\n2οΈβ£ Mask Strategy:")
# masked = presidio_detector.anonymize(sample_text, strategy="mask", mask_char="*")
# print(masked)
# print("\n3οΈβ£ Hash Strategy:")
# hashed = presidio_detector.anonymize(sample_text, strategy="hash")
# print(hashed)
# print("\n4οΈβ£ Pseudonymize Strategy:")
# pseudonymized, mapping = presidio_detector.pseudonymize(sample_text)
# print(pseudonymized)
# print(f"\nMapping: {json.dumps(mapping, indent=2)}")
# Example outputs
print("""
Expected Outputs:
1οΈβ£ Replace:
Hi, I'm <PERSON> from <LOCATION>.
You can reach me at <EMAIL_ADDRESS> or <PHONE_NUMBER>.
My employee ID is <EMPLOYEE_ID>.
2οΈβ£ Mask:
Hi, I'm ********** from ********.
You can reach me at ********************** or ********.
My employee ID is **********.
3οΈβ£ Hash:
Hi, I'm 5d41402abc4b2a76b9719d911017c592 from 8b1a9953c4611296a827abf8c47804d7.
You can reach me at 098f6bcd4621d373cade4e832627b4f6 or e4da3b7fbbce2345d7772b0674a318d5.
4οΈβ£ Pseudonymize:
Hi, I'm Jennifer Martinez from Phoenix.
You can reach me at robert.johnson@example.com or 555-9876.
My employee ID is EMP-987654.
""")
Privacy Compliance: GDPR and CCPA RequirementsΒΆ
Privacy regulations impose legal obligations on how AI systems handle personal data. GDPR (EU) requires a lawful basis for processing (consent, legitimate interest, legal obligation, etc.), data minimization (collect only what is necessary), purpose limitation (use data only for stated purposes), and data subject rights (access, deletion, portability, rectification). CCPA (California) grants consumers the right to know what data is collected, delete their data, opt out of data selling, and non-discrimination for exercising these rights.
The PrivacyComplianceManager below implements data retention policies with automatic expiration checks, data subject request handlers for all four GDPR rights, and automated privacy notice generation. Key implementation details: retention periods vary by purpose (90 days for analytics, 7 years for financial records due to tax law), the legal basis determines whether consent is required (marketing requires explicit consent; fraud prevention uses legitimate interest), and deletion requests must propagate to all copies including backups (with a documented timeline for backup purging).
from datetime import datetime, timedelta
from typing import Literal
@dataclass
class DataRetentionPolicy:
purpose: str
retention_days: int
legal_basis: str # GDPR Article 6 basis
requires_consent: bool
class PrivacyComplianceManager:
"""Manage privacy compliance requirements"""
def __init__(self):
# Define retention policies
self.retention_policies = {
"customer_service": DataRetentionPolicy(
purpose="Customer support and service improvement",
retention_days=365,
legal_basis="Legitimate interest (Article 6(1)(f))",
requires_consent=False
),
"marketing": DataRetentionPolicy(
purpose="Marketing and promotional communications",
retention_days=730,
legal_basis="Consent (Article 6(1)(a))",
requires_consent=True
),
"financial": DataRetentionPolicy(
purpose="Financial records and tax compliance",
retention_days=2555, # 7 years
legal_basis="Legal obligation (Article 6(1)(c))",
requires_consent=False
),
"analytics": DataRetentionPolicy(
purpose="Product analytics and improvement",
retention_days=90,
legal_basis="Legitimate interest (Article 6(1)(f))",
requires_consent=False
),
}
self.user_data = {} # In production, use a database
def check_retention_compliance(
self,
data_type: str,
created_date: datetime
) -> Dict:
"""Check if data should be retained or deleted"""
policy = self.retention_policies.get(data_type)
if not policy:
return {"action": "unknown", "reason": "No policy defined"}
age_days = (datetime.now() - created_date).days
should_delete = age_days > policy.retention_days
return {
"action": "delete" if should_delete else "retain",
"age_days": age_days,
"retention_days": policy.retention_days,
"days_remaining": max(0, policy.retention_days - age_days),
"reason": policy.purpose,
"legal_basis": policy.legal_basis
}
def handle_data_subject_request(
self,
request_type: Literal["access", "deletion", "portability", "rectification"],
user_id: str
) -> Dict:
"""Handle GDPR/CCPA data subject requests"""
if request_type == "access":
# Right to access (GDPR Art. 15)
return {
"status": "fulfilled",
"data": self.user_data.get(user_id, {}),
"format": "JSON",
"timestamp": datetime.now().isoformat()
}
elif request_type == "deletion":
# Right to erasure (GDPR Art. 17) / Right to delete (CCPA)
if user_id in self.user_data:
del self.user_data[user_id]
return {
"status": "deleted",
"user_id": user_id,
"timestamp": datetime.now().isoformat(),
"note": "Data removed within 30 days (may be retained in backups)"
}
return {"status": "not_found"}
elif request_type == "portability":
# Right to data portability (GDPR Art. 20)
return {
"status": "fulfilled",
"data": self.user_data.get(user_id, {}),
"format": "machine-readable JSON",
"timestamp": datetime.now().isoformat()
}
elif request_type == "rectification":
# Right to rectification (GDPR Art. 16)
return {
"status": "pending",
"message": "Please provide corrected data",
"timestamp": datetime.now().isoformat()
}
def generate_privacy_notice(self) -> str:
"""Generate privacy notice based on policies"""
notice = """# PRIVACY NOTICE
## Data We Collect and Why
"""
for data_type, policy in self.retention_policies.items():
notice += f"""### {data_type.replace('_', ' ').title()}
- **Purpose**: {policy.purpose}
- **Retention**: {policy.retention_days} days ({policy.retention_days // 365} years)
- **Legal Basis**: {policy.legal_basis}
- **Consent Required**: {'Yes' if policy.requires_consent else 'No'}
"""
notice += """## Your Rights
Under GDPR and CCPA, you have the right to:
- Access your personal data
- Request deletion of your data
- Port your data to another service
- Correct inaccurate data
- Opt-out of data selling (CCPA)
- Withdraw consent at any time
To exercise these rights, contact: privacy@example.com
"""
return notice
# Test compliance manager
compliance_mgr = PrivacyComplianceManager()
print("\nβοΈ Privacy Compliance Check\n" + "=" * 60)
# Check retention for old data
old_data_date = datetime.now() - timedelta(days=400)
result = compliance_mgr.check_retention_compliance("customer_service", old_data_date)
print(f"Customer Service Data (400 days old):")
print(f" Action: {result['action'].upper()}")
print(f" Reason: {result['reason']}")
print(f" Legal Basis: {result['legal_basis']}")
# Data subject request
print("\nπ Data Subject Request (Right to Deletion):")
deletion_result = compliance_mgr.handle_data_subject_request("deletion", "user123")
print(json.dumps(deletion_result, indent=2))
Production PII Protection Pipeline: End-to-End SystemΒΆ
The ProductionPIIProtection class combines all PII detection and anonymization components into a single process_input() method suitable for integration into any LLM application. The pipeline follows a two-stage detection strategy: a fast regex scan identifies structured PII patterns in microseconds; if any PII is detected, Presidioβs ML-based analyzer runs a deeper analysis for unstructured entities (names, addresses). This tiered approach minimizes latency for PII-free inputs (the common case) while providing thorough detection when PII is present.
Risk-level classification drives downstream handling: high-risk PII (SSNs, credit cards) triggers immediate anonymization and audit logging; medium-risk PII (emails, phone numbers) is anonymized with optional logging; low-risk content passes through unmodified. The audit log stores only metadata (PII types detected, risk level, text length) β never the actual PII values β ensuring that the monitoring system itself does not become a data liability. The get_pii_statistics() method provides aggregate analytics for compliance dashboards: what types of PII are users submitting most frequently, and are high-risk submissions increasing over time?
class ProductionPIIProtection:
"""Production-ready PII protection system"""
def __init__(
self,
anonymization_strategy: str = "replace",
enable_audit_log: bool = True
):
self.regex_detector = RegexPIIDetector()
self.presidio_detector = PresidioPIIDetector()
self.anonymization_strategy = anonymization_strategy
self.enable_audit_log = enable_audit_log
self.audit_log = []
def process_input(self, text: str, user_id: Optional[str] = None) -> Dict:
"""Process user input with PII protection"""
result = {
"original_length": len(text),
"pii_detected": False,
"pii_types": [],
"safe_text": text,
"risk_level": "low"
}
# Step 1: Quick regex check
regex_matches = self.regex_detector.detect(text)
if regex_matches:
result["pii_detected"] = True
result["pii_types"] = list(set(m.type for m in regex_matches))
# Step 2: Deep analysis with Presidio (if PII detected)
if result["pii_detected"]:
# Anonymize
result["safe_text"] = self.presidio_detector.anonymize(
text,
strategy=self.anonymization_strategy
)
# Determine risk level
high_risk_types = {"SSN", "CREDIT_CARD", "PASSWORD"}
if any(pii_type in high_risk_types for pii_type in result["pii_types"]):
result["risk_level"] = "high"
else:
result["risk_level"] = "medium"
# Step 3: Audit logging
if self.enable_audit_log and result["pii_detected"]:
self._log_pii_detection(user_id, result)
return result
def _log_pii_detection(self, user_id: Optional[str], result: Dict):
"""Log PII detection for auditing"""
log_entry = {
"timestamp": datetime.now().isoformat(),
"user_id": user_id,
"pii_types": result["pii_types"],
"risk_level": result["risk_level"],
"text_length": result["original_length"]
}
self.audit_log.append(log_entry)
def get_pii_statistics(self) -> Dict:
"""Generate statistics from audit log"""
if not self.audit_log:
return {"total_detections": 0}
pii_type_counts = {}
risk_level_counts = {"low": 0, "medium": 0, "high": 0}
for log in self.audit_log:
for pii_type in log["pii_types"]:
pii_type_counts[pii_type] = pii_type_counts.get(pii_type, 0) + 1
risk_level_counts[log["risk_level"]] += 1
return {
"total_detections": len(self.audit_log),
"pii_type_distribution": pii_type_counts,
"risk_level_distribution": risk_level_counts,
"recent_detections": self.audit_log[-5:]
}
# Test production system
pii_protection = ProductionPIIProtection(
anonymization_strategy="replace",
enable_audit_log=True
)
test_inputs = [
"Hello, how are you?", # No PII
"My email is test@example.com", # Medium risk
"My SSN is 123-45-6789 and card 4532-1234-5678-9010", # High risk
]
print("\nπ‘οΈ Production PII Protection Pipeline\n" + "=" * 70)
for i, test_input in enumerate(test_inputs, 1):
result = pii_protection.process_input(test_input, user_id=f"user{i}")
print(f"\nTest {i}:")
print(f" Input: {test_input}")
print(f" PII Detected: {result['pii_detected']}")
print(f" Risk Level: {result['risk_level'].upper()}")
if result['pii_detected']:
print(f" Types: {', '.join(result['pii_types'])}")
print(f" Safe Output: {result['safe_text']}")
print("\n\nπ PII Detection Statistics:")
stats = pii_protection.get_pii_statistics()
print(json.dumps(stats, indent=2, default=str))
Summary & Best PracticesΒΆ
Key TakeawaysΒΆ
Detect Early: Catch PII at input stage
Multiple Layers: Use both regex and ML-based detection
Choose Right Strategy: Different anonymization for different use cases
Compliance First: Build in GDPR/CCPA from day one
Audit Everything: Log PII handling for accountability
Privacy ChecklistΒΆ
PII detection at all input points
Anonymization before storage
Data retention policies defined
User consent management
Data subject request handlers
Privacy notice published
Audit logging enabled
Regular privacy audits
Encryption at rest and in transit
Access controls and authentication
Data breach response plan
Third-party data sharing agreements