Coverage for src / documint_mcp / worker_health.py: 0%
28 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-24 23:27 -0400
« prev ^ index » next coverage.py v7.13.5, created at 2026-03-24 23:27 -0400
1"""Minimal health server used by the Railway worker service."""
3from __future__ import annotations
5import json
6import os
7from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer
10class _Handler(BaseHTTPRequestHandler):
11 def do_GET(self) -> None: # noqa: N802
12 if self.path != "/health":
13 self.send_response(404)
14 self.send_header("Content-Type", "application/json")
15 self.end_headers()
16 self.wfile.write(b'{"status":"not_found"}')
17 return
19 body = json.dumps(
20 {
21 "status": "healthy",
22 "service": "worker",
23 "role": os.getenv("DOCUMINT_PROCESS_ROLE", "worker"),
24 }
25 ).encode("utf-8")
26 self.send_response(200)
27 self.send_header("Content-Type", "application/json")
28 self.send_header("Content-Length", str(len(body)))
29 self.end_headers()
30 self.wfile.write(body)
32 def log_message(self, format: str, *args: object) -> None: # noqa: A003
33 del format, args
34 return
37def main() -> None:
38 host = os.getenv("HOST", "127.0.0.1")
39 port = int(os.getenv("PORT", "8000"))
40 server = ThreadingHTTPServer((host, port), _Handler)
41 server.serve_forever()
44if __name__ == "__main__":
45 main()