# Install required packages
!pip install presidio-analyzer presidio-anonymizer spacy faker openai python-dotenv -q
!python -m spacy download en_core_web_lg -q
import re
import hashlib
from typing import Dict, List, Tuple, Optional
from dataclasses import dataclass
from presidio_analyzer import AnalyzerEngine, PatternRecognizer, Pattern
from presidio_anonymizer import AnonymizerEngine
from presidio_anonymizer.entities import OperatorConfig
from faker import Faker
import json

Understanding PII Types: A Risk-Based ClassificationΒΆ

Personally Identifiable Information (PII) spans a spectrum from direct identifiers (SSN, passport number) that uniquely identify an individual to quasi-identifiers (zip code, age, gender) that can re-identify individuals when combined. The risk classification below organizes PII into five categories: direct identifiers (highest risk – a single leaked SSN enables identity theft), contact information (medium risk – enables targeted phishing), financial data (critical risk – enables direct financial fraud), health information (protected by HIPAA with penalties up to $1.5M per violation), and online identifiers (variable risk depending on linkability).

Why PII detection matters for AI systems: LLMs can inadvertently memorize and regurgitate PII from training data, and users may unknowingly paste sensitive information into chat interfaces. A production AI system must detect and handle PII at three points: on input (prevent PII from reaching the model), during processing (anonymize PII before storing conversation logs), and on output (filter any PII the model generates from memorized training data).

PII_CATEGORIES = {
    "Direct Identifiers": [
        "Full name",
        "Social Security Number (SSN)",
        "Driver's license number",
        "Passport number",
        "Biometric data",
        "Account numbers"
    ],
    "Contact Information": [
        "Email address",
        "Phone number",
        "Physical address",
        "IP address"
    ],
    "Financial Data": [
        "Credit card numbers",
        "Bank account numbers",
        "Routing numbers",
        "Financial account credentials"
    ],
    "Health Information": [
        "Medical record numbers",
        "Health insurance numbers",
        "Diagnosis information",
        "Prescription data"
    ],
    "Online Identifiers": [
        "Username/passwords",
        "Device IDs",
        "Cookie IDs",
        "Session tokens"
    ]
}

print("πŸ“‹ PII Categories and Examples\n" + "=" * 60)
for category, examples in PII_CATEGORIES.items():
    print(f"\n{category}:")
    for example in examples:
        print(f"  β€’ {example}")

print("\n\n⚠️ Risk Levels:")
print("  πŸ”΄ HIGH: SSN, Credit Card, Medical Records")
print("  🟑 MEDIUM: Email, Phone, Address")
print("  🟒 LOW: First name only, General location")

Basic PII Detection with Regex: Fast Pattern MatchingΒΆ

Regex-based PII detection provides the fastest detection layer, running in microseconds per text and catching structured PII formats that follow predictable patterns: email addresses (user@domain.tld), US phone numbers (10 digits with optional separators), SSNs (XXX-XX-XXXX with validation constraints), credit card numbers (Luhn-checkable 13-19 digit sequences), and IP addresses. The RegexPIIDetector class below uses compiled patterns for efficiency, detects all matches with their positions in the text, and supports redaction by replacing matches with [REDACTED] tokens.

Limitations of regex-based detection: regex catches only PII with consistent formatting. A name like β€œJohn Smith” has no regex-detectable pattern, addresses vary wildly in format across countries, and medical record numbers follow institution-specific schemas. For these unstructured PII types, ML-based Named Entity Recognition (NER) is essential – which is why production systems layer regex detection (fast, high precision) with NER-based detection (slower, better recall) as shown in Part 3 with Presidio.

@dataclass
class PIIMatch:
    type: str
    value: str
    start: int
    end: int
    confidence: float

class RegexPIIDetector:
    """Pattern-based PII detection"""
    
    def __init__(self):
        self.patterns = {
            "EMAIL": re.compile(
                r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
            ),
            "PHONE": re.compile(
                r'\b(?:\+?1[-.]?)?\(?([0-9]{3})\)?[-.]?([0-9]{3})[-.]?([0-9]{4})\b'
            ),
            "SSN": re.compile(
                r'\b(?!000|666|9\d{2})([0-8]\d{2}|7([0-6]\d))[-]?(?!00)\d{2}[-]?(?!0000)\d{4}\b'
            ),
            "CREDIT_CARD": re.compile(
                r'\b(?:4[0-9]{12}(?:[0-9]{3})?|5[1-5][0-9]{14}|3[47][0-9]{13}|6(?:011|5[0-9]{2})[0-9]{12})\b'
            ),
            "IP_ADDRESS": re.compile(
                r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b'
            ),
            "DATE_OF_BIRTH": re.compile(
                r'\b(?:0?[1-9]|1[0-2])[/-](?:0?[1-9]|[12][0-9]|3[01])[/-](?:19|20)\d{2}\b'
            ),
        }
    
    def detect(self, text: str) -> List[PIIMatch]:
        """Detect PII in text using regex patterns"""
        matches = []
        
        for pii_type, pattern in self.patterns.items():
            for match in pattern.finditer(text):
                matches.append(PIIMatch(
                    type=pii_type,
                    value=match.group(),
                    start=match.start(),
                    end=match.end(),
                    confidence=0.9  # High confidence for regex matches
                ))
        
        return sorted(matches, key=lambda x: x.start)
    
    def redact(self, text: str, placeholder: str = "[REDACTED]") -> str:
        """Redact PII from text"""
        matches = self.detect(text)
        
        # Replace from end to start to preserve indices
        for match in reversed(matches):
            text = text[:match.start] + placeholder + text[match.end:]
        
        return text

# Test regex detector
detector = RegexPIIDetector()

test_text = """
Contact me at john.doe@email.com or call 555-123-4567.
My SSN is 123-45-6789 and card number is 4532-1234-5678-9010.
Server IP: 192.168.1.1, DOB: 01/15/1990
"""

print("\nπŸ” Regex PII Detection Results\n" + "=" * 60)
print(f"Original Text:\n{test_text}")

matches = detector.detect(test_text)
print(f"\nDetected PII ({len(matches)} matches):")
for match in matches:
    print(f"  β€’ {match.type}: '{match.value}' (confidence: {match.confidence})")

redacted = detector.redact(test_text)
print(f"\nRedacted Text:\n{redacted}")

Advanced PII Detection with Presidio: ML-Powered Entity RecognitionΒΆ

Microsoft Presidio combines spaCy’s NER models (which detect names, locations, and organizations from context) with customizable pattern recognizers and configurable anonymization strategies. Unlike regex, Presidio’s ML backbone can identify β€œJohn Smith” as a PERSON entity even without structural formatting cues, and can distinguish between β€œWashington” the person and β€œWashington” the city based on surrounding context. The AnalyzerEngine orchestrates multiple recognizers (regex + NER + custom) and returns entity spans with confidence scores.

Custom recognizers extend Presidio’s built-in capabilities for domain-specific PII. The example below adds an employee ID recognizer (pattern: EMP-XXXXXX) that standard NER models would not detect. In production, add recognizers for your organization’s internal identifiers, customer account numbers, and domain-specific sensitive fields. The AnonymizerEngine then applies configurable strategies (replace, mask, hash, or pseudonymize) to each detected entity, and the choice of strategy depends on the downstream use case – analysis requires pseudonymization (preserving statistical properties), while logging requires irreversible hashing.

class PresidioPIIDetector:
    """Advanced PII detection using Microsoft Presidio"""
    
    def __init__(self):
        self.analyzer = AnalyzerEngine()
        self.anonymizer = AnonymizerEngine()
        
        # Add custom recognizers
        self._add_custom_recognizers()
    
    def _add_custom_recognizers(self):
        """Add custom pattern recognizers"""
        # Example: Employee ID pattern
        employee_id_pattern = Pattern(
            name="employee_id_pattern",
            regex=r"\bEMP-\d{6}\b",
            score=0.8
        )
        employee_id_recognizer = PatternRecognizer(
            supported_entity="EMPLOYEE_ID",
            patterns=[employee_id_pattern]
        )
        self.analyzer.registry.add_recognizer(employee_id_recognizer)
    
    def analyze(self, text: str, language: str = "en") -> List[Dict]:
        """Analyze text for PII"""
        results = self.analyzer.analyze(
            text=text,
            language=language,
            score_threshold=0.5
        )
        
        return [
            {
                "type": result.entity_type,
                "start": result.start,
                "end": result.end,
                "score": result.score,
                "text": text[result.start:result.end]
            }
            for result in results
        ]
    
    def anonymize(
        self,
        text: str,
        strategy: str = "replace",
        mask_char: str = "*"
    ) -> str:
        """Anonymize PII in text"""
        # Analyze first
        analysis_results = self.analyzer.analyze(
            text=text,
            language="en"
        )
        
        # Define anonymization operators
        operators = {}
        if strategy == "replace":
            for result in analysis_results:
                operators[result.entity_type] = OperatorConfig(
                    "replace",
                    {"new_value": f"<{result.entity_type}>"}
                )
        elif strategy == "mask":
            for result in analysis_results:
                operators[result.entity_type] = OperatorConfig(
                    "mask",
                    {"masking_char": mask_char, "chars_to_mask": 100, "from_end": False}
                )
        elif strategy == "hash":
            for result in analysis_results:
                operators[result.entity_type] = OperatorConfig("hash", {})
        
        # Anonymize
        anonymized = self.anonymizer.anonymize(
            text=text,
            analyzer_results=analysis_results,
            operators=operators
        )
        
        return anonymized.text
    
    def pseudonymize(self, text: str) -> Tuple[str, Dict]:
        """Replace PII with fake but realistic values"""
        faker = Faker()
        analysis_results = self.analyzer.analyze(text=text, language="en")
        
        # Map original values to pseudonyms
        mapping = {}
        
        # Define operators with fake data
        operators = {
            "PERSON": OperatorConfig("replace", {"new_value": faker.name()}),
            "EMAIL_ADDRESS": OperatorConfig("replace", {"new_value": faker.email()}),
            "PHONE_NUMBER": OperatorConfig("replace", {"new_value": faker.phone_number()}),
            "LOCATION": OperatorConfig("replace", {"new_value": faker.city()}),
            "CREDIT_CARD": OperatorConfig("replace", {"new_value": faker.credit_card_number()}),
        }
        
        anonymized = self.anonymizer.anonymize(
            text=text,
            analyzer_results=analysis_results,
            operators=operators
        )
        
        # Build mapping
        for result in analysis_results:
            original = text[result.start:result.end]
            mapping[original] = result.entity_type
        
        return anonymized.text, mapping

print("βœ… Presidio PII Detector initialized")
print("Supports: PERSON, EMAIL, PHONE, SSN, CREDIT_CARD, LOCATION, and more")

Anonymization Strategies: Choosing the Right ApproachΒΆ

Four anonymization strategies serve different use cases. Replace substitutes PII with entity-type labels (<PERSON>, <EMAIL_ADDRESS>), preserving sentence structure while completely removing identifying information – ideal for displaying redacted text to users. Mask overwrites characters with a placeholder symbol (****), preserving the visual length of the original value – useful for receipts and confirmations where users expect to see a partially visible value. Hash applies SHA-256 to produce a deterministic, irreversible fingerprint – useful for deduplication and joining records across datasets without exposing the original PII. Pseudonymize replaces real PII with realistic fake values (via the Faker library), preserving statistical properties and readability – essential for creating realistic test datasets and for analytics where demographic distributions must remain valid.

Reversibility spectrum: replacement and masking are irreversible by design (the original value is destroyed). Hashing is deterministic but computationally irreversible (the same input always produces the same hash, enabling record linkage, but you cannot recover the input from the hash). Pseudonymization with a stored mapping is reversible (the mapping can re-identify individuals), which means pseudonymized data is still considered personal data under GDPR and requires appropriate safeguards.

# Test different anonymization strategies
presidio_detector = PresidioPIIDetector()

sample_text = """
Hi, I'm John Smith from New York. 
You can reach me at john.smith@company.com or 555-0123.
My employee ID is EMP-123456.
"""

print("\nπŸ“ Anonymization Strategy Comparison\n" + "=" * 70)
print(f"Original Text:\n{sample_text}")

# Note: Uncomment to run (requires spacy model)
# print("\n1️⃣ Replace Strategy:")
# replaced = presidio_detector.anonymize(sample_text, strategy="replace")
# print(replaced)

# print("\n2️⃣ Mask Strategy:")
# masked = presidio_detector.anonymize(sample_text, strategy="mask", mask_char="*")
# print(masked)

# print("\n3️⃣ Hash Strategy:")
# hashed = presidio_detector.anonymize(sample_text, strategy="hash")
# print(hashed)

# print("\n4️⃣ Pseudonymize Strategy:")
# pseudonymized, mapping = presidio_detector.pseudonymize(sample_text)
# print(pseudonymized)
# print(f"\nMapping: {json.dumps(mapping, indent=2)}")

# Example outputs
print("""
Expected Outputs:

1️⃣ Replace:
Hi, I'm <PERSON> from <LOCATION>.
You can reach me at <EMAIL_ADDRESS> or <PHONE_NUMBER>.
My employee ID is <EMPLOYEE_ID>.

2️⃣ Mask:
Hi, I'm ********** from ********.
You can reach me at ********************** or ********.
My employee ID is **********.

3️⃣ Hash:
Hi, I'm 5d41402abc4b2a76b9719d911017c592 from 8b1a9953c4611296a827abf8c47804d7.
You can reach me at 098f6bcd4621d373cade4e832627b4f6 or e4da3b7fbbce2345d7772b0674a318d5.

4️⃣ Pseudonymize:
Hi, I'm Jennifer Martinez from Phoenix.
You can reach me at robert.johnson@example.com or 555-9876.
My employee ID is EMP-987654.
""")

Privacy Compliance: GDPR and CCPA RequirementsΒΆ

Privacy regulations impose legal obligations on how AI systems handle personal data. GDPR (EU) requires a lawful basis for processing (consent, legitimate interest, legal obligation, etc.), data minimization (collect only what is necessary), purpose limitation (use data only for stated purposes), and data subject rights (access, deletion, portability, rectification). CCPA (California) grants consumers the right to know what data is collected, delete their data, opt out of data selling, and non-discrimination for exercising these rights.

The PrivacyComplianceManager below implements data retention policies with automatic expiration checks, data subject request handlers for all four GDPR rights, and automated privacy notice generation. Key implementation details: retention periods vary by purpose (90 days for analytics, 7 years for financial records due to tax law), the legal basis determines whether consent is required (marketing requires explicit consent; fraud prevention uses legitimate interest), and deletion requests must propagate to all copies including backups (with a documented timeline for backup purging).

from datetime import datetime, timedelta
from typing import Literal

@dataclass
class DataRetentionPolicy:
    purpose: str
    retention_days: int
    legal_basis: str  # GDPR Article 6 basis
    requires_consent: bool

class PrivacyComplianceManager:
    """Manage privacy compliance requirements"""
    
    def __init__(self):
        # Define retention policies
        self.retention_policies = {
            "customer_service": DataRetentionPolicy(
                purpose="Customer support and service improvement",
                retention_days=365,
                legal_basis="Legitimate interest (Article 6(1)(f))",
                requires_consent=False
            ),
            "marketing": DataRetentionPolicy(
                purpose="Marketing and promotional communications",
                retention_days=730,
                legal_basis="Consent (Article 6(1)(a))",
                requires_consent=True
            ),
            "financial": DataRetentionPolicy(
                purpose="Financial records and tax compliance",
                retention_days=2555,  # 7 years
                legal_basis="Legal obligation (Article 6(1)(c))",
                requires_consent=False
            ),
            "analytics": DataRetentionPolicy(
                purpose="Product analytics and improvement",
                retention_days=90,
                legal_basis="Legitimate interest (Article 6(1)(f))",
                requires_consent=False
            ),
        }
        
        self.user_data = {}  # In production, use a database
    
    def check_retention_compliance(
        self,
        data_type: str,
        created_date: datetime
    ) -> Dict:
        """Check if data should be retained or deleted"""
        policy = self.retention_policies.get(data_type)
        if not policy:
            return {"action": "unknown", "reason": "No policy defined"}
        
        age_days = (datetime.now() - created_date).days
        should_delete = age_days > policy.retention_days
        
        return {
            "action": "delete" if should_delete else "retain",
            "age_days": age_days,
            "retention_days": policy.retention_days,
            "days_remaining": max(0, policy.retention_days - age_days),
            "reason": policy.purpose,
            "legal_basis": policy.legal_basis
        }
    
    def handle_data_subject_request(
        self,
        request_type: Literal["access", "deletion", "portability", "rectification"],
        user_id: str
    ) -> Dict:
        """Handle GDPR/CCPA data subject requests"""
        
        if request_type == "access":
            # Right to access (GDPR Art. 15)
            return {
                "status": "fulfilled",
                "data": self.user_data.get(user_id, {}),
                "format": "JSON",
                "timestamp": datetime.now().isoformat()
            }
        
        elif request_type == "deletion":
            # Right to erasure (GDPR Art. 17) / Right to delete (CCPA)
            if user_id in self.user_data:
                del self.user_data[user_id]
                return {
                    "status": "deleted",
                    "user_id": user_id,
                    "timestamp": datetime.now().isoformat(),
                    "note": "Data removed within 30 days (may be retained in backups)"
                }
            return {"status": "not_found"}
        
        elif request_type == "portability":
            # Right to data portability (GDPR Art. 20)
            return {
                "status": "fulfilled",
                "data": self.user_data.get(user_id, {}),
                "format": "machine-readable JSON",
                "timestamp": datetime.now().isoformat()
            }
        
        elif request_type == "rectification":
            # Right to rectification (GDPR Art. 16)
            return {
                "status": "pending",
                "message": "Please provide corrected data",
                "timestamp": datetime.now().isoformat()
            }
    
    def generate_privacy_notice(self) -> str:
        """Generate privacy notice based on policies"""
        notice = """# PRIVACY NOTICE

## Data We Collect and Why

"""
        for data_type, policy in self.retention_policies.items():
            notice += f"""### {data_type.replace('_', ' ').title()}
- **Purpose**: {policy.purpose}
- **Retention**: {policy.retention_days} days ({policy.retention_days // 365} years)
- **Legal Basis**: {policy.legal_basis}
- **Consent Required**: {'Yes' if policy.requires_consent else 'No'}

"""
        
        notice += """## Your Rights

Under GDPR and CCPA, you have the right to:
- Access your personal data
- Request deletion of your data
- Port your data to another service
- Correct inaccurate data
- Opt-out of data selling (CCPA)
- Withdraw consent at any time

To exercise these rights, contact: privacy@example.com
"""
        return notice

# Test compliance manager
compliance_mgr = PrivacyComplianceManager()

print("\nβš–οΈ Privacy Compliance Check\n" + "=" * 60)

# Check retention for old data
old_data_date = datetime.now() - timedelta(days=400)
result = compliance_mgr.check_retention_compliance("customer_service", old_data_date)
print(f"Customer Service Data (400 days old):")
print(f"  Action: {result['action'].upper()}")
print(f"  Reason: {result['reason']}")
print(f"  Legal Basis: {result['legal_basis']}")

# Data subject request
print("\nπŸ“‹ Data Subject Request (Right to Deletion):")
deletion_result = compliance_mgr.handle_data_subject_request("deletion", "user123")
print(json.dumps(deletion_result, indent=2))

Production PII Protection Pipeline: End-to-End SystemΒΆ

The ProductionPIIProtection class combines all PII detection and anonymization components into a single process_input() method suitable for integration into any LLM application. The pipeline follows a two-stage detection strategy: a fast regex scan identifies structured PII patterns in microseconds; if any PII is detected, Presidio’s ML-based analyzer runs a deeper analysis for unstructured entities (names, addresses). This tiered approach minimizes latency for PII-free inputs (the common case) while providing thorough detection when PII is present.

Risk-level classification drives downstream handling: high-risk PII (SSNs, credit cards) triggers immediate anonymization and audit logging; medium-risk PII (emails, phone numbers) is anonymized with optional logging; low-risk content passes through unmodified. The audit log stores only metadata (PII types detected, risk level, text length) – never the actual PII values – ensuring that the monitoring system itself does not become a data liability. The get_pii_statistics() method provides aggregate analytics for compliance dashboards: what types of PII are users submitting most frequently, and are high-risk submissions increasing over time?

class ProductionPIIProtection:
    """Production-ready PII protection system"""
    
    def __init__(
        self,
        anonymization_strategy: str = "replace",
        enable_audit_log: bool = True
    ):
        self.regex_detector = RegexPIIDetector()
        self.presidio_detector = PresidioPIIDetector()
        self.anonymization_strategy = anonymization_strategy
        self.enable_audit_log = enable_audit_log
        self.audit_log = []
    
    def process_input(self, text: str, user_id: Optional[str] = None) -> Dict:
        """Process user input with PII protection"""
        result = {
            "original_length": len(text),
            "pii_detected": False,
            "pii_types": [],
            "safe_text": text,
            "risk_level": "low"
        }
        
        # Step 1: Quick regex check
        regex_matches = self.regex_detector.detect(text)
        if regex_matches:
            result["pii_detected"] = True
            result["pii_types"] = list(set(m.type for m in regex_matches))
        
        # Step 2: Deep analysis with Presidio (if PII detected)
        if result["pii_detected"]:
            # Anonymize
            result["safe_text"] = self.presidio_detector.anonymize(
                text,
                strategy=self.anonymization_strategy
            )
            
            # Determine risk level
            high_risk_types = {"SSN", "CREDIT_CARD", "PASSWORD"}
            if any(pii_type in high_risk_types for pii_type in result["pii_types"]):
                result["risk_level"] = "high"
            else:
                result["risk_level"] = "medium"
        
        # Step 3: Audit logging
        if self.enable_audit_log and result["pii_detected"]:
            self._log_pii_detection(user_id, result)
        
        return result
    
    def _log_pii_detection(self, user_id: Optional[str], result: Dict):
        """Log PII detection for auditing"""
        log_entry = {
            "timestamp": datetime.now().isoformat(),
            "user_id": user_id,
            "pii_types": result["pii_types"],
            "risk_level": result["risk_level"],
            "text_length": result["original_length"]
        }
        self.audit_log.append(log_entry)
    
    def get_pii_statistics(self) -> Dict:
        """Generate statistics from audit log"""
        if not self.audit_log:
            return {"total_detections": 0}
        
        pii_type_counts = {}
        risk_level_counts = {"low": 0, "medium": 0, "high": 0}
        
        for log in self.audit_log:
            for pii_type in log["pii_types"]:
                pii_type_counts[pii_type] = pii_type_counts.get(pii_type, 0) + 1
            risk_level_counts[log["risk_level"]] += 1
        
        return {
            "total_detections": len(self.audit_log),
            "pii_type_distribution": pii_type_counts,
            "risk_level_distribution": risk_level_counts,
            "recent_detections": self.audit_log[-5:]
        }

# Test production system
pii_protection = ProductionPIIProtection(
    anonymization_strategy="replace",
    enable_audit_log=True
)

test_inputs = [
    "Hello, how are you?",  # No PII
    "My email is test@example.com",  # Medium risk
    "My SSN is 123-45-6789 and card 4532-1234-5678-9010",  # High risk
]

print("\nπŸ›‘οΈ Production PII Protection Pipeline\n" + "=" * 70)
for i, test_input in enumerate(test_inputs, 1):
    result = pii_protection.process_input(test_input, user_id=f"user{i}")
    print(f"\nTest {i}:")
    print(f"  Input: {test_input}")
    print(f"  PII Detected: {result['pii_detected']}")
    print(f"  Risk Level: {result['risk_level'].upper()}")
    if result['pii_detected']:
        print(f"  Types: {', '.join(result['pii_types'])}")
        print(f"  Safe Output: {result['safe_text']}")

print("\n\nπŸ“Š PII Detection Statistics:")
stats = pii_protection.get_pii_statistics()
print(json.dumps(stats, indent=2, default=str))

Summary & Best PracticesΒΆ

Key TakeawaysΒΆ

  1. Detect Early: Catch PII at input stage

  2. Multiple Layers: Use both regex and ML-based detection

  3. Choose Right Strategy: Different anonymization for different use cases

  4. Compliance First: Build in GDPR/CCPA from day one

  5. Audit Everything: Log PII handling for accountability

Privacy ChecklistΒΆ

  • PII detection at all input points

  • Anonymization before storage

  • Data retention policies defined

  • User consent management

  • Data subject request handlers

  • Privacy notice published

  • Audit logging enabled

  • Regular privacy audits

  • Encryption at rest and in transit

  • Access controls and authentication

  • Data breach response plan

  • Third-party data sharing agreements

ResourcesΒΆ