import asyncio
import json
import time
import threading
from collections import deque
from datetime import datetime
from typing import Dict, Any, Optional
from queue import Queue
import traceback
[docs]
class ApiWatcher:
"""
Async API monitoring with zero-blocking fire-and-forget pattern.
Auto-starts dashboard if not running (like RabbitMQ with pika)
"""
def __init__(self,
service_name='main-app',
max_history:int=1000,
dashboard_host:str='localhost',
dashboard_port:int=22222,
dashboard_username:str='admin',
dashboard_password:str='admin',
auto_start_dashboard:bool=True
):
"""
Initialize ApiWatcher
Args:
service_name: Name of this microservice
max_history: Maximum number of requests to keep in memory (local cache)
dashboard_host: Host where dashboard server is running
dashboard_port: Port where dashboard server is running
auto_start_dashboard: Auto-start dashboard if not running (default: True)
"""
self.service_name = service_name
self.max_history = max_history
self.dashboard_host = dashboard_host
self.dashboard_port = dashboard_port
self.request_queue = Queue()
self.history = deque(maxlen=max_history)
self.loop = None
self.worker_thread = None
self._running = False
print(f'[ApiWatchdog] Service: {service_name}')
# Auto-start dashboard if enabled
if auto_start_dashboard:
from .server import start_dashboard_server
start_dashboard_server(
host=dashboard_host,
port=dashboard_port,
username=dashboard_username,
password=dashboard_password
)
# Start background worker
self._start_worker()
def _start_worker(self):
"""Start background async worker thread"""
self._running = True
self.worker_thread = threading.Thread(target=self._run_async_worker, daemon=True)
self.worker_thread.start()
def _run_async_worker(self):
"""Run async event loop in background thread"""
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.loop.run_until_complete(self._process_queue())
async def _process_queue(self):
"""Process request queue asynchronously"""
while self._running:
try:
if not self.request_queue.empty():
data = self.request_queue.get_nowait()
data['service'] = self.service_name
self.history.append(data)
# Send to dashboard via HTTP POST
await self._send_to_dashboard(data)
else:
await asyncio.sleep(0.01)
except Exception as e:
print(f'[ApiWatchdog] Error processing queue: {e}')
traceback.print_exc()
async def _send_to_dashboard(self, data: Dict[str, Any]):
"""Send data to dashboard server via HTTP POST"""
try:
import aiohttp
url = f"http://{self.dashboard_host}:{self.dashboard_port}/api/publish"
async with aiohttp.ClientSession() as session:
async with session.post(url, json=data, timeout=aiohttp.ClientTimeout(total=1)) as resp:
if resp.status != 200:
print(f"[ApiWatchdog] Failed to publish: {resp.status}")
except aiohttp.ClientConnectorError:
# Dashboard not running - silently fail
pass
except Exception:
# Don't crash the app if dashboard is down
pass
[docs]
def log_request(self,
method: str,
path: str,
status_code: Optional[int] = None,
request_data: Optional[Dict] = None,
response_data: Optional[Any] = None,
duration_ms: Optional[float] = None,
headers: Optional[Dict] = None,
query_params: Optional[Dict] = None
):
"""Fire-and-forget request logging (non-blocking)"""
log_entry = {
"id": f"{int(time.time() * 1000)}",
"timestamp": datetime.utcnow().isoformat(),
"method": method,
"path": path,
"status_code": status_code,
"request_data": request_data,
"response_data": response_data,
"duration_ms": duration_ms,
"headers": headers,
"query_params": query_params
}
try:
self.request_queue.put_nowait(log_entry)
except Exception as e:
print(f'[ApiWatchdog] Failed to queue request: {e}')
[docs]
def get_history(self):
"""Get local cached requests"""
return list(self.history)
[docs]
def clear_history(self):
"""Clear local request history"""
self.history.clear()
[docs]
def shutdown(self):
"""Gracefully shutdown watcher"""
self._running = False
if self.worker_thread:
self.worker_thread.join(timeout=2)