Coverage for agentos/plugins/lifecycle.py: 0%
167 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-02 09:59 +0800
1"""
2AgentOS v0.70 — 插件生命周期管理器。
3基因来源: Kubernetes Pod Lifecycle + Spring Boot Actuator
5生命周期钩子:
6- on_load() → 插件加载完成
7- on_init(config) → 初始化配置
8- on_start() → 开始工作
9- on_stop() → 优雅关闭
10- on_error(e) → 异常处理
11- health_check() → 健康检查
12"""
14from __future__ import annotations
16import asyncio
17import time
18from abc import ABC, abstractmethod
19from dataclasses import dataclass, field
20from enum import Enum
21from typing import Any
23from agentos.plugins.registry import PluginRegistry, RegisteredPlugin, PluginStatus
26# ── Abstract Plugin Base ─────────────────────────
28class LifecyclePlugin(ABC):
29 """插件基类 — 实现标准生命周期钩子。"""
31 def __init__(self, config: dict | None = None):
32 self.config = config or {}
33 self._started_at: float = 0.0
34 self._error_count: int = 0
36 @property
37 def uptime_seconds(self) -> float:
38 if self._started_at == 0:
39 return 0.0
40 return time.time() - self._started_at
42 async def on_load(self):
43 """插件加载后调用。"""
45 async def on_init(self, config: dict):
46 """初始化配置后调用。"""
47 self.config = config
49 async def on_start(self):
50 """开始工作时调用。"""
51 self._started_at = time.time()
53 async def on_stop(self):
54 """优雅关闭时调用。"""
55 self._started_at = 0.0
57 async def on_error(self, error: Exception) -> bool:
58 """
59 异常处理钩子。返回True表示已处理(可恢复),False表示致命错误。
60 """
61 self._error_count += 1
62 return False
64 async def health_check(self) -> HealthStatus:
65 """健康检查 — 子类可覆盖。"""
66 return HealthStatus.HEALTHY
69@dataclass
70class PluginHealth:
71 """插件健康状态摘要。"""
72 plugin_name: str
73 status: str
74 uptime_seconds: float
75 error_count: int
76 last_error: str
79class HealthStatus:
80 """插件健康状态。"""
81 status: str = "healthy" # healthy | degraded | unhealthy
82 details: dict = field(default_factory=dict)
83 uptime_seconds: float = 0.0
84 error_count: int = 0
86 @property
87 def is_healthy(self) -> bool:
88 return self.status == "healthy"
90 def to_dict(self) -> dict:
91 return {
92 "status": self.status,
93 "details": self.details,
94 "uptime_seconds": self.uptime_seconds,
95 "error_count": self.error_count,
96 }
99from dataclasses import dataclass, field
102@dataclass
103class LifecycleReport:
104 """插件生命周期报告。"""
106 plugin_name: str
107 status: PluginStatus
108 load_time_ms: float = 0.0
109 init_time_ms: float = 0.0
110 uptime_seconds: float = 0.0
111 health: HealthStatus | None = None
112 error: str | None = None
115# ── Lifecycle Manager ────────────────────────────
117class LifecycleManager:
118 """
119 生命周期管理器 — 协调所有插件的init/start/stop。
120 支持: 批量初始化、健康检查轮询、优雅降级。
121 """
123 def __init__(self, registry: PluginRegistry):
124 self.registry = registry
125 self._reports: dict[str, LifecycleReport] = {}
126 self._health_check_task: asyncio.Task | None = None
128 async def init_all(self, configs: dict[str, dict] | None = None):
129 """初始化所有已注册的LOADED状态插件。"""
130 configs = configs or {}
131 plugins = self.registry.by_status(PluginStatus.LOADED)
133 for rp in plugins:
134 if rp.instance and isinstance(rp.instance, LifecyclePlugin):
135 start = time.time()
136 try:
137 cfg = configs.get(rp.manifest.name, {})
138 await rp.instance.on_init(cfg)
139 rp.status = PluginStatus.INITIALIZED
140 init_ms = (time.time() - start) * 1000
141 self._reports[rp.manifest.name] = LifecycleReport(
142 plugin_name=rp.manifest.name,
143 status=PluginStatus.INITIALIZED,
144 load_time_ms=rp.load_time_ms,
145 init_time_ms=init_ms,
146 )
147 except Exception as e:
148 rp.status = PluginStatus.ERROR
149 rp.error = str(e)
150 await self._handle_error(rp, e)
152 async def start_all(self):
153 """启动所有INITIALIZED状态的插件。"""
154 plugins = self.registry.by_status(PluginStatus.INITIALIZED)
156 for rp in plugins:
157 await self.start_one(rp.manifest.name)
159 async def start_one(self, name: str) -> LifecycleReport:
160 rp = self.registry.get(name)
161 if not rp:
162 raise KeyError(f"Plugin '{name}' not found")
163 if rp.status not in (PluginStatus.INITIALIZED, PluginStatus.STOPPED):
164 raise RuntimeError(f"Cannot start plugin '{name}' in status {rp.status}")
166 try:
167 if rp.instance and isinstance(rp.instance, LifecyclePlugin):
168 await rp.instance.on_start()
169 rp.status = PluginStatus.ACTIVE
170 rp.error = None
171 report = self._reports.get(name, LifecycleReport(plugin_name=name, status=PluginStatus.ACTIVE))
172 report.status = PluginStatus.ACTIVE
173 report.uptime_seconds = rp.instance.uptime_seconds if rp.instance and isinstance(rp.instance, LifecyclePlugin) else 0
174 self._reports[name] = report
175 return report
176 except Exception as e:
177 rp.status = PluginStatus.ERROR
178 rp.error = str(e)
179 return LifecycleReport(plugin_name=name, status=PluginStatus.ERROR, error=str(e))
181 async def stop_all(self, graceful: bool = True):
182 """停止所有ACTIVE插件。"""
183 plugins = self.registry.by_status(PluginStatus.ACTIVE)
185 for rp in plugins:
186 await self.stop_one(rp.manifest.name, graceful)
188 async def stop_one(self, name: str, graceful: bool = True):
189 rp = self.registry.get(name)
190 if not rp:
191 return
193 rp.status = PluginStatus.STOPPING
194 try:
195 if rp.instance and isinstance(rp.instance, LifecyclePlugin):
196 await rp.instance.on_stop()
197 except Exception as e:
198 rp.error = str(e)
199 finally:
200 rp.status = PluginStatus.STOPPED
201 if name in self._reports:
202 self._reports[name].status = PluginStatus.STOPPED
204 async def health_check_all(self) -> dict[str, HealthStatus]:
205 """对所有ACTIVE插件执行健康检查。"""
206 results = {}
207 plugins = self.registry.by_status(PluginStatus.ACTIVE)
209 for rp in plugins:
210 if rp.instance and isinstance(rp.instance, LifecyclePlugin):
211 try:
212 health = await rp.instance.health_check()
213 except Exception as e:
214 health = HealthStatus(
215 status="unhealthy",
216 details={"error": str(e)},
217 error_count=rp.instance._error_count if rp.instance else 0,
218 )
219 health.uptime_seconds = rp.instance.uptime_seconds if rp.instance else 0
220 results[rp.manifest.name] = health
222 return results
224 def start_health_polling(self, interval_seconds: float = 30.0):
225 """启动后台健康检查轮询。"""
226 async def _poll():
227 while True:
228 try:
229 await self.health_check_all()
230 except Exception:
231 pass
232 await asyncio.sleep(interval_seconds)
234 self._health_check_task = asyncio.ensure_future(_poll())
236 def stop_health_polling(self):
237 if self._health_check_task:
238 self._health_check_task.cancel()
239 self._health_check_task = None
241 def report(self) -> list[LifecycleReport]:
242 """获取所有插件的生命周期报告。"""
243 reports = list(self._reports.values())
244 # Add any not tracked in _reports
245 tracked = {r.plugin_name for r in reports}
246 for rp in self.registry.list_all():
247 if rp.manifest.name not in tracked:
248 reports.append(LifecycleReport(
249 plugin_name=rp.manifest.name,
250 status=rp.status,
251 load_time_ms=rp.load_time_ms,
252 error=rp.error,
253 ))
254 return reports
256 def summary(self) -> str:
257 reports = self.report()
258 lines = [f"共 {len(reports)} 个插件"]
259 for r in reports:
260 lines.append(f" [{r.status.value}] {r.plugin_name} (load:{r.load_time_ms:.0f}ms, init:{r.init_time_ms:.0f}ms)")
261 if r.error:
262 lines.append(f" error: {r.error}")
263 return "\n".join(lines)
265 async def _handle_error(self, rp: RegisteredPlugin, error: Exception):
266 if rp.instance and isinstance(rp.instance, LifecyclePlugin):
267 try:
268 handled = await rp.instance.on_error(error)
269 if handled:
270 rp.error = None
271 except Exception:
272 pass