Coverage for agentos/guardrails/rules.py: 21%

47 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2Built-in guardrail rules — PII detection, keyword blocking, length limits, regex, 

3toxicity heuristics, and code injection detection. 

4""" 

5 

6import re 

7from typing import Dict, List 

8 

9from agentos.guardrails.engine import GuardrailRule, GuardrailAction, GuardrailCategory 

10 

11 

12def PIIRule( 

13 name: str = "pii_detector", 

14 action: GuardrailAction = GuardrailAction.SANITIZE, 

15 enabled: bool = True, 

16) -> GuardrailRule: 

17 """Detects common PII patterns (email, phone, SSN, credit card) and redacts.""" 

18 

19 _pii_patterns = [ 

20 (r"\b[\w._%+-]+@[\w.-]+\.[a-zA-Z]{2,}\b", "[EMAIL]"), 

21 (r"\b\d{3}[-.]?\d{3}[-.]?\d{4}\b", "[PHONE]"), 

22 (r"\b\d{3}-\d{2}-\d{4}\b", "[SSN]"), 

23 (r"\b(?:\d{4}[- ]?){3}\d{4}\b", "[CARD]"), 

24 ] 

25 

26 def _check(text: str) -> bool: 

27 for pat, _ in _pii_patterns: 

28 if re.search(pat, text): 

29 return True 

30 return False 

31 

32 def _sanitize(text: str) -> str: 

33 for pat, repl in _pii_patterns: 

34 text = re.sub(pat, repl, text) 

35 return text 

36 

37 return GuardrailRule( 

38 name=name, 

39 category=GuardrailCategory.PII, 

40 action=action, 

41 check=_check, 

42 sanitize=_sanitize, 

43 description="Redacts emails, phone numbers, SSNs, and credit card numbers.", 

44 enabled=enabled, 

45 ) 

46 

47 

48def KeywordBlockRule( 

49 keywords: list[str], 

50 name: str = "keyword_block", 

51 case_sensitive: bool = False, 

52 enabled: bool = True, 

53) -> GuardrailRule: 

54 """Blocks text containing any of the specified keywords.""" 

55 

56 _kw = keywords if case_sensitive else [kw.lower() for kw in keywords] 

57 

58 def _check(text: str) -> bool: 

59 t = text if case_sensitive else text.lower() 

60 return any(kw in t for kw in _kw) 

61 

62 return GuardrailRule( 

63 name=name, 

64 category=GuardrailCategory.KEYWORD, 

65 action=GuardrailAction.BLOCK, 

66 check=_check, 

67 description=f"Blocks content containing: {', '.join(keywords[:5])}", 

68 enabled=enabled, 

69 ) 

70 

71 

72def LengthLimitRule( 

73 max_input: int = 32_000, 

74 max_output: int = 16_000, 

75 name: str = "length_limit", 

76 enabled: bool = True, 

77) -> GuardrailRule: 

78 """Blocks text exceeding length limits (input or output).""" 

79 

80 def _check(text: str) -> bool: 

81 return len(text) > max(max_input, max_output) 

82 

83 return GuardrailRule( 

84 name=name, 

85 category=GuardrailCategory.LENGTH, 

86 action=GuardrailAction.BLOCK, 

87 check=_check, 

88 description=f"Limits input to {max_input} chars, output to {max_output} chars.", 

89 enabled=enabled, 

90 ) 

91 

92 

93def RegexRule( 

94 pattern: str, 

95 name: str = "regex_rule", 

96 action: GuardrailAction = GuardrailAction.FLAG, 

97 description: str = "", 

98 enabled: bool = True, 

99) -> GuardrailRule: 

100 """Flags or blocks text matching a custom regex pattern.""" 

101 _pat = re.compile(pattern) 

102 

103 def _check(text: str) -> bool: 

104 return bool(_pat.search(text)) 

105 

106 return GuardrailRule( 

107 name=name, 

108 category=GuardrailCategory.CUSTOM, 

109 action=action, 

110 check=_check, 

111 description=description or f"Regex: {pattern[:40]}", 

112 enabled=enabled, 

113 ) 

114 

115 

116def ToxicityRule( 

117 name: str = "toxicity_check", 

118 action: GuardrailAction = GuardrailAction.FLAG, 

119 enabled: bool = True, 

120) -> GuardrailRule: 

121 """Heuristic toxicity detection via keyword lists (offline, no API call).""" 

122 

123 _toxic = [ 

124 "kill yourself", "kys", "die in a fire", "i hope you die", 

125 "nigger", "faggot", "retard", "cunt", 

126 "terrorist", "bomb making", "how to make a bomb", 

127 "child porn", "cp ", "lolicon", 

128 ] 

129 

130 def _check(text: str) -> bool: 

131 t = text.lower() 

132 return any(toxic in t for toxic in _toxic) 

133 

134 return GuardrailRule( 

135 name=name, 

136 category=GuardrailCategory.TOXICITY, 

137 action=action, 

138 check=_check, 

139 description="Flags text containing toxic or harmful language.", 

140 enabled=enabled, 

141 ) 

142 

143 

144def CodeInjectionRule( 

145 name: str = "code_injection_detector", 

146 action: GuardrailAction = GuardrailAction.BLOCK, 

147 enabled: bool = True, 

148) -> GuardrailRule: 

149 """Detects prompt injection and code injection patterns.""" 

150 

151 _patterns = [ 

152 r"ignore (all )?(previous|above|prior) (instructions?|prompts?)", 

153 r"forget (your|all) (instructions?|rules?|training)", 

154 r"you are now (DAN|developer mode|jailbroken)", 

155 r"system:\s*you are", 

156 r"<\|im_start\|>", 

157 r"<\|system\|>", 

158 r"```.*\b(?:rm\s+-rf|DROP\s+TABLE|DELETE\s+FROM|shutdown)\b", 

159 r"\b(?:DROP\s+TABLE|DELETE\s+FROM|TRUNCATE\s+TABLE|ALTER\s+TABLE)\b", 

160 r"\brm\s+-rf\s+/", 

161 r"\bexec\s*\(.*\)", 

162 r"\beval\s*\(.*\)", 

163 r"\b__import__\s*\(.*\)", 

164 r"\bimportlib\.import_module\b", 

165 ] 

166 

167 _compiled = [re.compile(p, re.IGNORECASE) for p in _patterns] 

168 

169 def _check(text: str) -> bool: 

170 return any(pat.search(text) for pat in _compiled) 

171 

172 return GuardrailRule( 

173 name=name, 

174 category=GuardrailCategory.INJECTION, 

175 action=action, 

176 check=_check, 

177 description="Blocks prompt injection and code injection attempts.", 

178 enabled=enabled, 

179 ) 

180 

181 

182def build_default_rules( 

183 blocked_keywords: list[str] | None = None, 

184 max_input_length: int = 32_000, 

185 max_output_length: int = 16_000, 

186) -> list[GuardrailRule]: 

187 """Build a sensible default rule set for production use.""" 

188 rules: list[GuardrailRule] = [ 

189 CodeInjectionRule(), 

190 PIIRule(), 

191 ToxicityRule(action=GuardrailAction.FLAG), 

192 LengthLimitRule(max_input=max_input_length, max_output=max_output_length), 

193 ] 

194 if blocked_keywords: 

195 rules.append(KeywordBlockRule(keywords=blocked_keywords)) 

196 return rules