Coverage for agentos/tools/policy_engine.py: 0%

210 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 07:34 +0800

1""" 

2PolicyEngine — declarative rule engine with conditions, actions, and priorities. 

3 

4Supports: 

5 - Rule DSL: WHEN {condition} THEN {action} WITH priority {int} 

6 - Conditions: equals, contains, regex, gt/lt/gte/lte, in_set, exists, custom callable 

7 - Actions: set, log, reject, allow, call, chain 

8 - Priority ordering + first-match / all-match modes 

9 - Dynamic rule add/remove at runtime 

10 - JSON serialization for persistence 

11""" 

12 

13from __future__ import annotations 

14 

15import json 

16import logging 

17import re 

18from dataclasses import dataclass, field 

19from enum import Enum 

20from typing import Any, Callable, Dict, List, Optional, Set, Union 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25# ============================================================================ 

26# Condition Operators 

27# ============================================================================ 

28 

29class Op(Enum): 

30 EQ = "eq" 

31 NE = "ne" 

32 GT = "gt" 

33 GTE = "gte" 

34 LT = "lt" 

35 LTE = "lte" 

36 CONTAINS = "contains" 

37 REGEX = "regex" 

38 IN_SET = "in_set" 

39 EXISTS = "exists" 

40 CUSTOM = "custom" 

41 

42 def evaluate(self, actual: Any, expected: Any) -> bool: 

43 if self == Op.EQ: 

44 return actual == expected 

45 elif self == Op.NE: 

46 return actual != expected 

47 elif self == Op.GT: 

48 return actual > expected 

49 elif self == Op.GTE: 

50 return actual >= expected 

51 elif self == Op.LT: 

52 return actual < expected 

53 elif self == Op.LTE: 

54 return actual <= expected 

55 elif self == Op.CONTAINS: 

56 return expected in actual if isinstance(actual, (str, list, set, tuple)) else False 

57 elif self == Op.REGEX: 

58 return bool(re.search(expected, actual)) if isinstance(actual, str) else False 

59 elif self == Op.IN_SET: 

60 return actual in expected if isinstance(expected, (list, set, tuple)) else False 

61 elif self == Op.EXISTS: 

62 return actual is not None 

63 elif self == Op.CUSTOM: 

64 # expected is a callable 

65 return bool(expected(actual)) 

66 return False 

67 

68 

69# ============================================================================ 

70# Condition 

71# ============================================================================ 

72 

73@dataclass 

74class Condition: 

75 """A single condition: field OP value.""" 

76 field: str 

77 op: Op 

78 value: Any 

79 

80 def evaluate(self, context: Dict[str, Any]) -> bool: 

81 actual = context.get(self.field) 

82 return self.op.evaluate(actual, self.value) 

83 

84 def to_dict(self) -> Dict[str, Any]: 

85 result: Dict[str, Any] = {"field": self.field, "op": self.op.value} 

86 if self.op != Op.EXISTS: 

87 result["value"] = self.value if self.op != Op.CUSTOM else "<callable>" 

88 return result 

89 

90 @classmethod 

91 def from_dict(cls, d: Dict[str, Any]) -> "Condition": 

92 return cls(field=d["field"], op=Op(d["op"]), value=d.get("value")) 

93 

94 

95# ============================================================================ 

96# Actions 

97# ============================================================================ 

98 

99class ActionType(Enum): 

100 SET = "set" 

101 LOG = "log" 

102 REJECT = "reject" 

103 ALLOW = "allow" 

104 CALL = "call" 

105 CHAIN = "chain" 

106 

107 def execute( 

108 self, 

109 context: Dict[str, Any], 

110 params: Any, 

111 ) -> Optional[Dict[str, Any]]: 

112 result: Optional[Dict[str, Any]] = None 

113 if self == ActionType.SET: 

114 if isinstance(params, dict): 

115 for k, v in params.items(): 

116 context[k] = v 

117 elif self == ActionType.LOG: 

118 logger.info("policy_engine: %s", params) 

119 elif self == ActionType.REJECT: 

120 result = {"action": "reject", "reason": str(params)} 

121 elif self == ActionType.ALLOW: 

122 result = {"action": "allow"} 

123 elif self == ActionType.CALL: 

124 if callable(params): 

125 params(context) 

126 elif self == ActionType.CHAIN: 

127 if isinstance(params, list): 

128 for sub_action in params: 

129 if isinstance(sub_action, Action): 

130 sub_action.execute(context) 

131 return result 

132 

133 @classmethod 

134 def from_str(cls, s: str) -> "ActionType": 

135 return ActionType(s.lower()) 

136 

137 

138# ============================================================================ 

139# Action 

140# ============================================================================ 

141 

142@dataclass 

143class Action: 

144 """Action definition.""" 

145 type: ActionType 

146 params: Any = None 

147 

148 def execute(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: 

149 return self.type.execute(context, self.params) 

150 

151 def to_dict(self) -> Dict[str, Any]: 

152 result = {"type": self.type.value} 

153 if self.params is not None: 

154 result["params"] = self.params if not callable(self.params) else "<callable>" 

155 return result 

156 

157 @classmethod 

158 def from_dict(cls, d: Dict[str, Any]) -> "Action": 

159 return cls(type=ActionType.from_str(d["type"]), params=d.get("params")) 

160 

161 

162# ============================================================================ 

163# Rule 

164# ============================================================================ 

165 

166@dataclass 

167class Rule: 

168 """A single rule: when all conditions match, execute actions.""" 

169 name: str 

170 conditions: List[Condition] = field(default_factory=list) 

171 actions: List[Action] = field(default_factory=list) 

172 priority: int = 0 

173 enabled: bool = True 

174 description: str = "" 

175 

176 def matches(self, context: Dict[str, Any]) -> bool: 

177 if not self.enabled: 

178 return False 

179 return all(c.evaluate(context) for c in self.conditions) 

180 

181 def evaluate(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: 

182 """Evaluate rule against context. Returns action result if triggered.""" 

183 if not self.matches(context): 

184 return None 

185 results = [] 

186 for action in self.actions: 

187 r = action.execute(context) 

188 if r: 

189 results.append(r) 

190 return results[-1] if results else None 

191 

192 def to_dict(self) -> Dict[str, Any]: 

193 return { 

194 "name": self.name, 

195 "conditions": [c.to_dict() for c in self.conditions], 

196 "actions": [a.to_dict() for a in self.actions], 

197 "priority": self.priority, 

198 "enabled": self.enabled, 

199 "description": self.description, 

200 } 

201 

202 @classmethod 

203 def from_dict(cls, d: Dict[str, Any]) -> "Rule": 

204 return cls( 

205 name=d["name"], 

206 conditions=[Condition.from_dict(c) for c in d.get("conditions", [])], 

207 actions=[Action.from_dict(a) for a in d.get("actions", [])], 

208 priority=d.get("priority", 0), 

209 enabled=d.get("enabled", True), 

210 description=d.get("description", ""), 

211 ) 

212 

213 

214# ============================================================================ 

215# Match Mode 

216# ============================================================================ 

217 

218class MatchMode(Enum): 

219 FIRST = "first" # Stop after first matching rule 

220 ALL = "all" # Evaluate all rules, collect results 

221 

222 

223# ============================================================================ 

224# PolicyEngine 

225# ============================================================================ 

226 

227class PolicyEngine: 

228 """Declarative rule engine. 

229 

230 Usage: 

231 pe = PolicyEngine() 

232 

233 pe.add_rule( 

234 name="admin-access", 

235 conditions=[ 

236 Condition(field="role", op=Op.EQ, value="admin"), 

237 ], 

238 actions=[Action(type=ActionType.ALLOW)], 

239 priority=100, 

240 ) 

241 

242 pe.add_rule( 

243 name="rate-limit", 

244 conditions=[ 

245 Condition(field="requests_per_min", op=Op.GT, value=100), 

246 ], 

247 actions=[Action(type=ActionType.REJECT, params="rate limit exceeded")], 

248 priority=50, 

249 ) 

250 

251 result = pe.evaluate({"role": "admin", "requests_per_min": 150}) 

252 # → {"action": "allow"} (higher priority matches first) 

253 """ 

254 

255 def __init__(self, mode: MatchMode = MatchMode.FIRST): 

256 self._rules: List[Rule] = [] 

257 self._mode = mode 

258 self._rule_names: Set[str] = set() 

259 

260 # ---------- CRUD ---------- 

261 

262 def add_rule( 

263 self, 

264 name: str, 

265 conditions: Optional[List[Condition]] = None, 

266 actions: Optional[List[Action]] = None, 

267 priority: int = 0, 

268 enabled: bool = True, 

269 description: str = "", 

270 ) -> Rule: 

271 if name in self._rule_names: 

272 raise ValueError(f"Rule '{name}' already exists") 

273 rule = Rule( 

274 name=name, 

275 conditions=conditions or [], 

276 actions=actions or [], 

277 priority=priority, 

278 enabled=enabled, 

279 description=description, 

280 ) 

281 self._rules.append(rule) 

282 self._rule_names.add(name) 

283 self._sort() 

284 return rule 

285 

286 def remove_rule(self, name: str) -> bool: 

287 before = len(self._rules) 

288 self._rules = [r for r in self._rules if r.name != name] 

289 self._rule_names.discard(name) 

290 return len(self._rules) < before 

291 

292 def get_rule(self, name: str) -> Optional[Rule]: 

293 for r in self._rules: 

294 if r.name == name: 

295 return r 

296 return None 

297 

298 def enable_rule(self, name: str) -> bool: 

299 rule = self.get_rule(name) 

300 if rule: 

301 rule.enabled = True 

302 return True 

303 return False 

304 

305 def disable_rule(self, name: str) -> bool: 

306 rule = self.get_rule(name) 

307 if rule: 

308 rule.enabled = False 

309 return True 

310 return False 

311 

312 def _sort(self) -> None: 

313 self._rules.sort(key=lambda r: -r.priority) 

314 

315 # ---------- evaluation ---------- 

316 

317 def evaluate(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]: 

318 """Evaluate rules against context.""" 

319 results = [] 

320 for rule in self._rules: 

321 if not rule.enabled: 

322 continue 

323 if rule.matches(context): 

324 result = rule.evaluate(context) 

325 results.append({"rule": rule.name, "result": result}) 

326 if self._mode == MatchMode.FIRST: 

327 break 

328 if not results: 

329 return None 

330 if self._mode == MatchMode.FIRST: 

331 return results[0] 

332 return {"matches": results} 

333 

334 def evaluate_all(self, context: Dict[str, Any]) -> List[Dict[str, Any]]: 

335 """Evaluate all rules, return list of matches (ignores mode).""" 

336 matches = [] 

337 for rule in self._rules: 

338 if rule.matches(context): 

339 result = rule.evaluate(context) 

340 if result: 

341 matches.append({"rule": rule.name, "result": result}) 

342 return matches 

343 

344 # ---------- serialization ---------- 

345 

346 def to_dict(self) -> Dict[str, Any]: 

347 return {"rules": [r.to_dict() for r in self._rules], "mode": self._mode.value} 

348 

349 def to_json(self) -> str: 

350 return json.dumps(self.to_dict(), indent=2, ensure_ascii=False) 

351 

352 @classmethod 

353 def from_dict(cls, d: Dict[str, Any]) -> "PolicyEngine": 

354 pe = cls(mode=MatchMode(d.get("mode", "first"))) 

355 for rule_d in d.get("rules", []): 

356 pe.add_rule( 

357 name=rule_d["name"], 

358 conditions=[Condition.from_dict(c) for c in rule_d.get("conditions", [])], 

359 actions=[Action.from_dict(a) for a in rule_d.get("actions", [])], 

360 priority=rule_d.get("priority", 0), 

361 enabled=rule_d.get("enabled", True), 

362 description=rule_d.get("description", ""), 

363 ) 

364 return pe 

365 

366 @classmethod 

367 def from_json(cls, json_str: str) -> "PolicyEngine": 

368 return cls.from_dict(json.loads(json_str)) 

369 

370 @property 

371 def rules(self) -> List[Rule]: 

372 return list(self._rules)