Coverage for src / apcore_cli / security / audit.py: 100%
44 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""Audit logging in JSON Lines format (FE-05)."""
3from __future__ import annotations
5import contextlib
6import hashlib
7import json
8import logging
9import os
10import secrets
11from datetime import UTC, datetime
12from pathlib import Path
13from typing import Literal
15logger = logging.getLogger("apcore_cli.security")
18class AuditLogger:
19 DEFAULT_PATH = Path.home() / ".apcore-cli" / "audit.jsonl"
21 def __init__(self, path: Path | None = None) -> None:
22 self._path = path or self.DEFAULT_PATH
23 self._ensure_directory()
25 def _ensure_directory(self) -> None:
26 self._path.parent.mkdir(parents=True, exist_ok=True)
27 with contextlib.suppress(OSError):
28 os.chmod(self._path.parent, 0o700)
30 def log_execution(
31 self,
32 module_id: str,
33 input_data: dict,
34 status: Literal["success", "error"],
35 exit_code: int,
36 duration_ms: int,
37 ) -> None:
38 entry = {
39 "timestamp": datetime.now(UTC).isoformat(timespec="milliseconds").replace("+00:00", "Z"),
40 "user": self._get_user(),
41 "module_id": module_id,
42 "input_hash": self._hash_input(input_data),
43 "status": status,
44 "exit_code": exit_code,
45 "duration_ms": duration_ms,
46 }
47 try:
48 with open(self._path, "a") as f:
49 f.write(json.dumps(entry) + "\n")
50 with contextlib.suppress(OSError):
51 os.chmod(self._path, 0o600)
52 except OSError as e:
53 logger.warning("Could not write audit log: %s", e)
55 def _hash_input(self, input_data: dict) -> str:
56 """Hash input with a random salt to prevent correlation across invocations."""
57 salt = secrets.token_bytes(16)
58 payload = json.dumps(input_data, sort_keys=True).encode()
59 return hashlib.sha256(salt + payload).hexdigest()
61 def _get_user(self) -> str:
62 try:
63 return os.getlogin()
64 except OSError:
65 pass
66 try:
67 import pwd
69 return pwd.getpwuid(os.getuid()).pw_name
70 except (ImportError, KeyError, AttributeError):
71 pass
72 return os.getenv("USER", os.getenv("USERNAME", "unknown"))