Coverage for agentos/guardrails/rules.py: 21%
47 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2Built-in guardrail rules — PII detection, keyword blocking, length limits, regex,
3toxicity heuristics, and code injection detection.
4"""
6import re
7from typing import Dict, List
9from agentos.guardrails.engine import GuardrailRule, GuardrailAction, GuardrailCategory
12def PIIRule(
13 name: str = "pii_detector",
14 action: GuardrailAction = GuardrailAction.SANITIZE,
15 enabled: bool = True,
16) -> GuardrailRule:
17 """Detects common PII patterns (email, phone, SSN, credit card) and redacts."""
19 _pii_patterns = [
20 (r"\b[\w._%+-]+@[\w.-]+\.[a-zA-Z]{2,}\b", "[EMAIL]"),
21 (r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[PHONE]"),
22 (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]"),
23 (r"\b(?:\d{4}[- ]?){3}\d{4}\b", "[CARD]"),
24 ]
26 def _check(text: str) -> bool:
27 for pat, _ in _pii_patterns:
28 if re.search(pat, text):
29 return True
30 return False
32 def _sanitize(text: str) -> str:
33 for pat, repl in _pii_patterns:
34 text = re.sub(pat, repl, text)
35 return text
37 return GuardrailRule(
38 name=name,
39 category=GuardrailCategory.PII,
40 action=action,
41 check=_check,
42 sanitize=_sanitize,
43 description="Redacts emails, phone numbers, SSNs, and credit card numbers.",
44 enabled=enabled,
45 )
48def KeywordBlockRule(
49 keywords: list[str],
50 name: str = "keyword_block",
51 case_sensitive: bool = False,
52 enabled: bool = True,
53) -> GuardrailRule:
54 """Blocks text containing any of the specified keywords."""
56 _kw = keywords if case_sensitive else [kw.lower() for kw in keywords]
58 def _check(text: str) -> bool:
59 t = text if case_sensitive else text.lower()
60 return any(kw in t for kw in _kw)
62 return GuardrailRule(
63 name=name,
64 category=GuardrailCategory.KEYWORD,
65 action=GuardrailAction.BLOCK,
66 check=_check,
67 description=f"Blocks content containing: {', '.join(keywords[:5])}",
68 enabled=enabled,
69 )
72def LengthLimitRule(
73 max_input: int = 32_000,
74 max_output: int = 16_000,
75 name: str = "length_limit",
76 enabled: bool = True,
77) -> GuardrailRule:
78 """Blocks text exceeding length limits (input or output)."""
80 def _check(text: str) -> bool:
81 return len(text) > max(max_input, max_output)
83 return GuardrailRule(
84 name=name,
85 category=GuardrailCategory.LENGTH,
86 action=GuardrailAction.BLOCK,
87 check=_check,
88 description=f"Limits input to {max_input} chars, output to {max_output} chars.",
89 enabled=enabled,
90 )
93def RegexRule(
94 pattern: str,
95 name: str = "regex_rule",
96 action: GuardrailAction = GuardrailAction.FLAG,
97 description: str = "",
98 enabled: bool = True,
99) -> GuardrailRule:
100 """Flags or blocks text matching a custom regex pattern."""
101 _pat = re.compile(pattern)
103 def _check(text: str) -> bool:
104 return bool(_pat.search(text))
106 return GuardrailRule(
107 name=name,
108 category=GuardrailCategory.CUSTOM,
109 action=action,
110 check=_check,
111 description=description or f"Regex: {pattern[:40]}",
112 enabled=enabled,
113 )
116def ToxicityRule(
117 name: str = "toxicity_check",
118 action: GuardrailAction = GuardrailAction.FLAG,
119 enabled: bool = True,
120) -> GuardrailRule:
121 """Heuristic toxicity detection via keyword lists (offline, no API call)."""
123 _toxic = [
124 "kill yourself", "kys", "die in a fire", "i hope you die",
125 "nigger", "faggot", "retard", "cunt",
126 "terrorist", "bomb making", "how to make a bomb",
127 "child porn", "cp ", "lolicon",
128 ]
130 def _check(text: str) -> bool:
131 t = text.lower()
132 return any(toxic in t for toxic in _toxic)
134 return GuardrailRule(
135 name=name,
136 category=GuardrailCategory.TOXICITY,
137 action=action,
138 check=_check,
139 description="Flags text containing toxic or harmful language.",
140 enabled=enabled,
141 )
144def CodeInjectionRule(
145 name: str = "code_injection_detector",
146 action: GuardrailAction = GuardrailAction.BLOCK,
147 enabled: bool = True,
148) -> GuardrailRule:
149 """Detects prompt injection and code injection patterns."""
151 _patterns = [
152 r"ignore (all )?(previous|above|prior) (instructions?|prompts?)",
153 r"forget (your|all) (instructions?|rules?|training)",
154 r"you are now (DAN|developer mode|jailbroken)",
155 r"system:\s*you are",
156 r"<\|im_start\|>",
157 r"<\|system\|>",
158 r"```.*\b(?:rm\s+-rf|DROP\s+TABLE|DELETE\s+FROM|shutdown)\b",
159 r"\b(?:DROP\s+TABLE|DELETE\s+FROM|TRUNCATE\s+TABLE|ALTER\s+TABLE)\b",
160 r"\brm\s+-rf\s+/",
161 r"\bexec\s*\(.*\)",
162 r"\beval\s*\(.*\)",
163 r"\b__import__\s*\(.*\)",
164 r"\bimportlib\.import_module\b",
165 ]
167 _compiled = [re.compile(p, re.IGNORECASE) for p in _patterns]
169 def _check(text: str) -> bool:
170 return any(pat.search(text) for pat in _compiled)
172 return GuardrailRule(
173 name=name,
174 category=GuardrailCategory.INJECTION,
175 action=action,
176 check=_check,
177 description="Blocks prompt injection and code injection attempts.",
178 enabled=enabled,
179 )
182def build_default_rules(
183 blocked_keywords: list[str] | None = None,
184 max_input_length: int = 32_000,
185 max_output_length: int = 16_000,
186) -> list[GuardrailRule]:
187 """Build a sensible default rule set for production use."""
188 rules: list[GuardrailRule] = [
189 CodeInjectionRule(),
190 PIIRule(),
191 ToxicityRule(action=GuardrailAction.FLAG),
192 LengthLimitRule(max_input=max_input_length, max_output=max_output_length),
193 ]
194 if blocked_keywords:
195 rules.append(KeywordBlockRule(keywords=blocked_keywords))
196 return rules