Coverage for agentos/tools/policy_engine.py: 0%
210 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:34 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:34 +0800
1"""
2PolicyEngine — declarative rule engine with conditions, actions, and priorities.
4Supports:
5 - Rule DSL: WHEN {condition} THEN {action} WITH priority {int}
6 - Conditions: equals, contains, regex, gt/lt/gte/lte, in_set, exists, custom callable
7 - Actions: set, log, reject, allow, call, chain
8 - Priority ordering + first-match / all-match modes
9 - Dynamic rule add/remove at runtime
10 - JSON serialization for persistence
11"""
13from __future__ import annotations
15import json
16import logging
17import re
18from dataclasses import dataclass, field
19from enum import Enum
20from typing import Any, Callable, Dict, List, Optional, Set, Union
22logger = logging.getLogger(__name__)
25# ============================================================================
26# Condition Operators
27# ============================================================================
29class Op(Enum):
30 EQ = "eq"
31 NE = "ne"
32 GT = "gt"
33 GTE = "gte"
34 LT = "lt"
35 LTE = "lte"
36 CONTAINS = "contains"
37 REGEX = "regex"
38 IN_SET = "in_set"
39 EXISTS = "exists"
40 CUSTOM = "custom"
42 def evaluate(self, actual: Any, expected: Any) -> bool:
43 if self == Op.EQ:
44 return actual == expected
45 elif self == Op.NE:
46 return actual != expected
47 elif self == Op.GT:
48 return actual > expected
49 elif self == Op.GTE:
50 return actual >= expected
51 elif self == Op.LT:
52 return actual < expected
53 elif self == Op.LTE:
54 return actual <= expected
55 elif self == Op.CONTAINS:
56 return expected in actual if isinstance(actual, (str, list, set, tuple)) else False
57 elif self == Op.REGEX:
58 return bool(re.search(expected, actual)) if isinstance(actual, str) else False
59 elif self == Op.IN_SET:
60 return actual in expected if isinstance(expected, (list, set, tuple)) else False
61 elif self == Op.EXISTS:
62 return actual is not None
63 elif self == Op.CUSTOM:
64 # expected is a callable
65 return bool(expected(actual))
66 return False
69# ============================================================================
70# Condition
71# ============================================================================
73@dataclass
74class Condition:
75 """A single condition: field OP value."""
76 field: str
77 op: Op
78 value: Any
80 def evaluate(self, context: Dict[str, Any]) -> bool:
81 actual = context.get(self.field)
82 return self.op.evaluate(actual, self.value)
84 def to_dict(self) -> Dict[str, Any]:
85 result: Dict[str, Any] = {"field": self.field, "op": self.op.value}
86 if self.op != Op.EXISTS:
87 result["value"] = self.value if self.op != Op.CUSTOM else "<callable>"
88 return result
90 @classmethod
91 def from_dict(cls, d: Dict[str, Any]) -> "Condition":
92 return cls(field=d["field"], op=Op(d["op"]), value=d.get("value"))
95# ============================================================================
96# Actions
97# ============================================================================
99class ActionType(Enum):
100 SET = "set"
101 LOG = "log"
102 REJECT = "reject"
103 ALLOW = "allow"
104 CALL = "call"
105 CHAIN = "chain"
107 def execute(
108 self,
109 context: Dict[str, Any],
110 params: Any,
111 ) -> Optional[Dict[str, Any]]:
112 result: Optional[Dict[str, Any]] = None
113 if self == ActionType.SET:
114 if isinstance(params, dict):
115 for k, v in params.items():
116 context[k] = v
117 elif self == ActionType.LOG:
118 logger.info("policy_engine: %s", params)
119 elif self == ActionType.REJECT:
120 result = {"action": "reject", "reason": str(params)}
121 elif self == ActionType.ALLOW:
122 result = {"action": "allow"}
123 elif self == ActionType.CALL:
124 if callable(params):
125 params(context)
126 elif self == ActionType.CHAIN:
127 if isinstance(params, list):
128 for sub_action in params:
129 if isinstance(sub_action, Action):
130 sub_action.execute(context)
131 return result
133 @classmethod
134 def from_str(cls, s: str) -> "ActionType":
135 return ActionType(s.lower())
138# ============================================================================
139# Action
140# ============================================================================
142@dataclass
143class Action:
144 """Action definition."""
145 type: ActionType
146 params: Any = None
148 def execute(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
149 return self.type.execute(context, self.params)
151 def to_dict(self) -> Dict[str, Any]:
152 result = {"type": self.type.value}
153 if self.params is not None:
154 result["params"] = self.params if not callable(self.params) else "<callable>"
155 return result
157 @classmethod
158 def from_dict(cls, d: Dict[str, Any]) -> "Action":
159 return cls(type=ActionType.from_str(d["type"]), params=d.get("params"))
162# ============================================================================
163# Rule
164# ============================================================================
166@dataclass
167class Rule:
168 """A single rule: when all conditions match, execute actions."""
169 name: str
170 conditions: List[Condition] = field(default_factory=list)
171 actions: List[Action] = field(default_factory=list)
172 priority: int = 0
173 enabled: bool = True
174 description: str = ""
176 def matches(self, context: Dict[str, Any]) -> bool:
177 if not self.enabled:
178 return False
179 return all(c.evaluate(context) for c in self.conditions)
181 def evaluate(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
182 """Evaluate rule against context. Returns action result if triggered."""
183 if not self.matches(context):
184 return None
185 results = []
186 for action in self.actions:
187 r = action.execute(context)
188 if r:
189 results.append(r)
190 return results[-1] if results else None
192 def to_dict(self) -> Dict[str, Any]:
193 return {
194 "name": self.name,
195 "conditions": [c.to_dict() for c in self.conditions],
196 "actions": [a.to_dict() for a in self.actions],
197 "priority": self.priority,
198 "enabled": self.enabled,
199 "description": self.description,
200 }
202 @classmethod
203 def from_dict(cls, d: Dict[str, Any]) -> "Rule":
204 return cls(
205 name=d["name"],
206 conditions=[Condition.from_dict(c) for c in d.get("conditions", [])],
207 actions=[Action.from_dict(a) for a in d.get("actions", [])],
208 priority=d.get("priority", 0),
209 enabled=d.get("enabled", True),
210 description=d.get("description", ""),
211 )
214# ============================================================================
215# Match Mode
216# ============================================================================
218class MatchMode(Enum):
219 FIRST = "first" # Stop after first matching rule
220 ALL = "all" # Evaluate all rules, collect results
223# ============================================================================
224# PolicyEngine
225# ============================================================================
227class PolicyEngine:
228 """Declarative rule engine.
230 Usage:
231 pe = PolicyEngine()
233 pe.add_rule(
234 name="admin-access",
235 conditions=[
236 Condition(field="role", op=Op.EQ, value="admin"),
237 ],
238 actions=[Action(type=ActionType.ALLOW)],
239 priority=100,
240 )
242 pe.add_rule(
243 name="rate-limit",
244 conditions=[
245 Condition(field="requests_per_min", op=Op.GT, value=100),
246 ],
247 actions=[Action(type=ActionType.REJECT, params="rate limit exceeded")],
248 priority=50,
249 )
251 result = pe.evaluate({"role": "admin", "requests_per_min": 150})
252 # → {"action": "allow"} (higher priority matches first)
253 """
255 def __init__(self, mode: MatchMode = MatchMode.FIRST):
256 self._rules: List[Rule] = []
257 self._mode = mode
258 self._rule_names: Set[str] = set()
260 # ---------- CRUD ----------
262 def add_rule(
263 self,
264 name: str,
265 conditions: Optional[List[Condition]] = None,
266 actions: Optional[List[Action]] = None,
267 priority: int = 0,
268 enabled: bool = True,
269 description: str = "",
270 ) -> Rule:
271 if name in self._rule_names:
272 raise ValueError(f"Rule '{name}' already exists")
273 rule = Rule(
274 name=name,
275 conditions=conditions or [],
276 actions=actions or [],
277 priority=priority,
278 enabled=enabled,
279 description=description,
280 )
281 self._rules.append(rule)
282 self._rule_names.add(name)
283 self._sort()
284 return rule
286 def remove_rule(self, name: str) -> bool:
287 before = len(self._rules)
288 self._rules = [r for r in self._rules if r.name != name]
289 self._rule_names.discard(name)
290 return len(self._rules) < before
292 def get_rule(self, name: str) -> Optional[Rule]:
293 for r in self._rules:
294 if r.name == name:
295 return r
296 return None
298 def enable_rule(self, name: str) -> bool:
299 rule = self.get_rule(name)
300 if rule:
301 rule.enabled = True
302 return True
303 return False
305 def disable_rule(self, name: str) -> bool:
306 rule = self.get_rule(name)
307 if rule:
308 rule.enabled = False
309 return True
310 return False
312 def _sort(self) -> None:
313 self._rules.sort(key=lambda r: -r.priority)
315 # ---------- evaluation ----------
317 def evaluate(self, context: Dict[str, Any]) -> Optional[Dict[str, Any]]:
318 """Evaluate rules against context."""
319 results = []
320 for rule in self._rules:
321 if not rule.enabled:
322 continue
323 if rule.matches(context):
324 result = rule.evaluate(context)
325 results.append({"rule": rule.name, "result": result})
326 if self._mode == MatchMode.FIRST:
327 break
328 if not results:
329 return None
330 if self._mode == MatchMode.FIRST:
331 return results[0]
332 return {"matches": results}
334 def evaluate_all(self, context: Dict[str, Any]) -> List[Dict[str, Any]]:
335 """Evaluate all rules, return list of matches (ignores mode)."""
336 matches = []
337 for rule in self._rules:
338 if rule.matches(context):
339 result = rule.evaluate(context)
340 if result:
341 matches.append({"rule": rule.name, "result": result})
342 return matches
344 # ---------- serialization ----------
346 def to_dict(self) -> Dict[str, Any]:
347 return {"rules": [r.to_dict() for r in self._rules], "mode": self._mode.value}
349 def to_json(self) -> str:
350 return json.dumps(self.to_dict(), indent=2, ensure_ascii=False)
352 @classmethod
353 def from_dict(cls, d: Dict[str, Any]) -> "PolicyEngine":
354 pe = cls(mode=MatchMode(d.get("mode", "first")))
355 for rule_d in d.get("rules", []):
356 pe.add_rule(
357 name=rule_d["name"],
358 conditions=[Condition.from_dict(c) for c in rule_d.get("conditions", [])],
359 actions=[Action.from_dict(a) for a in rule_d.get("actions", [])],
360 priority=rule_d.get("priority", 0),
361 enabled=rule_d.get("enabled", True),
362 description=rule_d.get("description", ""),
363 )
364 return pe
366 @classmethod
367 def from_json(cls, json_str: str) -> "PolicyEngine":
368 return cls.from_dict(json.loads(json_str))
370 @property
371 def rules(self) -> List[Rule]:
372 return list(self._rules)