Coverage for agentos/plugins/lifecycle.py: 0%

167 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-02 09:59 +0800

1""" 

2AgentOS v0.70 — 插件生命周期管理器。 

3基因来源: Kubernetes Pod Lifecycle + Spring Boot Actuator 

4 

5生命周期钩子: 

6- on_load() → 插件加载完成 

7- on_init(config) → 初始化配置 

8- on_start() → 开始工作 

9- on_stop() → 优雅关闭 

10- on_error(e) → 异常处理 

11- health_check() → 健康检查 

12""" 

13 

14from __future__ import annotations 

15 

16import asyncio 

17import time 

18from abc import ABC, abstractmethod 

19from dataclasses import dataclass, field 

20from enum import Enum 

21from typing import Any 

22 

23from agentos.plugins.registry import PluginRegistry, RegisteredPlugin, PluginStatus 

24 

25 

26# ── Abstract Plugin Base ───────────────────────── 

27 

28class LifecyclePlugin(ABC): 

29 """插件基类 — 实现标准生命周期钩子。""" 

30 

31 def __init__(self, config: dict | None = None): 

32 self.config = config or {} 

33 self._started_at: float = 0.0 

34 self._error_count: int = 0 

35 

36 @property 

37 def uptime_seconds(self) -> float: 

38 if self._started_at == 0: 

39 return 0.0 

40 return time.time() - self._started_at 

41 

42 async def on_load(self): 

43 """插件加载后调用。""" 

44 

45 async def on_init(self, config: dict): 

46 """初始化配置后调用。""" 

47 self.config = config 

48 

49 async def on_start(self): 

50 """开始工作时调用。""" 

51 self._started_at = time.time() 

52 

53 async def on_stop(self): 

54 """优雅关闭时调用。""" 

55 self._started_at = 0.0 

56 

57 async def on_error(self, error: Exception) -> bool: 

58 """ 

59 异常处理钩子。返回True表示已处理(可恢复),False表示致命错误。 

60 """ 

61 self._error_count += 1 

62 return False 

63 

64 async def health_check(self) -> HealthStatus: 

65 """健康检查 — 子类可覆盖。""" 

66 return HealthStatus.HEALTHY 

67 

68 

69@dataclass 

70class PluginHealth: 

71 """插件健康状态摘要。""" 

72 plugin_name: str 

73 status: str 

74 uptime_seconds: float 

75 error_count: int 

76 last_error: str 

77 

78 

79class HealthStatus: 

80 """插件健康状态。""" 

81 status: str = "healthy" # healthy | degraded | unhealthy 

82 details: dict = field(default_factory=dict) 

83 uptime_seconds: float = 0.0 

84 error_count: int = 0 

85 

86 @property 

87 def is_healthy(self) -> bool: 

88 return self.status == "healthy" 

89 

90 def to_dict(self) -> dict: 

91 return { 

92 "status": self.status, 

93 "details": self.details, 

94 "uptime_seconds": self.uptime_seconds, 

95 "error_count": self.error_count, 

96 } 

97 

98 

99from dataclasses import dataclass, field 

100 

101 

102@dataclass 

103class LifecycleReport: 

104 """插件生命周期报告。""" 

105 

106 plugin_name: str 

107 status: PluginStatus 

108 load_time_ms: float = 0.0 

109 init_time_ms: float = 0.0 

110 uptime_seconds: float = 0.0 

111 health: HealthStatus | None = None 

112 error: str | None = None 

113 

114 

115# ── Lifecycle Manager ──────────────────────────── 

116 

117class LifecycleManager: 

118 """ 

119 生命周期管理器 — 协调所有插件的init/start/stop。 

120 支持: 批量初始化、健康检查轮询、优雅降级。 

121 """ 

122 

123 def __init__(self, registry: PluginRegistry): 

124 self.registry = registry 

125 self._reports: dict[str, LifecycleReport] = {} 

126 self._health_check_task: asyncio.Task | None = None 

127 

128 async def init_all(self, configs: dict[str, dict] | None = None): 

129 """初始化所有已注册的LOADED状态插件。""" 

130 configs = configs or {} 

131 plugins = self.registry.by_status(PluginStatus.LOADED) 

132 

133 for rp in plugins: 

134 if rp.instance and isinstance(rp.instance, LifecyclePlugin): 

135 start = time.time() 

136 try: 

137 cfg = configs.get(rp.manifest.name, {}) 

138 await rp.instance.on_init(cfg) 

139 rp.status = PluginStatus.INITIALIZED 

140 init_ms = (time.time() - start) * 1000 

141 self._reports[rp.manifest.name] = LifecycleReport( 

142 plugin_name=rp.manifest.name, 

143 status=PluginStatus.INITIALIZED, 

144 load_time_ms=rp.load_time_ms, 

145 init_time_ms=init_ms, 

146 ) 

147 except Exception as e: 

148 rp.status = PluginStatus.ERROR 

149 rp.error = str(e) 

150 await self._handle_error(rp, e) 

151 

152 async def start_all(self): 

153 """启动所有INITIALIZED状态的插件。""" 

154 plugins = self.registry.by_status(PluginStatus.INITIALIZED) 

155 

156 for rp in plugins: 

157 await self.start_one(rp.manifest.name) 

158 

159 async def start_one(self, name: str) -> LifecycleReport: 

160 rp = self.registry.get(name) 

161 if not rp: 

162 raise KeyError(f"Plugin '{name}' not found") 

163 if rp.status not in (PluginStatus.INITIALIZED, PluginStatus.STOPPED): 

164 raise RuntimeError(f"Cannot start plugin '{name}' in status {rp.status}") 

165 

166 try: 

167 if rp.instance and isinstance(rp.instance, LifecyclePlugin): 

168 await rp.instance.on_start() 

169 rp.status = PluginStatus.ACTIVE 

170 rp.error = None 

171 report = self._reports.get(name, LifecycleReport(plugin_name=name, status=PluginStatus.ACTIVE)) 

172 report.status = PluginStatus.ACTIVE 

173 report.uptime_seconds = rp.instance.uptime_seconds if rp.instance and isinstance(rp.instance, LifecyclePlugin) else 0 

174 self._reports[name] = report 

175 return report 

176 except Exception as e: 

177 rp.status = PluginStatus.ERROR 

178 rp.error = str(e) 

179 return LifecycleReport(plugin_name=name, status=PluginStatus.ERROR, error=str(e)) 

180 

181 async def stop_all(self, graceful: bool = True): 

182 """停止所有ACTIVE插件。""" 

183 plugins = self.registry.by_status(PluginStatus.ACTIVE) 

184 

185 for rp in plugins: 

186 await self.stop_one(rp.manifest.name, graceful) 

187 

188 async def stop_one(self, name: str, graceful: bool = True): 

189 rp = self.registry.get(name) 

190 if not rp: 

191 return 

192 

193 rp.status = PluginStatus.STOPPING 

194 try: 

195 if rp.instance and isinstance(rp.instance, LifecyclePlugin): 

196 await rp.instance.on_stop() 

197 except Exception as e: 

198 rp.error = str(e) 

199 finally: 

200 rp.status = PluginStatus.STOPPED 

201 if name in self._reports: 

202 self._reports[name].status = PluginStatus.STOPPED 

203 

204 async def health_check_all(self) -> dict[str, HealthStatus]: 

205 """对所有ACTIVE插件执行健康检查。""" 

206 results = {} 

207 plugins = self.registry.by_status(PluginStatus.ACTIVE) 

208 

209 for rp in plugins: 

210 if rp.instance and isinstance(rp.instance, LifecyclePlugin): 

211 try: 

212 health = await rp.instance.health_check() 

213 except Exception as e: 

214 health = HealthStatus( 

215 status="unhealthy", 

216 details={"error": str(e)}, 

217 error_count=rp.instance._error_count if rp.instance else 0, 

218 ) 

219 health.uptime_seconds = rp.instance.uptime_seconds if rp.instance else 0 

220 results[rp.manifest.name] = health 

221 

222 return results 

223 

224 def start_health_polling(self, interval_seconds: float = 30.0): 

225 """启动后台健康检查轮询。""" 

226 async def _poll(): 

227 while True: 

228 try: 

229 await self.health_check_all() 

230 except Exception: 

231 pass 

232 await asyncio.sleep(interval_seconds) 

233 

234 self._health_check_task = asyncio.ensure_future(_poll()) 

235 

236 def stop_health_polling(self): 

237 if self._health_check_task: 

238 self._health_check_task.cancel() 

239 self._health_check_task = None 

240 

241 def report(self) -> list[LifecycleReport]: 

242 """获取所有插件的生命周期报告。""" 

243 reports = list(self._reports.values()) 

244 # Add any not tracked in _reports 

245 tracked = {r.plugin_name for r in reports} 

246 for rp in self.registry.list_all(): 

247 if rp.manifest.name not in tracked: 

248 reports.append(LifecycleReport( 

249 plugin_name=rp.manifest.name, 

250 status=rp.status, 

251 load_time_ms=rp.load_time_ms, 

252 error=rp.error, 

253 )) 

254 return reports 

255 

256 def summary(self) -> str: 

257 reports = self.report() 

258 lines = [f"共 {len(reports)} 个插件"] 

259 for r in reports: 

260 lines.append(f" [{r.status.value}] {r.plugin_name} (load:{r.load_time_ms:.0f}ms, init:{r.init_time_ms:.0f}ms)") 

261 if r.error: 

262 lines.append(f" error: {r.error}") 

263 return "\n".join(lines) 

264 

265 async def _handle_error(self, rp: RegisteredPlugin, error: Exception): 

266 if rp.instance and isinstance(rp.instance, LifecyclePlugin): 

267 try: 

268 handled = await rp.instance.on_error(error) 

269 if handled: 

270 rp.error = None 

271 except Exception: 

272 pass