Coverage for src/alprina_cli/scanner.py: 8%
409 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:15 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:15 +0100
1"""
2Scanner module for Alprina CLI.
3Handles remote and local security scanning using Alprina security agents.
4"""
6from pathlib import Path
7from typing import Optional
8import httpx
9import os
10from rich.console import Console
11from rich.progress import Progress, SpinnerColumn, TextColumn
12from rich.panel import Panel
13from rich.table import Table
15from .auth import is_authenticated, get_auth_headers, get_backend_url
16from .policy import validate_target
17from .security_engine import run_remote_scan, run_local_scan
18from .reporting import write_event
19from .report_generator import generate_security_reports
20from .services.cve_service import enrich_findings
21from .services.container_scanner import get_container_scanner
23console = Console()
26def scan_command(
27 target: str,
28 profile: str = "default",
29 safe_only: bool = True,
30 output: Optional[Path] = None,
31 quick: bool = False,
32 container: bool = False,
33 agent: Optional[list[str]] = None,
34 verbose: bool = False,
35 # Week 4: Unified scanner parameters
36 all_analyzers: bool = False,
37 symbolic: bool = False,
38 mev: bool = False,
39 cross_contract: bool = False,
40 gas: bool = False, # Week 4 Day 3
41 tvl: Optional[float] = None,
42 protocol_type: Optional[str] = None,
43 output_format: str = "json"
44):
45 """
46 Execute a security scan on a target (remote, local, or container).
48 Args:
49 target: Target to scan (URL, IP, local path, or Docker image)
50 profile: Scan profile to use
51 safe_only: Only run safe, non-intrusive scans
52 output: Output file path
53 quick: Run quick 5-second scan for critical issues only
54 container: Scan as Docker container image
55 agent: Specific agent(s) to use
56 verbose: Show detailed output
57 all_analyzers: Run all security analyzers (Week 4)
58 symbolic: Run symbolic execution (Week 4)
59 mev: Run MEV detection (Week 4)
60 cross_contract: Run cross-contract analysis (Week 4)
61 tvl: Protocol TVL for economic impact (Week 4)
62 protocol_type: Protocol type (Week 4)
63 output_format: Output format (Week 4)
64 """
65 # Week 4: Smart contract unified scanner mode
66 if all_analyzers or symbolic or mev or cross_contract or gas:
67 _run_unified_scanner(
68 target, all_analyzers, symbolic, mev, cross_contract, gas,
69 tvl, protocol_type, output, output_format, verbose
70 )
71 return
73 # NEW: Container scan mode
74 if container:
75 _run_container_scan(target, output)
76 return
78 # NEW: Quick scan mode
79 if quick:
80 from .quick_scanner import quick_scan
81 _run_quick_scan(target)
82 return
84 # Check if target is local or remote first
85 target_path = Path(target)
86 is_local = target_path.exists()
88 # Only require auth for remote scans
89 if not is_local and not is_authenticated():
90 console.print(Panel(
91 "[bold red]🔒 Authentication Required[/bold red]\n\n"
92 "To use Alprina CLI, you need to authenticate first.\n\n"
93 "[bold cyan]Quick Start:[/bold cyan]\n"
94 " 1. Run: [bold]alprina auth login[/bold]\n"
95 " 2. Browser opens → Sign in with GitHub\n"
96 " 3. Authorize device → Done!\n\n"
97 "[dim]Don't have an account? Visit:[/dim]\n"
98 "[bold cyan]https://www.alprina.com[/bold cyan]\n\n"
99 "[yellow]💡 Tip:[/yellow] Local scans work without authentication!",
100 title="Welcome to Alprina CLI",
101 border_style="red"
102 ))
103 return
105 # Show warning if not authenticated for local scan
106 if is_local and not is_authenticated():
107 console.print("[yellow]⚠️ Running in offline mode (not authenticated)[/yellow]")
109 console.print(Panel(
110 f"🔍 Starting scan on: [bold]{target}[/bold]\n"
111 f"Profile: [cyan]{profile}[/cyan]\n"
112 f"Mode: {'[green]Safe only[/green]' if safe_only else '[yellow]Full scan[/yellow]'}",
113 title="Alprina Security Scan"
114 ))
116 scan_id = None
117 try:
118 # Create scan entry in database (if authenticated)
119 if is_authenticated():
120 scan_id = _create_scan_entry(target, "local" if is_local else "remote", profile)
121 if scan_id:
122 console.print(f"[dim]Scan ID: {scan_id}[/dim]")
124 # Execute scan with specific agents if requested
125 if agent:
126 console.print(f"[cyan]→[/cyan] Using specific agents: {', '.join(agent)}")
127 results = _scan_with_agents(target, agent, verbose)
128 elif is_local:
129 console.print(f"[cyan]→[/cyan] Detected local target: {target}")
130 results = _scan_local(target, profile, safe_only)
131 else:
132 console.print(f"[cyan]→[/cyan] Detected remote target: {target}")
133 validate_target(target) # Check against policy
134 results = _scan_remote(target, profile, safe_only)
136 # Enrich findings with CVE/CWE/CVSS data
137 if results.get("findings"):
138 console.print("[dim]→ Enriching findings with CVE/CWE/CVSS data...[/dim]")
139 results["findings"] = enrich_findings(results["findings"])
141 # Save results to database (if authenticated and scan was created)
142 if is_authenticated() and scan_id:
143 _save_scan_results(scan_id, results)
144 console.print(f"[dim]✓ Scan saved to your account[/dim]")
146 # Log the scan event locally
147 write_event({
148 "type": "scan",
149 "target": target,
150 "profile": profile,
151 "mode": "local" if is_local else "remote",
152 "safe_only": safe_only,
153 "findings_count": len(results.get("findings", []))
154 })
156 # Display results
157 _display_results(results)
159 # Generate markdown security reports in .alprina/ folder
160 if is_local and results.get("findings", []):
161 try:
162 report_dir = generate_security_reports(results, target)
163 console.print(f"\n[green]✓[/green] Security reports generated in: [cyan]{report_dir}[/cyan]")
164 console.print("[dim]Files created:[/dim]")
165 console.print("[dim] • SECURITY-REPORT.md - Full vulnerability analysis[/dim]")
166 console.print("[dim] • FINDINGS.md - Detailed findings with code context[/dim]")
167 console.print("[dim] • REMEDIATION.md - Step-by-step fix instructions[/dim]")
168 console.print("[dim] • EXECUTIVE-SUMMARY.md - Non-technical overview[/dim]")
169 except Exception as report_error:
170 console.print(f"[yellow]⚠️ Could not generate reports: {report_error}[/yellow]")
172 if output:
173 _save_results(results, output)
175 except Exception as e:
176 console.print(f"[red]Scan failed: {e}[/red]")
179def _scan_local(target: str, profile: str, safe_only: bool) -> dict:
180 """Execute local file/directory scan."""
181 with Progress(
182 SpinnerColumn(),
183 TextColumn("[progress.description]{task.description}"),
184 console=console
185 ) as progress:
186 task = progress.add_task("Scanning local files...", total=None)
188 results = run_local_scan(target, profile, safe_only)
190 progress.update(task, completed=True)
192 return results
195def _scan_remote(target: str, profile: str, safe_only: bool) -> dict:
196 """Execute remote target scan."""
197 with Progress(
198 SpinnerColumn(),
199 TextColumn("[progress.description]{task.description}"),
200 console=console
201 ) as progress:
202 task = progress.add_task("Scanning remote target...", total=None)
204 results = run_remote_scan(target, profile, safe_only)
206 progress.update(task, completed=True)
208 return results
211def _run_quick_scan(target: str):
212 """Execute quick security scan."""
213 from .quick_scanner import quick_scan
215 console.print(Panel(
216 f"⚡ Quick Health Check on: [bold]{target}[/bold]\n"
217 f"Scanning for top 10 critical patterns...\n"
218 f"[dim]This takes ~5 seconds[/dim]",
219 title="Alprina Quick Scan",
220 style="cyan"
221 ))
223 with Progress(
224 SpinnerColumn(),
225 TextColumn("[progress.description]{task.description}"),
226 console=console
227 ) as progress:
228 task = progress.add_task("Scanning files...", total=None)
229 results = quick_scan(target)
230 progress.update(task, completed=True)
232 _display_quick_results(results)
235def _display_quick_results(results: dict):
236 """Display quick scan results."""
237 duration = results['duration_ms'] / 1000
239 console.print(f"\n⚡ Quick scan completed in [bold cyan]{duration:.1f}s[/bold cyan]")
240 console.print(f" Scanned [bold]{results['summary']['total_files_scanned']}[/bold] files")
242 critical = results['summary']['critical']
244 if critical == 0:
245 console.print("\n✅ [bold green]No critical issues found![/bold green]")
246 console.print("\n💡 [dim]Run full scan for comprehensive analysis:[/dim]")
247 console.print(" [bold cyan]alprina scan ./ [/bold cyan]")
248 else:
249 console.print(f"\n🚨 [bold red]Found {critical} critical issue{'s' if critical != 1 else ''}[/bold red]")
251 # Show first 5 findings
252 table = Table(show_header=True, header_style="bold magenta")
253 table.add_column("Issue", style="red", width=30)
254 table.add_column("File", style="cyan", width=25)
255 table.add_column("Line", justify="right", style="yellow", width=6)
257 for finding in results['findings'][:5]:
258 file_name = Path(finding['file']).name
259 table.add_row(
260 finding['title'],
261 file_name,
262 str(finding['line'])
263 )
265 console.print(table)
267 if len(results['findings']) > 5:
268 console.print(f"\n[dim]+ {len(results['findings']) - 5} more issues...[/dim]")
270 console.print("\n⚠️ [yellow]Quick scan only checks critical patterns[/yellow]")
271 console.print(" Run full scan to find all vulnerabilities:")
272 console.print(" [bold cyan]alprina scan ./[/bold cyan]")
275def _display_results(results: dict):
276 """Display scan results in a formatted table with CVE/CWE/CVSS data."""
277 findings = results.get("findings", [])
279 if not findings:
280 console.print("\n[green]✓ No security issues found![/green]")
281 return
283 console.print(f"\n[yellow]⚠ Found {len(findings)} issues[/yellow]\n")
285 table = Table(title="Security Findings", show_header=True, header_style="bold cyan")
286 table.add_column("Severity", style="bold", width=10)
287 table.add_column("Type", width=25)
288 table.add_column("CVSS", justify="right", width=6)
289 table.add_column("CWE", width=12)
290 table.add_column("Description", width=40)
291 table.add_column("Location", width=25)
293 severity_colors = {
294 "CRITICAL": "bold red",
295 "HIGH": "red",
296 "MEDIUM": "yellow",
297 "LOW": "blue",
298 "INFO": "dim"
299 }
301 for finding in findings:
302 severity = finding.get("severity", "INFO")
303 color = severity_colors.get(severity, "white")
305 # Get CVSS score
306 cvss = finding.get("cvss_score")
307 cvss_str = f"{cvss:.1f}" if cvss else "N/A"
309 # Get CWE
310 cwe = finding.get("cwe", "")
311 cwe_num = cwe.split("-")[1] if cwe and "-" in cwe else ""
313 table.add_row(
314 f"[{color}]{severity}[/{color}]",
315 finding.get("type", "Unknown"),
316 f"[{color}]{cvss_str}[/{color}]",
317 f"[cyan]{cwe_num}[/cyan]" if cwe_num else "[dim]N/A[/dim]",
318 finding.get("description", "N/A"),
319 finding.get("location", "N/A")
320 )
322 console.print(table)
324 # Show enhanced details for top 3 findings
325 console.print("\n[bold cyan]📋 Detailed Analysis (Top 3)[/bold cyan]\n")
327 for i, finding in enumerate(findings[:3], 1):
328 severity = finding.get("severity", "INFO")
329 color = severity_colors.get(severity, "white")
331 console.print(f"[bold]{i}. [{color}]{severity}[/{color}]: {finding.get('type', 'Issue')}[/bold]")
332 console.print(f" 📍 {finding.get('location', 'N/A')}")
334 if finding.get("cvss_score"):
335 console.print(f" 📊 CVSS: {finding['cvss_score']:.1f}/10.0 ({finding.get('cvss_severity', 'N/A')})")
337 if finding.get("cwe"):
338 cwe_name = finding.get("cwe_name", finding["cwe"])
339 console.print(f" 🔖 {finding['cwe']}: {cwe_name}")
341 if finding.get("owasp"):
342 console.print(f" ⚡ OWASP: {finding['owasp']}")
344 if finding.get("references"):
345 console.print(" 🔗 References:")
346 for ref in finding["references"][:3]:
347 console.print(f" • {ref['name']}: [link={ref['url']}]{ref['url']}[/link]")
349 console.print()
352def _save_results(results: dict, output: Path):
353 """Save scan results to file."""
354 import json
356 output.parent.mkdir(parents=True, exist_ok=True)
358 with open(output, "w") as f:
359 json.dump(results, f, indent=2)
361 console.print(f"\n[green]✓[/green] Results saved to: {output}")
364def recon_command(target: str, passive: bool = True):
365 """
366 Perform reconnaissance on a target.
367 """
368 if not is_authenticated():
369 console.print("[red]Please login first: alprina auth login[/red]")
370 return
372 console.print(Panel(
373 f"🕵️ Reconnaissance: [bold]{target}[/bold]\n"
374 f"Mode: {'[green]Passive[/green]' if passive else '[yellow]Active[/yellow]'}",
375 title="Alprina Recon"
376 ))
378 try:
379 validate_target(target)
381 with Progress(
382 SpinnerColumn(),
383 TextColumn("[progress.description]{task.description}"),
384 console=console
385 ) as progress:
386 task = progress.add_task("Gathering intelligence...", total=None)
388 # Use Alprina security agent for reconnaissance
389 from .security_engine import run_agent
391 results = run_agent(
392 task="web-recon",
393 input_data=target,
394 metadata={"passive": passive}
395 )
397 progress.update(task, completed=True)
399 # Log event
400 write_event({
401 "type": "recon",
402 "target": target,
403 "passive": passive,
404 "findings_count": len(results.get("findings", []))
405 })
407 console.print("\n[green]✓ Reconnaissance complete[/green]")
408 _display_results(results)
410 except Exception as e:
411 console.print(f"[red]Recon failed: {e}[/red]")
414def _create_scan_entry(target: str, scan_type: str, profile: str) -> Optional[str]:
415 """Create a scan entry in the database before execution."""
416 try:
417 headers = get_auth_headers()
418 backend_url = get_backend_url()
420 response = httpx.post(
421 f"{backend_url}/scans",
422 headers=headers,
423 json={
424 "target": target,
425 "scan_type": scan_type,
426 "profile": profile
427 },
428 timeout=10.0
429 )
431 if response.status_code == 201:
432 data = response.json()
433 return data.get("scan_id")
434 else:
435 console.print(f"[yellow]⚠️ Could not create scan entry: {response.status_code}[/yellow]")
436 return None
438 except Exception as e:
439 console.print(f"[yellow]⚠️ Could not create scan entry: {e}[/yellow]")
440 return None
443def _save_scan_results(scan_id: str, results: dict):
444 """Save scan results to the database after completion."""
445 try:
446 headers = get_auth_headers()
447 backend_url = get_backend_url()
449 response = httpx.patch(
450 f"{backend_url}/scans/{scan_id}",
451 headers=headers,
452 json={"results": results},
453 timeout=30.0
454 )
456 if response.status_code != 200:
457 console.print(f"[yellow]⚠️ Could not save scan results: {response.status_code}[/yellow]")
459 except Exception as e:
460 console.print(f"[yellow]⚠️ Could not save scan results: {e}[/yellow]")
463def _scan_with_agents(target: str, agents: list[str], verbose: bool = False) -> dict:
464 """Execute scan with specific agents."""
465 from .utils.agent_loader import get_local_agent
467 console.print(Panel(
468 f"🔧 Agent-Specific Security Scan\n\n"
469 f"Target: [bold]{target}[/bold]\n"
470 f"Agents: [cyan]{', '.join(agents)}[/cyan]",
471 title="Alprina Agent Scan",
472 style="cyan"
473 ))
475 all_results = []
477 for agent_name in agents:
478 try:
479 # Get the agent instance
480 agent = get_local_agent(agent_name)
481 if not agent:
482 console.print(f"[yellow]⚠️ Agent '{agent_name}' not available, skipping...[/yellow]")
483 continue
485 console.print(f"[cyan]→[/cyan] Running {agent_name}...")
487 # Run the agent
488 result = agent.analyze(target)
489 all_results.append(result)
491 if verbose:
492 console.print(f"[green]✓[/green] {agent_name} completed: {len(result.get('vulnerabilities', []))} findings")
494 except Exception as e:
495 console.print(f"[red]✗[/red] {agent_name} failed: {e}")
497 # Combine results from all agents
498 combined_results = {
499 "mode": "agent-specific",
500 "target": target,
501 "agents_used": agents,
502 "findings": [],
503 "agent_results": all_results
504 }
506 # Aggregate findings from all agents
507 for result in all_results:
508 if result.get('status') == 'success':
509 combined_results["findings"].extend(result.get('vulnerabilities', []))
511 return combined_results
514def _run_container_scan(image: str, output: Optional[Path] = None):
515 """Execute container security scan with Trivy."""
516 console.print(Panel(
517 f"🐳 Container Security Scan\n\n"
518 f"Image: [bold]{image}[/bold]\n"
519 f"Scanner: [cyan]Trivy (Aqua Security)[/cyan]",
520 title="Alprina Container Scan",
521 style="cyan"
522 ))
524 scanner = get_container_scanner()
526 with Progress(
527 SpinnerColumn(),
528 TextColumn("[progress.description]{task.description}"),
529 console=console
530 ) as progress:
531 task = progress.add_task("Scanning container image...", total=None)
533 # Scan the image
534 results = scanner.scan_image(image)
536 progress.update(task, completed=True)
538 if not results["success"]:
539 console.print(f"[red]✗ Container scan failed: {results.get('error')}[/red]")
541 if "not installed" in results.get("error", ""):
542 console.print("\n[yellow]📦 Installation Required:[/yellow]")
543 console.print(f" {results.get('install_command', '')}")
544 console.print(f"\nDocumentation: {results.get('install_url', '')}")
546 return
548 console.print("[green]✓ Container scan complete![/green]\n")
550 # Display summary
551 _display_container_results(results)
553 # Save results if output specified
554 if output:
555 import json
556 output.parent.mkdir(parents=True, exist_ok=True)
557 with open(output, 'w') as f:
558 json.dump(results, f, indent=2)
559 console.print(f"\n[green]✓[/green] Results saved to: {output}")
562def _display_container_results(results: dict):
563 """Display container scan results."""
564 summary = results.get("summary", {})
565 image = results.get("image", "unknown")
567 # Summary table
568 table = Table(title=f"Scan Results: {image}", show_header=False, box=None)
570 table.add_row("📦 Image:", f"[bold]{image}[/bold]")
571 table.add_row("🔍 Vulnerabilities:", f"[bold]{summary.get('total_vulnerabilities', 0)}[/bold]")
573 # Severity breakdown
574 by_severity = summary.get("by_severity", {})
575 critical = by_severity.get("CRITICAL", 0)
576 high = by_severity.get("HIGH", 0)
577 medium = by_severity.get("MEDIUM", 0)
578 low = by_severity.get("LOW", 0)
580 if critical > 0:
581 table.add_row(" 🔴 Critical:", f"[red bold]{critical}[/red bold]")
582 if high > 0:
583 table.add_row(" 🟠 High:", f"[red]{high}[/red]")
584 if medium > 0:
585 table.add_row(" 🟡 Medium:", f"[yellow]{medium}[/yellow]")
586 if low > 0:
587 table.add_row(" 🔵 Low:", f"[blue]{low}[/blue]")
589 if summary.get("secrets_found", 0) > 0:
590 table.add_row("🔐 Secrets Found:", f"[red bold]{summary['secrets_found']}[/red bold]")
592 if summary.get("packages_scanned", 0) > 0:
593 table.add_row("📦 Packages Scanned:", str(summary["packages_scanned"]))
595 if summary.get("layers", 0) > 0:
596 table.add_row("🗂️ Image Layers:", str(summary["layers"]))
598 console.print(table)
600 # Recommendations
601 recommendations = results.get("recommendations", [])
602 if recommendations:
603 console.print("\n[bold cyan]💡 Recommendations:[/bold cyan]")
604 for rec in recommendations:
605 console.print(f" {rec}")
607 # Risk assessment
608 if critical > 0 or high > 0:
609 console.print("\n[bold red]⚠️ HIGH RISK[/bold red]")
610 console.print("This image has critical security issues. Update immediately.")
611 elif medium > 0:
612 console.print("\n[bold yellow]⚠️ MEDIUM RISK[/bold yellow]")
613 console.print("Plan security updates within 1-2 weeks.")
614 else:
615 console.print("\n[bold green]✅ LOW RISK[/bold green]")
616 console.print("No critical issues found. Continue monitoring.")
619def _run_unified_scanner(
620 target: str,
621 all_analyzers: bool,
622 symbolic: bool,
623 mev: bool,
624 cross_contract: bool,
625 gas: bool,
626 tvl: Optional[float],
627 protocol_type: Optional[str],
628 output: Optional[Path],
629 output_format: str,
630 verbose: bool
631):
632 """
633 Run unified scanner for smart contract security analysis (Week 4)
635 Args:
636 target: Path to Solidity contract file or directory
637 all_analyzers: Run all analyzers
638 symbolic: Run symbolic execution
639 mev: Run MEV detection
640 cross_contract: Run cross-contract analysis
641 gas: Run gas optimization analysis
642 tvl: Protocol TVL for economic impact
643 protocol_type: Protocol type (dex, lending, etc.)
644 output: Output file path
645 output_format: Output format (json, markdown, html, text)
646 verbose: Show detailed output
647 """
648 from .unified_scanner import UnifiedScanner, ScanOptions
650 target_path = Path(target)
652 if not target_path.exists():
653 console.print(f"[bold red]Error:[/bold red] Target not found: {target}")
654 return
656 # Determine if single file or directory
657 if target_path.is_file():
658 if not target_path.suffix == '.sol':
659 console.print(f"[bold yellow]Warning:[/bold yellow] Target is not a Solidity file (.sol)")
660 console.print("Unified scanner is optimized for smart contract analysis.")
661 return
663 # Single file scan
664 contract_code = target_path.read_text()
665 file_name = target_path.name
667 # Create scan options
668 options = ScanOptions(
669 run_all=all_analyzers,
670 symbolic=symbolic,
671 mev=mev,
672 cross_contract=False, # Single file can't do cross-contract
673 gas_optimization=gas,
674 calculate_economic_impact=(tvl is not None),
675 tvl=tvl,
676 protocol_type=protocol_type,
677 output_file=str(output) if output else None,
678 output_format=output_format,
679 verbose=verbose,
680 parallel=True
681 )
683 # Run scan
684 scanner = UnifiedScanner()
686 if verbose:
687 console.print(f"\n🔍 [bold]Alprina Unified Security Scanner[/bold]")
688 console.print(f"{'='*60}")
689 console.print(f"Contract: [cyan]{file_name}[/cyan]")
690 if tvl:
691 console.print(f"Protocol: [cyan]{protocol_type or 'generic'}[/cyan] (TVL: ${tvl:,.0f})")
692 console.print()
694 with Progress(
695 SpinnerColumn(),
696 TextColumn("[progress.description]{task.description}"),
697 console=console
698 ) as progress:
699 task = progress.add_task("Running security analysis...", total=None)
701 report = scanner.scan(contract_code, str(target_path), options)
703 progress.update(task, completed=True)
705 # Display results
706 _display_unified_results(report, verbose)
708 elif target_path.is_dir():
709 # Directory scan - find all .sol files
710 sol_files = list(target_path.glob("**/*.sol"))
712 if not sol_files:
713 console.print(f"[bold yellow]Warning:[/bold yellow] No Solidity files found in {target}")
714 return
716 console.print(f"[cyan]→[/cyan] Found {len(sol_files)} Solidity files")
718 if cross_contract and len(sol_files) > 1:
719 # Multi-contract analysis
720 contracts = {}
721 for sol_file in sol_files:
722 contract_name = sol_file.stem
723 contract_code = sol_file.read_text()
724 contracts[contract_name] = contract_code
726 options = ScanOptions(
727 run_all=all_analyzers,
728 symbolic=symbolic,
729 mev=mev,
730 cross_contract=True,
731 gas_optimization=gas,
732 calculate_economic_impact=(tvl is not None),
733 tvl=tvl,
734 protocol_type=protocol_type,
735 output_file=str(output) if output else None,
736 output_format=output_format,
737 verbose=verbose,
738 parallel=True
739 )
741 scanner = UnifiedScanner()
743 if verbose:
744 console.print(f"\n🔍 [bold]Alprina Unified Security Scanner[/bold]")
745 console.print(f"{'='*60}")
746 console.print(f"Contracts: [cyan]{len(contracts)}[/cyan]")
747 console.print(f"Cross-contract analysis: [green]enabled[/green]")
748 if tvl:
749 console.print(f"Protocol: [cyan]{protocol_type or 'generic'}[/cyan] (TVL: ${tvl:,.0f})")
750 console.print()
752 with Progress(
753 SpinnerColumn(),
754 TextColumn("[progress.description]{task.description}"),
755 console=console
756 ) as progress:
757 task = progress.add_task("Running cross-contract analysis...", total=None)
759 report = scanner.scan_multi_contract(contracts, str(target_path), options)
761 progress.update(task, completed=True)
763 _display_unified_results(report, verbose)
765 else:
766 # Scan each file individually
767 console.print(f"[cyan]→[/cyan] Scanning {len(sol_files)} contracts individually...")
769 all_reports = []
771 for sol_file in sol_files:
772 contract_code = sol_file.read_text()
774 options = ScanOptions(
775 run_all=all_analyzers,
776 symbolic=symbolic,
777 mev=mev,
778 cross_contract=False,
779 calculate_economic_impact=(tvl is not None),
780 tvl=tvl,
781 protocol_type=protocol_type,
782 output_file=None, # Don't save individual reports
783 output_format=output_format,
784 verbose=False,
785 parallel=True
786 )
788 scanner = UnifiedScanner()
789 report = scanner.scan(contract_code, str(sol_file), options)
790 all_reports.append(report)
792 if verbose:
793 console.print(f"\n[cyan]{sol_file.name}:[/cyan] {report.total_vulnerabilities} findings")
795 # Aggregate results
796 total_vulns = sum(r.total_vulnerabilities for r in all_reports)
797 console.print(f"\n[bold]Total vulnerabilities across all contracts:[/bold] {total_vulns}")
799 # Display summary
800 if total_vulns > 0 and verbose:
801 console.print("\n[bold cyan]Top Vulnerabilities:[/bold cyan]")
802 all_vulns = []
803 for report in all_reports:
804 all_vulns.extend(report.vulnerabilities)
806 # Sort by severity and risk score
807 all_vulns.sort(key=lambda v: (
808 {'critical': 0, 'high': 1, 'medium': 2, 'low': 3}.get(v.severity, 999),
809 -(v.risk_score or 0)
810 ))
812 for i, vuln in enumerate(all_vulns[:10], 1): # Top 10
813 icon = "🔴" if vuln.severity == "critical" else "🟠" if vuln.severity == "high" else "🟡"
814 console.print(f"{i}. {icon} {vuln.title} ({vuln.file_path})")
816 else:
817 console.print(f"[bold red]Error:[/bold red] Invalid target: {target}")
820def _display_unified_results(report, verbose: bool):
821 """Display results from unified scanner"""
822 from rich.table import Table
824 console.print(f"\n{'='*60}")
825 console.print(f"[bold]📊 Scan Results[/bold]")
826 console.print(f"{'='*60}\n")
828 # Summary table
829 summary_table = Table(show_header=False, box=None)
830 summary_table.add_column("Metric", style="cyan")
831 summary_table.add_column("Value", style="bold")
833 summary_table.add_row("Scan ID", report.scan_id)
834 summary_table.add_row("Scan Time", f"{report.total_scan_time:.2f}s")
835 summary_table.add_row("Total Vulnerabilities", str(report.total_vulnerabilities))
836 summary_table.add_row(" - Critical", str(report.vulnerabilities_by_severity['critical']))
837 summary_table.add_row(" - High", str(report.vulnerabilities_by_severity['high']))
838 summary_table.add_row(" - Medium", str(report.vulnerabilities_by_severity['medium']))
839 summary_table.add_row(" - Low", str(report.vulnerabilities_by_severity['low']))
841 if report.total_max_loss > 0:
842 summary_table.add_row("Estimated Max Loss", f"${report.total_max_loss:,.0f}")
843 summary_table.add_row("Average Risk Score", f"{report.average_risk_score:.1f}/100")
845 console.print(summary_table)
847 # Vulnerabilities by analyzer
848 if report.vulnerabilities_by_analyzer:
849 console.print(f"\n[bold cyan]Vulnerabilities by Analyzer:[/bold cyan]")
850 for analyzer, count in report.vulnerabilities_by_analyzer.items():
851 console.print(f" • {analyzer}: {count}")
853 # List vulnerabilities
854 if report.total_vulnerabilities > 0:
855 console.print(f"\n[bold cyan]Vulnerabilities:[/bold cyan]\n")
857 for i, vuln in enumerate(report.vulnerabilities, 1):
858 icon = "🔴" if vuln.severity == "critical" else "🟠" if vuln.severity == "high" else "🟡" if vuln.severity == "medium" else "⚪"
860 console.print(f"{i}. {icon} [bold]{vuln.title}[/bold] [{vuln.severity.upper()}]")
861 console.print(f" File: {vuln.file_path}:{vuln.line_number or '?'}")
862 console.print(f" Analyzer: {vuln.analyzer_type.value}")
864 if vuln.estimated_loss_max:
865 console.print(f" Financial Impact: ${vuln.estimated_loss_min:,.0f} - ${vuln.estimated_loss_max:,.0f}")
867 if verbose:
868 console.print(f" {vuln.description}")
869 console.print(f" [dim]Recommendation: {vuln.recommendation}[/dim]")
871 console.print()
873 else:
874 console.print("\n[bold green]✅ No vulnerabilities found![/bold green]\n")
876 # Performance metrics
877 if verbose and report.analyzer_times:
878 console.print(f"[bold cyan]Analyzer Performance:[/bold cyan]")
879 for analyzer, time in report.analyzer_times.items():
880 console.print(f" • {analyzer}: {time:.3f}s")
882 # Errors
883 if report.errors:
884 console.print(f"\n[bold yellow]⚠️ Warnings:[/bold yellow]")
885 for error in report.errors:
886 console.print(f" • {error}")
888 # Output file notification
889 if report.scan_options.output_file:
890 console.print(f"\n[green]✓[/green] Report saved to: [cyan]{report.scan_options.output_file}[/cyan]")
892 console.print(f"\n{'='*60}\n")