Coverage for src / tracekit / core / audit.py: 92%
115 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Audit trail with HMAC chain verification for compliance and tamper detection.
3This module provides tamper-evident audit logging using HMAC signatures
4to create a verifiable chain of audit entries.
7Example:
8 >>> from tracekit.core.audit import AuditTrail
9 >>> audit = AuditTrail(secret_key=b"my-secret-key")
10 >>> audit.record_action("load_trace", {"file": "data.bin"}, user="alice")
11 >>> audit.record_action("compute_fft", {"samples": 1000000}, user="alice")
12 >>> # Verify integrity
13 >>> is_valid = audit.verify_integrity()
14 >>> # Export audit log
15 >>> audit.export_audit_log("audit.json", format="json")
17References:
18 LOG-009: Comprehensive Audit Trail for Compliance
19 HMAC-SHA256 for tamper detection
20"""
22from __future__ import annotations
24import getpass
25import hashlib
26import hmac
27import json
28import os
29import socket
30from dataclasses import asdict, dataclass, field
31from datetime import UTC, datetime
32from pathlib import Path
33from typing import Any, Literal
35from tracekit.core.logging import format_timestamp
38@dataclass
39class AuditEntry:
40 """Single audit trail entry with HMAC signature.
42 Each entry records an auditable action and is linked to the previous
43 entry via HMAC chaining for tamper detection.
45 Attributes:
46 timestamp: ISO 8601 timestamp (UTC) of the action.
47 action: Action identifier (e.g., "load_trace", "compute_fft").
48 details: Additional details about the action (parameters, results).
49 user: Username who performed the action (defaults to current user).
50 host: Hostname where the action was performed.
51 previous_hash: HMAC of the previous entry (for chain verification).
52 hmac: HMAC signature of this entry.
54 References:
55 LOG-009: Comprehensive Audit Trail for Compliance
56 """
58 timestamp: str
59 action: str
60 details: dict[str, Any]
61 user: str
62 host: str
63 previous_hash: str
64 hmac: str = field(default="")
66 def to_dict(self) -> dict[str, Any]:
67 """Convert audit entry to dictionary.
69 Returns:
70 Dictionary representation of the audit entry.
71 """
72 return asdict(self)
74 @classmethod
75 def from_dict(cls, data: dict[str, Any]) -> AuditEntry:
76 """Create audit entry from dictionary.
78 Args:
79 data: Dictionary containing audit entry data.
81 Returns:
82 AuditEntry instance.
83 """
84 return cls(**data)
87class AuditTrail:
88 """Tamper-evident audit trail with HMAC chain verification.
90 Maintains a chain of audit entries where each entry is cryptographically
91 linked to the previous entry using HMAC signatures. This allows detection
92 of any tampering or modification of the audit log.
94 The HMAC chain works as follows:
95 1. Each entry contains the HMAC of the previous entry
96 2. Each entry's HMAC is computed over: timestamp + action + details + user + previous_hash
97 3. Any modification to any entry breaks the chain and fails verification
99 Args:
100 secret_key: Secret key for HMAC computation (required for tamper detection).
101 hash_algorithm: Hash algorithm to use (default: 'sha256').
103 Example:
104 >>> audit = AuditTrail(secret_key=b"my-secret")
105 >>> audit.record_action("operation", {"param": "value"})
106 >>> assert audit.verify_integrity()
108 References:
109 LOG-009: Comprehensive Audit Trail for Compliance
110 """
112 def __init__(
113 self,
114 secret_key: bytes | None = None,
115 hash_algorithm: Literal["sha256", "sha512"] = "sha256",
116 ):
117 """Initialize audit trail.
119 Args:
120 secret_key: Secret key for HMAC computation. If None, a random key is generated.
121 hash_algorithm: Hash algorithm to use (sha256 or sha512).
122 """
123 self._entries: list[AuditEntry] = []
124 self._secret_key = secret_key or os.urandom(32)
125 self._hash_algorithm = hash_algorithm
127 def record_action(
128 self,
129 action: str,
130 details: dict[str, Any],
131 user: str | None = None,
132 ) -> AuditEntry:
133 """Record an auditable action.
135 Creates a new audit entry with HMAC signature and adds it to the chain.
137 Args:
138 action: Action identifier (e.g., "load_trace", "compute_measurement").
139 details: Dictionary of action details (parameters, results, etc.).
140 user: Username who performed the action (defaults to current user).
142 Returns:
143 The created AuditEntry.
145 Example:
146 >>> audit = AuditTrail(secret_key=b"key")
147 >>> entry = audit.record_action(
148 ... "load_trace",
149 ... {"file": "data.bin", "size_mb": 100},
150 ... user="alice"
151 ... )
153 References:
154 LOG-009: Comprehensive Audit Trail for Compliance
155 """
156 # Get current user and host
157 if user is None:
158 try:
159 user = getpass.getuser()
160 except Exception:
161 user = "unknown"
163 try:
164 host = socket.gethostname()
165 except Exception:
166 host = "unknown"
168 # Get timestamp
169 timestamp = format_timestamp(datetime.now(UTC), format="iso8601")
171 # Get previous hash
172 previous_hash = self._entries[-1].hmac if self._entries else "GENESIS"
174 # Create entry (without HMAC initially)
175 entry = AuditEntry(
176 timestamp=timestamp,
177 action=action,
178 details=details,
179 user=user,
180 host=host,
181 previous_hash=previous_hash,
182 hmac="",
183 )
185 # Compute HMAC
186 entry.hmac = self._compute_hmac(entry)
188 # Add to chain
189 self._entries.append(entry)
191 return entry
193 def verify_integrity(self) -> bool:
194 """Verify HMAC chain integrity.
196 Verifies that:
197 1. Each entry's HMAC is valid
198 2. Each entry's previous_hash matches the previous entry's HMAC
199 3. No entries have been tampered with or removed
201 Returns:
202 True if the audit trail is intact and untampered, False otherwise.
204 Example:
205 >>> audit = AuditTrail(secret_key=b"key")
206 >>> audit.record_action("action1", {})
207 >>> audit.record_action("action2", {})
208 >>> assert audit.verify_integrity() # Should be True
209 >>> # Tampering with an entry would break the chain
210 >>> audit._entries[0].action = "modified"
211 >>> assert not audit.verify_integrity() # Should be False
213 References:
214 LOG-009: Comprehensive Audit Trail for Compliance
215 """
216 if not self._entries:
217 return True # Empty trail is valid
219 for i, entry in enumerate(self._entries):
220 # Verify HMAC
221 expected_hmac = self._compute_hmac(entry)
222 if entry.hmac != expected_hmac:
223 return False
225 # Verify previous hash linkage
226 if i == 0:
227 if entry.previous_hash != "GENESIS": 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true
228 return False
229 elif entry.previous_hash != self._entries[i - 1].hmac: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 return False
232 return True
234 def export_audit_log(
235 self,
236 path: str,
237 format: Literal["json", "csv"] = "json",
238 ) -> None:
239 """Export audit trail to file.
241 Args:
242 path: Path to export file.
243 format: Export format (json or csv).
245 Raises:
246 ValueError: If format is not supported.
248 Example:
249 >>> audit = AuditTrail(secret_key=b"key")
250 >>> audit.record_action("test", {})
251 >>> audit.export_audit_log("audit.json", format="json")
253 References:
254 LOG-009: Comprehensive Audit Trail for Compliance
255 """
256 path_obj = Path(path)
257 path_obj.parent.mkdir(parents=True, exist_ok=True)
259 if format == "json":
260 self._export_json(path_obj)
261 elif format == "csv":
262 self._export_csv(path_obj)
263 else:
264 raise ValueError(f"Unsupported format: {format}")
266 def get_entries(
267 self,
268 since: datetime | None = None,
269 action_type: str | None = None,
270 ) -> list[AuditEntry]:
271 """Query audit entries with optional filtering.
273 Args:
274 since: Return only entries after this datetime (UTC).
275 action_type: Return only entries with this action type.
277 Returns:
278 List of matching AuditEntry objects.
280 Example:
281 >>> from datetime import datetime, UTC, timedelta
282 >>> audit = AuditTrail(secret_key=b"key")
283 >>> audit.record_action("load", {})
284 >>> audit.record_action("analyze", {})
285 >>> # Get all load actions
286 >>> loads = audit.get_entries(action_type="load")
287 >>> # Get entries from last hour
288 >>> recent = audit.get_entries(since=datetime.now(UTC) - timedelta(hours=1))
290 References:
291 LOG-009: Comprehensive Audit Trail for Compliance
292 """
293 results = self._entries.copy()
295 # Filter by timestamp
296 if since is not None:
297 since_str = format_timestamp(since, format="iso8601")
298 results = [e for e in results if e.timestamp >= since_str]
300 # Filter by action type
301 if action_type is not None:
302 results = [e for e in results if e.action == action_type]
304 return results
306 def _compute_hmac(self, entry: AuditEntry) -> str:
307 """Compute HMAC signature for an audit entry.
309 Args:
310 entry: Audit entry to sign.
312 Returns:
313 Hexadecimal HMAC signature.
315 Raises:
316 ValueError: If hash algorithm is unsupported.
318 References:
319 LOG-009: HMAC-based tamper detection
320 """
321 # Create canonical representation
322 canonical = (
323 f"{entry.timestamp}|{entry.action}|{json.dumps(entry.details, sort_keys=True)}"
324 f"|{entry.user}|{entry.host}|{entry.previous_hash}"
325 )
327 # Compute HMAC
328 if self._hash_algorithm == "sha256":
329 h = hmac.new(self._secret_key, canonical.encode("utf-8"), hashlib.sha256)
330 elif self._hash_algorithm == "sha512": 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true
331 h = hmac.new(self._secret_key, canonical.encode("utf-8"), hashlib.sha512)
332 else:
333 raise ValueError(f"Unsupported hash algorithm: {self._hash_algorithm}")
335 return h.hexdigest()
337 def _export_json(self, path: Path) -> None:
338 """Export audit trail as JSON.
340 Args:
341 path: Path to JSON file.
342 """
343 data = {
344 "version": "1.0",
345 "hash_algorithm": self._hash_algorithm,
346 "entries": [entry.to_dict() for entry in self._entries],
347 }
349 with open(path, "w", encoding="utf-8") as f:
350 json.dump(data, f, indent=2)
352 def _export_csv(self, path: Path) -> None:
353 """Export audit trail as CSV.
355 Args:
356 path: Path to CSV file.
357 """
358 import csv
360 with open(path, "w", newline="", encoding="utf-8") as f:
361 if not self._entries: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true
362 return
364 # Get all possible detail keys
365 detail_keys = set() # type: ignore[var-annotated]
366 for entry in self._entries:
367 detail_keys.update(entry.details.keys())
368 detail_keys = sorted(detail_keys) # type: ignore[assignment]
370 # Create CSV writer
371 fieldnames = [
372 "timestamp",
373 "action",
374 "user",
375 "host",
376 "previous_hash",
377 "hmac",
378 ] + [f"detail_{k}" for k in detail_keys]
380 writer = csv.DictWriter(f, fieldnames=fieldnames)
381 writer.writeheader()
383 # Write entries
384 for entry in self._entries:
385 row = {
386 "timestamp": entry.timestamp,
387 "action": entry.action,
388 "user": entry.user,
389 "host": entry.host,
390 "previous_hash": entry.previous_hash,
391 "hmac": entry.hmac,
392 }
393 # Add details
394 for key in detail_keys:
395 value = entry.details.get(key)
396 row[f"detail_{key}"] = json.dumps(value) if value is not None else ""
398 writer.writerow(row)
401# Convenience function for global audit trail
402_global_audit_trail: AuditTrail | None = None
405def get_global_audit_trail(secret_key: bytes | None = None) -> AuditTrail:
406 """Get or create the global audit trail.
408 Args:
409 secret_key: Secret key for HMAC computation (only used on first call).
411 Returns:
412 Global AuditTrail instance.
414 Example:
415 >>> from tracekit.core.audit import get_global_audit_trail
416 >>> audit = get_global_audit_trail(secret_key=b"my-key")
417 >>> audit.record_action("test", {})
419 References:
420 LOG-009: Comprehensive Audit Trail for Compliance
421 """
422 global _global_audit_trail
423 if _global_audit_trail is None:
424 _global_audit_trail = AuditTrail(secret_key=secret_key)
425 return _global_audit_trail
428def record_audit(action: str, details: dict[str, Any], user: str | None = None) -> AuditEntry:
429 """Record an action to the global audit trail.
431 Convenience function for recording to the global audit trail.
433 Args:
434 action: Action identifier.
435 details: Action details.
436 user: Username (defaults to current user).
438 Returns:
439 Created AuditEntry.
441 Example:
442 >>> from tracekit.core.audit import record_audit
443 >>> record_audit("compute_fft", {"samples": 1000000})
445 References:
446 LOG-009: Comprehensive Audit Trail for Compliance
447 """
448 audit = get_global_audit_trail()
449 return audit.record_action(action, details, user)
452__all__ = [
453 "AuditEntry",
454 "AuditTrail",
455 "get_global_audit_trail",
456 "record_audit",
457]