Coverage for agentos/tools/audit_logger.py: 0%

106 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 07:41 +0800

1""" 

2AuditLogger — ring-buffer audit log with JSON export, level filtering, and callbacks. 

3 

4Supports: 

5 - Structured audit events (actor, action, resource, outcome, details) 

6 - Severity levels (INFO, WARNING, ERROR, CRITICAL) 

7 - Ring buffer with configurable capacity 

8 - JSON export (to file or string) 

9 - Level-based filtering 

10 - Subscription callbacks for real-time forwarding 

11 - Thread-safe append 

12""" 

13 

14from __future__ import annotations 

15 

16import json 

17import threading 

18import time 

19from dataclasses import dataclass, field, asdict 

20from enum import Enum 

21from pathlib import Path 

22from typing import Any, Callable, Dict, List, Optional 

23 

24 

25# ============================================================================ 

26# Severity 

27# ============================================================================ 

28 

29class Severity(Enum): 

30 INFO = 10 

31 WARNING = 20 

32 ERROR = 30 

33 CRITICAL = 40 

34 

35 @classmethod 

36 def from_str(cls, s: str) -> "Severity": 

37 return getattr(cls, s.upper(), cls.INFO) 

38 

39 

40# ============================================================================ 

41# AuditEvent 

42# ============================================================================ 

43 

44@dataclass 

45class AuditEvent: 

46 """Single audit log entry.""" 

47 actor: str = "" # who performed the action 

48 action: str = "" # what was done (e.g., "user.delete", "config.update") 

49 resource: str = "" # what was acted upon (e.g., "user:123", "/etc/config.yaml") 

50 outcome: str = "" # "success", "failure", "denied" 

51 severity: Severity = Severity.INFO 

52 details: Dict[str, Any] = field(default_factory=dict) 

53 timestamp: float = field(default_factory=time.time) 

54 

55 def to_dict(self) -> Dict[str, Any]: 

56 d = asdict(self) 

57 d["severity"] = self.severity.name 

58 d["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self.timestamp)) 

59 return d 

60 

61 @classmethod 

62 def from_dict(cls, d: Dict[str, Any]) -> "AuditEvent": 

63 return cls( 

64 actor=d.get("actor", ""), 

65 action=d.get("action", ""), 

66 resource=d.get("resource", ""), 

67 outcome=d.get("outcome", ""), 

68 severity=Severity.from_str(d.get("severity", "INFO")), 

69 details=d.get("details", {}), 

70 timestamp=d.get("timestamp", time.time()), 

71 ) 

72 

73 

74# ============================================================================ 

75# AuditLogger 

76# ============================================================================ 

77 

78class AuditLogger: 

79 """Ring-buffer audit logger. 

80 

81 Usage: 

82 audit = AuditLogger(capacity=2000) 

83 

84 # Record an event 

85 audit.log( 

86 actor="admin", 

87 action="user.delete", 

88 resource="user:42", 

89 outcome="success", 

90 severity=Severity.WARNING, 

91 details={"reason": "GDPR request"}, 

92 ) 

93 

94 # Export as JSON 

95 audit.export_json("audit_2026.json") 

96 

97 # Subscribe to events in real-time 

98 audit.subscribe(lambda event: forward_to_siem(event)) 

99 

100 # Query with filter 

101 recent_failures = audit.query( 

102 min_severity=Severity.ERROR, 

103 limit=50, 

104 ) 

105 """ 

106 

107 def __init__(self, capacity: int = 5000): 

108 if capacity <= 0: 

109 raise ValueError("capacity must be positive") 

110 self._capacity = capacity 

111 self._buffer: List[AuditEvent] = [] 

112 self._lock = threading.RLock() 

113 self._subscribers: List[Callable[[AuditEvent], None]] = [] 

114 

115 # ---------- log ---------- 

116 

117 def log( 

118 self, 

119 actor: str = "", 

120 action: str = "", 

121 resource: str = "", 

122 outcome: str = "", 

123 severity: Severity = Severity.INFO, 

124 details: Optional[Dict[str, Any]] = None, 

125 ) -> AuditEvent: 

126 """Record an audit event.""" 

127 event = AuditEvent( 

128 actor=actor, 

129 action=action, 

130 resource=resource, 

131 outcome=outcome, 

132 severity=severity, 

133 details=details or {}, 

134 ) 

135 with self._lock: 

136 self._buffer.append(event) 

137 # Ring buffer eviction 

138 excess = len(self._buffer) - self._capacity 

139 if excess > 0: 

140 self._buffer = self._buffer[excess:] 

141 

142 # Notify subscribers outside lock 

143 self._notify(event) 

144 return event 

145 

146 # ---------- query ---------- 

147 

148 def query( 

149 self, 

150 actor: Optional[str] = None, 

151 action: Optional[str] = None, 

152 resource: Optional[str] = None, 

153 outcome: Optional[str] = None, 

154 min_severity: Optional[Severity] = None, 

155 max_severity: Optional[Severity] = None, 

156 since: Optional[float] = None, 

157 until: Optional[float] = None, 

158 limit: Optional[int] = None, 

159 ) -> List[AuditEvent]: 

160 """Query audit events with optional filters.""" 

161 with self._lock: 

162 results = list(self._buffer) 

163 

164 if actor: 

165 results = [e for e in results if e.actor == actor] 

166 if action: 

167 results = [e for e in results if e.action == action] 

168 if resource: 

169 results = [e for e in results if e.resource == resource] 

170 if outcome: 

171 results = [e for e in results if e.outcome == outcome] 

172 if min_severity: 

173 results = [e for e in results if e.severity.value >= min_severity.value] 

174 if max_severity: 

175 results = [e for e in results if e.severity.value <= max_severity.value] 

176 if since is not None: 

177 results = [e for e in results if e.timestamp >= since] 

178 if until is not None: 

179 results = [e for e in results if e.timestamp <= until] 

180 

181 if limit is not None and limit > 0: 

182 results = results[-limit:] 

183 

184 return results 

185 

186 def recent(self, count: int = 20) -> List[AuditEvent]: 

187 """Return the most recent N events.""" 

188 with self._lock: 

189 return self._buffer[-count:] if count < len(self._buffer) else list(self._buffer) 

190 

191 # ---------- export ---------- 

192 

193 def export_json(self, path: Optional[str] = None) -> str: 

194 """Export all events as JSON. If path given, writes to file.""" 

195 with self._lock: 

196 data = [e.to_dict() for e in self._buffer] 

197 

198 json_str = json.dumps(data, indent=2, ensure_ascii=False) 

199 

200 if path: 

201 Path(path).write_text(json_str, encoding="utf-8") 

202 

203 return json_str 

204 

205 # ---------- subscription ---------- 

206 

207 def subscribe(self, callback: Callable[[AuditEvent], None]) -> None: 

208 """Register a callback for real-time event forwarding.""" 

209 with self._lock: 

210 self._subscribers.append(callback) 

211 

212 def unsubscribe(self, callback: Callable[[AuditEvent], None]) -> bool: 

213 with self._lock: 

214 if callback in self._subscribers: 

215 self._subscribers.remove(callback) 

216 return True 

217 return False 

218 

219 def _notify(self, event: AuditEvent) -> None: 

220 with self._lock: 

221 subs = list(self._subscribers) 

222 for cb in subs: 

223 try: 

224 cb(event) 

225 except Exception: 

226 pass 

227 

228 # ---------- info ---------- 

229 

230 @property 

231 def count(self) -> int: 

232 with self._lock: 

233 return len(self._buffer) 

234 

235 @property 

236 def capacity(self) -> int: 

237 return self._capacity