Coverage for agentos/tools/audit_logger.py: 0%
106 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:41 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:41 +0800
1"""
2AuditLogger — ring-buffer audit log with JSON export, level filtering, and callbacks.
4Supports:
5 - Structured audit events (actor, action, resource, outcome, details)
6 - Severity levels (INFO, WARNING, ERROR, CRITICAL)
7 - Ring buffer with configurable capacity
8 - JSON export (to file or string)
9 - Level-based filtering
10 - Subscription callbacks for real-time forwarding
11 - Thread-safe append
12"""
14from __future__ import annotations
16import json
17import threading
18import time
19from dataclasses import dataclass, field, asdict
20from enum import Enum
21from pathlib import Path
22from typing import Any, Callable, Dict, List, Optional
25# ============================================================================
26# Severity
27# ============================================================================
29class Severity(Enum):
30 INFO = 10
31 WARNING = 20
32 ERROR = 30
33 CRITICAL = 40
35 @classmethod
36 def from_str(cls, s: str) -> "Severity":
37 return getattr(cls, s.upper(), cls.INFO)
40# ============================================================================
41# AuditEvent
42# ============================================================================
44@dataclass
45class AuditEvent:
46 """Single audit log entry."""
47 actor: str = "" # who performed the action
48 action: str = "" # what was done (e.g., "user.delete", "config.update")
49 resource: str = "" # what was acted upon (e.g., "user:123", "/etc/config.yaml")
50 outcome: str = "" # "success", "failure", "denied"
51 severity: Severity = Severity.INFO
52 details: Dict[str, Any] = field(default_factory=dict)
53 timestamp: float = field(default_factory=time.time)
55 def to_dict(self) -> Dict[str, Any]:
56 d = asdict(self)
57 d["severity"] = self.severity.name
58 d["timestamp"] = time.strftime("%Y-%m-%dT%H:%M:%S", time.gmtime(self.timestamp))
59 return d
61 @classmethod
62 def from_dict(cls, d: Dict[str, Any]) -> "AuditEvent":
63 return cls(
64 actor=d.get("actor", ""),
65 action=d.get("action", ""),
66 resource=d.get("resource", ""),
67 outcome=d.get("outcome", ""),
68 severity=Severity.from_str(d.get("severity", "INFO")),
69 details=d.get("details", {}),
70 timestamp=d.get("timestamp", time.time()),
71 )
74# ============================================================================
75# AuditLogger
76# ============================================================================
78class AuditLogger:
79 """Ring-buffer audit logger.
81 Usage:
82 audit = AuditLogger(capacity=2000)
84 # Record an event
85 audit.log(
86 actor="admin",
87 action="user.delete",
88 resource="user:42",
89 outcome="success",
90 severity=Severity.WARNING,
91 details={"reason": "GDPR request"},
92 )
94 # Export as JSON
95 audit.export_json("audit_2026.json")
97 # Subscribe to events in real-time
98 audit.subscribe(lambda event: forward_to_siem(event))
100 # Query with filter
101 recent_failures = audit.query(
102 min_severity=Severity.ERROR,
103 limit=50,
104 )
105 """
107 def __init__(self, capacity: int = 5000):
108 if capacity <= 0:
109 raise ValueError("capacity must be positive")
110 self._capacity = capacity
111 self._buffer: List[AuditEvent] = []
112 self._lock = threading.RLock()
113 self._subscribers: List[Callable[[AuditEvent], None]] = []
115 # ---------- log ----------
117 def log(
118 self,
119 actor: str = "",
120 action: str = "",
121 resource: str = "",
122 outcome: str = "",
123 severity: Severity = Severity.INFO,
124 details: Optional[Dict[str, Any]] = None,
125 ) -> AuditEvent:
126 """Record an audit event."""
127 event = AuditEvent(
128 actor=actor,
129 action=action,
130 resource=resource,
131 outcome=outcome,
132 severity=severity,
133 details=details or {},
134 )
135 with self._lock:
136 self._buffer.append(event)
137 # Ring buffer eviction
138 excess = len(self._buffer) - self._capacity
139 if excess > 0:
140 self._buffer = self._buffer[excess:]
142 # Notify subscribers outside lock
143 self._notify(event)
144 return event
146 # ---------- query ----------
148 def query(
149 self,
150 actor: Optional[str] = None,
151 action: Optional[str] = None,
152 resource: Optional[str] = None,
153 outcome: Optional[str] = None,
154 min_severity: Optional[Severity] = None,
155 max_severity: Optional[Severity] = None,
156 since: Optional[float] = None,
157 until: Optional[float] = None,
158 limit: Optional[int] = None,
159 ) -> List[AuditEvent]:
160 """Query audit events with optional filters."""
161 with self._lock:
162 results = list(self._buffer)
164 if actor:
165 results = [e for e in results if e.actor == actor]
166 if action:
167 results = [e for e in results if e.action == action]
168 if resource:
169 results = [e for e in results if e.resource == resource]
170 if outcome:
171 results = [e for e in results if e.outcome == outcome]
172 if min_severity:
173 results = [e for e in results if e.severity.value >= min_severity.value]
174 if max_severity:
175 results = [e for e in results if e.severity.value <= max_severity.value]
176 if since is not None:
177 results = [e for e in results if e.timestamp >= since]
178 if until is not None:
179 results = [e for e in results if e.timestamp <= until]
181 if limit is not None and limit > 0:
182 results = results[-limit:]
184 return results
186 def recent(self, count: int = 20) -> List[AuditEvent]:
187 """Return the most recent N events."""
188 with self._lock:
189 return self._buffer[-count:] if count < len(self._buffer) else list(self._buffer)
191 # ---------- export ----------
193 def export_json(self, path: Optional[str] = None) -> str:
194 """Export all events as JSON. If path given, writes to file."""
195 with self._lock:
196 data = [e.to_dict() for e in self._buffer]
198 json_str = json.dumps(data, indent=2, ensure_ascii=False)
200 if path:
201 Path(path).write_text(json_str, encoding="utf-8")
203 return json_str
205 # ---------- subscription ----------
207 def subscribe(self, callback: Callable[[AuditEvent], None]) -> None:
208 """Register a callback for real-time event forwarding."""
209 with self._lock:
210 self._subscribers.append(callback)
212 def unsubscribe(self, callback: Callable[[AuditEvent], None]) -> bool:
213 with self._lock:
214 if callback in self._subscribers:
215 self._subscribers.remove(callback)
216 return True
217 return False
219 def _notify(self, event: AuditEvent) -> None:
220 with self._lock:
221 subs = list(self._subscribers)
222 for cb in subs:
223 try:
224 cb(event)
225 except Exception:
226 pass
228 # ---------- info ----------
230 @property
231 def count(self) -> int:
232 with self._lock:
233 return len(self._buffer)
235 @property
236 def capacity(self) -> int:
237 return self._capacity