Coverage for src / mysingle / core / audit / middleware.py: 0%

40 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-02 00:58 +0900

1"""Audit logging middleware for FastAPI apps. 

2 

3This middleware captures minimal request/response metadata and stores it in the 

4AuditLog collection using Beanie. It's designed to be added via the shared 

5app factory so all microservices can use it consistently. 

6""" 

7 

8from __future__ import annotations 

9 

10import time 

11from typing import Awaitable, Callable 

12 

13from fastapi import Request 

14from fastapi.responses import Response 

15from starlette.middleware.base import BaseHTTPMiddleware 

16 

17from ..config import settings 

18from ..logging import get_structured_logger 

19from .models import AuditLog 

20 

21logger = get_structured_logger(__name__) 

22 

23 

24class AuditLoggingMiddleware(BaseHTTPMiddleware): 

25 """Middleware that writes an audit log per HTTP request. 

26 

27 Parameters 

28 - service_name: Name of the service, stored with each audit record. 

29 - enabled: Toggle to enable/disable logging (default True). This will be 

30 AND-ed with environment check to skip in test. 

31 """ 

32 

33 def __init__(self, app, service_name: str, enabled: bool = True): # type: ignore[no-untyped-def] 

34 super().__init__(app) 

35 self.service_name = service_name 

36 self.enabled = enabled 

37 

38 async def dispatch( 

39 self, request: Request, call_next: Callable[[Request], Awaitable[Response]] 

40 ) -> Response: 

41 # Skip when disabled or in test environment 

42 should_log = bool(self.enabled) and ( 

43 getattr(settings, "ENVIRONMENT", "").lower() != "test" 

44 ) 

45 

46 start = time.monotonic() 

47 

48 # Request metadata 

49 method = request.method 

50 path = request.url.path 

51 req_id = request.headers.get("x-request-id") 

52 trace_id = request.headers.get("x-trace-id") or request.headers.get( 

53 "traceparent" 

54 ) 

55 user_agent = request.headers.get("user-agent") 

56 ip = request.client.host if request.client else None 

57 try: 

58 req_bytes = int(request.headers.get("content-length", "0")) 

59 except Exception: 

60 req_bytes = 0 

61 

62 response: Response = await call_next(request) 

63 

64 # Response metadata 

65 try: 

66 resp_bytes = int(response.headers.get("content-length", "0")) 

67 except Exception: 

68 resp_bytes = 0 

69 latency_ms = int((time.monotonic() - start) * 1000) 

70 

71 if should_log: 

72 try: 

73 # Best-effort user context (avoid importing heavy auth deps here) 

74 user_id = None 

75 audit = AuditLog( 

76 user_id=user_id, 

77 service=self.service_name, 

78 request_id=req_id, 

79 trace_id=trace_id, 

80 method=method, 

81 path=path, 

82 ip=ip, 

83 user_agent=user_agent, 

84 req_bytes=req_bytes, 

85 status_code=response.status_code, 

86 resp_bytes=resp_bytes, 

87 latency_ms=latency_ms, 

88 ) 

89 await audit.insert() 

90 except Exception as e: # pragma: no cover - best-effort 

91 logger.warning("audit log insert failed: %s", e) 

92 

93 return response