Coverage for src/alprina_cli/scanner.py: 8%

409 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:15 +0100

1""" 

2Scanner module for Alprina CLI. 

3Handles remote and local security scanning using Alprina security agents. 

4""" 

5 

6from pathlib import Path 

7from typing import Optional 

8import httpx 

9import os 

10from rich.console import Console 

11from rich.progress import Progress, SpinnerColumn, TextColumn 

12from rich.panel import Panel 

13from rich.table import Table 

14 

15from .auth import is_authenticated, get_auth_headers, get_backend_url 

16from .policy import validate_target 

17from .security_engine import run_remote_scan, run_local_scan 

18from .reporting import write_event 

19from .report_generator import generate_security_reports 

20from .services.cve_service import enrich_findings 

21from .services.container_scanner import get_container_scanner 

22 

23console = Console() 

24 

25 

26def scan_command( 

27 target: str, 

28 profile: str = "default", 

29 safe_only: bool = True, 

30 output: Optional[Path] = None, 

31 quick: bool = False, 

32 container: bool = False, 

33 agent: Optional[list[str]] = None, 

34 verbose: bool = False, 

35 # Week 4: Unified scanner parameters 

36 all_analyzers: bool = False, 

37 symbolic: bool = False, 

38 mev: bool = False, 

39 cross_contract: bool = False, 

40 gas: bool = False, # Week 4 Day 3 

41 tvl: Optional[float] = None, 

42 protocol_type: Optional[str] = None, 

43 output_format: str = "json" 

44): 

45 """ 

46 Execute a security scan on a target (remote, local, or container). 

47 

48 Args: 

49 target: Target to scan (URL, IP, local path, or Docker image) 

50 profile: Scan profile to use 

51 safe_only: Only run safe, non-intrusive scans 

52 output: Output file path 

53 quick: Run quick 5-second scan for critical issues only 

54 container: Scan as Docker container image 

55 agent: Specific agent(s) to use 

56 verbose: Show detailed output 

57 all_analyzers: Run all security analyzers (Week 4) 

58 symbolic: Run symbolic execution (Week 4) 

59 mev: Run MEV detection (Week 4) 

60 cross_contract: Run cross-contract analysis (Week 4) 

61 tvl: Protocol TVL for economic impact (Week 4) 

62 protocol_type: Protocol type (Week 4) 

63 output_format: Output format (Week 4) 

64 """ 

65 # Week 4: Smart contract unified scanner mode 

66 if all_analyzers or symbolic or mev or cross_contract or gas: 

67 _run_unified_scanner( 

68 target, all_analyzers, symbolic, mev, cross_contract, gas, 

69 tvl, protocol_type, output, output_format, verbose 

70 ) 

71 return 

72 

73 # NEW: Container scan mode 

74 if container: 

75 _run_container_scan(target, output) 

76 return 

77 

78 # NEW: Quick scan mode 

79 if quick: 

80 from .quick_scanner import quick_scan 

81 _run_quick_scan(target) 

82 return 

83 

84 # Check if target is local or remote first 

85 target_path = Path(target) 

86 is_local = target_path.exists() 

87 

88 # Only require auth for remote scans 

89 if not is_local and not is_authenticated(): 

90 console.print(Panel( 

91 "[bold red]🔒 Authentication Required[/bold red]\n\n" 

92 "To use Alprina CLI, you need to authenticate first.\n\n" 

93 "[bold cyan]Quick Start:[/bold cyan]\n" 

94 " 1. Run: [bold]alprina auth login[/bold]\n" 

95 " 2. Browser opens → Sign in with GitHub\n" 

96 " 3. Authorize device → Done!\n\n" 

97 "[dim]Don't have an account? Visit:[/dim]\n" 

98 "[bold cyan]https://www.alprina.com[/bold cyan]\n\n" 

99 "[yellow]💡 Tip:[/yellow] Local scans work without authentication!", 

100 title="Welcome to Alprina CLI", 

101 border_style="red" 

102 )) 

103 return 

104 

105 # Show warning if not authenticated for local scan 

106 if is_local and not is_authenticated(): 

107 console.print("[yellow]⚠️ Running in offline mode (not authenticated)[/yellow]") 

108 

109 console.print(Panel( 

110 f"🔍 Starting scan on: [bold]{target}[/bold]\n" 

111 f"Profile: [cyan]{profile}[/cyan]\n" 

112 f"Mode: {'[green]Safe only[/green]' if safe_only else '[yellow]Full scan[/yellow]'}", 

113 title="Alprina Security Scan" 

114 )) 

115 

116 scan_id = None 

117 try: 

118 # Create scan entry in database (if authenticated) 

119 if is_authenticated(): 

120 scan_id = _create_scan_entry(target, "local" if is_local else "remote", profile) 

121 if scan_id: 

122 console.print(f"[dim]Scan ID: {scan_id}[/dim]") 

123 

124 # Execute scan with specific agents if requested 

125 if agent: 

126 console.print(f"[cyan]→[/cyan] Using specific agents: {', '.join(agent)}") 

127 results = _scan_with_agents(target, agent, verbose) 

128 elif is_local: 

129 console.print(f"[cyan]→[/cyan] Detected local target: {target}") 

130 results = _scan_local(target, profile, safe_only) 

131 else: 

132 console.print(f"[cyan]→[/cyan] Detected remote target: {target}") 

133 validate_target(target) # Check against policy 

134 results = _scan_remote(target, profile, safe_only) 

135 

136 # Enrich findings with CVE/CWE/CVSS data 

137 if results.get("findings"): 

138 console.print("[dim]→ Enriching findings with CVE/CWE/CVSS data...[/dim]") 

139 results["findings"] = enrich_findings(results["findings"]) 

140 

141 # Save results to database (if authenticated and scan was created) 

142 if is_authenticated() and scan_id: 

143 _save_scan_results(scan_id, results) 

144 console.print(f"[dim]✓ Scan saved to your account[/dim]") 

145 

146 # Log the scan event locally 

147 write_event({ 

148 "type": "scan", 

149 "target": target, 

150 "profile": profile, 

151 "mode": "local" if is_local else "remote", 

152 "safe_only": safe_only, 

153 "findings_count": len(results.get("findings", [])) 

154 }) 

155 

156 # Display results 

157 _display_results(results) 

158 

159 # Generate markdown security reports in .alprina/ folder 

160 if is_local and results.get("findings", []): 

161 try: 

162 report_dir = generate_security_reports(results, target) 

163 console.print(f"\n[green]✓[/green] Security reports generated in: [cyan]{report_dir}[/cyan]") 

164 console.print("[dim]Files created:[/dim]") 

165 console.print("[dim] • SECURITY-REPORT.md - Full vulnerability analysis[/dim]") 

166 console.print("[dim] • FINDINGS.md - Detailed findings with code context[/dim]") 

167 console.print("[dim] • REMEDIATION.md - Step-by-step fix instructions[/dim]") 

168 console.print("[dim] • EXECUTIVE-SUMMARY.md - Non-technical overview[/dim]") 

169 except Exception as report_error: 

170 console.print(f"[yellow]⚠️ Could not generate reports: {report_error}[/yellow]") 

171 

172 if output: 

173 _save_results(results, output) 

174 

175 except Exception as e: 

176 console.print(f"[red]Scan failed: {e}[/red]") 

177 

178 

179def _scan_local(target: str, profile: str, safe_only: bool) -> dict: 

180 """Execute local file/directory scan.""" 

181 with Progress( 

182 SpinnerColumn(), 

183 TextColumn("[progress.description]{task.description}"), 

184 console=console 

185 ) as progress: 

186 task = progress.add_task("Scanning local files...", total=None) 

187 

188 results = run_local_scan(target, profile, safe_only) 

189 

190 progress.update(task, completed=True) 

191 

192 return results 

193 

194 

195def _scan_remote(target: str, profile: str, safe_only: bool) -> dict: 

196 """Execute remote target scan.""" 

197 with Progress( 

198 SpinnerColumn(), 

199 TextColumn("[progress.description]{task.description}"), 

200 console=console 

201 ) as progress: 

202 task = progress.add_task("Scanning remote target...", total=None) 

203 

204 results = run_remote_scan(target, profile, safe_only) 

205 

206 progress.update(task, completed=True) 

207 

208 return results 

209 

210 

211def _run_quick_scan(target: str): 

212 """Execute quick security scan.""" 

213 from .quick_scanner import quick_scan 

214 

215 console.print(Panel( 

216 f"⚡ Quick Health Check on: [bold]{target}[/bold]\n" 

217 f"Scanning for top 10 critical patterns...\n" 

218 f"[dim]This takes ~5 seconds[/dim]", 

219 title="Alprina Quick Scan", 

220 style="cyan" 

221 )) 

222 

223 with Progress( 

224 SpinnerColumn(), 

225 TextColumn("[progress.description]{task.description}"), 

226 console=console 

227 ) as progress: 

228 task = progress.add_task("Scanning files...", total=None) 

229 results = quick_scan(target) 

230 progress.update(task, completed=True) 

231 

232 _display_quick_results(results) 

233 

234 

235def _display_quick_results(results: dict): 

236 """Display quick scan results.""" 

237 duration = results['duration_ms'] / 1000 

238 

239 console.print(f"\n⚡ Quick scan completed in [bold cyan]{duration:.1f}s[/bold cyan]") 

240 console.print(f" Scanned [bold]{results['summary']['total_files_scanned']}[/bold] files") 

241 

242 critical = results['summary']['critical'] 

243 

244 if critical == 0: 

245 console.print("\n✅ [bold green]No critical issues found![/bold green]") 

246 console.print("\n💡 [dim]Run full scan for comprehensive analysis:[/dim]") 

247 console.print(" [bold cyan]alprina scan ./ [/bold cyan]") 

248 else: 

249 console.print(f"\n🚨 [bold red]Found {critical} critical issue{'s' if critical != 1 else ''}[/bold red]") 

250 

251 # Show first 5 findings 

252 table = Table(show_header=True, header_style="bold magenta") 

253 table.add_column("Issue", style="red", width=30) 

254 table.add_column("File", style="cyan", width=25) 

255 table.add_column("Line", justify="right", style="yellow", width=6) 

256 

257 for finding in results['findings'][:5]: 

258 file_name = Path(finding['file']).name 

259 table.add_row( 

260 finding['title'], 

261 file_name, 

262 str(finding['line']) 

263 ) 

264 

265 console.print(table) 

266 

267 if len(results['findings']) > 5: 

268 console.print(f"\n[dim]+ {len(results['findings']) - 5} more issues...[/dim]") 

269 

270 console.print("\n⚠️ [yellow]Quick scan only checks critical patterns[/yellow]") 

271 console.print(" Run full scan to find all vulnerabilities:") 

272 console.print(" [bold cyan]alprina scan ./[/bold cyan]") 

273 

274 

275def _display_results(results: dict): 

276 """Display scan results in a formatted table with CVE/CWE/CVSS data.""" 

277 findings = results.get("findings", []) 

278 

279 if not findings: 

280 console.print("\n[green]✓ No security issues found![/green]") 

281 return 

282 

283 console.print(f"\n[yellow]⚠ Found {len(findings)} issues[/yellow]\n") 

284 

285 table = Table(title="Security Findings", show_header=True, header_style="bold cyan") 

286 table.add_column("Severity", style="bold", width=10) 

287 table.add_column("Type", width=25) 

288 table.add_column("CVSS", justify="right", width=6) 

289 table.add_column("CWE", width=12) 

290 table.add_column("Description", width=40) 

291 table.add_column("Location", width=25) 

292 

293 severity_colors = { 

294 "CRITICAL": "bold red", 

295 "HIGH": "red", 

296 "MEDIUM": "yellow", 

297 "LOW": "blue", 

298 "INFO": "dim" 

299 } 

300 

301 for finding in findings: 

302 severity = finding.get("severity", "INFO") 

303 color = severity_colors.get(severity, "white") 

304 

305 # Get CVSS score 

306 cvss = finding.get("cvss_score") 

307 cvss_str = f"{cvss:.1f}" if cvss else "N/A" 

308 

309 # Get CWE 

310 cwe = finding.get("cwe", "") 

311 cwe_num = cwe.split("-")[1] if cwe and "-" in cwe else "" 

312 

313 table.add_row( 

314 f"[{color}]{severity}[/{color}]", 

315 finding.get("type", "Unknown"), 

316 f"[{color}]{cvss_str}[/{color}]", 

317 f"[cyan]{cwe_num}[/cyan]" if cwe_num else "[dim]N/A[/dim]", 

318 finding.get("description", "N/A"), 

319 finding.get("location", "N/A") 

320 ) 

321 

322 console.print(table) 

323 

324 # Show enhanced details for top 3 findings 

325 console.print("\n[bold cyan]📋 Detailed Analysis (Top 3)[/bold cyan]\n") 

326 

327 for i, finding in enumerate(findings[:3], 1): 

328 severity = finding.get("severity", "INFO") 

329 color = severity_colors.get(severity, "white") 

330 

331 console.print(f"[bold]{i}. [{color}]{severity}[/{color}]: {finding.get('type', 'Issue')}[/bold]") 

332 console.print(f" 📍 {finding.get('location', 'N/A')}") 

333 

334 if finding.get("cvss_score"): 

335 console.print(f" 📊 CVSS: {finding['cvss_score']:.1f}/10.0 ({finding.get('cvss_severity', 'N/A')})") 

336 

337 if finding.get("cwe"): 

338 cwe_name = finding.get("cwe_name", finding["cwe"]) 

339 console.print(f" 🔖 {finding['cwe']}: {cwe_name}") 

340 

341 if finding.get("owasp"): 

342 console.print(f" ⚡ OWASP: {finding['owasp']}") 

343 

344 if finding.get("references"): 

345 console.print(" 🔗 References:") 

346 for ref in finding["references"][:3]: 

347 console.print(f"{ref['name']}: [link={ref['url']}]{ref['url']}[/link]") 

348 

349 console.print() 

350 

351 

352def _save_results(results: dict, output: Path): 

353 """Save scan results to file.""" 

354 import json 

355 

356 output.parent.mkdir(parents=True, exist_ok=True) 

357 

358 with open(output, "w") as f: 

359 json.dump(results, f, indent=2) 

360 

361 console.print(f"\n[green]✓[/green] Results saved to: {output}") 

362 

363 

364def recon_command(target: str, passive: bool = True): 

365 """ 

366 Perform reconnaissance on a target. 

367 """ 

368 if not is_authenticated(): 

369 console.print("[red]Please login first: alprina auth login[/red]") 

370 return 

371 

372 console.print(Panel( 

373 f"🕵️ Reconnaissance: [bold]{target}[/bold]\n" 

374 f"Mode: {'[green]Passive[/green]' if passive else '[yellow]Active[/yellow]'}", 

375 title="Alprina Recon" 

376 )) 

377 

378 try: 

379 validate_target(target) 

380 

381 with Progress( 

382 SpinnerColumn(), 

383 TextColumn("[progress.description]{task.description}"), 

384 console=console 

385 ) as progress: 

386 task = progress.add_task("Gathering intelligence...", total=None) 

387 

388 # Use Alprina security agent for reconnaissance 

389 from .security_engine import run_agent 

390 

391 results = run_agent( 

392 task="web-recon", 

393 input_data=target, 

394 metadata={"passive": passive} 

395 ) 

396 

397 progress.update(task, completed=True) 

398 

399 # Log event 

400 write_event({ 

401 "type": "recon", 

402 "target": target, 

403 "passive": passive, 

404 "findings_count": len(results.get("findings", [])) 

405 }) 

406 

407 console.print("\n[green]✓ Reconnaissance complete[/green]") 

408 _display_results(results) 

409 

410 except Exception as e: 

411 console.print(f"[red]Recon failed: {e}[/red]") 

412 

413 

414def _create_scan_entry(target: str, scan_type: str, profile: str) -> Optional[str]: 

415 """Create a scan entry in the database before execution.""" 

416 try: 

417 headers = get_auth_headers() 

418 backend_url = get_backend_url() 

419 

420 response = httpx.post( 

421 f"{backend_url}/scans", 

422 headers=headers, 

423 json={ 

424 "target": target, 

425 "scan_type": scan_type, 

426 "profile": profile 

427 }, 

428 timeout=10.0 

429 ) 

430 

431 if response.status_code == 201: 

432 data = response.json() 

433 return data.get("scan_id") 

434 else: 

435 console.print(f"[yellow]⚠️ Could not create scan entry: {response.status_code}[/yellow]") 

436 return None 

437 

438 except Exception as e: 

439 console.print(f"[yellow]⚠️ Could not create scan entry: {e}[/yellow]") 

440 return None 

441 

442 

443def _save_scan_results(scan_id: str, results: dict): 

444 """Save scan results to the database after completion.""" 

445 try: 

446 headers = get_auth_headers() 

447 backend_url = get_backend_url() 

448 

449 response = httpx.patch( 

450 f"{backend_url}/scans/{scan_id}", 

451 headers=headers, 

452 json={"results": results}, 

453 timeout=30.0 

454 ) 

455 

456 if response.status_code != 200: 

457 console.print(f"[yellow]⚠️ Could not save scan results: {response.status_code}[/yellow]") 

458 

459 except Exception as e: 

460 console.print(f"[yellow]⚠️ Could not save scan results: {e}[/yellow]") 

461 

462 

463def _scan_with_agents(target: str, agents: list[str], verbose: bool = False) -> dict: 

464 """Execute scan with specific agents.""" 

465 from .utils.agent_loader import get_local_agent 

466 

467 console.print(Panel( 

468 f"🔧 Agent-Specific Security Scan\n\n" 

469 f"Target: [bold]{target}[/bold]\n" 

470 f"Agents: [cyan]{', '.join(agents)}[/cyan]", 

471 title="Alprina Agent Scan", 

472 style="cyan" 

473 )) 

474 

475 all_results = [] 

476 

477 for agent_name in agents: 

478 try: 

479 # Get the agent instance 

480 agent = get_local_agent(agent_name) 

481 if not agent: 

482 console.print(f"[yellow]⚠️ Agent '{agent_name}' not available, skipping...[/yellow]") 

483 continue 

484 

485 console.print(f"[cyan]→[/cyan] Running {agent_name}...") 

486 

487 # Run the agent 

488 result = agent.analyze(target) 

489 all_results.append(result) 

490 

491 if verbose: 

492 console.print(f"[green]✓[/green] {agent_name} completed: {len(result.get('vulnerabilities', []))} findings") 

493 

494 except Exception as e: 

495 console.print(f"[red]✗[/red] {agent_name} failed: {e}") 

496 

497 # Combine results from all agents 

498 combined_results = { 

499 "mode": "agent-specific", 

500 "target": target, 

501 "agents_used": agents, 

502 "findings": [], 

503 "agent_results": all_results 

504 } 

505 

506 # Aggregate findings from all agents 

507 for result in all_results: 

508 if result.get('status') == 'success': 

509 combined_results["findings"].extend(result.get('vulnerabilities', [])) 

510 

511 return combined_results 

512 

513 

514def _run_container_scan(image: str, output: Optional[Path] = None): 

515 """Execute container security scan with Trivy.""" 

516 console.print(Panel( 

517 f"🐳 Container Security Scan\n\n" 

518 f"Image: [bold]{image}[/bold]\n" 

519 f"Scanner: [cyan]Trivy (Aqua Security)[/cyan]", 

520 title="Alprina Container Scan", 

521 style="cyan" 

522 )) 

523 

524 scanner = get_container_scanner() 

525 

526 with Progress( 

527 SpinnerColumn(), 

528 TextColumn("[progress.description]{task.description}"), 

529 console=console 

530 ) as progress: 

531 task = progress.add_task("Scanning container image...", total=None) 

532 

533 # Scan the image 

534 results = scanner.scan_image(image) 

535 

536 progress.update(task, completed=True) 

537 

538 if not results["success"]: 

539 console.print(f"[red]✗ Container scan failed: {results.get('error')}[/red]") 

540 

541 if "not installed" in results.get("error", ""): 

542 console.print("\n[yellow]📦 Installation Required:[/yellow]") 

543 console.print(f" {results.get('install_command', '')}") 

544 console.print(f"\nDocumentation: {results.get('install_url', '')}") 

545 

546 return 

547 

548 console.print("[green]✓ Container scan complete![/green]\n") 

549 

550 # Display summary 

551 _display_container_results(results) 

552 

553 # Save results if output specified 

554 if output: 

555 import json 

556 output.parent.mkdir(parents=True, exist_ok=True) 

557 with open(output, 'w') as f: 

558 json.dump(results, f, indent=2) 

559 console.print(f"\n[green]✓[/green] Results saved to: {output}") 

560 

561 

562def _display_container_results(results: dict): 

563 """Display container scan results.""" 

564 summary = results.get("summary", {}) 

565 image = results.get("image", "unknown") 

566 

567 # Summary table 

568 table = Table(title=f"Scan Results: {image}", show_header=False, box=None) 

569 

570 table.add_row("📦 Image:", f"[bold]{image}[/bold]") 

571 table.add_row("🔍 Vulnerabilities:", f"[bold]{summary.get('total_vulnerabilities', 0)}[/bold]") 

572 

573 # Severity breakdown 

574 by_severity = summary.get("by_severity", {}) 

575 critical = by_severity.get("CRITICAL", 0) 

576 high = by_severity.get("HIGH", 0) 

577 medium = by_severity.get("MEDIUM", 0) 

578 low = by_severity.get("LOW", 0) 

579 

580 if critical > 0: 

581 table.add_row(" 🔴 Critical:", f"[red bold]{critical}[/red bold]") 

582 if high > 0: 

583 table.add_row(" 🟠 High:", f"[red]{high}[/red]") 

584 if medium > 0: 

585 table.add_row(" 🟡 Medium:", f"[yellow]{medium}[/yellow]") 

586 if low > 0: 

587 table.add_row(" 🔵 Low:", f"[blue]{low}[/blue]") 

588 

589 if summary.get("secrets_found", 0) > 0: 

590 table.add_row("🔐 Secrets Found:", f"[red bold]{summary['secrets_found']}[/red bold]") 

591 

592 if summary.get("packages_scanned", 0) > 0: 

593 table.add_row("📦 Packages Scanned:", str(summary["packages_scanned"])) 

594 

595 if summary.get("layers", 0) > 0: 

596 table.add_row("🗂️ Image Layers:", str(summary["layers"])) 

597 

598 console.print(table) 

599 

600 # Recommendations 

601 recommendations = results.get("recommendations", []) 

602 if recommendations: 

603 console.print("\n[bold cyan]💡 Recommendations:[/bold cyan]") 

604 for rec in recommendations: 

605 console.print(f" {rec}") 

606 

607 # Risk assessment 

608 if critical > 0 or high > 0: 

609 console.print("\n[bold red]⚠️ HIGH RISK[/bold red]") 

610 console.print("This image has critical security issues. Update immediately.") 

611 elif medium > 0: 

612 console.print("\n[bold yellow]⚠️ MEDIUM RISK[/bold yellow]") 

613 console.print("Plan security updates within 1-2 weeks.") 

614 else: 

615 console.print("\n[bold green]✅ LOW RISK[/bold green]") 

616 console.print("No critical issues found. Continue monitoring.") 

617 

618 

619def _run_unified_scanner( 

620 target: str, 

621 all_analyzers: bool, 

622 symbolic: bool, 

623 mev: bool, 

624 cross_contract: bool, 

625 gas: bool, 

626 tvl: Optional[float], 

627 protocol_type: Optional[str], 

628 output: Optional[Path], 

629 output_format: str, 

630 verbose: bool 

631): 

632 """ 

633 Run unified scanner for smart contract security analysis (Week 4) 

634 

635 Args: 

636 target: Path to Solidity contract file or directory 

637 all_analyzers: Run all analyzers 

638 symbolic: Run symbolic execution 

639 mev: Run MEV detection 

640 cross_contract: Run cross-contract analysis 

641 gas: Run gas optimization analysis 

642 tvl: Protocol TVL for economic impact 

643 protocol_type: Protocol type (dex, lending, etc.) 

644 output: Output file path 

645 output_format: Output format (json, markdown, html, text) 

646 verbose: Show detailed output 

647 """ 

648 from .unified_scanner import UnifiedScanner, ScanOptions 

649 

650 target_path = Path(target) 

651 

652 if not target_path.exists(): 

653 console.print(f"[bold red]Error:[/bold red] Target not found: {target}") 

654 return 

655 

656 # Determine if single file or directory 

657 if target_path.is_file(): 

658 if not target_path.suffix == '.sol': 

659 console.print(f"[bold yellow]Warning:[/bold yellow] Target is not a Solidity file (.sol)") 

660 console.print("Unified scanner is optimized for smart contract analysis.") 

661 return 

662 

663 # Single file scan 

664 contract_code = target_path.read_text() 

665 file_name = target_path.name 

666 

667 # Create scan options 

668 options = ScanOptions( 

669 run_all=all_analyzers, 

670 symbolic=symbolic, 

671 mev=mev, 

672 cross_contract=False, # Single file can't do cross-contract 

673 gas_optimization=gas, 

674 calculate_economic_impact=(tvl is not None), 

675 tvl=tvl, 

676 protocol_type=protocol_type, 

677 output_file=str(output) if output else None, 

678 output_format=output_format, 

679 verbose=verbose, 

680 parallel=True 

681 ) 

682 

683 # Run scan 

684 scanner = UnifiedScanner() 

685 

686 if verbose: 

687 console.print(f"\n🔍 [bold]Alprina Unified Security Scanner[/bold]") 

688 console.print(f"{'='*60}") 

689 console.print(f"Contract: [cyan]{file_name}[/cyan]") 

690 if tvl: 

691 console.print(f"Protocol: [cyan]{protocol_type or 'generic'}[/cyan] (TVL: ${tvl:,.0f})") 

692 console.print() 

693 

694 with Progress( 

695 SpinnerColumn(), 

696 TextColumn("[progress.description]{task.description}"), 

697 console=console 

698 ) as progress: 

699 task = progress.add_task("Running security analysis...", total=None) 

700 

701 report = scanner.scan(contract_code, str(target_path), options) 

702 

703 progress.update(task, completed=True) 

704 

705 # Display results 

706 _display_unified_results(report, verbose) 

707 

708 elif target_path.is_dir(): 

709 # Directory scan - find all .sol files 

710 sol_files = list(target_path.glob("**/*.sol")) 

711 

712 if not sol_files: 

713 console.print(f"[bold yellow]Warning:[/bold yellow] No Solidity files found in {target}") 

714 return 

715 

716 console.print(f"[cyan]→[/cyan] Found {len(sol_files)} Solidity files") 

717 

718 if cross_contract and len(sol_files) > 1: 

719 # Multi-contract analysis 

720 contracts = {} 

721 for sol_file in sol_files: 

722 contract_name = sol_file.stem 

723 contract_code = sol_file.read_text() 

724 contracts[contract_name] = contract_code 

725 

726 options = ScanOptions( 

727 run_all=all_analyzers, 

728 symbolic=symbolic, 

729 mev=mev, 

730 cross_contract=True, 

731 gas_optimization=gas, 

732 calculate_economic_impact=(tvl is not None), 

733 tvl=tvl, 

734 protocol_type=protocol_type, 

735 output_file=str(output) if output else None, 

736 output_format=output_format, 

737 verbose=verbose, 

738 parallel=True 

739 ) 

740 

741 scanner = UnifiedScanner() 

742 

743 if verbose: 

744 console.print(f"\n🔍 [bold]Alprina Unified Security Scanner[/bold]") 

745 console.print(f"{'='*60}") 

746 console.print(f"Contracts: [cyan]{len(contracts)}[/cyan]") 

747 console.print(f"Cross-contract analysis: [green]enabled[/green]") 

748 if tvl: 

749 console.print(f"Protocol: [cyan]{protocol_type or 'generic'}[/cyan] (TVL: ${tvl:,.0f})") 

750 console.print() 

751 

752 with Progress( 

753 SpinnerColumn(), 

754 TextColumn("[progress.description]{task.description}"), 

755 console=console 

756 ) as progress: 

757 task = progress.add_task("Running cross-contract analysis...", total=None) 

758 

759 report = scanner.scan_multi_contract(contracts, str(target_path), options) 

760 

761 progress.update(task, completed=True) 

762 

763 _display_unified_results(report, verbose) 

764 

765 else: 

766 # Scan each file individually 

767 console.print(f"[cyan]→[/cyan] Scanning {len(sol_files)} contracts individually...") 

768 

769 all_reports = [] 

770 

771 for sol_file in sol_files: 

772 contract_code = sol_file.read_text() 

773 

774 options = ScanOptions( 

775 run_all=all_analyzers, 

776 symbolic=symbolic, 

777 mev=mev, 

778 cross_contract=False, 

779 calculate_economic_impact=(tvl is not None), 

780 tvl=tvl, 

781 protocol_type=protocol_type, 

782 output_file=None, # Don't save individual reports 

783 output_format=output_format, 

784 verbose=False, 

785 parallel=True 

786 ) 

787 

788 scanner = UnifiedScanner() 

789 report = scanner.scan(contract_code, str(sol_file), options) 

790 all_reports.append(report) 

791 

792 if verbose: 

793 console.print(f"\n[cyan]{sol_file.name}:[/cyan] {report.total_vulnerabilities} findings") 

794 

795 # Aggregate results 

796 total_vulns = sum(r.total_vulnerabilities for r in all_reports) 

797 console.print(f"\n[bold]Total vulnerabilities across all contracts:[/bold] {total_vulns}") 

798 

799 # Display summary 

800 if total_vulns > 0 and verbose: 

801 console.print("\n[bold cyan]Top Vulnerabilities:[/bold cyan]") 

802 all_vulns = [] 

803 for report in all_reports: 

804 all_vulns.extend(report.vulnerabilities) 

805 

806 # Sort by severity and risk score 

807 all_vulns.sort(key=lambda v: ( 

808 {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}.get(v.severity, 999), 

809 -(v.risk_score or 0) 

810 )) 

811 

812 for i, vuln in enumerate(all_vulns[:10], 1): # Top 10 

813 icon = "🔴" if vuln.severity == "critical" else "🟠" if vuln.severity == "high" else "🟡" 

814 console.print(f"{i}. {icon} {vuln.title} ({vuln.file_path})") 

815 

816 else: 

817 console.print(f"[bold red]Error:[/bold red] Invalid target: {target}") 

818 

819 

820def _display_unified_results(report, verbose: bool): 

821 """Display results from unified scanner""" 

822 from rich.table import Table 

823 

824 console.print(f"\n{'='*60}") 

825 console.print(f"[bold]📊 Scan Results[/bold]") 

826 console.print(f"{'='*60}\n") 

827 

828 # Summary table 

829 summary_table = Table(show_header=False, box=None) 

830 summary_table.add_column("Metric", style="cyan") 

831 summary_table.add_column("Value", style="bold") 

832 

833 summary_table.add_row("Scan ID", report.scan_id) 

834 summary_table.add_row("Scan Time", f"{report.total_scan_time:.2f}s") 

835 summary_table.add_row("Total Vulnerabilities", str(report.total_vulnerabilities)) 

836 summary_table.add_row(" - Critical", str(report.vulnerabilities_by_severity['critical'])) 

837 summary_table.add_row(" - High", str(report.vulnerabilities_by_severity['high'])) 

838 summary_table.add_row(" - Medium", str(report.vulnerabilities_by_severity['medium'])) 

839 summary_table.add_row(" - Low", str(report.vulnerabilities_by_severity['low'])) 

840 

841 if report.total_max_loss > 0: 

842 summary_table.add_row("Estimated Max Loss", f"${report.total_max_loss:,.0f}") 

843 summary_table.add_row("Average Risk Score", f"{report.average_risk_score:.1f}/100") 

844 

845 console.print(summary_table) 

846 

847 # Vulnerabilities by analyzer 

848 if report.vulnerabilities_by_analyzer: 

849 console.print(f"\n[bold cyan]Vulnerabilities by Analyzer:[/bold cyan]") 

850 for analyzer, count in report.vulnerabilities_by_analyzer.items(): 

851 console.print(f"{analyzer}: {count}") 

852 

853 # List vulnerabilities 

854 if report.total_vulnerabilities > 0: 

855 console.print(f"\n[bold cyan]Vulnerabilities:[/bold cyan]\n") 

856 

857 for i, vuln in enumerate(report.vulnerabilities, 1): 

858 icon = "🔴" if vuln.severity == "critical" else "🟠" if vuln.severity == "high" else "🟡" if vuln.severity == "medium" else "⚪" 

859 

860 console.print(f"{i}. {icon} [bold]{vuln.title}[/bold] [{vuln.severity.upper()}]") 

861 console.print(f" File: {vuln.file_path}:{vuln.line_number or '?'}") 

862 console.print(f" Analyzer: {vuln.analyzer_type.value}") 

863 

864 if vuln.estimated_loss_max: 

865 console.print(f" Financial Impact: ${vuln.estimated_loss_min:,.0f} - ${vuln.estimated_loss_max:,.0f}") 

866 

867 if verbose: 

868 console.print(f" {vuln.description}") 

869 console.print(f" [dim]Recommendation: {vuln.recommendation}[/dim]") 

870 

871 console.print() 

872 

873 else: 

874 console.print("\n[bold green]✅ No vulnerabilities found![/bold green]\n") 

875 

876 # Performance metrics 

877 if verbose and report.analyzer_times: 

878 console.print(f"[bold cyan]Analyzer Performance:[/bold cyan]") 

879 for analyzer, time in report.analyzer_times.items(): 

880 console.print(f"{analyzer}: {time:.3f}s") 

881 

882 # Errors 

883 if report.errors: 

884 console.print(f"\n[bold yellow]⚠️ Warnings:[/bold yellow]") 

885 for error in report.errors: 

886 console.print(f"{error}") 

887 

888 # Output file notification 

889 if report.scan_options.output_file: 

890 console.print(f"\n[green]✓[/green] Report saved to: [cyan]{report.scan_options.output_file}[/cyan]") 

891 

892 console.print(f"\n{'='*60}\n")