Coverage for src / tracekit / plugins / cli.py: 97%

194 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-11 23:04 +0000

1"""Plugin management CLI. 

2 

3This module provides command-line interface for plugin management including 

4list, info, enable/disable, install, and validate operations. 

5""" 

6 

7from __future__ import annotations 

8 

9import hashlib 

10import logging 

11import shutil 

12import subprocess 

13import sys 

14import tempfile 

15from pathlib import Path 

16from urllib.parse import urlparse 

17 

18from tracekit.plugins.discovery import discover_plugins, get_plugin_paths 

19from tracekit.plugins.lifecycle import get_lifecycle_manager 

20from tracekit.plugins.registry import get_plugin_registry 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25class PluginInstaller: 

26 """Plugin installer with integrity validation. 

27 

28 Supports installing plugins from: 

29 - Repository URLs (git) 

30 - Archive files (.tar.gz, .zip) 

31 - Local directories 

32 

33 References: 

34 PLUG-007: Plugin CLI - install from repository URL, integrity validation 

35 """ 

36 

37 def __init__(self, install_dir: Path | None = None) -> None: 

38 """Initialize installer. 

39 

40 Args: 

41 install_dir: Directory to install plugins (defaults to user plugins) 

42 """ 

43 if install_dir is None: 

44 paths = get_plugin_paths() 

45 # Use first user-writable path 

46 install_dir = paths[0] if paths else Path.home() / ".tracekit" / "plugins" 

47 

48 self.install_dir = install_dir 

49 self.install_dir.mkdir(parents=True, exist_ok=True) 

50 

51 def install_from_url( 

52 self, 

53 url: str, 

54 *, 

55 checksum: str | None = None, 

56 checksum_algo: str = "sha256", 

57 ) -> Path: 

58 """Install plugin from URL. 

59 

60 Args: 

61 url: Plugin repository URL or archive URL 

62 checksum: Expected checksum for integrity validation 

63 checksum_algo: Hash algorithm (sha256, sha512, md5) 

64 

65 Returns: 

66 Path to installed plugin 

67 

68 Raises: 

69 ValueError: If checksum verification fails or unsupported URL type. 

70 

71 References: 

72 PLUG-007: Plugin CLI - install from repository URL, integrity validation 

73 

74 Example: 

75 >>> installer = PluginInstaller() 

76 >>> path = installer.install_from_url( 

77 ... "https://github.com/user/plugin.git", 

78 ... checksum="abc123...", 

79 ... ) 

80 """ 

81 logger.info(f"Installing plugin from URL: {url}") 

82 

83 parsed = urlparse(url) 

84 

85 # Determine source type 

86 if parsed.path.endswith(".git") or "github.com" in parsed.netloc: 

87 return self._install_from_git(url, checksum, checksum_algo) 

88 elif parsed.path.endswith((".tar.gz", ".zip")): 

89 return self._install_from_archive(url, checksum, checksum_algo) 

90 else: 

91 raise ValueError(f"Unsupported URL type: {url}") 

92 

93 def _install_from_git( 

94 self, 

95 url: str, 

96 checksum: str | None, 

97 checksum_algo: str, 

98 ) -> Path: 

99 """Install plugin from git repository. 

100 

101 Args: 

102 url: Git repository URL 

103 checksum: Expected checksum 

104 checksum_algo: Hash algorithm 

105 

106 Returns: 

107 Path to installed plugin 

108 

109 Raises: 

110 RuntimeError: If git clone fails. 

111 ValueError: If checksum verification fails. 

112 

113 References: 

114 PLUG-007: Plugin CLI - install from repository URL 

115 """ 

116 # Extract plugin name from URL 

117 plugin_name = Path(urlparse(url).path).stem 

118 if plugin_name.endswith(".git"): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true

119 plugin_name = plugin_name[:-4] 

120 

121 target_dir = self.install_dir / plugin_name 

122 

123 # Clone repository to temp directory first 

124 with tempfile.TemporaryDirectory() as temp_dir: 

125 temp_path = Path(temp_dir) / plugin_name 

126 

127 try: 

128 subprocess.run( 

129 ["git", "clone", url, str(temp_path)], 

130 check=True, 

131 capture_output=True, 

132 text=True, 

133 ) 

134 except subprocess.CalledProcessError as e: 

135 raise RuntimeError(f"Git clone failed: {e.stderr}") from e 

136 

137 # Validate checksum if provided 

138 if checksum: 

139 actual = self._compute_directory_checksum(temp_path, checksum_algo) 

140 if actual != checksum: 

141 raise ValueError(f"Checksum mismatch: expected {checksum}, got {actual}") 

142 

143 # Move to final location 

144 if target_dir.exists(): 

145 logger.warning(f"Removing existing plugin at {target_dir}") 

146 shutil.rmtree(target_dir) 

147 

148 shutil.copytree(temp_path, target_dir) 

149 

150 logger.info(f"Installed plugin '{plugin_name}' to {target_dir}") 

151 return target_dir 

152 

153 def _install_from_archive( 

154 self, 

155 url: str, 

156 checksum: str | None, 

157 checksum_algo: str, 

158 ) -> Path: 

159 """Install plugin from archive file. 

160 

161 Args: 

162 url: Archive URL 

163 checksum: Expected checksum 

164 checksum_algo: Hash algorithm 

165 

166 Returns: 

167 Path to installed plugin 

168 

169 Raises: 

170 RuntimeError: If download fails. 

171 ValueError: If checksum verification fails or archive format is invalid. 

172 

173 References: 

174 PLUG-007: Plugin CLI - integrity validation 

175 """ 

176 import urllib.request 

177 

178 # Download archive 

179 with tempfile.TemporaryDirectory() as temp_dir: 

180 archive_path = Path(temp_dir) / "plugin.archive" 

181 

182 try: 

183 urllib.request.urlretrieve(url, archive_path) 

184 except Exception as e: 

185 raise RuntimeError(f"Download failed: {e}") from e 

186 

187 # Validate checksum 

188 if checksum: 

189 actual = self._compute_file_checksum(archive_path, checksum_algo) 

190 if actual != checksum: 

191 raise ValueError(f"Checksum mismatch: expected {checksum}, got {actual}") 

192 

193 # Extract archive 

194 extract_dir = Path(temp_dir) / "extracted" 

195 shutil.unpack_archive(archive_path, extract_dir) 

196 

197 # Find plugin directory (should be single top-level dir) 

198 plugin_dirs = [d for d in extract_dir.iterdir() if d.is_dir()] 

199 if len(plugin_dirs) != 1: 

200 raise ValueError( 

201 f"Archive should contain single plugin directory, found {len(plugin_dirs)}" 

202 ) 

203 

204 plugin_dir = plugin_dirs[0] 

205 plugin_name = plugin_dir.name 

206 

207 target_dir = self.install_dir / plugin_name 

208 

209 # Move to final location 

210 if target_dir.exists(): 

211 logger.warning(f"Removing existing plugin at {target_dir}") 

212 shutil.rmtree(target_dir) 

213 

214 shutil.copytree(plugin_dir, target_dir) 

215 

216 logger.info(f"Installed plugin '{plugin_name}' to {target_dir}") 

217 return target_dir 

218 

219 def _compute_file_checksum(self, path: Path, algo: str) -> str: 

220 """Compute checksum of a file. 

221 

222 Args: 

223 path: File path 

224 algo: Hash algorithm 

225 

226 Returns: 

227 Hexadecimal checksum 

228 

229 References: 

230 PLUG-007: Plugin CLI - integrity validation (checksum verification) 

231 """ 

232 hasher = hashlib.new(algo) 

233 

234 with open(path, "rb") as f: 

235 while chunk := f.read(8192): 

236 hasher.update(chunk) 

237 

238 return hasher.hexdigest() 

239 

240 def _compute_directory_checksum(self, path: Path, algo: str) -> str: 

241 """Compute checksum of a directory (all files). 

242 

243 Args: 

244 path: Directory path 

245 algo: Hash algorithm 

246 

247 Returns: 

248 Hexadecimal checksum 

249 

250 References: 

251 PLUG-007: Plugin CLI - integrity validation (checksum verification) 

252 """ 

253 hasher = hashlib.new(algo) 

254 

255 # Sort files for consistent ordering 

256 files = sorted(path.rglob("*")) 

257 

258 for file_path in files: 

259 if file_path.is_file(): 

260 # Include relative path in hash 

261 rel_path = file_path.relative_to(path) 

262 hasher.update(str(rel_path).encode()) 

263 

264 # Include file content 

265 with open(file_path, "rb") as f: 

266 while chunk := f.read(8192): 

267 hasher.update(chunk) 

268 

269 return hasher.hexdigest() 

270 

271 def validate_integrity( 

272 self, 

273 plugin_path: Path, 

274 expected_checksum: str, 

275 algo: str = "sha256", 

276 ) -> bool: 

277 """Validate plugin integrity. 

278 

279 Args: 

280 plugin_path: Path to plugin 

281 expected_checksum: Expected checksum 

282 algo: Hash algorithm 

283 

284 Returns: 

285 True if checksum matches 

286 

287 References: 

288 PLUG-007: Plugin CLI - integrity validation (checksum verification) 

289 """ 

290 if plugin_path.is_file(): 

291 actual = self._compute_file_checksum(plugin_path, algo) 

292 else: 

293 actual = self._compute_directory_checksum(plugin_path, algo) 

294 

295 return actual == expected_checksum 

296 

297 

298def cli_list_plugins() -> None: 

299 """List all available plugins (CLI command). 

300 

301 References: 

302 PLUG-007: Plugin CLI 

303 """ 

304 plugins = discover_plugins(compatible_only=False) 

305 

306 if not plugins: 

307 print("No plugins found") 

308 return 

309 

310 print(f"Found {len(plugins)} plugins:\n") 

311 

312 for plugin in plugins: 

313 status = "enabled" if plugin.metadata.enabled else "disabled" 

314 compat = "compatible" if plugin.compatible else "incompatible" 

315 

316 print(f" {plugin.metadata.name} v{plugin.metadata.version} [{status}]") 

317 if plugin.path: 

318 print(f" Path: {plugin.path}") 

319 print(f" API: {plugin.metadata.api_version} ({compat})") 

320 

321 if plugin.metadata.provides: 

322 provides: list[str] = [] 

323 for key, values in plugin.metadata.provides.items(): 

324 provides.extend(f"{key}:{v}" for v in values) 

325 print(f" Provides: {', '.join(provides)}") 

326 

327 if plugin.load_error: 

328 print(f" Error: {plugin.load_error}") 

329 

330 print() 

331 

332 

333def cli_plugin_info(name: str) -> None: 

334 """Show detailed plugin information (CLI command). 

335 

336 Args: 

337 name: Plugin name 

338 

339 References: 

340 PLUG-007: Plugin CLI 

341 """ 

342 registry = get_plugin_registry() 

343 metadata = registry.get_metadata(name) 

344 

345 if metadata is None: 

346 print(f"Plugin '{name}' not found") 

347 sys.exit(1) 

348 

349 print(f"Name: {metadata.name}") 

350 print(f"Version: {metadata.version}") 

351 print(f"API Version: {metadata.api_version}") 

352 

353 if metadata.author: 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true

354 print(f"Author: {metadata.author}") 

355 

356 if metadata.description: 356 ↛ 359line 356 didn't jump to line 359 because the condition on line 356 was always true

357 print(f"Description: {metadata.description}") 

358 

359 if metadata.homepage: 

360 print(f"Homepage: {metadata.homepage}") 

361 

362 if metadata.license: 

363 print(f"License: {metadata.license}") 

364 

365 if metadata.path: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true

366 print(f"Path: {metadata.path}") 

367 

368 print(f"Status: {'enabled' if metadata.enabled else 'disabled'}") 

369 

370 if metadata.dependencies: 

371 print("\nDependencies:") 

372 for dep, version in metadata.dependencies.items(): 

373 print(f" - {dep} {version}") 

374 

375 if metadata.provides: 

376 print("\nProvides:") 

377 for key, values in metadata.provides.items(): 

378 for value in values: 

379 print(f" - {key}: {value}") 

380 

381 

382def cli_enable_plugin(name: str) -> None: 

383 """Enable a plugin (CLI command). 

384 

385 Args: 

386 name: Plugin name 

387 

388 References: 

389 PLUG-007: Plugin CLI 

390 """ 

391 manager = get_lifecycle_manager() 

392 

393 try: 

394 manager.enable_plugin(name) 

395 print(f"Plugin '{name}' enabled") 

396 except Exception as e: 

397 print(f"Failed to enable plugin: {e}") 

398 sys.exit(1) 

399 

400 

401def cli_disable_plugin(name: str) -> None: 

402 """Disable a plugin (CLI command). 

403 

404 Args: 

405 name: Plugin name 

406 

407 References: 

408 PLUG-007: Plugin CLI 

409 """ 

410 manager = get_lifecycle_manager() 

411 

412 try: 

413 manager.disable_plugin(name) 

414 print(f"Plugin '{name}' disabled") 

415 except Exception as e: 

416 print(f"Failed to disable plugin: {e}") 

417 sys.exit(1) 

418 

419 

420def cli_validate_plugin(name: str) -> None: 

421 """Validate a plugin (CLI command). 

422 

423 Args: 

424 name: Plugin name 

425 

426 References: 

427 PLUG-007: Plugin CLI - integrity validation 

428 """ 

429 plugins = discover_plugins(compatible_only=False) 

430 plugin = next((p for p in plugins if p.metadata.name == name), None) 

431 

432 if plugin is None: 

433 print(f"Plugin '{name}' not found") 

434 sys.exit(1) 

435 

436 print(f"Validating {name}...") 

437 

438 # Check metadata 

439 if not plugin.metadata.name: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true

440 print(" ✗ Missing name") 

441 sys.exit(1) 

442 print(" ✓ Metadata valid") 

443 

444 # Check dependencies 

445 if plugin.metadata.dependencies: 

446 print(f" ✓ Dependencies declared: {len(plugin.metadata.dependencies)}") 

447 else: 

448 print(" ✓ No dependencies") 

449 

450 # Check API compatibility 

451 if plugin.compatible: 

452 print(" ✓ API version compatible") 

453 else: 

454 print(f" ✗ API version incompatible: {plugin.metadata.api_version}") 

455 sys.exit(1) 

456 

457 # Check for errors 

458 if plugin.load_error: 

459 print(f" ✗ Load error: {plugin.load_error}") 

460 sys.exit(1) 

461 

462 print("\nPlugin is valid") 

463 

464 

465def cli_install_plugin( 

466 url: str, 

467 *, 

468 checksum: str | None = None, 

469) -> None: 

470 """Install a plugin from URL (CLI command). 

471 

472 Args: 

473 url: Plugin repository or archive URL 

474 checksum: Expected checksum for validation 

475 

476 References: 

477 PLUG-007: Plugin CLI - install from repository URL, integrity validation 

478 """ 

479 installer = PluginInstaller() 

480 

481 try: 

482 path = installer.install_from_url(url, checksum=checksum) 

483 print(f"Successfully installed plugin to {path}") 

484 except Exception as e: 

485 print(f"Installation failed: {e}") 

486 sys.exit(1) 

487 

488 

489__all__ = [ 

490 "PluginInstaller", 

491 "cli_disable_plugin", 

492 "cli_enable_plugin", 

493 "cli_install_plugin", 

494 "cli_list_plugins", 

495 "cli_plugin_info", 

496 "cli_validate_plugin", 

497]