# Install required packages
!pip install openai python-dotenv detoxify transformers -q
import os
import re
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from openai import OpenAI
from dotenv import load_dotenv

load_dotenv()
client = OpenAI(api_key=os.getenv("OPENAI_API_KEY"))

OpenAI Moderation API: Production-Grade Content ClassificationΒΆ

OpenAI’s Moderation endpoint is a free, purpose-built classifier that evaluates text against 11 content policy categories (hate, harassment, self-harm, sexual, violence, and their subcategories). Each category returns both a boolean flag and a continuous confidence score, enabling nuanced policy enforcement – you might warn at 0.5 confidence but block at 0.8. The endpoint is designed for high throughput and low latency, making it suitable as a pre-filter before expensive LLM calls.

Why use the Moderation API instead of the LLM itself: LLMs can be manipulated into ignoring safety instructions (the entire prompt injection problem), but the Moderation endpoint is a separate, hardened classifier that cannot be influenced by user input. The β€œfail secure” pattern shown below – flagging content on API errors – ensures that network failures or service outages do not accidentally allow harmful content through. In production, combine the Moderation API with application-specific filters for domain-specific content policies that OpenAI’s general categories do not cover.

@dataclass
class ModerationResult:
    flagged: bool
    categories: Dict[str, bool]
    category_scores: Dict[str, float]
    highest_risk_category: str
    highest_risk_score: float

class OpenAIModerator:
    """Wrapper for OpenAI Moderation API"""
    
    def __init__(self, client: OpenAI):
        self.client = client
        self.category_descriptions = {
            "hate": "Content that expresses, incites, or promotes hate based on identity",
            "hate/threatening": "Hateful content that also includes violence or serious harm",
            "harassment": "Content that expresses, incites, or promotes harassing language",
            "harassment/threatening": "Harassment content that also includes violence or serious harm",
            "self-harm": "Content that promotes, encourages, or depicts acts of self-harm",
            "self-harm/intent": "Content where the speaker expresses intent to engage in self-harm",
            "self-harm/instructions": "Content that encourages or provides instructions for self-harm",
            "sexual": "Content meant to arouse sexual excitement",
            "sexual/minors": "Sexual content involving individuals under 18",
            "violence": "Content that depicts death, violence, or physical injury",
            "violence/graphic": "Content that depicts death, violence, or physical injury in graphic detail"
        }
    
    def moderate(self, text: str) -> ModerationResult:
        """Check content using OpenAI Moderation API"""
        try:
            response = self.client.moderations.create(input=text)
            result = response.results[0]
            
            # Find highest risk category
            scores = result.category_scores.model_dump()
            highest_category = max(scores, key=scores.get)
            highest_score = scores[highest_category]
            
            return ModerationResult(
                flagged=result.flagged,
                categories=result.categories.model_dump(),
                category_scores=scores,
                highest_risk_category=highest_category,
                highest_risk_score=highest_score
            )
        except Exception as e:
            print(f"Error in moderation: {e}")
            # Fail secure: flag on error
            return ModerationResult(
                flagged=True,
                categories={},
                category_scores={},
                highest_risk_category="error",
                highest_risk_score=1.0
            )
    
    def print_result(self, text: str, result: ModerationResult):
        """Pretty print moderation results"""
        status = "πŸ”΄ FLAGGED" if result.flagged else "βœ… SAFE"
        print(f"\n{status}")
        print(f"Text: {text[:80]}..." if len(text) > 80 else f"Text: {text}")
        print(f"Highest Risk: {result.highest_risk_category} ({result.highest_risk_score:.3f})")
        
        if result.flagged:
            print("\nFlagged Categories:")
            for category, flagged in result.categories.items():
                if flagged:
                    score = result.category_scores[category]
                    desc = self.category_descriptions.get(category, "")
                    print(f"  β€’ {category}: {score:.3f} - {desc}")

# Test with sample content
moderator = OpenAIModerator(client)

test_samples = [
    "I love learning about AI and machine learning!",
    "I want to harm myself",  # Triggers self-harm
    "How can I build a chatbot?",
]

print("πŸ” OpenAI Moderation API Demo\n" + "=" * 60)

# Note: Uncomment to run (requires API key)
# for sample in test_samples:
#     result = moderator.moderate(sample)
#     moderator.print_result(sample, result)

print("""\nExample Output:
βœ… SAFE
Text: I love learning about AI and machine learning!
Highest Risk: violence (0.001)

πŸ”΄ FLAGGED
Text: I want to harm myself
Highest Risk: self-harm (0.985)

Flagged Categories:
  β€’ self-harm: 0.985 - Content that promotes, encourages, or depicts acts of self-harm
  β€’ self-harm/intent: 0.892 - Content where the speaker expresses intent to engage in self-harm
""")

Toxicity Detection with Detoxify: Local ML-Based AnalysisΒΆ

Detoxify provides open-source toxicity classification models that run locally without API calls, offering three variants: β€œoriginal” (trained on the Jigsaw Toxic Comment dataset), β€œunbiased” (debiased to reduce false positives on identity-related terms), and β€œmultilingual” (supports 7 languages). Each variant scores text across six dimensions: toxicity, severe toxicity, obscenity, threat, insult, and identity attack, with configurable thresholds per category.

When to use Detoxify over the OpenAI Moderation API: Detoxify runs entirely on your infrastructure with no external API dependency, which is critical for applications with strict data residency requirements (healthcare, government, GDPR-sensitive workloads) or offline environments. The β€œunbiased” model variant is particularly important for applications serving diverse user bases – standard toxicity classifiers often have higher false positive rates for text mentioning marginalized identities (e.g., β€œI’m a gay man” flagged as toxic), and the debiased variant specifically addresses this problem.

from detoxify import Detoxify

class ToxicityDetector:
    """Detect toxicity using Detoxify models"""
    
    def __init__(self, model_name: str = "original"):
        """
        Initialize detector
        
        Args:
            model_name: 'original', 'unbiased', or 'multilingual'
        """
        self.model = Detoxify(model_name)
        self.thresholds = {
            "toxicity": 0.7,
            "severe_toxicity": 0.5,
            "obscene": 0.7,
            "threat": 0.6,
            "insult": 0.7,
            "identity_attack": 0.6
        }
    
    def analyze(self, text: str) -> Dict:
        """Analyze text for toxicity"""
        scores = self.model.predict(text)
        
        # Check which categories exceed thresholds
        violations = {}
        for category, score in scores.items():
            threshold = self.thresholds.get(category, 0.7)
            if score > threshold:
                violations[category] = score
        
        return {
            "scores": scores,
            "violations": violations,
            "is_toxic": len(violations) > 0,
            "max_score": max(scores.values()),
            "max_category": max(scores, key=scores.get)
        }
    
    def set_thresholds(self, thresholds: Dict[str, float]):
        """Update detection thresholds"""
        self.thresholds.update(thresholds)

# Example usage
print("⚠️ Toxicity Detection with Detoxify")
print("This would use ML models to detect toxic content")
print("Categories: toxicity, severe_toxicity, obscene, threat, insult, identity_attack")

# Uncomment to run (downloads ~100MB model)
# detector = ToxicityDetector()
# result = detector.analyze("You are such an idiot!")
# print(result)

Custom Content Filters: Domain-Specific Safety RulesΒΆ

General-purpose moderation APIs cover broad safety categories but miss application-specific policies. A customer support chatbot for a pharmacy needs to flag self-medication advice; a children’s education platform needs stricter content thresholds than an adult-oriented creative writing tool. The CustomContentFilter below combines keyword blocklists (organized by category: violence, adult, spam, PII) with regex patterns for structured data detection (emails, phone numbers, SSNs, credit cards).

Design principles for custom filters: maintain an allowlist for terms that trigger false positives in your domain (a medical chatbot should not block β€œinjection” or β€œneedle”), use category-based organization so different policy violations trigger different response strategies (spam gets a warning, PII gets redacted, violence gets escalated), and enable PII checking as a toggle so the same filter class works for both input validation (where PII detection is critical) and internal content routing (where PII checking adds unnecessary latency).

class CustomContentFilter:
    """Custom content filtering with keyword lists and patterns"""
    
    def __init__(self):
        # Blocked keywords (in practice, load from file)
        self.blocked_keywords = {
            "violence": ["kill", "murder", "attack", "weapon"],
            "adult": ["nsfw", "explicit"],
            "spam": ["click here", "buy now", "limited offer"],
            "personal_info": ["ssn", "social security", "credit card"]
        }
        
        # Regex patterns for more complex matching
        self.patterns = {
            "email": re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'),
            "phone": re.compile(r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b'),
            "ssn": re.compile(r'\b\d{3}-\d{2}-\d{4}\b'),
            "url": re.compile(r'http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+'),
        }
        
        self.allow_list = set()  # Whitelisted terms
    
    def filter(self, text: str, check_pii: bool = True) -> Dict:
        """Apply custom content filters"""
        results = {
            "safe": True,
            "blocked_keywords": [],
            "detected_patterns": [],
            "reason": None
        }
        
        text_lower = text.lower()
        
        # Check blocked keywords
        for category, keywords in self.blocked_keywords.items():
            for keyword in keywords:
                if keyword in text_lower and keyword not in self.allow_list:
                    results["safe"] = False
                    results["blocked_keywords"].append({"keyword": keyword, "category": category})
        
        # Check patterns (e.g., PII)
        if check_pii:
            for pattern_name, pattern in self.patterns.items():
                matches = pattern.findall(text)
                if matches:
                    results["safe"] = False
                    results["detected_patterns"].append({
                        "type": pattern_name,
                        "count": len(matches)
                    })
        
        if not results["safe"]:
            reasons = []
            if results["blocked_keywords"]:
                reasons.append(f"{len(results['blocked_keywords'])} blocked keywords")
            if results["detected_patterns"]:
                reasons.append(f"{len(results['detected_patterns'])} PII patterns")
            results["reason"] = ", ".join(reasons)
        
        return results
    
    def add_to_allowlist(self, terms: List[str]):
        """Add terms to allow list (won't be blocked)"""
        self.allow_list.update(terms)
    
    def add_blocked_keywords(self, category: str, keywords: List[str]):
        """Add new blocked keywords to a category"""
        if category not in self.blocked_keywords:
            self.blocked_keywords[category] = []
        self.blocked_keywords[category].extend(keywords)

# Test custom filter
filter_system = CustomContentFilter()

test_cases = [
    "How do I build a chatbot?",
    "My email is john@example.com",
    "I want to attack this problem",
    "Click here for limited offer!",
]

print("\nπŸ”Ž Custom Content Filter Results\n" + "=" * 60)
for test in test_cases:
    result = filter_system.filter(test)
    status = "βœ… SAFE" if result["safe"] else "πŸ”΄ BLOCKED"
    print(f"\n{status}")
    print(f"Text: {test}")
    if not result["safe"]:
        print(f"Reason: {result['reason']}")
        if result["blocked_keywords"]:
            for item in result["blocked_keywords"]:
                print(f"  β€’ Keyword '{item['keyword']}' ({item['category']})")
        if result["detected_patterns"]:
            for item in result["detected_patterns"]:
                print(f"  β€’ Pattern '{item['type']}' ({item['count']} matches)")

Multi-Layer Content Moderation: Defense in Depth for SafetyΒΆ

Multi-layer moderation chains multiple detection systems from fast-and-local to slow-and-comprehensive: (1) custom keyword/regex filters run in microseconds with zero API latency, catching obvious violations immediately; (2) Detoxify’s ML-based toxicity classifier runs locally in milliseconds, catching nuanced harmful language that keywords miss; (3) the OpenAI Moderation API provides a final external validation. The check_all_layers parameter controls whether to short-circuit on the first failure (fast, for high-traffic applications) or run all layers regardless (thorough, for audit and reporting).

Why layered moderation outperforms any single system: keyword filters have low recall (miss rephrased harmful content), ML classifiers have imperfect precision (flag benign content as harmful), and API-based systems have latency and availability risks. Combining all three with an β€œany-layer-flags” union strategy maximizes recall (catching the most harmful content), while the overall_risk score enables downstream systems to differentiate between borderline cases (flagged by one layer at low confidence) and clear violations (flagged by multiple layers at high confidence).

class MultiLayerModerator:
    """Combine multiple moderation techniques"""
    
    def __init__(
        self,
        openai_client: OpenAI,
        use_toxicity: bool = False,
        custom_filter: Optional[CustomContentFilter] = None
    ):
        self.openai_moderator = OpenAIModerator(openai_client)
        self.custom_filter = custom_filter or CustomContentFilter()
        self.use_toxicity = use_toxicity
        
        if use_toxicity:
            self.toxicity_detector = ToxicityDetector()
    
    def moderate(self, text: str, check_all_layers: bool = True) -> Dict:
        """Run text through multiple moderation layers"""
        results = {
            "safe": True,
            "layers": {},
            "overall_risk": 0.0,
            "blocked_by": []
        }
        
        # Layer 1: Custom Filter (fast, local)
        custom_result = self.custom_filter.filter(text)
        results["layers"]["custom_filter"] = custom_result
        if not custom_result["safe"]:
            results["safe"] = False
            results["blocked_by"].append("custom_filter")
            results["overall_risk"] = max(results["overall_risk"], 0.8)
            
            if not check_all_layers:
                return results  # Fast fail
        
        # Layer 2: Toxicity Detection (if enabled)
        if self.use_toxicity:
            try:
                toxicity_result = self.toxicity_detector.analyze(text)
                results["layers"]["toxicity"] = toxicity_result
                if toxicity_result["is_toxic"]:
                    results["safe"] = False
                    results["blocked_by"].append("toxicity")
                    results["overall_risk"] = max(results["overall_risk"], toxicity_result["max_score"])
                    
                    if not check_all_layers:
                        return results
            except Exception as e:
                results["layers"]["toxicity"] = {"error": str(e)}
        
        # Layer 3: OpenAI Moderation (API call)
        try:
            openai_result = self.openai_moderator.moderate(text)
            results["layers"]["openai"] = openai_result
            if openai_result.flagged:
                results["safe"] = False
                results["blocked_by"].append("openai")
                results["overall_risk"] = max(results["overall_risk"], openai_result.highest_risk_score)
        except Exception as e:
            results["layers"]["openai"] = {"error": str(e)}
        
        return results
    
    def print_detailed_results(self, text: str, results: Dict):
        """Print comprehensive moderation results"""
        print("\n" + "=" * 70)
        status = "βœ… SAFE" if results["safe"] else "πŸ”΄ BLOCKED"
        print(f"{status} | Overall Risk: {results['overall_risk']:.3f}")
        print(f"Text: {text}\n")
        
        if results["blocked_by"]:
            print(f"Blocked by: {', '.join(results['blocked_by'])}\n")
        
        print("Layer Results:")
        for layer_name, layer_result in results["layers"].items():
            print(f"\n  {layer_name.upper()}:")
            if isinstance(layer_result, dict):
                if "error" in layer_result:
                    print(f"    Error: {layer_result['error']}")
                elif layer_name == "custom_filter":
                    print(f"    Safe: {layer_result['safe']}")
                    if layer_result.get("reason"):
                        print(f"    Reason: {layer_result['reason']}")
                elif hasattr(layer_result, 'flagged'):
                    print(f"    Flagged: {layer_result.flagged}")
                    if layer_result.flagged:
                        print(f"    Category: {layer_result.highest_risk_category} ({layer_result.highest_risk_score:.3f})")

# Example usage
multi_moderator = MultiLayerModerator(
    openai_client=client,
    use_toxicity=False,  # Set to True to use Detoxify
    custom_filter=filter_system
)

print("\nπŸ›‘οΈ Multi-Layer Moderation System")
print("Layers: Custom Filter β†’ Toxicity Detection β†’ OpenAI Moderation")
print("\nThis provides defense-in-depth for content safety")

Moderation Policies and Response Strategies: Beyond Binary BlockingΒΆ

Effective content moderation is not binary (allow/block) but graduated: different violation types and severities warrant different responses. The ModerationPolicy class maps each category-severity combination to one of five actions: ALLOW (proceed normally), WARN (add a disclaimer but continue), BLOCK (refuse and explain why), ESCALATE (flag for human review and potentially alert safety teams), or REPHRASE (modify the response to remove problematic elements while preserving useful content).

Self-harm is a critical special case: unlike other categories where blocking is appropriate, self-harm content requires an empathetic response that provides crisis resources (988 Suicide & Crisis Lifeline in the US). Even low-severity self-harm signals should trigger resource provision rather than a generic rejection. The policy framework below encodes these nuanced response strategies as configurable rules, allowing different applications to implement the same moderation engine with different policy configurations (a mental health app would have more sensitive self-harm thresholds than a general chatbot).

from enum import Enum
from typing import Callable

class ModerationAction(Enum):
    ALLOW = "allow"
    WARN = "warn"
    BLOCK = "block"
    ESCALATE = "escalate"
    REPHRASE = "rephrase"

class ModerationPolicy:
    """Define policies for handling different types of content violations"""
    
    def __init__(self):
        # Define policies by category and severity
        self.policies = {
            "violence": {
                "low": (ModerationAction.WARN, "Please avoid violent language"),
                "medium": (ModerationAction.BLOCK, "Content contains violent themes"),
                "high": (ModerationAction.ESCALATE, "Severe violence detected - reporting")
            },
            "hate": {
                "low": (ModerationAction.WARN, "Please be respectful"),
                "medium": (ModerationAction.BLOCK, "Hateful content not allowed"),
                "high": (ModerationAction.ESCALATE, "Severe hate speech - reporting")
            },
            "self-harm": {
                "low": (ModerationAction.REPHRASE, "If you're struggling, please reach out: 988 Suicide Hotline"),
                "medium": (ModerationAction.ESCALATE, "Self-harm content detected - providing resources"),
                "high": (ModerationAction.ESCALATE, "Immediate self-harm risk - escalating")
            },
            "pii": {
                "low": (ModerationAction.WARN, "Avoid sharing personal information"),
                "medium": (ModerationAction.REPHRASE, "Removing personal information"),
                "high": (ModerationAction.BLOCK, "Cannot process sensitive personal data")
            },
            "spam": {
                "low": (ModerationAction.WARN, "This looks like spam"),
                "medium": (ModerationAction.BLOCK, "Spam detected"),
                "high": (ModerationAction.ESCALATE, "Repeated spam - user may be blocked")
            }
        }
        
        self.response_templates = {
            ModerationAction.ALLOW: "Content processed normally",
            ModerationAction.WARN: "⚠️ Warning: {message}",
            ModerationAction.BLOCK: "πŸ”΄ Blocked: {message}",
            ModerationAction.ESCALATE: "🚨 Escalated: {message}",
            ModerationAction.REPHRASE: "✏️ Rephrased: {message}"
        }
    
    def get_severity(self, score: float) -> str:
        """Determine severity level from score"""
        if score < 0.5:
            return "low"
        elif score < 0.8:
            return "medium"
        else:
            return "high"
    
    def determine_action(
        self,
        category: str,
        score: float
    ) -> Tuple[ModerationAction, str]:
        """Determine what action to take for a violation"""
        severity = self.get_severity(score)
        
        # Get policy for this category and severity
        if category in self.policies:
            if severity in self.policies[category]:
                return self.policies[category][severity]
        
        # Default policy
        if severity == "high":
            return (ModerationAction.BLOCK, "Content policy violation")
        else:
            return (ModerationAction.WARN, "Please review content guidelines")
    
    def format_response(self, action: ModerationAction, message: str) -> str:
        """Format response based on action type"""
        template = self.response_templates.get(action, "{message}")
        return template.format(message=message)
    
    def add_custom_policy(
        self,
        category: str,
        severity: str,
        action: ModerationAction,
        message: str
    ):
        """Add a custom moderation policy"""
        if category not in self.policies:
            self.policies[category] = {}
        self.policies[category][severity] = (action, message)

# Example usage
policy = ModerationPolicy()

test_violations = [
    ("violence", 0.3),   # Low severity
    ("hate", 0.7),       # Medium severity
    ("self-harm", 0.9),  # High severity
    ("pii", 0.6),        # Medium severity
]

print("\nπŸ“‹ Moderation Policy Examples\n" + "=" * 60)
for category, score in test_violations:
    action, message = policy.determine_action(category, score)
    response = policy.format_response(action, message)
    print(f"\nCategory: {category} | Score: {score:.2f}")
    print(f"Action: {action.value.upper()}")
    print(f"Response: {response}")

Production Implementation: Integrating All ComponentsΒΆ

The ProductionContentModerator class orchestrates the full moderation pipeline: multi-layer detection feeds into policy-based action selection, which determines both the user-facing response and the internal audit trail. The moderate_and_respond() method is the single entry point for all content, returning a structured result that includes whether the content was allowed, what action was taken, and whether the event should be logged for compliance auditing.

Audit logging for compliance: regulations like GDPR, COPPA, and the EU AI Act require organizations to demonstrate that content moderation decisions are consistent, auditable, and non-discriminatory. The logging system records moderation decisions (not raw user content, which raises privacy concerns) with enough metadata to reconstruct the decision chain: which layers flagged the content, what risk scores were assigned, and what action was taken. The get_moderation_stats() method provides aggregate analytics for compliance reporting: total violations, distribution by action type, and recent high-severity events.

class ProductionContentModerator:
    """Production-ready content moderation system"""
    
    def __init__(
        self,
        openai_client: OpenAI,
        enable_logging: bool = True
    ):
        self.moderator = MultiLayerModerator(
            openai_client=openai_client,
            use_toxicity=False  # Set True in production if needed
        )
        self.policy = ModerationPolicy()
        self.enable_logging = enable_logging
        self.moderation_log = []
    
    def moderate_and_respond(
        self,
        text: str,
        user_id: Optional[str] = None
    ) -> Dict:
        """Moderate content and return appropriate response"""
        # Run moderation
        mod_result = self.moderator.moderate(text, check_all_layers=False)
        
        response = {
            "allowed": mod_result["safe"],
            "original_text": text,
            "safe_text": text if mod_result["safe"] else None,
            "action": ModerationAction.ALLOW,
            "user_message": None,
            "should_log": False
        }
        
        if not mod_result["safe"]:
            # Determine action based on highest risk
            highest_risk_category = "unknown"
            highest_risk_score = mod_result["overall_risk"]
            
            # Try to get category from OpenAI result
            if "openai" in mod_result["layers"]:
                openai_result = mod_result["layers"]["openai"]
                if hasattr(openai_result, 'highest_risk_category'):
                    highest_risk_category = openai_result.highest_risk_category
            
            action, message = self.policy.determine_action(
                highest_risk_category,
                highest_risk_score
            )
            
            response["action"] = action
            response["user_message"] = self.policy.format_response(action, message)
            response["should_log"] = action in [ModerationAction.BLOCK, ModerationAction.ESCALATE]
        
        # Log if enabled
        if self.enable_logging and response["should_log"]:
            self._log_violation(user_id, text, mod_result, response)
        
        return response
    
    def _log_violation(self, user_id, text, mod_result, response):
        """Log moderation violations"""
        log_entry = {
            "timestamp": __import__('datetime').datetime.now().isoformat(),
            "user_id": user_id,
            "text_preview": text[:100],
            "risk_score": mod_result["overall_risk"],
            "action": response["action"].value,
            "blocked_by": mod_result["blocked_by"]
        }
        self.moderation_log.append(log_entry)
    
    def get_moderation_stats(self) -> Dict:
        """Get moderation statistics"""
        if not self.moderation_log:
            return {"total_violations": 0}
        
        stats = {
            "total_violations": len(self.moderation_log),
            "by_action": {},
            "average_risk_score": sum(log["risk_score"] for log in self.moderation_log) / len(self.moderation_log),
            "recent_violations": self.moderation_log[-5:]
        }
        
        # Count by action type
        for log in self.moderation_log:
            action = log["action"]
            stats["by_action"][action] = stats["by_action"].get(action, 0) + 1
        
        return stats

print("\nβœ… Production Content Moderation System Ready")
print("Features:")
print("  β€’ Multi-layer detection")
print("  β€’ Policy-based responses")
print("  β€’ Comprehensive logging")
print("  β€’ User-friendly messaging")
print("  β€’ Analytics and reporting")

Summary & Best PracticesΒΆ

Key TakeawaysΒΆ

  1. Multi-Layer Defense: Combine multiple moderation techniques

  2. Context Matters: Different applications need different policies

  3. User Experience: Provide helpful feedback, not just blocking

  4. Continuous Improvement: Monitor and update filters regularly

  5. Privacy First: Be careful with PII in logs

Production ChecklistΒΆ

  • Implement multiple moderation layers

  • Define clear moderation policies

  • Customize keyword lists for your domain

  • Add PII detection and redaction

  • Set up proper logging and monitoring

  • Create user-friendly violation messages

  • Implement escalation procedures

  • Regular review of moderation logs

  • Update filters based on new patterns

  • Compliance with relevant regulations (COPPA, GDPR, etc.)

  • Have human review process for edge cases

  • Regular testing with adversarial examples

ResourcesΒΆ