"""Append-only audit log.
Every meaningful event in the loop — run start/finish, tool dispatch,
tool result, permission decision — gets a signed entry on the audit
log. Two backends ship:
* :class:`InMemoryAuditLog` — list-backed, fast, used in tests and dev.
* :class:`FileAuditLog` — JSONL append on disk, durable across
process restarts.
Both compute an HMAC-SHA256 ``signature`` over a canonicalised
representation of the entry's content fields, keyed by a per-log
``secret``. The signature lets compliance tooling detect tampering;
:func:`verify_signature` recomputes it and compares.
The log is conceptually monotonic: ``seq`` is per-log and never
re-used. :class:`FileAuditLog` recovers the highest seq from the file
on startup so multiple processes can append in turn.
"""
from __future__ import annotations
import hashlib
import hmac
import json
from collections.abc import AsyncIterator
from datetime import UTC, datetime
from pathlib import Path
from typing import Any, Protocol, runtime_checkable
import anyio
from ..core.types import AuditEntry
[docs]
@runtime_checkable
class AuditLog(Protocol):
"""The append-only signed log surface."""
self,
*,
session_id: str,
actor: str,
action: str,
payload: dict[str, Any],
) -> AuditEntry: ...
self,
*,
session_id: str | None = None,
action: str | None = None,
) -> list[AuditEntry]: ...
# ---------------------------------------------------------------------------
# Signing
# ---------------------------------------------------------------------------
def _canonical_payload(
*,
seq: int,
timestamp: datetime,
session_id: str,
actor: str,
action: str,
payload: dict[str, Any],
) -> bytes:
"""Stable byte representation used for HMAC signing.
Sorted-keys JSON keeps the signature stable across processes and
Python releases.
"""
blob = {
"seq": seq,
"timestamp": timestamp.isoformat(),
"session_id": session_id,
"actor": actor,
"action": action,
"payload": payload,
}
return json.dumps(blob, sort_keys=True, default=str).encode("utf-8")
def _sign(secret: str, body: bytes) -> str:
return hmac.new(
secret.encode("utf-8") or b"",
body,
hashlib.sha256,
).hexdigest()
[docs]
def verify_signature(entry: AuditEntry, secret: str) -> bool:
"""Recompute the HMAC and compare against the stored signature."""
expected = _sign(
secret,
_canonical_payload(
seq=entry.seq,
timestamp=entry.timestamp,
session_id=entry.session_id,
actor=entry.actor,
action=entry.action,
payload=entry.payload,
),
)
return hmac.compare_digest(expected, entry.signature)
# ---------------------------------------------------------------------------
# In-memory backend
# ---------------------------------------------------------------------------
[docs]
class InMemoryAuditLog:
"""List-backed signed audit log."""
def __init__(self, *, secret: str = "") -> None:
self._secret = secret
self._entries: list[AuditEntry] = []
self._seq = 0
self._lock = anyio.Lock()
[docs]
async def append(
self,
*,
session_id: str,
actor: str,
action: str,
payload: dict[str, Any],
) -> AuditEntry:
async with self._lock:
self._seq += 1
timestamp = datetime.now(UTC)
body = _canonical_payload(
seq=self._seq,
timestamp=timestamp,
session_id=session_id,
actor=actor,
action=action,
payload=payload,
)
entry = AuditEntry(
seq=self._seq,
timestamp=timestamp,
session_id=session_id,
actor=actor,
action=action,
payload=payload,
signature=_sign(self._secret, body),
)
self._entries.append(entry)
return entry
[docs]
async def query(
self,
*,
session_id: str | None = None,
action: str | None = None,
) -> list[AuditEntry]:
async with self._lock:
entries = list(self._entries)
return _filter_entries(entries, session_id=session_id, action=action)
[docs]
async def all_entries(self) -> list[AuditEntry]:
async with self._lock:
return list(self._entries)
# ---------------------------------------------------------------------------
# JSONL file backend
# ---------------------------------------------------------------------------
[docs]
class FileAuditLog:
"""JSONL append-only audit log with HMAC signatures.
On construction we read any pre-existing entries to recover the
highest seq, so a process restart picks up where the last one left
off.
"""
def __init__(self, path: str | Path, *, secret: str = "") -> None:
self._path = Path(path)
self._secret = secret
self._seq = 0
self._lock = anyio.Lock()
self._path.parent.mkdir(parents=True, exist_ok=True)
if self._path.exists():
self._seq = self._scan_max_seq()
@property
def path(self) -> Path:
return self._path
def _scan_max_seq(self) -> int:
max_seq = 0
with self._path.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
seq = obj.get("seq")
if isinstance(seq, int) and seq > max_seq:
max_seq = seq
return max_seq
[docs]
async def append(
self,
*,
session_id: str,
actor: str,
action: str,
payload: dict[str, Any],
) -> AuditEntry:
async with self._lock:
self._seq += 1
timestamp = datetime.now(UTC)
body = _canonical_payload(
seq=self._seq,
timestamp=timestamp,
session_id=session_id,
actor=actor,
action=action,
payload=payload,
)
entry = AuditEntry(
seq=self._seq,
timestamp=timestamp,
session_id=session_id,
actor=actor,
action=action,
payload=payload,
signature=_sign(self._secret, body),
)
await anyio.to_thread.run_sync(self._write_line, entry)
return entry
def _write_line(self, entry: AuditEntry) -> None:
with self._path.open("a", encoding="utf-8") as fh:
fh.write(json.dumps(entry.model_dump(mode="json")))
fh.write("\n")
[docs]
async def query(
self,
*,
session_id: str | None = None,
action: str | None = None,
) -> list[AuditEntry]:
entries = await anyio.to_thread.run_sync(self._read_entries)
return _filter_entries(entries, session_id=session_id, action=action)
def _read_entries(self) -> list[AuditEntry]:
if not self._path.exists():
return []
out: list[AuditEntry] = []
with self._path.open("r", encoding="utf-8") as fh:
for line in fh:
if not line.strip():
continue
try:
obj = json.loads(line)
except json.JSONDecodeError:
continue
try:
out.append(AuditEntry.model_validate(obj))
except Exception: # noqa: BLE001 — skip corrupt entries
continue
return out
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _filter_entries(
entries: list[AuditEntry],
*,
session_id: str | None,
action: str | None,
) -> list[AuditEntry]:
if session_id is not None:
entries = [e for e in entries if e.session_id == session_id]
if action is not None:
entries = [e for e in entries if e.action == action]
return entries
[docs]
async def stream_entries(log: AuditLog) -> AsyncIterator[AuditEntry]:
"""Yield every entry currently in ``log`` in seq order.
A polling helper for compliance tooling. Doesn't tail — that comes
later when we add an on-write notification stream.
"""
entries = await log.query()
entries.sort(key=lambda e: e.seq)
for entry in entries:
yield entry