litefs.middleware.health_check 源代码

#!/usr/bin/env python
# coding: utf-8

import json
import time
import sys
import os
from typing import Any, Dict, List, Optional, Callable

sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))

from middleware.base import Middleware
from exceptions import HttpError


[文档] class HealthCheck(Middleware): """ 健康检查中间件 提供 /health 和 /health/ready 端点用于健康检查 """ def __init__(self, app, path: str = '/health', ready_path: str = '/health/ready'): """ 初始化健康检查中间件 Args: app: Litefs 应用实例 path: 健康检查端点路径,默认为 /health ready_path: 就绪检查端点路径,默认为 /health/ready """ super().__init__(app) self.path = path self.ready_path = ready_path self._checks: Dict[str, Callable] = {} self._ready_checks: Dict[str, Callable] = {}
[文档] def add_check(self, name: str, check_func: Callable[[], bool]): """ 添加健康检查 Args: name: 检查名称 check_func: 检查函数,返回 True 表示健康,False 表示不健康 """ self._checks[name] = check_func
[文档] def add_ready_check(self, name: str, check_func: Callable[[], bool]): """ 添加就绪检查 Args: name: 检查名称 check_func: 检查函数,返回 True 表示就绪,False 表示未就绪 """ self._ready_checks[name] = check_func
[文档] def process_request(self, request_handler): """ 处理请求,检查是否为健康检查端点 Args: request_handler: 请求处理器实例 Returns: None: 继续处理请求 其他值: 直接返回该值作为响应 """ path = request_handler._environ.get('PATH_INFO', '') method = request_handler._environ.get('REQUEST_METHOD', 'GET') if method == 'GET': if path == self.path: return self._handle_health_check(request_handler) elif path == self.ready_path: return self._handle_ready_check(request_handler) return None
def _handle_health_check(self, request_handler): """ 处理健康检查请求 Args: request_handler: 请求处理器实例 Returns: 健康检查响应 """ status_code = 200 checks = {} for name, check_func in self._checks.items(): try: is_healthy = check_func() checks[name] = { 'status': 'pass' if is_healthy else 'fail', 'timestamp': time.time() } if not is_healthy: status_code = 503 except Exception as e: checks[name] = { 'status': 'error', 'error': str(e), 'timestamp': time.time() } status_code = 503 response = { 'status': 'healthy' if status_code == 200 else 'unhealthy', 'timestamp': time.time(), 'checks': checks } request_handler.start_response(status_code, [('Content-Type', 'application/json')]) return json.dumps(response).encode('utf-8') def _handle_ready_check(self, request_handler): """ 处理就绪检查请求 Args: request_handler: 请求处理器实例 Returns: 就绪检查响应 """ status_code = 200 checks = {} for name, check_func in self._ready_checks.items(): try: is_ready = check_func() checks[name] = { 'status': 'pass' if is_ready else 'fail', 'timestamp': time.time() } if not is_ready: status_code = 503 except Exception as e: checks[name] = { 'status': 'error', 'error': str(e), 'timestamp': time.time() } status_code = 503 response = { 'status': 'ready' if status_code == 200 else 'not_ready', 'timestamp': time.time(), 'checks': checks } request_handler.start_response(status_code, [('Content-Type', 'application/json')]) return json.dumps(response).encode('utf-8')