Coverage for agentos/security/guard.py: 32%

219 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2v1.9.9: Security Guardrails — input/output filtering, PII detection, content safety. 

3 

4Guardrail types: 

5- InputGuard: validate/filter user input before it reaches the agent 

6- OutputGuard: validate/filter agent output before it reaches the user 

7- PII Detector: detect and redact personally identifiable information 

8- ContentSafety: toxicity, prompt injection, jailbreak detection 

9- GuardChain: composable guardrail pipeline with configurable actions 

10""" 

11 

12from __future__ import annotations 

13 

14import re 

15import json 

16import hashlib 

17from dataclasses import dataclass, field 

18from enum import Enum 

19from typing import Any, Callable, Optional 

20 

21 

22# ── Enums & Data Classes ────────────────────────────────────────── 

23 

24class GuardAction(str, Enum): 

25 """Action to take when a guardrail is triggered.""" 

26 ALLOW = "allow" # Pass through unchanged 

27 BLOCK = "block" # Reject the content entirely 

28 REDACT = "redact" # Remove sensitive parts, pass the rest 

29 WARN = "warn" # Pass through but log a warning 

30 SANITIZE = "sanitize" # Replace sensitive content with placeholders 

31 

32 

33class Severity(str, Enum): 

34 """Severity level for guardrail triggers.""" 

35 LOW = "low" 

36 MEDIUM = "medium" 

37 HIGH = "high" 

38 CRITICAL = "critical" 

39 

40 

41@dataclass 

42class GuardResult: 

43 """Result from a single guardrail check.""" 

44 passed: bool 

45 action: GuardAction = GuardAction.ALLOW 

46 severity: Severity = Severity.LOW 

47 rule_name: str = "" 

48 message: str = "" 

49 modified_content: str = "" # Content after guardrail processing 

50 redacted_items: list[str] = field(default_factory=list) # What was redacted 

51 metadata: dict[str, Any] = field(default_factory=dict) 

52 

53 

54@dataclass 

55class GuardChainResult: 

56 """Aggregate result from a chain of guardrails.""" 

57 allowed: bool 

58 final_content: str 

59 results: list[GuardResult] = field(default_factory=list) 

60 blocked_by: str = "" # Which guard blocked it 

61 total_checks: int = 0 

62 warnings: list[str] = field(default_factory=list) 

63 

64 @property 

65 def blocked(self) -> bool: 

66 return not self.allowed 

67 

68 

69# ── PII Patterns ────────────────────────────────────────────────── 

70 

71# Regex patterns for common PII types 

72PII_PATTERNS: dict[str, tuple[str, str]] = { 

73 "email": ( 

74 r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b', 

75 "[EMAIL]", 

76 ), 

77 "phone_cn": ( 

78 r'\b1[3-9]\d{9}\b', 

79 "[PHONE]", 

80 ), 

81 "phone_us": ( 

82 r'\b\d{3}[-.]?\d{3}[-.]?\d{4}\b', 

83 "[PHONE]", 

84 ), 

85 "id_card_cn": ( 

86 r'\b[1-9]\d{5}(?:19|20)\d{2}(?:0[1-9]|1[0-2])(?:0[1-9]|[12]\d|3[01])\d{3}[\dXx]\b', 

87 "[ID_CARD]", 

88 ), 

89 "credit_card": ( 

90 r'\b(?:\d[ -]*?){13,19}\b', 

91 "[CREDIT_CARD]", 

92 ), 

93 "ipv4": ( 

94 r'\b(?:(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\.){3}(?:25[0-5]|2[0-4]\d|[01]?\d\d?)\b', 

95 "[IP_ADDR]", 

96 ), 

97 "ssn_us": ( 

98 r'\b\d{3}-\d{2}-\d{4}\b', 

99 "[SSN]", 

100 ), 

101 "bank_account": ( 

102 r'\b\d{10,20}\b', 

103 "", # Only flag, don't auto-redact (false positive risk) 

104 ), 

105} 

106 

107# Common password/key patterns in text 

108SECRET_PATTERNS: dict[str, tuple[str, str]] = { 

109 "api_key": ( 

110 r'(?i)(?:api[_-]?key|apikey|api[_-]?secret)\s*[:=]\s*["\']?[A-Za-z0-9_\-\.]{20,}["\']?', 

111 "[API_KEY_REDACTED]", 

112 ), 

113 "aws_key": ( 

114 r'\bAKIA[0-9A-Z]{16}\b', 

115 "[AWS_KEY_REDACTED]", 

116 ), 

117 "github_token": ( 

118 r'\bgh[pousr]_[A-Za-z0-9_]{36,}\b', 

119 "[GITHUB_TOKEN_REDACTED]", 

120 ), 

121 "jwt": ( 

122 r'\beyJ[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\.[A-Za-z0-9_-]{10,}\b', 

123 "[JWT_REDACTED]", 

124 ), 

125 "private_key_header": ( 

126 r'-----BEGIN (?:RSA |EC |DSA |OPENSSH )?PRIVATE KEY-----', 

127 "[PRIVATE_KEY_REDACTED]", 

128 ), 

129 "password_in_url": ( 

130 r'(?i)(?:password|passwd|pwd|secret)\s*[:=]\s*\S+', 

131 "[PASSWORD_REDACTED]", 

132 ), 

133} 

134 

135# Prompt injection / jailbreak patterns 

136INJECTION_PATTERNS: list[str] = [ 

137 # Direct override attempts 

138 r'(?i)ignore\s+(?:all\s+)?(?:previous|above|prior)\s+(?:instructions?|prompts?|rules?|commands?)', 

139 r'(?i)forget\s+(?:everything|all\s+instructions?|your\s+training)', 

140 r'(?i)(?:you\s+are|act\s+as|pretend\s+to\s+be)\s+(?:now\s+)?(?:DAN|jailbroken|unfiltered|unrestricted)', 

141 r'(?i)developer\s*mode|god\s*mode|debug\s*mode', 

142 r'(?i)system\s*prompt\s*(?:leak|reveal|disclose|show|display|print|output)', 

143 r'(?i)(?:what|tell\s+me|show\s+me)\s+(?:your|the)\s+(?:system\s+)?prompt', 

144 r'(?i)(?:from\s+now\s+on|starting\s+now)\s+(?:you\s+are|you\'re)\s+', 

145 r'(?i)new\s+instructions?\s*:', 

146 # Role-playing jailbreaks 

147 r'(?i)(?:you\'re|you\s+are)\s+in\s+a\s+(?:simulation|movie|play|game|fantasy)', 

148 r'(?i)this\s+is\s+a\s+(?:hypothetical|fictional|imaginary)\s+scenario', 

149 # Encoding tricks 

150 r'(?i)(?:base64|hex|rot13)\s*(?:encoded|decoded)', 

151 r'(?i)decode\s+(?:this|the\s+following)', 

152 # Token smuggling 

153 r'(?i)concatenate\s+and\s+respond', 

154 r'(?i)respond\s+with\s+only\s+\w+\s+and\s+nothing\s+else', 

155 r'[<>].*[<>]', # XML/HTML tag injection 

156] 

157 

158# Toxic / harmful content patterns 

159TOXICITY_PATTERNS: dict[str, list[str]] = { 

160 "hate_speech": [ 

161 r'(?i)\b(?:kill\s+(?:all|yourself|them)|hate\s+(?:you|them|all))', 

162 r'(?i)\b(?: racial\s+slur|ethnic\s+cleansing)', 

163 r'(?i)gas\s+the\s+\w+', 

164 r'(?i)(?:white|black|asian|jewish|muslim|christian)\s+(?:supremacy|power)', 

165 ], 

166 "violence": [ 

167 r'(?i)\b(?:torture|mutilate|dismember|behead|execute)\b', 

168 r'(?i)how\s+to\s+(?:build\s+a\s+bomb|make\s+(?:meth|crack|drugs?))', 

169 r'(?i)\b(?:assassinate|terrorist\s+attack|mass\s+shooting)\b', 

170 ], 

171 "self_harm": [ 

172 r'(?i)\b(?:suicide\s+method|how\s+to\s+kill\s+myself|ways\s+to\s+die)\b', 

173 r'(?i)\b(?:cut\s+myself|hurt\s+myself|self[-\s]?harm)\b', 

174 r'(?i)want\s+to\s+(?:die|end\s+it\s+all|disappear)', 

175 ], 

176 "illegal": [ 

177 r'(?i)\b(?:child\s+(?:porn|abuse)|cp\b|underage)', 

178 r'(?i)\b(?:ransomware|phishing\s+kit|carding)', 

179 r'(?i)how\s+to\s+(?:hack|steal|bypass\s+(?:security|authentication))', 

180 ], 

181} 

182 

183 

184# ── PII Detector ────────────────────────────────────────────────── 

185 

186class PIIDetector: 

187 """Detect and optionally redact personally identifiable information. 

188 

189 Supports: email, phone (CN/US), ID card (CN), credit card, SSN, 

190 IP addresses, API keys, tokens, passwords, private keys, JWTs. 

191 """ 

192 

193 def __init__( 

194 self, 

195 auto_redact: bool = False, 

196 redact_placeholder: str = "[REDACTED]", 

197 custom_patterns: dict[str, tuple[str, str]] | None = None, 

198 enabled_pii_types: list[str] | None = None, 

199 ): 

200 self.auto_redact = auto_redact 

201 self.redact_placeholder = redact_placeholder 

202 

203 # Compile all patterns 

204 self._patterns: dict[str, tuple[re.Pattern, str]] = {} 

205 all_patterns = {**PII_PATTERNS, **SECRET_PATTERNS} 

206 if custom_patterns: 

207 all_patterns.update(custom_patterns) 

208 

209 for name, (pattern, placeholder) in all_patterns.items(): 

210 if enabled_pii_types and name not in enabled_pii_types: 

211 continue 

212 self._patterns[name] = ( 

213 re.compile(pattern, re.IGNORECASE if "(?i)" not in pattern else 0), 

214 placeholder or redact_placeholder, 

215 ) 

216 

217 def detect(self, content: str) -> list[dict[str, Any]]: 

218 """Find all PII instances in content.""" 

219 findings = [] 

220 for pii_type, (pattern, placeholder) in self._patterns.items(): 

221 for match in pattern.finditer(content): 

222 findings.append({ 

223 "type": pii_type, 

224 "value": match.group(), 

225 "start": match.start(), 

226 "end": match.end(), 

227 "placeholder": placeholder, 

228 }) 

229 return sorted(findings, key=lambda x: x["start"]) 

230 

231 def redact(self, content: str) -> tuple[str, list[str]]: 

232 """Redact all PII from content. Returns (redacted_content, list_of_redacted).""" 

233 findings = self.detect(content) 

234 if not findings: 

235 return content, [] 

236 

237 redacted = list(content) 

238 redacted_items = [] 

239 

240 # Process from end to start to preserve indices 

241 for f in reversed(findings): 

242 placeholder = f["placeholder"] 

243 if placeholder: # Only redact if placeholder is non-empty 

244 redacted[f["start"]:f["end"]] = placeholder 

245 redacted_items.append(f"{f['type']}:{f['value'][:20]}") 

246 

247 return "".join(redacted), redacted_items 

248 

249 def has_pii(self, content: str) -> bool: 

250 """Quick check if content contains any PII.""" 

251 return len(self.detect(content)) > 0 

252 

253 

254# ── Content Safety Filter ───────────────────────────────────────── 

255 

256class ContentSafetyFilter: 

257 """Filter for toxic content, prompt injection, jailbreak attempts. 

258 

259 Three-layer defense: 

260 1. Pattern matching (regex) — fast, deterministic 

261 2. Keyword blocklist — user-configurable 

262 3. Hash matching — known-attack fingerprints (optional) 

263 """ 

264 

265 def __init__( 

266 self, 

267 block_injection: bool = True, 

268 block_toxicity: bool = True, 

269 custom_blocklist: list[str] | None = None, 

270 custom_allowlist: list[str] | None = None, 

271 known_attack_hashes: set[str] | None = None, 

272 ): 

273 self.block_injection = block_injection 

274 self.block_toxicity = block_toxicity 

275 self.blocklist: set[str] = set(custom_blocklist or []) 

276 self.allowlist: set[str] = set(custom_allowlist or []) 

277 self.known_hashes: set[str] = known_attack_hashes or set() 

278 

279 # Compile injection patterns 

280 self._injection_re = [ 

281 re.compile(p, re.IGNORECASE) for p in INJECTION_PATTERNS 

282 ] 

283 

284 # Compile toxicity patterns 

285 self._toxicity_re: dict[str, list[re.Pattern]] = {} 

286 for category, patterns in TOXICITY_PATTERNS.items(): 

287 self._toxicity_re[category] = [ 

288 re.compile(p, re.IGNORECASE) for p in patterns 

289 ] 

290 

291 def check_injection(self, content: str) -> list[GuardResult]: 

292 """Check for prompt injection / jailbreak attempts.""" 

293 results = [] 

294 for i, pattern in enumerate(self._injection_re): 

295 if pattern.search(content): 

296 results.append(GuardResult( 

297 passed=False, 

298 action=GuardAction.BLOCK, 

299 severity=Severity.HIGH, 

300 rule_name=f"injection_pattern_{i}", 

301 message=f"Potential prompt injection detected: {pattern.pattern[:80]}", 

302 )) 

303 return results 

304 

305 def check_toxicity(self, content: str) -> list[GuardResult]: 

306 """Check for toxic/harmful content.""" 

307 results = [] 

308 for category, patterns in self._toxicity_re.items(): 

309 for i, pattern in enumerate(patterns): 

310 if pattern.search(content): 

311 severity = Severity.CRITICAL if category in ("self_harm", "illegal") else Severity.HIGH 

312 results.append(GuardResult( 

313 passed=False, 

314 action=GuardAction.BLOCK, 

315 severity=severity, 

316 rule_name=f"toxicity_{category}_{i}", 

317 message=f"Toxic content detected [{category}]: {pattern.pattern[:60]}", 

318 )) 

319 return results 

320 

321 def check_blocklist(self, content: str) -> list[GuardResult]: 

322 """Check against custom keyword blocklist.""" 

323 if not self.blocklist: 

324 return [] 

325 

326 content_lower = content.lower() 

327 results = [] 

328 for keyword in self.blocklist: 

329 if keyword.lower() in content_lower: 

330 # Skip if in allowlist 

331 if keyword.lower() in self.allowlist: 

332 continue 

333 results.append(GuardResult( 

334 passed=False, 

335 action=GuardAction.BLOCK, 

336 severity=Severity.MEDIUM, 

337 rule_name="blocklist", 

338 message=f"Blocked keyword: {keyword}", 

339 )) 

340 return results 

341 

342 def check_hash(self, content: str) -> list[GuardResult]: 

343 """Check content hash against known attack fingerprints.""" 

344 if not self.known_hashes: 

345 return [] 

346 

347 content_hash = hashlib.sha256(content.encode()).hexdigest() 

348 if content_hash in self.known_hashes: 

349 return [GuardResult( 

350 passed=False, 

351 action=GuardAction.BLOCK, 

352 severity=Severity.CRITICAL, 

353 rule_name="known_attack_hash", 

354 message="Content matches known attack fingerprint", 

355 )] 

356 return [] 

357 

358 def check_all(self, content: str) -> list[GuardResult]: 

359 """Run all safety checks on content.""" 

360 results = [] 

361 

362 if self.block_injection: 

363 results.extend(self.check_injection(content)) 

364 

365 if self.block_toxicity: 

366 results.extend(self.check_toxicity(content)) 

367 

368 results.extend(self.check_blocklist(content)) 

369 results.extend(self.check_hash(content)) 

370 

371 return results 

372 

373 def is_safe(self, content: str) -> bool: 

374 """Quick safety check — True if content passes all filters.""" 

375 results = self.check_all(content) 

376 return all(r.passed for r in results) 

377 

378 

379# ── Input Guardrail ─────────────────────────────────────────────── 

380 

381class InputGuard: 

382 """Guardrail for user input: PII detection, injection, content safety. 

383 

384 Runs before user input reaches the agent. 

385 """ 

386 

387 def __init__( 

388 self, 

389 pii_detector: PIIDetector | None = None, 

390 safety_filter: ContentSafetyFilter | None = None, 

391 max_input_length: int = 0, # 0 = no limit 

392 deny_empty: bool = True, 

393 ): 

394 self.pii = pii_detector or PIIDetector(auto_redact=True) 

395 self.safety = safety_filter or ContentSafetyFilter() 

396 self.max_input_length = max_input_length 

397 self.deny_empty = deny_empty 

398 

399 def guard(self, user_input: str, redact_pii: bool = True) -> GuardChainResult: 

400 """Run all input guardrails.""" 

401 results: list[GuardResult] = [] 

402 current_content = user_input 

403 

404 # 1. Empty check 

405 if self.deny_empty and (not user_input or not user_input.strip()): 

406 results.append(GuardResult( 

407 passed=False, action=GuardAction.BLOCK, 

408 severity=Severity.LOW, rule_name="empty_input", 

409 message="Empty input rejected", 

410 )) 

411 

412 # 2. Length check 

413 if self.max_input_length > 0 and len(user_input) > self.max_input_length: 

414 results.append(GuardResult( 

415 passed=False, action=GuardAction.BLOCK, 

416 severity=Severity.LOW, rule_name="input_too_long", 

417 message=f"Input exceeds max length ({len(user_input)} > {self.max_input_length})", 

418 )) 

419 

420 # 3. PII check 

421 if redact_pii: 

422 redacted, items = self.pii.redact(current_content) 

423 if items: 

424 current_content = redacted 

425 results.append(GuardResult( 

426 passed=True, action=GuardAction.REDACT, 

427 severity=Severity.MEDIUM, rule_name="pii_redacted", 

428 message=f"Redacted {len(items)} PII items", 

429 modified_content=current_content, 

430 redacted_items=items, 

431 )) 

432 

433 # 4. Safety checks 

434 safety_results = self.safety.check_all(current_content) 

435 results.extend(safety_results) 

436 

437 # Determine final outcome 

438 blocked = any(r.action == GuardAction.BLOCK for r in results) 

439 blocked_by = next((r.rule_name for r in results if r.action == GuardAction.BLOCK), "") 

440 warnings = [r.message for r in results if r.action == GuardAction.WARN] 

441 

442 return GuardChainResult( 

443 allowed=not blocked, 

444 final_content="" if blocked else current_content, 

445 results=results, 

446 blocked_by=blocked_by, 

447 total_checks=len(results), 

448 warnings=warnings, 

449 ) 

450 

451 

452# ── Output Guardrail ────────────────────────────────────────────── 

453 

454class OutputGuard: 

455 """Guardrail for agent output: PII leak prevention, sensitive content filtering. 

456 

457 Runs after agent generates output, before it reaches the user. 

458 """ 

459 

460 def __init__( 

461 self, 

462 pii_detector: PIIDetector | None = None, 

463 safety_filter: ContentSafetyFilter | None = None, 

464 max_output_length: int = 0, 

465 deny_empty: bool = True, 

466 block_system_prompt_leak: bool = True, 

467 ): 

468 self.pii = pii_detector or PIIDetector(auto_redact=True) 

469 self.safety = safety_filter or ContentSafetyFilter(block_injection=False) # No injection check on output 

470 self.max_output_length = max_output_length 

471 self.deny_empty = deny_empty 

472 self.block_system_prompt_leak = block_system_prompt_leak 

473 

474 def guard(self, agent_output: str) -> GuardChainResult: 

475 """Run all output guardrails.""" 

476 results: list[GuardResult] = [] 

477 current_content = agent_output 

478 

479 # 1. Empty check 

480 if self.deny_empty and (not agent_output or not agent_output.strip()): 

481 results.append(GuardResult( 

482 passed=False, action=GuardAction.BLOCK, 

483 severity=Severity.MEDIUM, rule_name="empty_output", 

484 message="Empty output blocked", 

485 )) 

486 

487 # 2. PII leak prevention 

488 redacted, items = self.pii.redact(current_content) 

489 if items: 

490 current_content = redacted 

491 results.append(GuardResult( 

492 passed=True, action=GuardAction.REDACT, 

493 severity=Severity.HIGH, rule_name="pii_leak_prevented", 

494 message=f"Prevented {len(items)} PII leaks in output", 

495 modified_content=current_content, 

496 redacted_items=items, 

497 )) 

498 

499 # 3. System prompt leak detection 

500 if self.block_system_prompt_leak: 

501 leak_indicators = [ 

502 r'(?i)(?:system\s+prompt|you\s+are\s+a\s+helpful|your\s+instructions?\s+are)', 

503 r'(?i)(?:your\s+rules?\s+are|your\s+guidelines?\s+are|your\s+core\s+directive)', 

504 r'(?i)(?:my\s+system\s+prompt|my\s+instructions?\s+(?:is|are|tell|say))', 

505 ] 

506 for i, pattern in enumerate(leak_indicators): 

507 if re.search(pattern, current_content): 

508 results.append(GuardResult( 

509 passed=False, action=GuardAction.BLOCK, 

510 severity=Severity.CRITICAL, rule_name=f"prompt_leak_{i}", 

511 message="Potential system prompt leak detected in output", 

512 )) 

513 break 

514 

515 # 4. Toxicity check (output should not contain harmful content) 

516 toxicity_results = self.safety.check_toxicity(current_content) 

517 results.extend(toxicity_results) 

518 

519 # Determine final outcome 

520 blocked = any(r.action == GuardAction.BLOCK for r in results) 

521 blocked_by = next((r.rule_name for r in results if r.action == GuardAction.BLOCK), "") 

522 

523 # Apply the last modification that changed content 

524 for r in results: 

525 if r.modified_content: 

526 current_content = r.modified_content 

527 

528 return GuardChainResult( 

529 allowed=not blocked, 

530 final_content="" if blocked else current_content, 

531 results=results, 

532 blocked_by=blocked_by, 

533 total_checks=len(results), 

534 ) 

535 

536 

537# ── Guardrail Pipeline ──────────────────────────────────────────── 

538 

539class GuardPipeline: 

540 """Full guardrail pipeline: Input → Agent → Output. 

541 

542 Usage: 

543 pipeline = GuardPipeline() 

544 result = pipeline.process_input(user_msg) 

545 if result.allowed: 

546 agent_output = agent.run(result.final_content) 

547 final = pipeline.process_output(agent_output) 

548 """ 

549 

550 def __init__( 

551 self, 

552 input_guard: InputGuard | None = None, 

553 output_guard: OutputGuard | None = None, 

554 ): 

555 self.input_guard = input_guard or InputGuard() 

556 self.output_guard = output_guard or OutputGuard() 

557 self.total_blocked: int = 0 

558 self.total_redacted: int = 0 

559 self.log: list[dict[str, Any]] = [] 

560 

561 def process_input(self, user_input: str) -> GuardChainResult: 

562 """Guard user input before it reaches the agent.""" 

563 result = self.input_guard.guard(user_input) 

564 self._log("input", result) 

565 if result.blocked: 

566 self.total_blocked += 1 

567 return result 

568 

569 def process_output(self, agent_output: str) -> GuardChainResult: 

570 """Guard agent output before it reaches the user.""" 

571 result = self.output_guard.guard(agent_output) 

572 self._log("output", result) 

573 if result.blocked: 

574 self.total_blocked += 1 

575 for r in result.results: 

576 if r.redacted_items: 

577 self.total_redacted += len(r.redacted_items) 

578 return result 

579 

580 def _log(self, stage: str, result: GuardChainResult) -> None: 

581 guard_results = [ 

582 {"rule": r.rule_name, "passed": r.passed, "action": r.action.value, 

583 "severity": r.severity.value, "message": r.message} 

584 for r in result.results 

585 ] 

586 self.log.append({ 

587 "stage": stage, 

588 "allowed": result.allowed, 

589 "total_checks": result.total_checks, 

590 "results": guard_results, 

591 }) 

592 

593 def get_stats(self) -> dict[str, Any]: 

594 return { 

595 "total_checks": len(self.log), 

596 "total_blocked": self.total_blocked, 

597 "total_redacted": self.total_redacted, 

598 "block_rate": f"{self.total_blocked / max(len(self.log), 1) * 100:.1f}%", 

599 } 

600 

601 

602# ── Default Guard Configs ───────────────────────────────────────── 

603 

604def create_strict_guard() -> GuardPipeline: 

605 """Create a strict guardrail pipeline (production recommended).""" 

606 pii = PIIDetector(auto_redact=True) 

607 safety = ContentSafetyFilter(block_injection=True, block_toxicity=True) 

608 return GuardPipeline( 

609 input_guard=InputGuard(pii_detector=pii, safety_filter=safety, max_input_length=32768), 

610 output_guard=OutputGuard(pii_detector=pii, safety_filter=safety, block_system_prompt_leak=True), 

611 ) 

612 

613 

614def create_permissive_guard() -> GuardPipeline: 

615 """Create a permissive guardrail pipeline (dev/debug).""" 

616 pii = PIIDetector(auto_redact=True) 

617 safety = ContentSafetyFilter(block_injection=True, block_toxicity=False) 

618 return GuardPipeline( 

619 input_guard=InputGuard(pii_detector=pii, safety_filter=safety), 

620 output_guard=OutputGuard(pii_detector=pii, safety_filter=safety, block_system_prompt_leak=False), 

621 )