Source code for apiwatch.middleware_fastapi

import time
import json

try:
    from fastapi import Request, Response
    from starlette.middleware.base import BaseHTTPMiddleware
    from starlette.datastructures import Headers
    FASTAPI_AVAILABLE = True
except ImportError:
    FASTAPI_AVAILABLE = False


if not FASTAPI_AVAILABLE:
    raise ImportError(
        "FastAPI is not installed. Install with: pip install apiwatchdog[fastapi]"
    )


[docs] class FastAPIWatchMiddleware(BaseHTTPMiddleware): """FastAPI middleware for ApiWatchdog - optimized for zero blocking""" def __init__(self, app, watcher, capture_request_body=True, capture_response_body=True): """ Initialize FastAPI middleware Args: app: FastAPI application instance watcher: ApiWatcher instance capture_request_body: Whether to capture request body capture_response_body: Whether to capture response body """ super().__init__(app) self.watcher = watcher self.capture_request_body = capture_request_body self.capture_response_body = capture_response_body
[docs] async def dispatch(self, request: Request, call_next): """Process request and response""" start_time = time.time() # Extract request data request_data = None if self.capture_request_body: try: content_type = request.headers.get("content-type", "") if "application/json" in content_type: # Read body (FastAPI allows multiple reads) body = await request.body() if body: request_data = json.loads(body.decode('utf-8')) except Exception: request_data = None # Extract query params query_params = dict(request.query_params) # Extract headers (filter sensitive ones) headers = {k: v for k, v in request.headers.items() if k.lower() not in ['authorization', 'cookie', 'x-api-key']} # Call the route try: response = await call_next(request) except Exception as e: # Log error and re-raise duration_ms = (time.time() - start_time) * 1000 self.watcher.log_request( method=request.method, path=request.url.path, status_code=500, request_data=request_data, response_data={"error": str(e)}, duration_ms=round(duration_ms, 2), headers=headers, query_params=query_params ) raise # Calculate duration duration_ms = (time.time() - start_time) * 1000 # Extract response data response_data = None if self.capture_response_body: try: # For streaming responses, we can't capture body easily # Only capture for regular responses if hasattr(response, 'body'): body = response.body if body: content_type = response.headers.get("content-type", "") if "application/json" in content_type: response_data = json.loads(body.decode('utf-8')) else: response_data = body.decode('utf-8')[:1000] except Exception: response_data = None # Fire-and-forget logging try: self.watcher.log_request( method=request.method, path=request.url.path, status_code=response.status_code, request_data=request_data, response_data=response_data, duration_ms=round(duration_ms, 2), headers=headers, query_params=query_params ) except Exception as e: # Never let monitoring crash the app print(f"[ApiWatchdog] Middleware error: {e}") return response