Coverage for src / tracekit / plugins / cli.py: 97%
194 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-11 23:04 +0000
1"""Plugin management CLI.
3This module provides command-line interface for plugin management including
4list, info, enable/disable, install, and validate operations.
5"""
7from __future__ import annotations
9import hashlib
10import logging
11import shutil
12import subprocess
13import sys
14import tempfile
15from pathlib import Path
16from urllib.parse import urlparse
18from tracekit.plugins.discovery import discover_plugins, get_plugin_paths
19from tracekit.plugins.lifecycle import get_lifecycle_manager
20from tracekit.plugins.registry import get_plugin_registry
22logger = logging.getLogger(__name__)
25class PluginInstaller:
26 """Plugin installer with integrity validation.
28 Supports installing plugins from:
29 - Repository URLs (git)
30 - Archive files (.tar.gz, .zip)
31 - Local directories
33 References:
34 PLUG-007: Plugin CLI - install from repository URL, integrity validation
35 """
37 def __init__(self, install_dir: Path | None = None) -> None:
38 """Initialize installer.
40 Args:
41 install_dir: Directory to install plugins (defaults to user plugins)
42 """
43 if install_dir is None:
44 paths = get_plugin_paths()
45 # Use first user-writable path
46 install_dir = paths[0] if paths else Path.home() / ".tracekit" / "plugins"
48 self.install_dir = install_dir
49 self.install_dir.mkdir(parents=True, exist_ok=True)
51 def install_from_url(
52 self,
53 url: str,
54 *,
55 checksum: str | None = None,
56 checksum_algo: str = "sha256",
57 ) -> Path:
58 """Install plugin from URL.
60 Args:
61 url: Plugin repository URL or archive URL
62 checksum: Expected checksum for integrity validation
63 checksum_algo: Hash algorithm (sha256, sha512, md5)
65 Returns:
66 Path to installed plugin
68 Raises:
69 ValueError: If checksum verification fails or unsupported URL type.
71 References:
72 PLUG-007: Plugin CLI - install from repository URL, integrity validation
74 Example:
75 >>> installer = PluginInstaller()
76 >>> path = installer.install_from_url(
77 ... "https://github.com/user/plugin.git",
78 ... checksum="abc123...",
79 ... )
80 """
81 logger.info(f"Installing plugin from URL: {url}")
83 parsed = urlparse(url)
85 # Determine source type
86 if parsed.path.endswith(".git") or "github.com" in parsed.netloc:
87 return self._install_from_git(url, checksum, checksum_algo)
88 elif parsed.path.endswith((".tar.gz", ".zip")):
89 return self._install_from_archive(url, checksum, checksum_algo)
90 else:
91 raise ValueError(f"Unsupported URL type: {url}")
93 def _install_from_git(
94 self,
95 url: str,
96 checksum: str | None,
97 checksum_algo: str,
98 ) -> Path:
99 """Install plugin from git repository.
101 Args:
102 url: Git repository URL
103 checksum: Expected checksum
104 checksum_algo: Hash algorithm
106 Returns:
107 Path to installed plugin
109 Raises:
110 RuntimeError: If git clone fails.
111 ValueError: If checksum verification fails.
113 References:
114 PLUG-007: Plugin CLI - install from repository URL
115 """
116 # Extract plugin name from URL
117 plugin_name = Path(urlparse(url).path).stem
118 if plugin_name.endswith(".git"): 118 ↛ 119line 118 didn't jump to line 119 because the condition on line 118 was never true
119 plugin_name = plugin_name[:-4]
121 target_dir = self.install_dir / plugin_name
123 # Clone repository to temp directory first
124 with tempfile.TemporaryDirectory() as temp_dir:
125 temp_path = Path(temp_dir) / plugin_name
127 try:
128 subprocess.run(
129 ["git", "clone", url, str(temp_path)],
130 check=True,
131 capture_output=True,
132 text=True,
133 )
134 except subprocess.CalledProcessError as e:
135 raise RuntimeError(f"Git clone failed: {e.stderr}") from e
137 # Validate checksum if provided
138 if checksum:
139 actual = self._compute_directory_checksum(temp_path, checksum_algo)
140 if actual != checksum:
141 raise ValueError(f"Checksum mismatch: expected {checksum}, got {actual}")
143 # Move to final location
144 if target_dir.exists():
145 logger.warning(f"Removing existing plugin at {target_dir}")
146 shutil.rmtree(target_dir)
148 shutil.copytree(temp_path, target_dir)
150 logger.info(f"Installed plugin '{plugin_name}' to {target_dir}")
151 return target_dir
153 def _install_from_archive(
154 self,
155 url: str,
156 checksum: str | None,
157 checksum_algo: str,
158 ) -> Path:
159 """Install plugin from archive file.
161 Args:
162 url: Archive URL
163 checksum: Expected checksum
164 checksum_algo: Hash algorithm
166 Returns:
167 Path to installed plugin
169 Raises:
170 RuntimeError: If download fails.
171 ValueError: If checksum verification fails or archive format is invalid.
173 References:
174 PLUG-007: Plugin CLI - integrity validation
175 """
176 import urllib.request
178 # Download archive
179 with tempfile.TemporaryDirectory() as temp_dir:
180 archive_path = Path(temp_dir) / "plugin.archive"
182 try:
183 urllib.request.urlretrieve(url, archive_path)
184 except Exception as e:
185 raise RuntimeError(f"Download failed: {e}") from e
187 # Validate checksum
188 if checksum:
189 actual = self._compute_file_checksum(archive_path, checksum_algo)
190 if actual != checksum:
191 raise ValueError(f"Checksum mismatch: expected {checksum}, got {actual}")
193 # Extract archive
194 extract_dir = Path(temp_dir) / "extracted"
195 shutil.unpack_archive(archive_path, extract_dir)
197 # Find plugin directory (should be single top-level dir)
198 plugin_dirs = [d for d in extract_dir.iterdir() if d.is_dir()]
199 if len(plugin_dirs) != 1:
200 raise ValueError(
201 f"Archive should contain single plugin directory, found {len(plugin_dirs)}"
202 )
204 plugin_dir = plugin_dirs[0]
205 plugin_name = plugin_dir.name
207 target_dir = self.install_dir / plugin_name
209 # Move to final location
210 if target_dir.exists():
211 logger.warning(f"Removing existing plugin at {target_dir}")
212 shutil.rmtree(target_dir)
214 shutil.copytree(plugin_dir, target_dir)
216 logger.info(f"Installed plugin '{plugin_name}' to {target_dir}")
217 return target_dir
219 def _compute_file_checksum(self, path: Path, algo: str) -> str:
220 """Compute checksum of a file.
222 Args:
223 path: File path
224 algo: Hash algorithm
226 Returns:
227 Hexadecimal checksum
229 References:
230 PLUG-007: Plugin CLI - integrity validation (checksum verification)
231 """
232 hasher = hashlib.new(algo)
234 with open(path, "rb") as f:
235 while chunk := f.read(8192):
236 hasher.update(chunk)
238 return hasher.hexdigest()
240 def _compute_directory_checksum(self, path: Path, algo: str) -> str:
241 """Compute checksum of a directory (all files).
243 Args:
244 path: Directory path
245 algo: Hash algorithm
247 Returns:
248 Hexadecimal checksum
250 References:
251 PLUG-007: Plugin CLI - integrity validation (checksum verification)
252 """
253 hasher = hashlib.new(algo)
255 # Sort files for consistent ordering
256 files = sorted(path.rglob("*"))
258 for file_path in files:
259 if file_path.is_file():
260 # Include relative path in hash
261 rel_path = file_path.relative_to(path)
262 hasher.update(str(rel_path).encode())
264 # Include file content
265 with open(file_path, "rb") as f:
266 while chunk := f.read(8192):
267 hasher.update(chunk)
269 return hasher.hexdigest()
271 def validate_integrity(
272 self,
273 plugin_path: Path,
274 expected_checksum: str,
275 algo: str = "sha256",
276 ) -> bool:
277 """Validate plugin integrity.
279 Args:
280 plugin_path: Path to plugin
281 expected_checksum: Expected checksum
282 algo: Hash algorithm
284 Returns:
285 True if checksum matches
287 References:
288 PLUG-007: Plugin CLI - integrity validation (checksum verification)
289 """
290 if plugin_path.is_file():
291 actual = self._compute_file_checksum(plugin_path, algo)
292 else:
293 actual = self._compute_directory_checksum(plugin_path, algo)
295 return actual == expected_checksum
298def cli_list_plugins() -> None:
299 """List all available plugins (CLI command).
301 References:
302 PLUG-007: Plugin CLI
303 """
304 plugins = discover_plugins(compatible_only=False)
306 if not plugins:
307 print("No plugins found")
308 return
310 print(f"Found {len(plugins)} plugins:\n")
312 for plugin in plugins:
313 status = "enabled" if plugin.metadata.enabled else "disabled"
314 compat = "compatible" if plugin.compatible else "incompatible"
316 print(f" {plugin.metadata.name} v{plugin.metadata.version} [{status}]")
317 if plugin.path:
318 print(f" Path: {plugin.path}")
319 print(f" API: {plugin.metadata.api_version} ({compat})")
321 if plugin.metadata.provides:
322 provides: list[str] = []
323 for key, values in plugin.metadata.provides.items():
324 provides.extend(f"{key}:{v}" for v in values)
325 print(f" Provides: {', '.join(provides)}")
327 if plugin.load_error:
328 print(f" Error: {plugin.load_error}")
330 print()
333def cli_plugin_info(name: str) -> None:
334 """Show detailed plugin information (CLI command).
336 Args:
337 name: Plugin name
339 References:
340 PLUG-007: Plugin CLI
341 """
342 registry = get_plugin_registry()
343 metadata = registry.get_metadata(name)
345 if metadata is None:
346 print(f"Plugin '{name}' not found")
347 sys.exit(1)
349 print(f"Name: {metadata.name}")
350 print(f"Version: {metadata.version}")
351 print(f"API Version: {metadata.api_version}")
353 if metadata.author: 353 ↛ 356line 353 didn't jump to line 356 because the condition on line 353 was always true
354 print(f"Author: {metadata.author}")
356 if metadata.description: 356 ↛ 359line 356 didn't jump to line 359 because the condition on line 356 was always true
357 print(f"Description: {metadata.description}")
359 if metadata.homepage:
360 print(f"Homepage: {metadata.homepage}")
362 if metadata.license:
363 print(f"License: {metadata.license}")
365 if metadata.path: 365 ↛ 366line 365 didn't jump to line 366 because the condition on line 365 was never true
366 print(f"Path: {metadata.path}")
368 print(f"Status: {'enabled' if metadata.enabled else 'disabled'}")
370 if metadata.dependencies:
371 print("\nDependencies:")
372 for dep, version in metadata.dependencies.items():
373 print(f" - {dep} {version}")
375 if metadata.provides:
376 print("\nProvides:")
377 for key, values in metadata.provides.items():
378 for value in values:
379 print(f" - {key}: {value}")
382def cli_enable_plugin(name: str) -> None:
383 """Enable a plugin (CLI command).
385 Args:
386 name: Plugin name
388 References:
389 PLUG-007: Plugin CLI
390 """
391 manager = get_lifecycle_manager()
393 try:
394 manager.enable_plugin(name)
395 print(f"Plugin '{name}' enabled")
396 except Exception as e:
397 print(f"Failed to enable plugin: {e}")
398 sys.exit(1)
401def cli_disable_plugin(name: str) -> None:
402 """Disable a plugin (CLI command).
404 Args:
405 name: Plugin name
407 References:
408 PLUG-007: Plugin CLI
409 """
410 manager = get_lifecycle_manager()
412 try:
413 manager.disable_plugin(name)
414 print(f"Plugin '{name}' disabled")
415 except Exception as e:
416 print(f"Failed to disable plugin: {e}")
417 sys.exit(1)
420def cli_validate_plugin(name: str) -> None:
421 """Validate a plugin (CLI command).
423 Args:
424 name: Plugin name
426 References:
427 PLUG-007: Plugin CLI - integrity validation
428 """
429 plugins = discover_plugins(compatible_only=False)
430 plugin = next((p for p in plugins if p.metadata.name == name), None)
432 if plugin is None:
433 print(f"Plugin '{name}' not found")
434 sys.exit(1)
436 print(f"Validating {name}...")
438 # Check metadata
439 if not plugin.metadata.name: 439 ↛ 440line 439 didn't jump to line 440 because the condition on line 439 was never true
440 print(" ✗ Missing name")
441 sys.exit(1)
442 print(" ✓ Metadata valid")
444 # Check dependencies
445 if plugin.metadata.dependencies:
446 print(f" ✓ Dependencies declared: {len(plugin.metadata.dependencies)}")
447 else:
448 print(" ✓ No dependencies")
450 # Check API compatibility
451 if plugin.compatible:
452 print(" ✓ API version compatible")
453 else:
454 print(f" ✗ API version incompatible: {plugin.metadata.api_version}")
455 sys.exit(1)
457 # Check for errors
458 if plugin.load_error:
459 print(f" ✗ Load error: {plugin.load_error}")
460 sys.exit(1)
462 print("\nPlugin is valid")
465def cli_install_plugin(
466 url: str,
467 *,
468 checksum: str | None = None,
469) -> None:
470 """Install a plugin from URL (CLI command).
472 Args:
473 url: Plugin repository or archive URL
474 checksum: Expected checksum for validation
476 References:
477 PLUG-007: Plugin CLI - install from repository URL, integrity validation
478 """
479 installer = PluginInstaller()
481 try:
482 path = installer.install_from_url(url, checksum=checksum)
483 print(f"Successfully installed plugin to {path}")
484 except Exception as e:
485 print(f"Installation failed: {e}")
486 sys.exit(1)
489__all__ = [
490 "PluginInstaller",
491 "cli_disable_plugin",
492 "cli_enable_plugin",
493 "cli_install_plugin",
494 "cli_list_plugins",
495 "cli_plugin_info",
496 "cli_validate_plugin",
497]