Coverage for src / mysingle / core / audit / middleware.py: 0%
40 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-02 00:58 +0900
1"""Audit logging middleware for FastAPI apps.
3This middleware captures minimal request/response metadata and stores it in the
4AuditLog collection using Beanie. It's designed to be added via the shared
5app factory so all microservices can use it consistently.
6"""
8from __future__ import annotations
10import time
11from typing import Awaitable, Callable
13from fastapi import Request
14from fastapi.responses import Response
15from starlette.middleware.base import BaseHTTPMiddleware
17from ..config import settings
18from ..logging import get_structured_logger
19from .models import AuditLog
21logger = get_structured_logger(__name__)
24class AuditLoggingMiddleware(BaseHTTPMiddleware):
25 """Middleware that writes an audit log per HTTP request.
27 Parameters
28 - service_name: Name of the service, stored with each audit record.
29 - enabled: Toggle to enable/disable logging (default True). This will be
30 AND-ed with environment check to skip in test.
31 """
33 def __init__(self, app, service_name: str, enabled: bool = True): # type: ignore[no-untyped-def]
34 super().__init__(app)
35 self.service_name = service_name
36 self.enabled = enabled
38 async def dispatch(
39 self, request: Request, call_next: Callable[[Request], Awaitable[Response]]
40 ) -> Response:
41 # Skip when disabled or in test environment
42 should_log = bool(self.enabled) and (
43 getattr(settings, "ENVIRONMENT", "").lower() != "test"
44 )
46 start = time.monotonic()
48 # Request metadata
49 method = request.method
50 path = request.url.path
51 req_id = request.headers.get("x-request-id")
52 trace_id = request.headers.get("x-trace-id") or request.headers.get(
53 "traceparent"
54 )
55 user_agent = request.headers.get("user-agent")
56 ip = request.client.host if request.client else None
57 try:
58 req_bytes = int(request.headers.get("content-length", "0"))
59 except Exception:
60 req_bytes = 0
62 response: Response = await call_next(request)
64 # Response metadata
65 try:
66 resp_bytes = int(response.headers.get("content-length", "0"))
67 except Exception:
68 resp_bytes = 0
69 latency_ms = int((time.monotonic() - start) * 1000)
71 if should_log:
72 try:
73 # Best-effort user context (avoid importing heavy auth deps here)
74 user_id = None
75 audit = AuditLog(
76 user_id=user_id,
77 service=self.service_name,
78 request_id=req_id,
79 trace_id=trace_id,
80 method=method,
81 path=path,
82 ip=ip,
83 user_agent=user_agent,
84 req_bytes=req_bytes,
85 status_code=response.status_code,
86 resp_bytes=resp_bytes,
87 latency_ms=latency_ms,
88 )
89 await audit.insert()
90 except Exception as e: # pragma: no cover - best-effort
91 logger.warning("audit log insert failed: %s", e)
93 return response