Coverage for src / apcore_cli / security / audit.py: 100%

44 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""Audit logging in JSON Lines format (FE-05).""" 

2 

3from __future__ import annotations 

4 

5import contextlib 

6import hashlib 

7import json 

8import logging 

9import os 

10import secrets 

11from datetime import UTC, datetime 

12from pathlib import Path 

13from typing import Literal 

14 

15logger = logging.getLogger("apcore_cli.security") 

16 

17 

18class AuditLogger: 

19 DEFAULT_PATH = Path.home() / ".apcore-cli" / "audit.jsonl" 

20 

21 def __init__(self, path: Path | None = None) -> None: 

22 self._path = path or self.DEFAULT_PATH 

23 self._ensure_directory() 

24 

25 def _ensure_directory(self) -> None: 

26 self._path.parent.mkdir(parents=True, exist_ok=True) 

27 with contextlib.suppress(OSError): 

28 os.chmod(self._path.parent, 0o700) 

29 

30 def log_execution( 

31 self, 

32 module_id: str, 

33 input_data: dict, 

34 status: Literal["success", "error"], 

35 exit_code: int, 

36 duration_ms: int, 

37 ) -> None: 

38 entry = { 

39 "timestamp": datetime.now(UTC).isoformat(timespec="milliseconds").replace("+00:00", "Z"), 

40 "user": self._get_user(), 

41 "module_id": module_id, 

42 "input_hash": self._hash_input(input_data), 

43 "status": status, 

44 "exit_code": exit_code, 

45 "duration_ms": duration_ms, 

46 } 

47 try: 

48 with open(self._path, "a") as f: 

49 f.write(json.dumps(entry) + "\n") 

50 with contextlib.suppress(OSError): 

51 os.chmod(self._path, 0o600) 

52 except OSError as e: 

53 logger.warning("Could not write audit log: %s", e) 

54 

55 def _hash_input(self, input_data: dict) -> str: 

56 """Hash input with a random salt to prevent correlation across invocations.""" 

57 salt = secrets.token_bytes(16) 

58 payload = json.dumps(input_data, sort_keys=True).encode() 

59 return hashlib.sha256(salt + payload).hexdigest() 

60 

61 def _get_user(self) -> str: 

62 try: 

63 return os.getlogin() 

64 except OSError: 

65 pass 

66 try: 

67 import pwd 

68 

69 return pwd.getpwuid(os.getuid()).pw_name 

70 except (ImportError, KeyError, AttributeError): 

71 pass 

72 return os.getenv("USER", os.getenv("USERNAME", "unknown"))