Coverage for agentos/security/guard.py: 32%
219 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2v1.9.9: Security Guardrails — input/output filtering, PII detection, content safety.
4Guardrail types:
5- InputGuard: validate/filter user input before it reaches the agent
6- OutputGuard: validate/filter agent output before it reaches the user
7- PII Detector: detect and redact personally identifiable information
8- ContentSafety: toxicity, prompt injection, jailbreak detection
9- GuardChain: composable guardrail pipeline with configurable actions
10"""
12from __future__ import annotations
14import re
15import json
16import hashlib
17from dataclasses import dataclass, field
18from enum import Enum
19from typing import Any, Callable, Optional
22# ── Enums & Data Classes ──────────────────────────────────────────
24class GuardAction(str, Enum):
25 """Action to take when a guardrail is triggered."""
26 ALLOW = "allow" # Pass through unchanged
27 BLOCK = "block" # Reject the content entirely
28 REDACT = "redact" # Remove sensitive parts, pass the rest
29 WARN = "warn" # Pass through but log a warning
30 SANITIZE = "sanitize" # Replace sensitive content with placeholders
33class Severity(str, Enum):
34 """Severity level for guardrail triggers."""
35 LOW = "low"
36 MEDIUM = "medium"
37 HIGH = "high"
38 CRITICAL = "critical"
41@dataclass
42class GuardResult:
43 """Result from a single guardrail check."""
44 passed: bool
45 action: GuardAction = GuardAction.ALLOW
46 severity: Severity = Severity.LOW
47 rule_name: str = ""
48 message: str = ""
49 modified_content: str = "" # Content after guardrail processing
50 redacted_items: list[str] = field(default_factory=list) # What was redacted
51 metadata: dict[str, Any] = field(default_factory=dict)
54@dataclass
55class GuardChainResult:
56 """Aggregate result from a chain of guardrails."""
57 allowed: bool
58 final_content: str
59 results: list[GuardResult] = field(default_factory=list)
60 blocked_by: str = "" # Which guard blocked it
61 total_checks: int = 0
62 warnings: list[str] = field(default_factory=list)
64 @property
65 def blocked(self) -> bool:
66 return not self.allowed
69# ── PII Patterns ──────────────────────────────────────────────────
71# Regex patterns for common PII types
72PII_PATTERNS: dict[str, tuple[str, str]] = {
73 "email": (
74 r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b',
75 "[EMAIL]",
76 ),
77 "phone_cn": (
78 r'\b1[3-9]\d{9}\b',
79 "[PHONE]",
80 ),
81 "phone_us": (
82 r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b',
83 "[PHONE]",
84 ),
85 "id_card_cn": (
86 r'\b[1-9]\d{5}(?:19|20)\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b',
87 "[ID_CARD]",
88 ),
89 "credit_card": (
90 r'\b(?:\d[ -]*?){13,19}\b',
91 "[CREDIT_CARD]",
92 ),
93 "ipv4": (
94 r'\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b',
95 "[IP_ADDR]",
96 ),
97 "ssn_us": (
98 r'\b\d{3}-\d{2}-\d{4}\b',
99 "[SSN]",
100 ),
101 "bank_account": (
102 r'\b\d{10,20}\b',
103 "", # Only flag, don't auto-redact (false positive risk)
104 ),
105}
107# Common password/key patterns in text
108SECRET_PATTERNS: dict[str, tuple[str, str]] = {
109 "api_key": (
110 r'(?i)(?:api[_-]?key|apikey|api[_-]?secret)\s*[:=]\s*["\']?[A-Za-z0-9_\-\.]{20,}["\']?',
111 "[API_KEY_REDACTED]",
112 ),
113 "aws_key": (
114 r'\bAKIA[0-9A-Z]{16}\b',
115 "[AWS_KEY_REDACTED]",
116 ),
117 "github_token": (
118 r'\bgh[pousr]_[A-Za-z0-9_]{36,}\b',
119 "[GITHUB_TOKEN_REDACTED]",
120 ),
121 "jwt": (
122 r'\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b',
123 "[JWT_REDACTED]",
124 ),
125 "private_key_header": (
126 r'-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----',
127 "[PRIVATE_KEY_REDACTED]",
128 ),
129 "password_in_url": (
130 r'(?i)(?:password|passwd|pwd|secret)\s*[:=]\s*\S+',
131 "[PASSWORD_REDACTED]",
132 ),
133}
135# Prompt injection / jailbreak patterns
136INJECTION_PATTERNS: list[str] = [
137 # Direct override attempts
138 r'(?i)ignore\s+(?:all\s+)?(?:previous|above|prior)\s+(?:instructions?|prompts?|rules?|commands?)',
139 r'(?i)forget\s+(?:everything|all\s+instructions?|your\s+training)',
140 r'(?i)(?:you\s+are|act\s+as|pretend\s+to\s+be)\s+(?:now\s+)?(?:DAN|jailbroken|unfiltered|unrestricted)',
141 r'(?i)developer\s*mode|god\s*mode|debug\s*mode',
142 r'(?i)system\s*prompt\s*(?:leak|reveal|disclose|show|display|print|output)',
143 r'(?i)(?:what|tell\s+me|show\s+me)\s+(?:your|the)\s+(?:system\s+)?prompt',
144 r'(?i)(?:from\s+now\s+on|starting\s+now)\s+(?:you\s+are|you\'re)\s+',
145 r'(?i)new\s+instructions?\s*:',
146 # Role-playing jailbreaks
147 r'(?i)(?:you\'re|you\s+are)\s+in\s+a\s+(?:simulation|movie|play|game|fantasy)',
148 r'(?i)this\s+is\s+a\s+(?:hypothetical|fictional|imaginary)\s+scenario',
149 # Encoding tricks
150 r'(?i)(?:base64|hex|rot13)\s*(?:encoded|decoded)',
151 r'(?i)decode\s+(?:this|the\s+following)',
152 # Token smuggling
153 r'(?i)concatenate\s+and\s+respond',
154 r'(?i)respond\s+with\s+only\s+\w+\s+and\s+nothing\s+else',
155 r'[<>].*[<>]', # XML/HTML tag injection
156]
158# Toxic / harmful content patterns
159TOXICITY_PATTERNS: dict[str, list[str]] = {
160 "hate_speech": [
161 r'(?i)\b(?:kill\s+(?:all|yourself|them)|hate\s+(?:you|them|all))',
162 r'(?i)\b(?: racial\s+slur|ethnic\s+cleansing)',
163 r'(?i)gas\s+the\s+\w+',
164 r'(?i)(?:white|black|asian|jewish|muslim|christian)\s+(?:supremacy|power)',
165 ],
166 "violence": [
167 r'(?i)\b(?:torture|mutilate|dismember|behead|execute)\b',
168 r'(?i)how\s+to\s+(?:build\s+a\s+bomb|make\s+(?:meth|crack|drugs?))',
169 r'(?i)\b(?:assassinate|terrorist\s+attack|mass\s+shooting)\b',
170 ],
171 "self_harm": [
172 r'(?i)\b(?:suicide\s+method|how\s+to\s+kill\s+myself|ways\s+to\s+die)\b',
173 r'(?i)\b(?:cut\s+myself|hurt\s+myself|self[-\s]?harm)\b',
174 r'(?i)want\s+to\s+(?:die|end\s+it\s+all|disappear)',
175 ],
176 "illegal": [
177 r'(?i)\b(?:child\s+(?:porn|abuse)|cp\b|underage)',
178 r'(?i)\b(?:ransomware|phishing\s+kit|carding)',
179 r'(?i)how\s+to\s+(?:hack|steal|bypass\s+(?:security|authentication))',
180 ],
181}
184# ── PII Detector ──────────────────────────────────────────────────
186class PIIDetector:
187 """Detect and optionally redact personally identifiable information.
189 Supports: email, phone (CN/US), ID card (CN), credit card, SSN,
190 IP addresses, API keys, tokens, passwords, private keys, JWTs.
191 """
193 def __init__(
194 self,
195 auto_redact: bool = False,
196 redact_placeholder: str = "[REDACTED]",
197 custom_patterns: dict[str, tuple[str, str]] | None = None,
198 enabled_pii_types: list[str] | None = None,
199 ):
200 self.auto_redact = auto_redact
201 self.redact_placeholder = redact_placeholder
203 # Compile all patterns
204 self._patterns: dict[str, tuple[re.Pattern, str]] = {}
205 all_patterns = {**PII_PATTERNS, **SECRET_PATTERNS}
206 if custom_patterns:
207 all_patterns.update(custom_patterns)
209 for name, (pattern, placeholder) in all_patterns.items():
210 if enabled_pii_types and name not in enabled_pii_types:
211 continue
212 self._patterns[name] = (
213 re.compile(pattern, re.IGNORECASE if "(?i)" not in pattern else 0),
214 placeholder or redact_placeholder,
215 )
217 def detect(self, content: str) -> list[dict[str, Any]]:
218 """Find all PII instances in content."""
219 findings = []
220 for pii_type, (pattern, placeholder) in self._patterns.items():
221 for match in pattern.finditer(content):
222 findings.append({
223 "type": pii_type,
224 "value": match.group(),
225 "start": match.start(),
226 "end": match.end(),
227 "placeholder": placeholder,
228 })
229 return sorted(findings, key=lambda x: x["start"])
231 def redact(self, content: str) -> tuple[str, list[str]]:
232 """Redact all PII from content. Returns (redacted_content, list_of_redacted)."""
233 findings = self.detect(content)
234 if not findings:
235 return content, []
237 redacted = list(content)
238 redacted_items = []
240 # Process from end to start to preserve indices
241 for f in reversed(findings):
242 placeholder = f["placeholder"]
243 if placeholder: # Only redact if placeholder is non-empty
244 redacted[f["start"]:f["end"]] = placeholder
245 redacted_items.append(f"{f['type']}:{f['value'][:20]}")
247 return "".join(redacted), redacted_items
249 def has_pii(self, content: str) -> bool:
250 """Quick check if content contains any PII."""
251 return len(self.detect(content)) > 0
254# ── Content Safety Filter ─────────────────────────────────────────
256class ContentSafetyFilter:
257 """Filter for toxic content, prompt injection, jailbreak attempts.
259 Three-layer defense:
260 1. Pattern matching (regex) — fast, deterministic
261 2. Keyword blocklist — user-configurable
262 3. Hash matching — known-attack fingerprints (optional)
263 """
265 def __init__(
266 self,
267 block_injection: bool = True,
268 block_toxicity: bool = True,
269 custom_blocklist: list[str] | None = None,
270 custom_allowlist: list[str] | None = None,
271 known_attack_hashes: set[str] | None = None,
272 ):
273 self.block_injection = block_injection
274 self.block_toxicity = block_toxicity
275 self.blocklist: set[str] = set(custom_blocklist or [])
276 self.allowlist: set[str] = set(custom_allowlist or [])
277 self.known_hashes: set[str] = known_attack_hashes or set()
279 # Compile injection patterns
280 self._injection_re = [
281 re.compile(p, re.IGNORECASE) for p in INJECTION_PATTERNS
282 ]
284 # Compile toxicity patterns
285 self._toxicity_re: dict[str, list[re.Pattern]] = {}
286 for category, patterns in TOXICITY_PATTERNS.items():
287 self._toxicity_re[category] = [
288 re.compile(p, re.IGNORECASE) for p in patterns
289 ]
291 def check_injection(self, content: str) -> list[GuardResult]:
292 """Check for prompt injection / jailbreak attempts."""
293 results = []
294 for i, pattern in enumerate(self._injection_re):
295 if pattern.search(content):
296 results.append(GuardResult(
297 passed=False,
298 action=GuardAction.BLOCK,
299 severity=Severity.HIGH,
300 rule_name=f"injection_pattern_{i}",
301 message=f"Potential prompt injection detected: {pattern.pattern[:80]}",
302 ))
303 return results
305 def check_toxicity(self, content: str) -> list[GuardResult]:
306 """Check for toxic/harmful content."""
307 results = []
308 for category, patterns in self._toxicity_re.items():
309 for i, pattern in enumerate(patterns):
310 if pattern.search(content):
311 severity = Severity.CRITICAL if category in ("self_harm", "illegal") else Severity.HIGH
312 results.append(GuardResult(
313 passed=False,
314 action=GuardAction.BLOCK,
315 severity=severity,
316 rule_name=f"toxicity_{category}_{i}",
317 message=f"Toxic content detected [{category}]: {pattern.pattern[:60]}",
318 ))
319 return results
321 def check_blocklist(self, content: str) -> list[GuardResult]:
322 """Check against custom keyword blocklist."""
323 if not self.blocklist:
324 return []
326 content_lower = content.lower()
327 results = []
328 for keyword in self.blocklist:
329 if keyword.lower() in content_lower:
330 # Skip if in allowlist
331 if keyword.lower() in self.allowlist:
332 continue
333 results.append(GuardResult(
334 passed=False,
335 action=GuardAction.BLOCK,
336 severity=Severity.MEDIUM,
337 rule_name="blocklist",
338 message=f"Blocked keyword: {keyword}",
339 ))
340 return results
342 def check_hash(self, content: str) -> list[GuardResult]:
343 """Check content hash against known attack fingerprints."""
344 if not self.known_hashes:
345 return []
347 content_hash = hashlib.sha256(content.encode()).hexdigest()
348 if content_hash in self.known_hashes:
349 return [GuardResult(
350 passed=False,
351 action=GuardAction.BLOCK,
352 severity=Severity.CRITICAL,
353 rule_name="known_attack_hash",
354 message="Content matches known attack fingerprint",
355 )]
356 return []
358 def check_all(self, content: str) -> list[GuardResult]:
359 """Run all safety checks on content."""
360 results = []
362 if self.block_injection:
363 results.extend(self.check_injection(content))
365 if self.block_toxicity:
366 results.extend(self.check_toxicity(content))
368 results.extend(self.check_blocklist(content))
369 results.extend(self.check_hash(content))
371 return results
373 def is_safe(self, content: str) -> bool:
374 """Quick safety check — True if content passes all filters."""
375 results = self.check_all(content)
376 return all(r.passed for r in results)
379# ── Input Guardrail ───────────────────────────────────────────────
381class InputGuard:
382 """Guardrail for user input: PII detection, injection, content safety.
384 Runs before user input reaches the agent.
385 """
387 def __init__(
388 self,
389 pii_detector: PIIDetector | None = None,
390 safety_filter: ContentSafetyFilter | None = None,
391 max_input_length: int = 0, # 0 = no limit
392 deny_empty: bool = True,
393 ):
394 self.pii = pii_detector or PIIDetector(auto_redact=True)
395 self.safety = safety_filter or ContentSafetyFilter()
396 self.max_input_length = max_input_length
397 self.deny_empty = deny_empty
399 def guard(self, user_input: str, redact_pii: bool = True) -> GuardChainResult:
400 """Run all input guardrails."""
401 results: list[GuardResult] = []
402 current_content = user_input
404 # 1. Empty check
405 if self.deny_empty and (not user_input or not user_input.strip()):
406 results.append(GuardResult(
407 passed=False, action=GuardAction.BLOCK,
408 severity=Severity.LOW, rule_name="empty_input",
409 message="Empty input rejected",
410 ))
412 # 2. Length check
413 if self.max_input_length > 0 and len(user_input) > self.max_input_length:
414 results.append(GuardResult(
415 passed=False, action=GuardAction.BLOCK,
416 severity=Severity.LOW, rule_name="input_too_long",
417 message=f"Input exceeds max length ({len(user_input)} > {self.max_input_length})",
418 ))
420 # 3. PII check
421 if redact_pii:
422 redacted, items = self.pii.redact(current_content)
423 if items:
424 current_content = redacted
425 results.append(GuardResult(
426 passed=True, action=GuardAction.REDACT,
427 severity=Severity.MEDIUM, rule_name="pii_redacted",
428 message=f"Redacted {len(items)} PII items",
429 modified_content=current_content,
430 redacted_items=items,
431 ))
433 # 4. Safety checks
434 safety_results = self.safety.check_all(current_content)
435 results.extend(safety_results)
437 # Determine final outcome
438 blocked = any(r.action == GuardAction.BLOCK for r in results)
439 blocked_by = next((r.rule_name for r in results if r.action == GuardAction.BLOCK), "")
440 warnings = [r.message for r in results if r.action == GuardAction.WARN]
442 return GuardChainResult(
443 allowed=not blocked,
444 final_content="" if blocked else current_content,
445 results=results,
446 blocked_by=blocked_by,
447 total_checks=len(results),
448 warnings=warnings,
449 )
452# ── Output Guardrail ──────────────────────────────────────────────
454class OutputGuard:
455 """Guardrail for agent output: PII leak prevention, sensitive content filtering.
457 Runs after agent generates output, before it reaches the user.
458 """
460 def __init__(
461 self,
462 pii_detector: PIIDetector | None = None,
463 safety_filter: ContentSafetyFilter | None = None,
464 max_output_length: int = 0,
465 deny_empty: bool = True,
466 block_system_prompt_leak: bool = True,
467 ):
468 self.pii = pii_detector or PIIDetector(auto_redact=True)
469 self.safety = safety_filter or ContentSafetyFilter(block_injection=False) # No injection check on output
470 self.max_output_length = max_output_length
471 self.deny_empty = deny_empty
472 self.block_system_prompt_leak = block_system_prompt_leak
474 def guard(self, agent_output: str) -> GuardChainResult:
475 """Run all output guardrails."""
476 results: list[GuardResult] = []
477 current_content = agent_output
479 # 1. Empty check
480 if self.deny_empty and (not agent_output or not agent_output.strip()):
481 results.append(GuardResult(
482 passed=False, action=GuardAction.BLOCK,
483 severity=Severity.MEDIUM, rule_name="empty_output",
484 message="Empty output blocked",
485 ))
487 # 2. PII leak prevention
488 redacted, items = self.pii.redact(current_content)
489 if items:
490 current_content = redacted
491 results.append(GuardResult(
492 passed=True, action=GuardAction.REDACT,
493 severity=Severity.HIGH, rule_name="pii_leak_prevented",
494 message=f"Prevented {len(items)} PII leaks in output",
495 modified_content=current_content,
496 redacted_items=items,
497 ))
499 # 3. System prompt leak detection
500 if self.block_system_prompt_leak:
501 leak_indicators = [
502 r'(?i)(?:system\s+prompt|you\s+are\s+a\s+helpful|your\s+instructions?\s+are)',
503 r'(?i)(?:your\s+rules?\s+are|your\s+guidelines?\s+are|your\s+core\s+directive)',
504 r'(?i)(?:my\s+system\s+prompt|my\s+instructions?\s+(?:is|are|tell|say))',
505 ]
506 for i, pattern in enumerate(leak_indicators):
507 if re.search(pattern, current_content):
508 results.append(GuardResult(
509 passed=False, action=GuardAction.BLOCK,
510 severity=Severity.CRITICAL, rule_name=f"prompt_leak_{i}",
511 message="Potential system prompt leak detected in output",
512 ))
513 break
515 # 4. Toxicity check (output should not contain harmful content)
516 toxicity_results = self.safety.check_toxicity(current_content)
517 results.extend(toxicity_results)
519 # Determine final outcome
520 blocked = any(r.action == GuardAction.BLOCK for r in results)
521 blocked_by = next((r.rule_name for r in results if r.action == GuardAction.BLOCK), "")
523 # Apply the last modification that changed content
524 for r in results:
525 if r.modified_content:
526 current_content = r.modified_content
528 return GuardChainResult(
529 allowed=not blocked,
530 final_content="" if blocked else current_content,
531 results=results,
532 blocked_by=blocked_by,
533 total_checks=len(results),
534 )
537# ── Guardrail Pipeline ────────────────────────────────────────────
539class GuardPipeline:
540 """Full guardrail pipeline: Input → Agent → Output.
542 Usage:
543 pipeline = GuardPipeline()
544 result = pipeline.process_input(user_msg)
545 if result.allowed:
546 agent_output = agent.run(result.final_content)
547 final = pipeline.process_output(agent_output)
548 """
550 def __init__(
551 self,
552 input_guard: InputGuard | None = None,
553 output_guard: OutputGuard | None = None,
554 ):
555 self.input_guard = input_guard or InputGuard()
556 self.output_guard = output_guard or OutputGuard()
557 self.total_blocked: int = 0
558 self.total_redacted: int = 0
559 self.log: list[dict[str, Any]] = []
561 def process_input(self, user_input: str) -> GuardChainResult:
562 """Guard user input before it reaches the agent."""
563 result = self.input_guard.guard(user_input)
564 self._log("input", result)
565 if result.blocked:
566 self.total_blocked += 1
567 return result
569 def process_output(self, agent_output: str) -> GuardChainResult:
570 """Guard agent output before it reaches the user."""
571 result = self.output_guard.guard(agent_output)
572 self._log("output", result)
573 if result.blocked:
574 self.total_blocked += 1
575 for r in result.results:
576 if r.redacted_items:
577 self.total_redacted += len(r.redacted_items)
578 return result
580 def _log(self, stage: str, result: GuardChainResult) -> None:
581 guard_results = [
582 {"rule": r.rule_name, "passed": r.passed, "action": r.action.value,
583 "severity": r.severity.value, "message": r.message}
584 for r in result.results
585 ]
586 self.log.append({
587 "stage": stage,
588 "allowed": result.allowed,
589 "total_checks": result.total_checks,
590 "results": guard_results,
591 })
593 def get_stats(self) -> dict[str, Any]:
594 return {
595 "total_checks": len(self.log),
596 "total_blocked": self.total_blocked,
597 "total_redacted": self.total_redacted,
598 "block_rate": f"{self.total_blocked / max(len(self.log), 1) * 100:.1f}%",
599 }
602# ── Default Guard Configs ─────────────────────────────────────────
604def create_strict_guard() -> GuardPipeline:
605 """Create a strict guardrail pipeline (production recommended)."""
606 pii = PIIDetector(auto_redact=True)
607 safety = ContentSafetyFilter(block_injection=True, block_toxicity=True)
608 return GuardPipeline(
609 input_guard=InputGuard(pii_detector=pii, safety_filter=safety, max_input_length=32768),
610 output_guard=OutputGuard(pii_detector=pii, safety_filter=safety, block_system_prompt_leak=True),
611 )
614def create_permissive_guard() -> GuardPipeline:
615 """Create a permissive guardrail pipeline (dev/debug)."""
616 pii = PIIDetector(auto_redact=True)
617 safety = ContentSafetyFilter(block_injection=True, block_toxicity=False)
618 return GuardPipeline(
619 input_guard=InputGuard(pii_detector=pii, safety_filter=safety),
620 output_guard=OutputGuard(pii_detector=pii, safety_filter=safety, block_system_prompt_leak=False),
621 )