Coverage for agentos/tools/startup_accelerator.py: 0%
181 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 01:06 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 01:06 +0800
1"""
2Startup Acceleration Tools for AgentOS.
3Lazy loading, module pre-compilation, and startup sequence optimization.
4"""
6import importlib
7import threading
8import time
9import types
10from collections import OrderedDict
11from dataclasses import dataclass, field
12from typing import Any, Callable, Dict, List, Optional, Tuple, Union
15# ============================================================================
16# LazyLoader
17# ============================================================================
19class _LazyModule:
20 """Proxy that defers module import until an attribute is accessed."""
22 def __init__(self, module_name: str):
23 self._module_name = module_name
24 self._module: Optional[types.ModuleType] = None
26 def _load(self) -> types.ModuleType:
27 if self._module is None:
28 self._module = importlib.import_module(self._module_name)
29 return self._module
31 def __getattr__(self, name: str) -> Any:
32 return getattr(self._load(), name)
34 def __repr__(self) -> str:
35 if self._module is None:
36 return f"<LazyModule: {self._module_name} (unloaded)>"
37 return repr(self._module)
40class LazyLoader:
41 """Registry for lazy-loaded modules with batch loading and dependency tracking."""
43 def __init__(self):
44 self._proxies: Dict[str, _LazyModule] = {}
45 self._load_times: Dict[str, float] = {}
46 self._lock = threading.Lock()
48 def register(self, module_name: str) -> _LazyModule:
49 """Register a module for lazy loading."""
50 with self._lock:
51 if module_name not in self._proxies:
52 self._proxies[module_name] = _LazyModule(module_name)
53 return self._proxies[module_name]
55 def load_now(self, module_name: str) -> types.ModuleType:
56 """Eagerly load a registered module."""
57 proxy = self.register(module_name)
58 with self._lock:
59 start = time.perf_counter()
60 result = proxy._load()
61 elapsed = time.perf_counter() - start
62 self._load_times[module_name] = elapsed
63 return result
65 def load_all(self) -> List[Tuple[str, float]]:
66 """Eagerly load all registered modules. Returns load times."""
67 results: List[Tuple[str, float]] = []
68 for name in list(self._proxies.keys()):
69 start = time.perf_counter()
70 self._proxies[name]._load()
71 elapsed = time.perf_counter() - start
72 self._load_times[name] = elapsed
73 results.append((name, elapsed))
74 return results
76 def preload(self, module_names: List[str]) -> List[Tuple[str, float]]:
77 """Register and load a batch of modules."""
78 for name in module_names:
79 self.register(name)
80 results: List[Tuple[str, float]] = []
81 for name in module_names:
82 start = time.perf_counter()
83 self._proxies[name]._load()
84 elapsed = time.perf_counter() - start
85 self._load_times[name] = elapsed
86 results.append((name, elapsed))
87 return results
89 @property
90 def stats(self) -> Dict[str, Any]:
91 with self._lock:
92 loaded = {k: v for k, v in self._proxies.items() if v._module is not None}
93 return {
94 "registered": len(self._proxies),
95 "loaded": len(loaded),
96 "unloaded": len(self._proxies) - len(loaded),
97 "load_times": dict(self._load_times),
98 "total_load_time": sum(self._load_times.values()),
99 }
101 def __getitem__(self, module_name: str) -> _LazyModule:
102 return self.register(module_name)
105# ============================================================================
106# ModulePreloader
107# ============================================================================
109class ModulePreloader:
110 """Pre-compile and cache frequently used modules for fast startup."""
112 def __init__(self, max_concurrent: int = 4):
113 self._cache: Dict[str, types.ModuleType] = {}
114 self._max_concurrent = max_concurrent
115 self._lock = threading.Lock()
116 self._preload_times: Dict[str, float] = {}
118 def precompile(self, module_names: List[str], parallel: bool = True) -> Dict[str, float]:
119 """Pre-compile a list of modules, optionally in parallel."""
120 results: Dict[str, float] = {}
122 if parallel and len(module_names) > 1:
123 results = self._precompile_parallel(module_names)
124 else:
125 for name in module_names:
126 start = time.perf_counter()
127 self._cache[name] = importlib.import_module(name)
128 elapsed = time.perf_counter() - start
129 results[name] = elapsed
131 self._preload_times.update(results)
132 return results
134 def _precompile_parallel(self, module_names: List[str]) -> Dict[str, float]:
135 results: Dict[str, float] = {}
136 errors: List[str] = []
138 def _worker(name: str) -> None:
139 try:
140 start = time.perf_counter()
141 module = importlib.import_module(name)
142 elapsed = time.perf_counter() - start
143 with self._lock:
144 self._cache[name] = module
145 results[name] = elapsed
146 except Exception as e:
147 errors.append(f"{name}: {e}")
149 # Batch into groups
150 for i in range(0, len(module_names), self._max_concurrent):
151 batch = module_names[i:i + self._max_concurrent]
152 threads = [threading.Thread(target=_worker, args=(name,)) for name in batch]
153 for t in threads:
154 t.start()
155 for t in threads:
156 t.join()
158 return results
160 def get(self, module_name: str) -> Optional[types.ModuleType]:
161 return self._cache.get(module_name)
163 def warm_cache(self, hot_modules: List[str]) -> int:
164 """Preload hot modules into cache. Returns number newly cached."""
165 count = 0
166 for name in hot_modules:
167 if name not in self._cache:
168 try:
169 self._cache[name] = importlib.import_module(name)
170 count += 1
171 except Exception:
172 pass
173 return count
175 def clear(self) -> None:
176 with self._lock:
177 self._cache.clear()
179 @property
180 def stats(self) -> Dict[str, Any]:
181 with self._lock:
182 return {
183 "cached_modules": len(self._cache),
184 "total_preload_time": sum(self._preload_times.values()),
185 "module_times": dict(self._preload_times),
186 }
189# ============================================================================
190# StartupOptimizer
191# ============================================================================
193@dataclass
194class _StartupPhase:
195 name: str
196 start_time: float = 0.0
197 end_time: float = 0.0
198 metadata: Dict[str, Any] = field(default_factory=dict)
200 @property
201 def duration(self) -> float:
202 return self.end_time - self.start_time
205class StartupOptimizer:
206 """Profile and optimize application startup sequence."""
208 def __init__(self):
209 self._phases: OrderedDict[str, _StartupPhase] = OrderedDict()
210 self._lock = threading.Lock()
211 self._total_start: float = 0.0
212 self._total_end: float = 0.0
214 def start(self) -> None:
215 """Mark the beginning of the startup sequence."""
216 self._total_start = time.perf_counter()
218 def begin_phase(self, name: str, **metadata) -> None:
219 """Begin profiling a startup phase."""
220 with self._lock:
221 phase = _StartupPhase(name=name, start_time=time.perf_counter(), metadata=metadata)
222 self._phases[name] = phase
224 def end_phase(self, name: str) -> Optional[float]:
225 """End profiling a startup phase. Returns duration."""
226 with self._lock:
227 phase = self._phases.get(name)
228 if phase:
229 phase.end_time = time.perf_counter()
230 return phase.duration
231 return None
233 def end(self) -> None:
234 """Mark the end of the startup sequence."""
235 self._total_end = time.perf_counter()
237 def report(self) -> Dict[str, Any]:
238 """Generate a startup performance report."""
239 phases = []
240 for name, phase in self._phases.items():
241 phases.append({
242 "name": name,
243 "duration_ms": round(phase.duration * 1000, 2),
244 "pct_of_total": 0.0,
245 **phase.metadata,
246 })
248 total_duration = self._total_end - self._total_start
249 total_ms = round(total_duration * 1000, 2)
251 for p in phases:
252 if total_ms > 0:
253 p["pct_of_total"] = round(p["duration_ms"] / total_ms * 100, 1)
255 sorted_phases = sorted(phases, key=lambda x: x["duration_ms"], reverse=True)
256 return {
257 "total_duration_ms": total_ms,
258 "phase_count": len(phases),
259 "phases": sorted_phases,
260 "bottleneck": sorted_phases[0]["name"] if sorted_phases else None,
261 }
263 def total_duration_ms(self) -> float:
264 return round((self._total_end - self._total_start) * 1000, 2)
267# ============================================================================
268# Convenience Functions
269# ============================================================================
271def create_lazy_loader() -> LazyLoader:
272 """Create a lazy module loader."""
273 return LazyLoader()
276def create_module_preloader(max_concurrent: int = 4) -> ModulePreloader:
277 """Create a module preloader for startup acceleration."""
278 return ModulePreloader(max_concurrent=max_concurrent)
281def create_startup_optimizer() -> StartupOptimizer:
282 """Create a startup sequence profiler and optimizer."""
283 return StartupOptimizer()
286def quick_start(
287 essential_modules: List[str],
288 lazy_modules: List[str],
289 hot_modules: Optional[List[str]] = None,
290) -> Dict[str, Any]:
291 """One-shot startup optimization: preload essentials, lazy-load the rest."""
292 preloader = ModulePreloader()
293 loader = LazyLoader()
295 # Preload essential modules
296 essential_times = preloader.precompile(essential_modules, parallel=True)
298 # Register lazy modules
299 for name in lazy_modules:
300 loader.register(name)
302 # Warm cache with hot modules (extras)
303 if hot_modules:
304 preloader.warm_cache(hot_modules)
306 return {
307 "essential": essential_times,
308 "lazy_count": len(lazy_modules),
309 "cached_total": len(preloader._cache),
310 "total_essential_ms": round(sum(essential_times.values()) * 1000, 2),
311 }