Coverage for agentos/enterprise/audit.py: 50%
123 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS Enterprise — Audit Logging.
4功能:
5 - 全量审计事件记录
6 - 事件分类(认证/操作/数据/系统)
7 - 可配置保留策略
8 - 合规报告导出(CSV/JSON)
9 - GDPR/CCPA 数据删除支持
10"""
12from __future__ import annotations
14import csv
15import io
16import json
17import time
18import uuid
19from dataclasses import dataclass, field
20from enum import Enum
21from typing import Optional
24class AuditCategory(str, Enum):
25 """审计事件分类。"""
26 AUTH = "auth" # 登录/登出/Token
27 API_KEY = "api_key" # Key 创建/撤销/轮转
28 AGENT = "agent" # Agent 创建/运行/删除
29 TENANT = "tenant" # 租户管理
30 CONFIG = "config" # 配置变更
31 SYSTEM = "system" # 系统事件
32 DATA = "data" # 数据访问/导出
33 SECURITY = "security" # 安全事件(违规/攻击)
36class AuditSeverity(str, Enum):
37 INFO = "info"
38 WARNING = "warning"
39 ERROR = "error"
40 CRITICAL = "critical"
43@dataclass
44class AuditEvent:
45 """一条审计事件。"""
46 event_id: str
47 timestamp: float
48 category: AuditCategory
49 action: str # 如 "api_key.created", "agent.run"
50 severity: AuditSeverity
51 actor_type: str # "user" / "agent" / "system" / "api_key"
52 actor_id: str # user_id / agent_id / key_id
53 tenant_id: str
54 resource_type: str # "agent" / "api_key" / "tenant" / ...
55 resource_id: str
56 ip_address: str
57 user_agent: str
58 status: str # "success" / "failure"
59 details: dict = field(default_factory=dict)
60 metadata: dict = field(default_factory=dict)
63@dataclass
64class RetentionPolicy:
65 """审计日志保留策略。"""
66 max_events: int = 100_000 # 最大事件数
67 max_age_days: int = 90 # 最大保留天数
68 auto_prune: bool = True # 是否自动清理过期事件
71class AuditLogger:
72 """审计日志引擎。
74 特性:
75 - 全量事件记录
76 - 内存 + 文件双存储模式
77 - 可配置保留策略
78 - 过滤/搜索/导出
79 - 合规支持(GDPR 数据删除)
80 """
82 def __init__(self, retention: RetentionPolicy = None):
83 self._events: list[AuditEvent] = []
84 self.retention = retention or RetentionPolicy()
86 # ── 记录 ──
88 def log(
89 self,
90 category: AuditCategory,
91 action: str,
92 severity: AuditSeverity = AuditSeverity.INFO,
93 actor_type: str = "system",
94 actor_id: str = "",
95 tenant_id: str = "",
96 resource_type: str = "",
97 resource_id: str = "",
98 ip_address: str = "",
99 user_agent: str = "",
100 status: str = "success",
101 details: dict = None,
102 metadata: dict = None,
103 ) -> AuditEvent:
104 """记录一条审计事件。"""
105 event = AuditEvent(
106 event_id=f"evt_{uuid.uuid4().hex[:12]}",
107 timestamp=time.time(),
108 category=category,
109 action=action,
110 severity=severity,
111 actor_type=actor_type,
112 actor_id=actor_id,
113 tenant_id=tenant_id,
114 resource_type=resource_type,
115 resource_id=resource_id,
116 ip_address=ip_address,
117 user_agent=user_agent,
118 status=status,
119 details=details or {},
120 metadata=metadata or {},
121 )
122 self._events.append(event)
124 # 自动清理
125 if self.retention.auto_prune:
126 self._prune()
128 return event
130 def log_auth(self, action: str, user_id: str, tenant_id: str, status: str, ip: str = "", ua: str = "", **kwargs):
131 """便捷:记录认证事件。"""
132 return self.log(
133 category=AuditCategory.AUTH,
134 action=action,
135 actor_type="user",
136 actor_id=user_id,
137 tenant_id=tenant_id,
138 resource_type="session",
139 resource_id=user_id,
140 ip_address=ip,
141 user_agent=ua,
142 status=status,
143 details=kwargs,
144 )
146 def log_api_key(self, action: str, key_id: str, tenant_id: str, actor_id: str, **kwargs):
147 """便捷:记录 API Key 事件。"""
148 return self.log(
149 category=AuditCategory.API_KEY,
150 action=action,
151 severity=AuditSeverity.WARNING if action.endswith(".revoked") else AuditSeverity.INFO,
152 actor_type="user",
153 actor_id=actor_id,
154 tenant_id=tenant_id,
155 resource_type="api_key",
156 resource_id=key_id,
157 details=kwargs,
158 )
160 def log_agent_run(self, agent_id: str, tenant_id: str, status: str, **kwargs):
161 """便捷:记录 Agent 运行事件。"""
162 return self.log(
163 category=AuditCategory.AGENT,
164 action="agent.run",
165 actor_type="agent",
166 actor_id=agent_id,
167 tenant_id=tenant_id,
168 resource_type="agent",
169 resource_id=agent_id,
170 status=status,
171 details=kwargs,
172 )
174 def log_security(self, action: str, severity: AuditSeverity, details: dict, **kwargs):
175 """便捷:记录安全事件。"""
176 return self.log(
177 category=AuditCategory.SECURITY,
178 action=action,
179 severity=severity,
180 actor_type=kwargs.pop("actor_type", "system"),
181 actor_id=kwargs.pop("actor_id", ""),
182 tenant_id=kwargs.pop("tenant_id", ""),
183 resource_type=kwargs.pop("resource_type", ""),
184 resource_id=kwargs.pop("resource_id", ""),
185 details=details,
186 **kwargs,
187 )
189 # ── 查询 ──
191 def query(
192 self,
193 category: Optional[AuditCategory] = None,
194 severity: Optional[AuditSeverity] = None,
195 tenant_id: Optional[str] = None,
196 actor_id: Optional[str] = None,
197 status: Optional[str] = None,
198 since: Optional[float] = None,
199 until: Optional[float] = None,
200 limit: int = 100,
201 ) -> list[AuditEvent]:
202 """多条件过滤查询。"""
203 results = self._events
205 if category:
206 results = [e for e in results if e.category == category]
207 if severity:
208 results = [e for e in results if e.severity == severity]
209 if tenant_id:
210 results = [e for e in results if e.tenant_id == tenant_id]
211 if actor_id:
212 results = [e for e in results if e.actor_id == actor_id]
213 if status:
214 results = [e for e in results if e.status == status]
215 if since:
216 results = [e for e in results if e.timestamp >= since]
217 if until:
218 results = [e for e in results if e.timestamp <= until]
220 return sorted(results, key=lambda e: e.timestamp, reverse=True)[:limit]
222 def get_recent(self, n: int = 50) -> list[AuditEvent]:
223 """最近 N 条事件。"""
224 return sorted(self._events, key=lambda e: e.timestamp, reverse=True)[:n]
226 # ── 导出 ──
228 def export_json(self, events: list[AuditEvent] = None) -> str:
229 """导出为 JSON 字符串。"""
230 target = events or self._events
231 return json.dumps(
232 [_event_to_dict(e) for e in target],
233 ensure_ascii=False,
234 indent=2,
235 default=str,
236 )
238 def export_csv(self, events: list[AuditEvent] = None) -> str:
239 """导出为 CSV 字符串。"""
240 target = events or self._events
241 output = io.StringIO()
242 writer = csv.DictWriter(output, fieldnames=[
243 "event_id", "timestamp", "category", "action", "severity",
244 "actor_type", "actor_id", "tenant_id", "resource_type", "resource_id",
245 "ip_address", "status", "details",
246 ])
247 writer.writeheader()
248 for e in target:
249 writer.writerow({
250 "event_id": e.event_id,
251 "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(e.timestamp)),
252 "category": e.category.value,
253 "action": e.action,
254 "severity": e.severity.value,
255 "actor_type": e.actor_type,
256 "actor_id": e.actor_id,
257 "tenant_id": e.tenant_id,
258 "resource_type": e.resource_type,
259 "resource_id": e.resource_id,
260 "ip_address": e.ip_address,
261 "status": e.status,
262 "details": json.dumps(e.details, ensure_ascii=False),
263 })
264 return output.getvalue()
266 # ── 合规 ──
268 def delete_user_data(self, user_id: str) -> int:
269 """GDPR / CCPA:删除指定用户相关的所有审计记录。返回删除条数。"""
270 before = len(self._events)
271 self._events = [e for e in self._events if e.actor_id != user_id]
272 return before - len(self._events)
274 def compliance_report(self, tenant_id: str, start: float, end: float) -> dict:
275 """生成合规报告摘要。"""
276 events = self.query(tenant_id=tenant_id, since=start, until=end, limit=10000)
277 by_category = {}
278 by_severity = {}
279 failure_count = 0
280 for e in events:
281 by_category[e.category.value] = by_category.get(e.category.value, 0) + 1
282 by_severity[e.severity.value] = by_severity.get(e.severity.value, 0) + 1
283 if e.status == "failure":
284 failure_count += 1
285 return {
286 "tenant_id": tenant_id,
287 "period": {
288 "start": time.strftime("%Y-%m-%d", time.gmtime(start)),
289 "end": time.strftime("%Y-%m-%d", time.gmtime(end)),
290 },
291 "total_events": len(events),
292 "by_category": by_category,
293 "by_severity": by_severity,
294 "failure_rate": f"{failure_count / len(events) * 100:.1f}%" if events else "0%",
295 "generated_at": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime()),
296 }
298 # ── 统计 ──
300 def stats(self) -> dict:
301 total = len(self._events)
302 by_category = {}
303 by_severity = {}
304 for e in self._events:
305 by_category[e.category.value] = by_category.get(e.category.value, 0) + 1
306 by_severity[e.severity.value] = by_severity.get(e.severity.value, 0) + 1
307 return {
308 "total_events": total,
309 "by_category": by_category,
310 "by_severity": by_severity,
311 "retention_policy": {
312 "max_events": self.retention.max_events,
313 "max_age_days": self.retention.max_age_days,
314 },
315 }
317 # ── 内部 ──
319 def _prune(self):
320 """按保留策略清理过期事件。"""
321 # 按数量
322 if len(self._events) > self.retention.max_events:
323 self._events = self._events[-self.retention.max_events:]
325 # 按时间
326 cutoff = time.time() - self.retention.max_age_days * 86400
327 self._events = [e for e in self._events if e.timestamp > cutoff]
330def _event_to_dict(e: AuditEvent) -> dict:
331 return {
332 "event_id": e.event_id,
333 "timestamp": time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime(e.timestamp)),
334 "category": e.category.value,
335 "action": e.action,
336 "severity": e.severity.value,
337 "actor_type": e.actor_type,
338 "actor_id": e.actor_id,
339 "tenant_id": e.tenant_id,
340 "resource_type": e.resource_type,
341 "resource_id": e.resource_id,
342 "ip_address": e.ip_address,
343 "user_agent": e.user_agent,
344 "status": e.status,
345 "details": e.details,
346 "metadata": e.metadata,
347 }