Coverage for agentos/tools/startup_accelerator.py: 0%

181 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 01:06 +0800

1""" 

2Startup Acceleration Tools for AgentOS. 

3Lazy loading, module pre-compilation, and startup sequence optimization. 

4""" 

5 

6import importlib 

7import threading 

8import time 

9import types 

10from collections import OrderedDict 

11from dataclasses import dataclass, field 

12from typing import Any, Callable, Dict, List, Optional, Tuple, Union 

13 

14 

15# ============================================================================ 

16# LazyLoader 

17# ============================================================================ 

18 

19class _LazyModule: 

20 """Proxy that defers module import until an attribute is accessed.""" 

21 

22 def __init__(self, module_name: str): 

23 self._module_name = module_name 

24 self._module: Optional[types.ModuleType] = None 

25 

26 def _load(self) -> types.ModuleType: 

27 if self._module is None: 

28 self._module = importlib.import_module(self._module_name) 

29 return self._module 

30 

31 def __getattr__(self, name: str) -> Any: 

32 return getattr(self._load(), name) 

33 

34 def __repr__(self) -> str: 

35 if self._module is None: 

36 return f"<LazyModule: {self._module_name} (unloaded)>" 

37 return repr(self._module) 

38 

39 

40class LazyLoader: 

41 """Registry for lazy-loaded modules with batch loading and dependency tracking.""" 

42 

43 def __init__(self): 

44 self._proxies: Dict[str, _LazyModule] = {} 

45 self._load_times: Dict[str, float] = {} 

46 self._lock = threading.Lock() 

47 

48 def register(self, module_name: str) -> _LazyModule: 

49 """Register a module for lazy loading.""" 

50 with self._lock: 

51 if module_name not in self._proxies: 

52 self._proxies[module_name] = _LazyModule(module_name) 

53 return self._proxies[module_name] 

54 

55 def load_now(self, module_name: str) -> types.ModuleType: 

56 """Eagerly load a registered module.""" 

57 proxy = self.register(module_name) 

58 with self._lock: 

59 start = time.perf_counter() 

60 result = proxy._load() 

61 elapsed = time.perf_counter() - start 

62 self._load_times[module_name] = elapsed 

63 return result 

64 

65 def load_all(self) -> List[Tuple[str, float]]: 

66 """Eagerly load all registered modules. Returns load times.""" 

67 results: List[Tuple[str, float]] = [] 

68 for name in list(self._proxies.keys()): 

69 start = time.perf_counter() 

70 self._proxies[name]._load() 

71 elapsed = time.perf_counter() - start 

72 self._load_times[name] = elapsed 

73 results.append((name, elapsed)) 

74 return results 

75 

76 def preload(self, module_names: List[str]) -> List[Tuple[str, float]]: 

77 """Register and load a batch of modules.""" 

78 for name in module_names: 

79 self.register(name) 

80 results: List[Tuple[str, float]] = [] 

81 for name in module_names: 

82 start = time.perf_counter() 

83 self._proxies[name]._load() 

84 elapsed = time.perf_counter() - start 

85 self._load_times[name] = elapsed 

86 results.append((name, elapsed)) 

87 return results 

88 

89 @property 

90 def stats(self) -> Dict[str, Any]: 

91 with self._lock: 

92 loaded = {k: v for k, v in self._proxies.items() if v._module is not None} 

93 return { 

94 "registered": len(self._proxies), 

95 "loaded": len(loaded), 

96 "unloaded": len(self._proxies) - len(loaded), 

97 "load_times": dict(self._load_times), 

98 "total_load_time": sum(self._load_times.values()), 

99 } 

100 

101 def __getitem__(self, module_name: str) -> _LazyModule: 

102 return self.register(module_name) 

103 

104 

105# ============================================================================ 

106# ModulePreloader 

107# ============================================================================ 

108 

109class ModulePreloader: 

110 """Pre-compile and cache frequently used modules for fast startup.""" 

111 

112 def __init__(self, max_concurrent: int = 4): 

113 self._cache: Dict[str, types.ModuleType] = {} 

114 self._max_concurrent = max_concurrent 

115 self._lock = threading.Lock() 

116 self._preload_times: Dict[str, float] = {} 

117 

118 def precompile(self, module_names: List[str], parallel: bool = True) -> Dict[str, float]: 

119 """Pre-compile a list of modules, optionally in parallel.""" 

120 results: Dict[str, float] = {} 

121 

122 if parallel and len(module_names) > 1: 

123 results = self._precompile_parallel(module_names) 

124 else: 

125 for name in module_names: 

126 start = time.perf_counter() 

127 self._cache[name] = importlib.import_module(name) 

128 elapsed = time.perf_counter() - start 

129 results[name] = elapsed 

130 

131 self._preload_times.update(results) 

132 return results 

133 

134 def _precompile_parallel(self, module_names: List[str]) -> Dict[str, float]: 

135 results: Dict[str, float] = {} 

136 errors: List[str] = [] 

137 

138 def _worker(name: str) -> None: 

139 try: 

140 start = time.perf_counter() 

141 module = importlib.import_module(name) 

142 elapsed = time.perf_counter() - start 

143 with self._lock: 

144 self._cache[name] = module 

145 results[name] = elapsed 

146 except Exception as e: 

147 errors.append(f"{name}: {e}") 

148 

149 # Batch into groups 

150 for i in range(0, len(module_names), self._max_concurrent): 

151 batch = module_names[i:i + self._max_concurrent] 

152 threads = [threading.Thread(target=_worker, args=(name,)) for name in batch] 

153 for t in threads: 

154 t.start() 

155 for t in threads: 

156 t.join() 

157 

158 return results 

159 

160 def get(self, module_name: str) -> Optional[types.ModuleType]: 

161 return self._cache.get(module_name) 

162 

163 def warm_cache(self, hot_modules: List[str]) -> int: 

164 """Preload hot modules into cache. Returns number newly cached.""" 

165 count = 0 

166 for name in hot_modules: 

167 if name not in self._cache: 

168 try: 

169 self._cache[name] = importlib.import_module(name) 

170 count += 1 

171 except Exception: 

172 pass 

173 return count 

174 

175 def clear(self) -> None: 

176 with self._lock: 

177 self._cache.clear() 

178 

179 @property 

180 def stats(self) -> Dict[str, Any]: 

181 with self._lock: 

182 return { 

183 "cached_modules": len(self._cache), 

184 "total_preload_time": sum(self._preload_times.values()), 

185 "module_times": dict(self._preload_times), 

186 } 

187 

188 

189# ============================================================================ 

190# StartupOptimizer 

191# ============================================================================ 

192 

193@dataclass 

194class _StartupPhase: 

195 name: str 

196 start_time: float = 0.0 

197 end_time: float = 0.0 

198 metadata: Dict[str, Any] = field(default_factory=dict) 

199 

200 @property 

201 def duration(self) -> float: 

202 return self.end_time - self.start_time 

203 

204 

205class StartupOptimizer: 

206 """Profile and optimize application startup sequence.""" 

207 

208 def __init__(self): 

209 self._phases: OrderedDict[str, _StartupPhase] = OrderedDict() 

210 self._lock = threading.Lock() 

211 self._total_start: float = 0.0 

212 self._total_end: float = 0.0 

213 

214 def start(self) -> None: 

215 """Mark the beginning of the startup sequence.""" 

216 self._total_start = time.perf_counter() 

217 

218 def begin_phase(self, name: str, **metadata) -> None: 

219 """Begin profiling a startup phase.""" 

220 with self._lock: 

221 phase = _StartupPhase(name=name, start_time=time.perf_counter(), metadata=metadata) 

222 self._phases[name] = phase 

223 

224 def end_phase(self, name: str) -> Optional[float]: 

225 """End profiling a startup phase. Returns duration.""" 

226 with self._lock: 

227 phase = self._phases.get(name) 

228 if phase: 

229 phase.end_time = time.perf_counter() 

230 return phase.duration 

231 return None 

232 

233 def end(self) -> None: 

234 """Mark the end of the startup sequence.""" 

235 self._total_end = time.perf_counter() 

236 

237 def report(self) -> Dict[str, Any]: 

238 """Generate a startup performance report.""" 

239 phases = [] 

240 for name, phase in self._phases.items(): 

241 phases.append({ 

242 "name": name, 

243 "duration_ms": round(phase.duration * 1000, 2), 

244 "pct_of_total": 0.0, 

245 **phase.metadata, 

246 }) 

247 

248 total_duration = self._total_end - self._total_start 

249 total_ms = round(total_duration * 1000, 2) 

250 

251 for p in phases: 

252 if total_ms > 0: 

253 p["pct_of_total"] = round(p["duration_ms"] / total_ms * 100, 1) 

254 

255 sorted_phases = sorted(phases, key=lambda x: x["duration_ms"], reverse=True) 

256 return { 

257 "total_duration_ms": total_ms, 

258 "phase_count": len(phases), 

259 "phases": sorted_phases, 

260 "bottleneck": sorted_phases[0]["name"] if sorted_phases else None, 

261 } 

262 

263 def total_duration_ms(self) -> float: 

264 return round((self._total_end - self._total_start) * 1000, 2) 

265 

266 

267# ============================================================================ 

268# Convenience Functions 

269# ============================================================================ 

270 

271def create_lazy_loader() -> LazyLoader: 

272 """Create a lazy module loader.""" 

273 return LazyLoader() 

274 

275 

276def create_module_preloader(max_concurrent: int = 4) -> ModulePreloader: 

277 """Create a module preloader for startup acceleration.""" 

278 return ModulePreloader(max_concurrent=max_concurrent) 

279 

280 

281def create_startup_optimizer() -> StartupOptimizer: 

282 """Create a startup sequence profiler and optimizer.""" 

283 return StartupOptimizer() 

284 

285 

286def quick_start( 

287 essential_modules: List[str], 

288 lazy_modules: List[str], 

289 hot_modules: Optional[List[str]] = None, 

290) -> Dict[str, Any]: 

291 """One-shot startup optimization: preload essentials, lazy-load the rest.""" 

292 preloader = ModulePreloader() 

293 loader = LazyLoader() 

294 

295 # Preload essential modules 

296 essential_times = preloader.precompile(essential_modules, parallel=True) 

297 

298 # Register lazy modules 

299 for name in lazy_modules: 

300 loader.register(name) 

301 

302 # Warm cache with hot modules (extras) 

303 if hot_modules: 

304 preloader.warm_cache(hot_modules) 

305 

306 return { 

307 "essential": essential_times, 

308 "lazy_count": len(lazy_modules), 

309 "cached_total": len(preloader._cache), 

310 "total_essential_ms": round(sum(essential_times.values()) * 1000, 2), 

311 }