Coverage for src / documint_mcp / worker_health.py: 0%

28 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-03-24 23:27 -0400

1"""Minimal health server used by the Railway worker service.""" 

2 

3from __future__ import annotations 

4 

5import json 

6import os 

7from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer 

8 

9 

10class _Handler(BaseHTTPRequestHandler): 

11 def do_GET(self) -> None: # noqa: N802 

12 if self.path != "/health": 

13 self.send_response(404) 

14 self.send_header("Content-Type", "application/json") 

15 self.end_headers() 

16 self.wfile.write(b'{"status":"not_found"}') 

17 return 

18 

19 body = json.dumps( 

20 { 

21 "status": "healthy", 

22 "service": "worker", 

23 "role": os.getenv("DOCUMINT_PROCESS_ROLE", "worker"), 

24 } 

25 ).encode("utf-8") 

26 self.send_response(200) 

27 self.send_header("Content-Type", "application/json") 

28 self.send_header("Content-Length", str(len(body))) 

29 self.end_headers() 

30 self.wfile.write(body) 

31 

32 def log_message(self, format: str, *args: object) -> None: # noqa: A003 

33 del format, args 

34 return 

35 

36 

37def main() -> None: 

38 host = os.getenv("HOST", "127.0.0.1") 

39 port = int(os.getenv("PORT", "8000")) 

40 server = ThreadingHTTPServer((host, port), _Handler) 

41 server.serve_forever() 

42 

43 

44if __name__ == "__main__": 

45 main()