Coverage for src / tracekit / core / audit.py: 92%

115 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Audit trail with HMAC chain verification for compliance and tamper detection. 

2 

3This module provides tamper-evident audit logging using HMAC signatures 

4to create a verifiable chain of audit entries. 

5 

6 

7Example: 

8 >>> from tracekit.core.audit import AuditTrail 

9 >>> audit = AuditTrail(secret_key=b"my-secret-key") 

10 >>> audit.record_action("load_trace", {"file": "data.bin"}, user="alice") 

11 >>> audit.record_action("compute_fft", {"samples": 1000000}, user="alice") 

12 >>> # Verify integrity 

13 >>> is_valid = audit.verify_integrity() 

14 >>> # Export audit log 

15 >>> audit.export_audit_log("audit.json", format="json") 

16 

17References: 

18 LOG-009: Comprehensive Audit Trail for Compliance 

19 HMAC-SHA256 for tamper detection 

20""" 

21 

22from __future__ import annotations 

23 

24import getpass 

25import hashlib 

26import hmac 

27import json 

28import os 

29import socket 

30from dataclasses import asdict, dataclass, field 

31from datetime import UTC, datetime 

32from pathlib import Path 

33from typing import Any, Literal 

34 

35from tracekit.core.logging import format_timestamp 

36 

37 

38@dataclass 

39class AuditEntry: 

40 """Single audit trail entry with HMAC signature. 

41 

42 Each entry records an auditable action and is linked to the previous 

43 entry via HMAC chaining for tamper detection. 

44 

45 Attributes: 

46 timestamp: ISO 8601 timestamp (UTC) of the action. 

47 action: Action identifier (e.g., "load_trace", "compute_fft"). 

48 details: Additional details about the action (parameters, results). 

49 user: Username who performed the action (defaults to current user). 

50 host: Hostname where the action was performed. 

51 previous_hash: HMAC of the previous entry (for chain verification). 

52 hmac: HMAC signature of this entry. 

53 

54 References: 

55 LOG-009: Comprehensive Audit Trail for Compliance 

56 """ 

57 

58 timestamp: str 

59 action: str 

60 details: dict[str, Any] 

61 user: str 

62 host: str 

63 previous_hash: str 

64 hmac: str = field(default="") 

65 

66 def to_dict(self) -> dict[str, Any]: 

67 """Convert audit entry to dictionary. 

68 

69 Returns: 

70 Dictionary representation of the audit entry. 

71 """ 

72 return asdict(self) 

73 

74 @classmethod 

75 def from_dict(cls, data: dict[str, Any]) -> AuditEntry: 

76 """Create audit entry from dictionary. 

77 

78 Args: 

79 data: Dictionary containing audit entry data. 

80 

81 Returns: 

82 AuditEntry instance. 

83 """ 

84 return cls(**data) 

85 

86 

87class AuditTrail: 

88 """Tamper-evident audit trail with HMAC chain verification. 

89 

90 Maintains a chain of audit entries where each entry is cryptographically 

91 linked to the previous entry using HMAC signatures. This allows detection 

92 of any tampering or modification of the audit log. 

93 

94 The HMAC chain works as follows: 

95 1. Each entry contains the HMAC of the previous entry 

96 2. Each entry's HMAC is computed over: timestamp + action + details + user + previous_hash 

97 3. Any modification to any entry breaks the chain and fails verification 

98 

99 Args: 

100 secret_key: Secret key for HMAC computation (required for tamper detection). 

101 hash_algorithm: Hash algorithm to use (default: 'sha256'). 

102 

103 Example: 

104 >>> audit = AuditTrail(secret_key=b"my-secret") 

105 >>> audit.record_action("operation", {"param": "value"}) 

106 >>> assert audit.verify_integrity() 

107 

108 References: 

109 LOG-009: Comprehensive Audit Trail for Compliance 

110 """ 

111 

112 def __init__( 

113 self, 

114 secret_key: bytes | None = None, 

115 hash_algorithm: Literal["sha256", "sha512"] = "sha256", 

116 ): 

117 """Initialize audit trail. 

118 

119 Args: 

120 secret_key: Secret key for HMAC computation. If None, a random key is generated. 

121 hash_algorithm: Hash algorithm to use (sha256 or sha512). 

122 """ 

123 self._entries: list[AuditEntry] = [] 

124 self._secret_key = secret_key or os.urandom(32) 

125 self._hash_algorithm = hash_algorithm 

126 

127 def record_action( 

128 self, 

129 action: str, 

130 details: dict[str, Any], 

131 user: str | None = None, 

132 ) -> AuditEntry: 

133 """Record an auditable action. 

134 

135 Creates a new audit entry with HMAC signature and adds it to the chain. 

136 

137 Args: 

138 action: Action identifier (e.g., "load_trace", "compute_measurement"). 

139 details: Dictionary of action details (parameters, results, etc.). 

140 user: Username who performed the action (defaults to current user). 

141 

142 Returns: 

143 The created AuditEntry. 

144 

145 Example: 

146 >>> audit = AuditTrail(secret_key=b"key") 

147 >>> entry = audit.record_action( 

148 ... "load_trace", 

149 ... {"file": "data.bin", "size_mb": 100}, 

150 ... user="alice" 

151 ... ) 

152 

153 References: 

154 LOG-009: Comprehensive Audit Trail for Compliance 

155 """ 

156 # Get current user and host 

157 if user is None: 

158 try: 

159 user = getpass.getuser() 

160 except Exception: 

161 user = "unknown" 

162 

163 try: 

164 host = socket.gethostname() 

165 except Exception: 

166 host = "unknown" 

167 

168 # Get timestamp 

169 timestamp = format_timestamp(datetime.now(UTC), format="iso8601") 

170 

171 # Get previous hash 

172 previous_hash = self._entries[-1].hmac if self._entries else "GENESIS" 

173 

174 # Create entry (without HMAC initially) 

175 entry = AuditEntry( 

176 timestamp=timestamp, 

177 action=action, 

178 details=details, 

179 user=user, 

180 host=host, 

181 previous_hash=previous_hash, 

182 hmac="", 

183 ) 

184 

185 # Compute HMAC 

186 entry.hmac = self._compute_hmac(entry) 

187 

188 # Add to chain 

189 self._entries.append(entry) 

190 

191 return entry 

192 

193 def verify_integrity(self) -> bool: 

194 """Verify HMAC chain integrity. 

195 

196 Verifies that: 

197 1. Each entry's HMAC is valid 

198 2. Each entry's previous_hash matches the previous entry's HMAC 

199 3. No entries have been tampered with or removed 

200 

201 Returns: 

202 True if the audit trail is intact and untampered, False otherwise. 

203 

204 Example: 

205 >>> audit = AuditTrail(secret_key=b"key") 

206 >>> audit.record_action("action1", {}) 

207 >>> audit.record_action("action2", {}) 

208 >>> assert audit.verify_integrity() # Should be True 

209 >>> # Tampering with an entry would break the chain 

210 >>> audit._entries[0].action = "modified" 

211 >>> assert not audit.verify_integrity() # Should be False 

212 

213 References: 

214 LOG-009: Comprehensive Audit Trail for Compliance 

215 """ 

216 if not self._entries: 

217 return True # Empty trail is valid 

218 

219 for i, entry in enumerate(self._entries): 

220 # Verify HMAC 

221 expected_hmac = self._compute_hmac(entry) 

222 if entry.hmac != expected_hmac: 

223 return False 

224 

225 # Verify previous hash linkage 

226 if i == 0: 

227 if entry.previous_hash != "GENESIS": 227 ↛ 228line 227 didn't jump to line 228 because the condition on line 227 was never true

228 return False 

229 elif entry.previous_hash != self._entries[i - 1].hmac: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 return False 

231 

232 return True 

233 

234 def export_audit_log( 

235 self, 

236 path: str, 

237 format: Literal["json", "csv"] = "json", 

238 ) -> None: 

239 """Export audit trail to file. 

240 

241 Args: 

242 path: Path to export file. 

243 format: Export format (json or csv). 

244 

245 Raises: 

246 ValueError: If format is not supported. 

247 

248 Example: 

249 >>> audit = AuditTrail(secret_key=b"key") 

250 >>> audit.record_action("test", {}) 

251 >>> audit.export_audit_log("audit.json", format="json") 

252 

253 References: 

254 LOG-009: Comprehensive Audit Trail for Compliance 

255 """ 

256 path_obj = Path(path) 

257 path_obj.parent.mkdir(parents=True, exist_ok=True) 

258 

259 if format == "json": 

260 self._export_json(path_obj) 

261 elif format == "csv": 

262 self._export_csv(path_obj) 

263 else: 

264 raise ValueError(f"Unsupported format: {format}") 

265 

266 def get_entries( 

267 self, 

268 since: datetime | None = None, 

269 action_type: str | None = None, 

270 ) -> list[AuditEntry]: 

271 """Query audit entries with optional filtering. 

272 

273 Args: 

274 since: Return only entries after this datetime (UTC). 

275 action_type: Return only entries with this action type. 

276 

277 Returns: 

278 List of matching AuditEntry objects. 

279 

280 Example: 

281 >>> from datetime import datetime, UTC, timedelta 

282 >>> audit = AuditTrail(secret_key=b"key") 

283 >>> audit.record_action("load", {}) 

284 >>> audit.record_action("analyze", {}) 

285 >>> # Get all load actions 

286 >>> loads = audit.get_entries(action_type="load") 

287 >>> # Get entries from last hour 

288 >>> recent = audit.get_entries(since=datetime.now(UTC) - timedelta(hours=1)) 

289 

290 References: 

291 LOG-009: Comprehensive Audit Trail for Compliance 

292 """ 

293 results = self._entries.copy() 

294 

295 # Filter by timestamp 

296 if since is not None: 

297 since_str = format_timestamp(since, format="iso8601") 

298 results = [e for e in results if e.timestamp >= since_str] 

299 

300 # Filter by action type 

301 if action_type is not None: 

302 results = [e for e in results if e.action == action_type] 

303 

304 return results 

305 

306 def _compute_hmac(self, entry: AuditEntry) -> str: 

307 """Compute HMAC signature for an audit entry. 

308 

309 Args: 

310 entry: Audit entry to sign. 

311 

312 Returns: 

313 Hexadecimal HMAC signature. 

314 

315 Raises: 

316 ValueError: If hash algorithm is unsupported. 

317 

318 References: 

319 LOG-009: HMAC-based tamper detection 

320 """ 

321 # Create canonical representation 

322 canonical = ( 

323 f"{entry.timestamp}|{entry.action}|{json.dumps(entry.details, sort_keys=True)}" 

324 f"|{entry.user}|{entry.host}|{entry.previous_hash}" 

325 ) 

326 

327 # Compute HMAC 

328 if self._hash_algorithm == "sha256": 

329 h = hmac.new(self._secret_key, canonical.encode("utf-8"), hashlib.sha256) 

330 elif self._hash_algorithm == "sha512": 330 ↛ 333line 330 didn't jump to line 333 because the condition on line 330 was always true

331 h = hmac.new(self._secret_key, canonical.encode("utf-8"), hashlib.sha512) 

332 else: 

333 raise ValueError(f"Unsupported hash algorithm: {self._hash_algorithm}") 

334 

335 return h.hexdigest() 

336 

337 def _export_json(self, path: Path) -> None: 

338 """Export audit trail as JSON. 

339 

340 Args: 

341 path: Path to JSON file. 

342 """ 

343 data = { 

344 "version": "1.0", 

345 "hash_algorithm": self._hash_algorithm, 

346 "entries": [entry.to_dict() for entry in self._entries], 

347 } 

348 

349 with open(path, "w", encoding="utf-8") as f: 

350 json.dump(data, f, indent=2) 

351 

352 def _export_csv(self, path: Path) -> None: 

353 """Export audit trail as CSV. 

354 

355 Args: 

356 path: Path to CSV file. 

357 """ 

358 import csv 

359 

360 with open(path, "w", newline="", encoding="utf-8") as f: 

361 if not self._entries: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true

362 return 

363 

364 # Get all possible detail keys 

365 detail_keys = set() # type: ignore[var-annotated] 

366 for entry in self._entries: 

367 detail_keys.update(entry.details.keys()) 

368 detail_keys = sorted(detail_keys) # type: ignore[assignment] 

369 

370 # Create CSV writer 

371 fieldnames = [ 

372 "timestamp", 

373 "action", 

374 "user", 

375 "host", 

376 "previous_hash", 

377 "hmac", 

378 ] + [f"detail_{k}" for k in detail_keys] 

379 

380 writer = csv.DictWriter(f, fieldnames=fieldnames) 

381 writer.writeheader() 

382 

383 # Write entries 

384 for entry in self._entries: 

385 row = { 

386 "timestamp": entry.timestamp, 

387 "action": entry.action, 

388 "user": entry.user, 

389 "host": entry.host, 

390 "previous_hash": entry.previous_hash, 

391 "hmac": entry.hmac, 

392 } 

393 # Add details 

394 for key in detail_keys: 

395 value = entry.details.get(key) 

396 row[f"detail_{key}"] = json.dumps(value) if value is not None else "" 

397 

398 writer.writerow(row) 

399 

400 

401# Convenience function for global audit trail 

402_global_audit_trail: AuditTrail | None = None 

403 

404 

405def get_global_audit_trail(secret_key: bytes | None = None) -> AuditTrail: 

406 """Get or create the global audit trail. 

407 

408 Args: 

409 secret_key: Secret key for HMAC computation (only used on first call). 

410 

411 Returns: 

412 Global AuditTrail instance. 

413 

414 Example: 

415 >>> from tracekit.core.audit import get_global_audit_trail 

416 >>> audit = get_global_audit_trail(secret_key=b"my-key") 

417 >>> audit.record_action("test", {}) 

418 

419 References: 

420 LOG-009: Comprehensive Audit Trail for Compliance 

421 """ 

422 global _global_audit_trail 

423 if _global_audit_trail is None: 

424 _global_audit_trail = AuditTrail(secret_key=secret_key) 

425 return _global_audit_trail 

426 

427 

428def record_audit(action: str, details: dict[str, Any], user: str | None = None) -> AuditEntry: 

429 """Record an action to the global audit trail. 

430 

431 Convenience function for recording to the global audit trail. 

432 

433 Args: 

434 action: Action identifier. 

435 details: Action details. 

436 user: Username (defaults to current user). 

437 

438 Returns: 

439 Created AuditEntry. 

440 

441 Example: 

442 >>> from tracekit.core.audit import record_audit 

443 >>> record_audit("compute_fft", {"samples": 1000000}) 

444 

445 References: 

446 LOG-009: Comprehensive Audit Trail for Compliance 

447 """ 

448 audit = get_global_audit_trail() 

449 return audit.record_action(action, details, user) 

450 

451 

452__all__ = [ 

453 "AuditEntry", 

454 "AuditTrail", 

455 "get_global_audit_trail", 

456 "record_audit", 

457]