Coverage for src/alprina_cli/agents/web3_auditor/solidity_analyzer.py: 11%
428 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:15 +0100
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-13 11:15 +0100
1"""
2Solidity Smart Contract Static Analyzer
4Inspired by Slither but enhanced for startup Web3 security needs.
5Focuses on OWASP Smart Contract Top 10 detection with economic context.
6"""
8import re
9import ast
10from pathlib import Path
11from typing import List, Dict, Any, Optional, Tuple
12from dataclasses import dataclass
13from enum import Enum
15class VulnerabilityType(Enum):
16 REENTRANCY = "reentrancy"
17 ACCESS_CONTROL = "access_control"
18 INTEGER_OVERFLOW_UNDERFLOW = "integer_overflow"
19 UNCHECKED_LOW_LEVEL_CALL = "unchecked_call"
20 LOGIC_ERROR = "logic_error"
21 TIMESTAMP_DEPENDENCE = "timestamp_dependence"
22 UNINITIALIZED_STORAGE = "uninitialized_storage"
23 ORACLE_MANIPULATION = "oracle_manipulation"
24 GAS_LIMIT_ISSUE = "gas_limit"
25 DENIAL_OF_SERVICE = "denial_of_service"
27@dataclass
28class SolidityVulnerability:
29 """Represents a smart contract vulnerability"""
30 vulnerability_type: VulnerabilityType
31 severity: str # "critical", "high", "medium", "low"
32 title: str
33 description: str
34 file_path: str
35 line_number: Optional[int]
36 function_name: Optional[str]
37 contract_name: str
38 code_snippet: Optional[str] = None
39 remediation: Optional[str] = None
40 confidence: int = 100 # 0-100
42class SolidityStaticAnalyzer:
43 """
44 Enhanced Solidity analyzer focused on Web3 startup security needs
45 """
47 def __init__(self):
48 self.vulnerability_patterns = self._initialize_patterns()
49 self.contract_structure = None
50 self.functions = []
51 self.state_variables = []
53 def analyze_contract(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
54 """
55 Comprehensive smart contract vulnerability analysis
57 Args:
58 contract_code: Solidity source code
59 file_path: Path to the contract file
61 Returns:
62 List of detected vulnerabilities
63 """
64 vulnerabilities = []
66 try:
67 # Parse contract structure
68 self._parse_contract_structure(contract_code)
70 # Run comprehensive vulnerability detection
71 reentrancy_vulns = self._detect_reentrancy_vulnerabilities(contract_code, file_path)
72 access_control_vulns = self._detect_access_control_vulnerabilities(contract_code, file_path)
73 overflow_vulns = self._detect_integer_vulnerabilities(contract_code, file_path)
74 call_vulns = self._detect_unchecked_calls(contract_code, file_path)
75 logic_vulns = self._detect_logic_errors(contract_code, file_path)
76 oracle_vulns = self._detect_oracle_manipulation(contract_code, file_path)
77 input_vulns = self._detect_input_validation_issues(contract_code, file_path)
79 vulnerabilities.extend(reentrancy_vulns)
80 vulnerabilities.extend(access_control_vulns)
81 vulnerabilities.extend(overflow_vulns)
82 vulnerabilities.extend(call_vulns)
83 vulnerabilities.extend(logic_vulns)
84 vulnerabilities.extend(oracle_vulns)
85 vulnerabilities.extend(input_vulns)
87 except Exception as e:
88 # Add parsing error as info-level vulnerability
89 vulnerabilities.append(SolidityVulnerability(
90 vulnerability_type=VulnerabilityType.LOGIC_ERROR,
91 severity="low",
92 title="Analysis Error",
93 description=f"Could not fully analyze contract: {str(e)}",
94 file_path=file_path,
95 line_number=None,
96 contract_name="unknown",
97 confidence=20
98 ))
100 return vulnerabilities
102 def _parse_contract_structure(self, contract_code: str):
103 """Parse contract structure to identify functions and state variables"""
104 lines = contract_code.split('\n')
106 # Find contracts
107 self.contract_structure = {
108 'contracts': [],
109 'functions': [],
110 'state_variables': []
111 }
113 current_contract = None
115 for i, line in enumerate(lines):
116 line = line.strip()
118 # Skip comments and empty lines
119 if not line or line.startswith('//') or line.startswith('/*'):
120 continue
122 # Find contract definitions
123 if line.startswith('contract ') or line.startswith('abstract contract '):
124 contract_match = re.search(r'(abstract )?contract\s+(\w+)', line)
125 if contract_match:
126 current_contract = contract_match.group(2)
127 self.contract_structure['contracts'].append({
128 'name': current_contract,
129 'line': i + 1
130 })
131 continue
133 # Find function definitions
134 if current_contract and ('function ' in line or 'modifier ' in line):
135 func_match = re.search(r'(function|modifier)\s+(\w+)', line)
136 if func_match:
137 func_type = func_match.group(1)
138 func_name = func_match.group(2)
139 function_info = {
140 'name': func_name,
141 'type': func_type,
142 'contract': current_contract,
143 'line': i + 1,
144 'visibility': 'internal' # Default
145 }
147 # Check visibility modifiers
148 if 'public' in line:
149 function_info['visibility'] = 'public'
150 elif 'external' in line:
151 function_info['visibility'] = 'external'
152 elif 'internal' in line:
153 function_info['visibility'] = 'internal'
154 elif 'private' in line:
155 function_info['visibility'] = 'private'
157 # Check for payable
158 if 'payable' in line:
159 function_info['payable'] = True
161 self.contract_structure['functions'].append(function_info)
162 continue
164 # Find state variables
165 if current_contract and ('uint256 ' in line or 'address ' in line or 'mapping(' in line):
166 # Extract variable name
167 var_match = re.search(r'(uint256|address|mapping)\s+(?:\(.*?\))?\s*(\w+)', line)
168 if var_match:
169 var_type = var_match.group(1)
170 var_name = var_match.group(2)
171 self.contract_structure['state_variables'].append({
172 'name': var_name,
173 'type': var_type,
174 'contract': current_contract,
175 'line': i + 1
176 })
178 def _detect_reentrancy_vulnerabilities(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
179 """Detect reentrancy attack vulnerabilities"""
180 vulnerabilities = []
181 lines = contract_code.split('\n')
183 for i, line in enumerate(lines):
184 line_content = line.strip()
185 if not line_content or line_content.startswith('//'):
186 continue
188 # Pattern 1: Call to external address before state change
189 if ('.call(' in line_content or '.transfer(' in line_content or '.send(' in line_content):
190 # Check if this happens before state update in same function
191 vuln = SolidityVulnerability(
192 vulnerability_type=VulnerabilityType.REENTRANCY,
193 severity="high",
194 title="Potential Reentrancy",
195 description=f"External call detected: {line_content[:80]}... Reentrancy vulnerability if state changes happen after this call.",
196 file_path=file_path,
197 line_number=i + 1,
198 contract_name=self._get_current_function_contract(i, lines),
199 code_snippet=line_content.strip(),
200 remediation="Implement checks-effects-interactions pattern or use ReentrancyGuard modifier",
201 confidence=75
202 )
203 vulnerabilities.append(vuln)
205 # Pattern 2: Low-level calls without checks
206 if '.call.value(' in line_content and 'return' not in line_content:
207 vuln = SolidityVulnerability(
208 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL,
209 severity="medium",
210 title="Unchecked Low-Level Call",
211 description=f"Unverified low-level call: {line_content[:80]}...",
212 file_path=file_path,
213 line_number=i + 1,
214 contract_name=self._get_current_function_contract(i, lines),
215 code_snippet=line_content.strip(),
216 remediation="Always check return values of low-level calls",
217 confidence=85
218 )
219 vulnerabilities.append(vuln)
221 return vulnerabilities
223 def _detect_access_control_vulnerabilities(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
224 """Detect access control vulnerabilities"""
225 vulnerabilities = []
226 lines = contract_code.split('\n')
228 critical_functions = ['withdraw', 'transferOwnership', 'mint', 'burn', 'pause', 'unpause']
230 for i, line in enumerate(lines):
231 line_content = line.strip()
232 if not line_content or line_content.startswith('//'):
233 continue
235 # Check for critical functions without access controls
236 for func_name in critical_functions:
237 if f'function {func_name}' in line_content and 'public' in line_content:
238 # Look for modifiers in the same line
239 if not any(mod in line_content for mod in ['onlyOwner', 'require', 'if', 'modifier']):
240 vuln = SolidityVulnerability(
241 vulnerability_type=VulnerabilityType.ACCESS_CONTROL,
242 severity="critical",
243 title="Missing Access Control",
244 description=f"Critical function {func_name} lacks proper access control modifier",
245 file_path=file_path,
246 line_number=i + 1,
247 contract_name=self._get_current_function_contract(i, lines),
248 function_name=func_name,
249 code_snippet=line_content.strip(),
250 remediation=f"Add access control modifier (e.g., onlyOwner) to {func_name} function",
251 confidence=90
252 )
253 vulnerabilities.append(vuln)
255 # Pattern: owner() function that returns hardcoded address
256 for i, line in enumerate(lines):
257 line_content = line.strip()
258 if 'return' in line_content and '0x' in line_content:
259 if 'owner()' in ''.join(lines[max(0, i-2):i+2]):
260 vuln = SolidityVulnerability(
261 vulnerability_type=VulnerabilityType.ACCESS_CONTROL,
262 severity="medium",
263 title="Hardcoded Owner Address",
264 description="Owner function returns hardcoded address instead of dynamic storage",
265 file_path=file_path,
266 line_number=i + 1,
267 contract_name=self._get_current_function_contract(i, lines),
268 code_snippet=line_content.strip(),
269 remediation="Use address storage variable for owner instead of hardcoded value",
270 confidence=70
271 )
272 vulnerabilities.append(vuln)
274 return vulnerabilities
276 def _detect_integer_vulnerabilities(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
277 """Detect integer overflow/underflow vulnerabilities"""
278 vulnerabilities = []
279 lines = contract_code.split('\n')
281 arithmetic_operations = ['+', '-', '*', '/']
283 for i, line in enumerate(lines):
284 line_content = line.strip()
285 if not line_content or line_content.startswith('//'):
286 continue
288 # Look for arithmetic operations without SafeMath
289 for op in arithmetic_operations:
290 if op in line_content and 'SafeMath' not in ''.join(lines[max(0, i-5):i+5]):
291 # Check if this is a critical operation (balance, amount, etc.)
292 context_words = ['balance', 'amount', 'total', 'supply', 'price', 'value']
293 if any(word in line_content.lower() for word in context_words):
294 vuln = SolidityVulnerability(
295 vulnerability_type=VulnerabilityType.INTEGER_OVERFLOW_UNDERFLOW,
296 severity="medium",
297 title="Potential Integer Overflow/Underflow",
298 description=f"Arithmetic operation without overflow protection: {line_content[:80]}...",
299 file_path=file_path,
300 line_number=i + 1,
301 contract_name=self._get_current_function_contract(i, lines),
302 code_snippet=line_content.strip(),
303 remediation="Use SafeMath library or Solidity 0.8+ which has built-in overflow protection",
304 confidence=65
305 )
306 vulnerabilities.append(vuln)
308 return vulnerabilities
310 def _detect_unchecked_calls(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
311 """Detect unchecked external calls"""
312 vulnerabilities = []
313 lines = contract_code.split('\n')
315 for i, line in enumerate(lines):
316 line_content = line.strip()
317 if not line_content or line_content.startswith('//'):
318 continue
320 # External call patterns
321 external_calls = ['.call(', '.delegatecall(', '.transfer(', '.send(']
323 for pattern in external_calls:
324 if pattern in line_content:
325 # Check if return value is being used or verified
326 next_lines = lines[i+1:i+3] # Look at next 2-3 lines
327 has_check = any('require(' in next_line or 'if (' in next_line
328 for next_line in next_lines if next_line.strip())
330 if not has_check:
331 vuln = SolidityVulnerability(
332 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL,
333 severity="high",
334 title="Unchecked External Call",
335 description=f"External call {pattern} without return value verification",
336 file_path=file_path,
337 line_number=i + 1,
338 contract_name=self._get_current_function_contract(i, lines),
339 code_snippet=line_content.strip(),
340 remediation="Always verify return values of external calls",
341 confidence=80
342 )
343 vulnerabilities.append(vuln)
345 return vulnerabilities
347 def _detect_logic_errors(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
348 """Detect logic errors and bad practices"""
349 vulnerabilities = []
350 lines = contract_code.split('\n')
352 # Pattern 1: Using block.timestamp for critical operations
353 for i, line in enumerate(lines):
354 line_content = line.strip()
355 if 'block.timestamp' in line_content:
356 # Check if timestamp is used for something critical
357 critical_contexts = ['deadline', 'expiration', 'unlock', 'vest']
358 if any(context in ''.join(lines[max(0, i-3):i+3]).lower() for context in critical_contexts):
359 vuln = SolidityVulnerability(
360 vulnerability_type=VulnerabilityType.TIMESTAMP_DEPENDENCE,
361 severity="medium",
362 title="Block Timestamp Manipulation Risk",
363 description="Using block.timestamp for critical logic that miners can manipulate",
364 file_path=file_path,
365 line_number=i + 1,
366 contract_name=self._get_current_function_contract(i, lines),
367 code_snippet=line_content.strip(),
368 remediation="Use block.number or external oracle for time-dependent logic",
369 confidence=75
370 )
371 vulnerabilities.append(vuln)
373 # Pattern 2: Uninitialized storage pointers
374 for i, line in enumerate(lines):
375 line_content = line.strip()
376 if 'Storage(' in line_content and 'new' in line_content:
377 vuln = SolidityVulnerability(
378 vulnerability_type=VulnerabilityType.UNINITIALIZED_STORAGE,
379 severity="medium",
380 title="Potential Uninitialized Storage Pointer",
381 description="Creating struct or array storage without proper initialization",
382 file_path=file_path,
383 line_number=i + 1,
384 contract_name=self._get_current_function_contract(i, lines),
385 code_snippet=line_content.strip(),
386 remediation="Initialize storage variables properly before use",
387 confidence=60
388 )
389 vulnerabilities.append(vuln)
391 return vulnerabilities
393 def _detect_oracle_manipulation(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
394 """
395 Detect price oracle manipulation vulnerabilities
397 WEEK 2 DAY 1: Enhanced Oracle Manipulation Detection
398 Based on OWASP SC02:2025 and 2024-2025 exploit research
400 Detection Patterns:
401 1. Chainlink oracle staleness (no updatedAt check)
402 2. Single oracle source (no aggregation)
403 3. Missing price bounds validation
404 4. UniswapV2 spot price usage (flash loan vulnerable)
405 5. Missing TWAP implementation
406 6. No oracle failure handling (try/catch)
407 7. Direct pool reserve usage
408 """
409 vulnerabilities = []
410 lines = contract_code.split('\n')
412 # Track oracle usage per function for contextual analysis
413 function_contexts = self._extract_function_contexts(lines)
415 # Pattern 1: Chainlink oracle without staleness checks
416 chainlink_vulns = self._detect_chainlink_staleness(lines, file_path, function_contexts)
417 vulnerabilities.extend(chainlink_vulns)
419 # Pattern 2: Single oracle source without aggregation
420 single_oracle_vulns = self._detect_single_oracle_usage(lines, file_path, function_contexts)
421 vulnerabilities.extend(single_oracle_vulns)
423 # Pattern 3: Missing price bounds validation
424 bounds_vulns = self._detect_missing_price_bounds(lines, file_path, function_contexts)
425 vulnerabilities.extend(bounds_vulns)
427 # Pattern 4: UniswapV2 spot price vulnerability
428 uniswap_vulns = self._detect_uniswap_spot_price(lines, file_path, function_contexts)
429 vulnerabilities.extend(uniswap_vulns)
431 # Pattern 5: Pool reserve manipulation
432 reserve_vulns = self._detect_pool_reserve_manipulation(lines, file_path, function_contexts)
433 vulnerabilities.extend(reserve_vulns)
435 # Pattern 6: Missing oracle failure handling
436 failure_vulns = self._detect_missing_oracle_failure_handling(lines, file_path, function_contexts)
437 vulnerabilities.extend(failure_vulns)
439 return vulnerabilities
441 def _extract_function_contexts(self, lines: List[str]) -> Dict[int, Dict[str, Any]]:
442 """Extract function contexts for contextual analysis"""
443 contexts = {}
444 current_function = None
445 current_contract = None
446 brace_count = 0
448 for i, line in enumerate(lines):
449 line_stripped = line.strip()
451 # Track contract
452 if line_stripped.startswith('contract ') or line_stripped.startswith('abstract contract '):
453 match = re.search(r'contract\s+(\w+)', line_stripped)
454 if match:
455 current_contract = match.group(1)
457 # Track function
458 if line_stripped.startswith('function '):
459 match = re.search(r'function\s+(\w+)', line_stripped)
460 if match:
461 current_function = match.group(1)
462 brace_count = 0
464 # Track braces for function scope
465 brace_count += line_stripped.count('{') - line_stripped.count('}')
467 # Store context
468 contexts[i] = {
469 'function': current_function,
470 'contract': current_contract,
471 'in_function': brace_count > 0 and current_function is not None
472 }
474 # Reset function when it ends
475 if brace_count == 0 and current_function is not None:
476 current_function = None
478 return contexts
480 def _detect_chainlink_staleness(
481 self,
482 lines: List[str],
483 file_path: str,
484 contexts: Dict[int, Dict[str, Any]]
485 ) -> List[SolidityVulnerability]:
486 """
487 Detect Chainlink oracle usage without staleness checks
489 CVE Pattern: Missing updatedAt validation
490 Real Exploits: Polter Finance, BonqDAO Protocol
491 """
492 vulnerabilities = []
494 # Pattern: latestRoundData() without updatedAt check
495 chainlink_patterns = [
496 r'latestRoundData\s*\(',
497 r'AggregatorV3Interface',
498 r'getRoundData\s*\(',
499 ]
501 for i, line in enumerate(lines):
502 line_content = line.strip()
504 # Check if line contains Chainlink oracle call
505 if any(re.search(pattern, line_content) for pattern in chainlink_patterns):
506 context = contexts.get(i, {})
507 function_name = context.get('function', 'unknown')
508 contract_name = context.get('contract', 'unknown')
510 # Check next 15 lines for staleness validation
511 check_window = lines[i:i+15]
512 has_staleness_check = any(
513 'updatedAt' in check_line and
514 ('block.timestamp' in check_line or 'now' in check_line)
515 for check_line in check_window
516 )
518 has_price_validation = any(
519 'price' in check_line and
520 ('require' in check_line or 'revert' in check_line) and
521 ('>' in check_line or '<' in check_line)
522 for check_line in check_window
523 )
525 if not has_staleness_check:
526 vulnerabilities.append(SolidityVulnerability(
527 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
528 severity="high",
529 title="Chainlink Oracle Staleness Not Checked",
530 description=(
531 f"Function '{function_name}' uses Chainlink oracle without validating "
532 f"data freshness. Stale price data can be exploited for profit. "
533 f"OWASP SC02:2025 - Price Oracle Manipulation. "
534 f"Similar to Polter Finance exploit (2024)."
535 ),
536 file_path=file_path,
537 line_number=i + 1,
538 function_name=function_name,
539 contract_name=contract_name,
540 code_snippet=line_content,
541 remediation=(
542 "Add staleness validation:\n"
543 "require(block.timestamp - updatedAt < 3600, 'Stale price');\n"
544 "Also validate: updatedAt > 0, answeredInRound >= roundId, price > 0"
545 ),
546 confidence=90
547 ))
549 if not has_price_validation:
550 vulnerabilities.append(SolidityVulnerability(
551 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
552 severity="medium",
553 title="Missing Chainlink Price Validation",
554 description=(
555 f"Function '{function_name}' doesn't validate price from Chainlink. "
556 f"Price should be checked for: price > 0, within bounds."
557 ),
558 file_path=file_path,
559 line_number=i + 1,
560 function_name=function_name,
561 contract_name=contract_name,
562 code_snippet=line_content,
563 remediation=(
564 "Add price validation:\n"
565 "require(price > 0, 'Invalid price');\n"
566 "require(price >= minPrice && price <= maxPrice, 'Price out of bounds');"
567 ),
568 confidence=85
569 ))
571 return vulnerabilities
573 def _detect_single_oracle_usage(
574 self,
575 lines: List[str],
576 file_path: str,
577 contexts: Dict[int, Dict[str, Any]]
578 ) -> List[SolidityVulnerability]:
579 """
580 Detect single oracle source without aggregation
582 OWASP Recommendation: Use multiple independent oracle sources
583 """
584 vulnerabilities = []
586 # Track oracle sources per function
587 function_oracle_counts = {}
589 oracle_source_patterns = [
590 r'latestRoundData\s*\(', # Chainlink
591 r'getAmountsOut\s*\(', # Uniswap
592 r'consult\s*\(', # TWAP
593 r'\.price\s*\(', # Generic price getter
594 ]
596 for i, line in enumerate(lines):
597 line_content = line.strip()
598 context = contexts.get(i, {})
599 function_name = context.get('function')
601 if not function_name or not context.get('in_function'):
602 continue
604 # Count oracle sources
605 for pattern in oracle_source_patterns:
606 if re.search(pattern, line_content):
607 if function_name not in function_oracle_counts:
608 function_oracle_counts[function_name] = {
609 'count': 0,
610 'line': i + 1,
611 'contract': context.get('contract', 'unknown')
612 }
613 function_oracle_counts[function_name]['count'] += 1
615 # Report functions with single oracle source
616 for func_name, data in function_oracle_counts.items():
617 if data['count'] == 1:
618 vulnerabilities.append(SolidityVulnerability(
619 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
620 severity="medium",
621 title="Single Oracle Source - No Aggregation",
622 description=(
623 f"Function '{func_name}' relies on single oracle source. "
624 f"OWASP SC02:2025 recommends multiple independent oracles "
625 f"to prevent single-point manipulation. "
626 f"$70M+ lost to oracle manipulation in 2024."
627 ),
628 file_path=file_path,
629 line_number=data['line'],
630 function_name=func_name,
631 contract_name=data['contract'],
632 code_snippet=None,
633 remediation=(
634 "Implement multi-oracle strategy:\n"
635 "1. Use Chainlink + UniswapV3 TWAP\n"
636 "2. Compare prices and revert if deviation > 10%\n"
637 "3. Take median of 3+ oracle sources"
638 ),
639 confidence=80
640 ))
642 return vulnerabilities
644 def _detect_missing_price_bounds(
645 self,
646 lines: List[str],
647 file_path: str,
648 contexts: Dict[int, Dict[str, Any]]
649 ) -> List[SolidityVulnerability]:
650 """
651 Detect missing MIN_PRICE and MAX_PRICE validation
653 OWASP Mitigation: Implement price boundaries
654 """
655 vulnerabilities = []
657 price_usage_pattern = r'(price|amount|value)\s*[=:]'
659 for i, line in enumerate(lines):
660 line_content = line.strip()
661 context = contexts.get(i, {})
663 if not context.get('in_function'):
664 continue
666 # Check if line assigns/uses price
667 if re.search(price_usage_pattern, line_content):
668 # Look for oracle calls in previous 5 lines
669 prev_lines = lines[max(0, i-5):i+1]
670 has_oracle_call = any(
671 'latestRoundData' in prev or
672 'getAmountOut' in prev or
673 'consult' in prev
674 for prev in prev_lines
675 )
677 if not has_oracle_call:
678 continue
680 # Check for bounds validation in next 10 lines
681 next_lines = lines[i+1:i+10]
682 has_min_check = any('MIN' in next_line.upper() or 'minPrice' in next_line for next_line in next_lines)
683 has_max_check = any('MAX' in next_line.upper() or 'maxPrice' in next_line for next_line in next_lines)
685 if not (has_min_check and has_max_check):
686 vulnerabilities.append(SolidityVulnerability(
687 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
688 severity="medium",
689 title="Missing Price Bounds Validation",
690 description=(
691 f"Price usage at line {i+1} lacks MIN/MAX bounds validation. "
692 f"OWASP SC02:2025 recommends price thresholds to detect anomalies. "
693 f"Without bounds, extreme price manipulation goes undetected."
694 ),
695 file_path=file_path,
696 line_number=i + 1,
697 function_name=context.get('function', 'unknown'),
698 contract_name=context.get('contract', 'unknown'),
699 code_snippet=line_content,
700 remediation=(
701 "Add price bounds:\n"
702 "uint256 constant MIN_PRICE = 1e6; // Adjust for token\n"
703 "uint256 constant MAX_PRICE = 1e12;\n"
704 "require(price >= MIN_PRICE && price <= MAX_PRICE, 'Price anomaly');"
705 ),
706 confidence=75
707 ))
709 return vulnerabilities
711 def _detect_uniswap_spot_price(
712 self,
713 lines: List[str],
714 file_path: str,
715 contexts: Dict[int, Dict[str, Any]]
716 ) -> List[SolidityVulnerability]:
717 """
718 Detect UniswapV2 spot price usage (flash loan vulnerable)
720 Critical: Spot prices can be manipulated within single transaction
721 Real Exploits: Moby (Jan 2025), The Vow (Aug 2024)
722 """
723 vulnerabilities = []
725 # Patterns indicating spot price usage
726 spot_price_patterns = [
727 r'getAmountsOut\s*\(',
728 r'getAmountOut\s*\(',
729 r'getReserves\s*\(',
730 r'\.reserves\(',
731 r'pair\.getReserves',
732 ]
734 twap_patterns = [
735 r'consult\s*\(',
736 r'TWAP',
737 r'timeWeighted',
738 r'observe\s*\(', # UniswapV3
739 ]
741 for i, line in enumerate(lines):
742 line_content = line.strip()
743 context = contexts.get(i, {})
745 # Check if using spot price
746 is_spot_price = any(re.search(pattern, line_content) for pattern in spot_price_patterns)
748 if not is_spot_price:
749 continue
751 # Check if TWAP is also used (good)
752 check_window = lines[max(0, i-10):i+10]
753 has_twap = any(
754 any(re.search(twap_pattern, check_line) for twap_pattern in twap_patterns)
755 for check_line in check_window
756 )
758 if not has_twap:
759 vulnerabilities.append(SolidityVulnerability(
760 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
761 severity="critical",
762 title="UniswapV2 Spot Price Flash Loan Vulnerability",
763 description=(
764 f"Line {i+1} uses Uniswap spot price without TWAP protection. "
765 f"CRITICAL: Spot prices can be manipulated within single transaction. "
766 f"Recent Exploits: Moby (Jan 2025), The Vow (Aug 2024). "
767 f"Attackers use flash loans to manipulate pool reserves. "
768 f"OWASP SC02:2025 - Most common DeFi exploit pattern (34.3%)."
769 ),
770 file_path=file_path,
771 line_number=i + 1,
772 function_name=context.get('function', 'unknown'),
773 contract_name=context.get('contract', 'unknown'),
774 code_snippet=line_content,
775 remediation=(
776 "CRITICAL FIX REQUIRED:\n"
777 "1. Use UniswapV3 TWAP with observe() for time-weighted prices\n"
778 "2. OR use Chainlink as primary oracle with Uniswap as backup\n"
779 "3. Never rely on spot prices for critical logic\n"
780 "4. Implement price deviation checks between oracles"
781 ),
782 confidence=95
783 ))
785 return vulnerabilities
787 def _detect_pool_reserve_manipulation(
788 self,
789 lines: List[str],
790 file_path: str,
791 contexts: Dict[int, Dict[str, Any]]
792 ) -> List[SolidityVulnerability]:
793 """
794 Detect direct pool reserve usage for pricing
796 Using pool reserves directly is extremely vulnerable to manipulation
797 """
798 vulnerabilities = []
800 reserve_patterns = [
801 r'reserve0',
802 r'reserve1',
803 r'\.reserves\s*\(',
804 r'balanceOf\(address\(this\)\)',
805 r'token\.balanceOf\(pool\)',
806 ]
808 for i, line in enumerate(lines):
809 line_content = line.strip()
810 context = contexts.get(i, {})
812 if not context.get('in_function'):
813 continue
815 # Check if using reserves for calculation
816 uses_reserves = any(re.search(pattern, line_content) for pattern in reserve_patterns)
818 if uses_reserves and ('*' in line_content or '/' in line_content or '=' in line_content):
819 # Check if it's in a price calculation context
820 next_lines = lines[i:i+5]
821 looks_like_price_calc = any(
822 'price' in next_line.lower() or
823 'value' in next_line.lower() or
824 'amount' in next_line.lower()
825 for next_line in next_lines
826 )
828 if looks_like_price_calc:
829 vulnerabilities.append(SolidityVulnerability(
830 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
831 severity="critical",
832 title="Pool Reserve Direct Usage - Flash Loan Vulnerability",
833 description=(
834 f"Line {i+1} uses pool reserves directly for pricing. "
835 f"CRITICAL: Reserves can be manipulated within single transaction. "
836 f"This is the #1 DeFi exploit pattern. "
837 f"Using pool balances as price oracle is NEVER safe."
838 ),
839 file_path=file_path,
840 line_number=i + 1,
841 function_name=context.get('function', 'unknown'),
842 contract_name=context.get('contract', 'unknown'),
843 code_snippet=line_content,
844 remediation=(
845 "CRITICAL FIX:\n"
846 "1. Never use pool reserves directly for pricing\n"
847 "2. Use Chainlink Price Feeds for external prices\n"
848 "3. Use UniswapV3 TWAP with sufficient time window (30+ min)\n"
849 "4. Implement multi-oracle aggregation"
850 ),
851 confidence=95
852 ))
854 return vulnerabilities
856 def _detect_missing_oracle_failure_handling(
857 self,
858 lines: List[str],
859 file_path: str,
860 contexts: Dict[int, Dict[str, Any]]
861 ) -> List[SolidityVulnerability]:
862 """
863 Detect oracle calls without try/catch blocks
865 Oracle failures can DOS contracts if not handled properly
866 """
867 vulnerabilities = []
869 oracle_call_patterns = [
870 r'latestRoundData\s*\(',
871 r'getRoundData\s*\(',
872 ]
874 for i, line in enumerate(lines):
875 line_content = line.strip()
876 context = contexts.get(i, {})
878 # Check if making oracle call
879 is_oracle_call = any(re.search(pattern, line_content) for pattern in oracle_call_patterns)
881 if not is_oracle_call:
882 continue
884 # Check if wrapped in try/catch
885 prev_lines = lines[max(0, i-3):i]
886 has_try = any('try' in prev_line for prev_line in prev_lines)
888 next_lines = lines[i+1:i+5]
889 has_catch = any('catch' in next_line for next_line in next_lines)
891 if not (has_try and has_catch):
892 vulnerabilities.append(SolidityVulnerability(
893 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION,
894 severity="medium",
895 title="Missing Oracle Failure Handling",
896 description=(
897 f"Oracle call at line {i+1} lacks try/catch error handling. "
898 f"Oracle failures can cause contract DOS. "
899 f"Best practice: wrap oracle calls in try/catch with fallback logic."
900 ),
901 file_path=file_path,
902 line_number=i + 1,
903 function_name=context.get('function', 'unknown'),
904 contract_name=context.get('contract', 'unknown'),
905 code_snippet=line_content,
906 remediation=(
907 "Add error handling:\n"
908 "try oracle.latestRoundData() returns (...) {\n"
909 " // use data\n"
910 "} catch {\n"
911 " // fallback logic or revert gracefully\n"
912 "}"
913 ),
914 confidence=70
915 ))
917 return vulnerabilities
919 def _detect_input_validation_issues(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]:
920 """
921 Detect input validation vulnerabilities
923 WEEK 2 DAY 2: Enhanced Input Validation Detection
924 Based on OWASP Smart Contract Top 10 2025 - $14.6M in losses
926 Detection Patterns:
927 1. Missing address(0) checks for address parameters
928 2. Missing zero/negative amount checks
929 3. Missing array bounds validation
930 4. Unchecked low-level call return values (enhanced)
931 5. Missing parameter validation in critical functions
932 6. Unsafe type conversions
933 """
934 vulnerabilities = []
935 lines = contract_code.split('\n')
937 # Extract function contexts for analysis
938 function_contexts = self._extract_function_contexts(lines)
940 # Pattern 1: Missing address(0) validation
941 address_vulns = self._detect_missing_address_validation(lines, file_path, function_contexts)
942 vulnerabilities.extend(address_vulns)
944 # Pattern 2: Missing amount/value validation
945 amount_vulns = self._detect_missing_amount_validation(lines, file_path, function_contexts)
946 vulnerabilities.extend(amount_vulns)
948 # Pattern 3: Missing array bounds validation
949 array_vulns = self._detect_missing_array_bounds(lines, file_path, function_contexts)
950 vulnerabilities.extend(array_vulns)
952 # Pattern 4: Enhanced unchecked external calls
953 external_call_vulns = self._detect_unchecked_external_calls(lines, file_path, function_contexts)
954 vulnerabilities.extend(external_call_vulns)
956 # Pattern 5: Unsafe type conversions
957 conversion_vulns = self._detect_unsafe_type_conversions(lines, file_path, function_contexts)
958 vulnerabilities.extend(conversion_vulns)
960 return vulnerabilities
962 def _detect_missing_address_validation(
963 self,
964 lines: List[str],
965 file_path: str,
966 contexts: Dict[int, Dict[str, Any]]
967 ) -> List[SolidityVulnerability]:
968 """
969 Detect missing address(0) validation
971 OWASP: Lack of Input Validation ($14.6M in losses)
972 Critical Pattern: Sending funds to address(0) = permanent loss
973 """
974 vulnerabilities = []
976 # Track function parameters
977 for i, line in enumerate(lines):
978 line_content = line.strip()
979 context = contexts.get(i, {})
981 # Check if it's a function definition with address parameter
982 if line_content.startswith('function '):
983 # Extract parameters
984 if '(' in line_content and ')' in line_content:
985 params_match = re.search(r'\((.*?)\)', line_content)
986 if params_match:
987 params_str = params_match.group(1)
989 # Find address parameters
990 address_params = re.findall(r'address\s+(\w+)', params_str)
992 if address_params:
993 function_name = context.get('function', 'unknown')
995 # Check next 20 lines for address(0) validation
996 check_window = lines[i+1:i+20]
998 for addr_param in address_params:
999 has_validation = any(
1000 f'{addr_param}' in check_line and
1001 ('address(0)' in check_line or '0x0' in check_line) and
1002 ('require' in check_line or 'revert' in check_line or 'if' in check_line)
1003 for check_line in check_window
1004 )
1006 # Check if it's used in critical operations
1007 is_critical = any(
1008 f'{addr_param}' in check_line and
1009 any(op in check_line for op in ['transfer', 'send', 'call', 'delegatecall', '='])
1010 for check_line in check_window
1011 )
1013 if not has_validation and is_critical:
1014 vulnerabilities.append(SolidityVulnerability(
1015 vulnerability_type=VulnerabilityType.LOGIC_ERROR,
1016 severity="high",
1017 title="Missing Address Zero Validation",
1018 description=(
1019 f"Parameter '{addr_param}' in function '{function_name}' lacks address(0) check. "
1020 f"OWASP 2025: Lack of Input Validation ($14.6M in losses). "
1021 f"Funds sent to address(0) are permanently lost - no private key exists. "
1022 f"This is a common attack vector in 2024-2025."
1023 ),
1024 file_path=file_path,
1025 line_number=i + 1,
1026 function_name=function_name,
1027 contract_name=context.get('contract', 'unknown'),
1028 code_snippet=line_content,
1029 remediation=(
1030 f"Add validation:\n"
1031 f"require({addr_param} != address(0), 'Zero address not allowed');"
1032 ),
1033 confidence=85
1034 ))
1036 return vulnerabilities
1038 def _detect_missing_amount_validation(
1039 self,
1040 lines: List[str],
1041 file_path: str,
1042 contexts: Dict[int, Dict[str, Any]]
1043 ) -> List[SolidityVulnerability]:
1044 """
1045 Detect missing amount/value validation (zero or negative)
1047 Common Pattern: Functions accepting amounts without validation
1048 """
1049 vulnerabilities = []
1051 for i, line in enumerate(lines):
1052 line_content = line.strip()
1053 context = contexts.get(i, {})
1055 # Check if it's a function with amount/value parameter
1056 if line_content.startswith('function '):
1057 if '(' in line_content and ')' in line_content:
1058 params_match = re.search(r'\((.*?)\)', line_content)
1059 if params_match:
1060 params_str = params_match.group(1)
1062 # Find amount/value parameters (uint256, uint, int)
1063 amount_params = re.findall(
1064 r'(?:uint256|uint|int256|int)\s+(\w*(?:amount|value|quantity|balance|size)\w*)',
1065 params_str,
1066 re.IGNORECASE
1067 )
1069 if amount_params:
1070 function_name = context.get('function', 'unknown')
1072 # Check next 15 lines for validation
1073 check_window = lines[i+1:i+15]
1075 for amount_param in amount_params:
1076 has_validation = any(
1077 f'{amount_param}' in check_line and
1078 ('> 0' in check_line or '!= 0' in check_line or '>=' in check_line) and
1079 ('require' in check_line or 'revert' in check_line)
1080 for check_line in check_window
1081 )
1083 if not has_validation:
1084 vulnerabilities.append(SolidityVulnerability(
1085 vulnerability_type=VulnerabilityType.LOGIC_ERROR,
1086 severity="medium",
1087 title="Missing Amount Validation",
1088 description=(
1089 f"Parameter '{amount_param}' in function '{function_name}' lacks validation. "
1090 f"Should check: amount > 0 to prevent zero-value operations. "
1091 f"OWASP 2025: Input Validation ($14.6M in losses)."
1092 ),
1093 file_path=file_path,
1094 line_number=i + 1,
1095 function_name=function_name,
1096 contract_name=context.get('contract', 'unknown'),
1097 code_snippet=line_content,
1098 remediation=(
1099 f"Add validation:\n"
1100 f"require({amount_param} > 0, 'Amount must be greater than zero');"
1101 ),
1102 confidence=75
1103 ))
1105 return vulnerabilities
1107 def _detect_missing_array_bounds(
1108 self,
1109 lines: List[str],
1110 file_path: str,
1111 contexts: Dict[int, Dict[str, Any]]
1112 ) -> List[SolidityVulnerability]:
1113 """
1114 Detect missing array bounds validation
1116 Pattern: Array access without length check
1117 """
1118 vulnerabilities = []
1120 for i, line in enumerate(lines):
1121 line_content = line.strip()
1122 context = contexts.get(i, {})
1124 if not context.get('in_function'):
1125 continue
1127 # Check for array access patterns
1128 array_access_pattern = r'(\w+)\[(\w+|\d+)\]'
1129 matches = re.findall(array_access_pattern, line_content)
1131 for array_name, index in matches:
1132 # Skip if index is a number
1133 if index.isdigit():
1134 continue
1136 # Check if there's a bounds check before this line
1137 prev_lines = lines[max(0, i-5):i]
1138 has_bounds_check = any(
1139 f'{index}' in prev_line and
1140 (f'{array_name}.length' in prev_line or 'length' in prev_line) and
1141 ('require' in prev_line or 'if' in prev_line or '<' in prev_line)
1142 for prev_line in prev_lines
1143 )
1145 if not has_bounds_check:
1146 vulnerabilities.append(SolidityVulnerability(
1147 vulnerability_type=VulnerabilityType.LOGIC_ERROR,
1148 severity="medium",
1149 title="Missing Array Bounds Validation",
1150 description=(
1151 f"Array access '{array_name}[{index}]' lacks bounds checking. "
1152 f"Out-of-bounds access causes revert but wastes gas. "
1153 f"OWASP 2025: Input Validation."
1154 ),
1155 file_path=file_path,
1156 line_number=i + 1,
1157 function_name=context.get('function', 'unknown'),
1158 contract_name=context.get('contract', 'unknown'),
1159 code_snippet=line_content,
1160 remediation=(
1161 f"Add bounds check:\n"
1162 f"require({index} < {array_name}.length, 'Index out of bounds');"
1163 ),
1164 confidence=70
1165 ))
1167 return vulnerabilities
1169 def _detect_unchecked_external_calls(
1170 self,
1171 lines: List[str],
1172 file_path: str,
1173 contexts: Dict[int, Dict[str, Any]]
1174 ) -> List[SolidityVulnerability]:
1175 """
1176 Detect unchecked external calls (enhanced)
1178 OWASP 2025: Unchecked External Calls ($550.7K in losses)
1179 Climbed from #10 to #6 in 2025 rankings
1181 Pattern: Low-level calls (.call, .delegatecall) without return value check
1182 """
1183 vulnerabilities = []
1185 low_level_calls = [
1186 r'\.call\s*\(',
1187 r'\.delegatecall\s*\(',
1188 r'\.staticcall\s*\(',
1189 ]
1191 for i, line in enumerate(lines):
1192 line_content = line.strip()
1193 context = contexts.get(i, {})
1195 if not context.get('in_function'):
1196 continue
1198 # Check for low-level calls
1199 for pattern in low_level_calls:
1200 if re.search(pattern, line_content):
1201 # Check if return value is captured
1202 captures_return = re.search(r'(?:bool\s+\w+|[\(\w]+)\s*=.*\.(?:call|delegatecall|staticcall)', line_content)
1204 if captures_return:
1205 # Check if the captured value is validated
1206 # Extract the variable name
1207 var_match = re.search(r'(?:bool\s+(\w+)|[\(](\w+))', line_content)
1208 if var_match:
1209 var_name = var_match.group(1) or var_match.group(2)
1211 # Check next 5 lines for require/if with this variable
1212 check_window = lines[i+1:i+5]
1213 has_validation = any(
1214 var_name in check_line and
1215 ('require' in check_line or 'if' in check_line or 'assert' in check_line)
1216 for check_line in check_window
1217 )
1219 if not has_validation:
1220 vulnerabilities.append(SolidityVulnerability(
1221 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL,
1222 severity="high",
1223 title="Unchecked External Call Return Value",
1224 description=(
1225 f"Low-level call return value '{var_name}' captured but not validated. "
1226 f"OWASP 2025 #6: Unchecked External Calls ($550.7K in losses). "
1227 f"Climbed from #10 to #6 in 2025 rankings. "
1228 f"Failed external calls can cause unexpected behavior if not handled."
1229 ),
1230 file_path=file_path,
1231 line_number=i + 1,
1232 function_name=context.get('function', 'unknown'),
1233 contract_name=context.get('contract', 'unknown'),
1234 code_snippet=line_content,
1235 remediation=(
1236 f"Add validation:\n"
1237 f"require({var_name}, 'External call failed');\n"
1238 f"// OR use try/catch for better error handling"
1239 ),
1240 confidence=90
1241 ))
1242 else:
1243 # Return value not even captured!
1244 vulnerabilities.append(SolidityVulnerability(
1245 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL,
1246 severity="critical",
1247 title="External Call Return Value Ignored",
1248 description=(
1249 f"Low-level call return value completely ignored. "
1250 f"CRITICAL: OWASP 2025 #6 ($550.7K in losses). "
1251 f"The call may fail silently causing logic errors or fund loss. "
1252 f"Always capture and validate external call results."
1253 ),
1254 file_path=file_path,
1255 line_number=i + 1,
1256 function_name=context.get('function', 'unknown'),
1257 contract_name=context.get('contract', 'unknown'),
1258 code_snippet=line_content,
1259 remediation=(
1260 "Capture and validate return value:\n"
1261 "(bool success, ) = target.call(...);\n"
1262 "require(success, 'External call failed');"
1263 ),
1264 confidence=95
1265 ))
1267 return vulnerabilities
1269 def _detect_unsafe_type_conversions(
1270 self,
1271 lines: List[str],
1272 file_path: str,
1273 contexts: Dict[int, Dict[str, Any]]
1274 ) -> List[SolidityVulnerability]:
1275 """
1276 Detect unsafe type conversions
1278 Pattern: Converting between types without validation
1279 """
1280 vulnerabilities = []
1282 conversion_patterns = [
1283 r'uint256\s*\(\s*int', # int to uint
1284 r'uint\s*\(\s*int',
1285 r'int\s*\(\s*uint', # uint to int
1286 r'address\s*\(\s*uint', # uint to address
1287 ]
1289 for i, line in enumerate(lines):
1290 line_content = line.strip()
1291 context = contexts.get(i, {})
1293 if not context.get('in_function'):
1294 continue
1296 for pattern in conversion_patterns:
1297 if re.search(pattern, line_content):
1298 # Check if there's validation nearby
1299 check_window = lines[max(0, i-2):i+3]
1300 has_validation = any(
1301 'require' in check_line or 'assert' in check_line
1302 for check_line in check_window
1303 )
1305 if not has_validation:
1306 vulnerabilities.append(SolidityVulnerability(
1307 vulnerability_type=VulnerabilityType.LOGIC_ERROR,
1308 severity="medium",
1309 title="Unsafe Type Conversion",
1310 description=(
1311 f"Type conversion without validation at line {i+1}. "
1312 f"Converting between signed/unsigned or numeric/address types can cause unexpected behavior. "
1313 f"OWASP 2025: Input Validation."
1314 ),
1315 file_path=file_path,
1316 line_number=i + 1,
1317 function_name=context.get('function', 'unknown'),
1318 contract_name=context.get('contract', 'unknown'),
1319 code_snippet=line_content,
1320 remediation=(
1321 "Add validation before conversion:\n"
1322 "require(value >= 0, 'Invalid conversion');\n"
1323 "// OR use SafeCast library for safe conversions"
1324 ),
1325 confidence=70
1326 ))
1328 return vulnerabilities
1330 def _get_current_function_contract(self, line_index: int, lines: List[str]) -> str:
1331 """Helper to determine current contract context"""
1332 current_contract = "unknown"
1334 # Look backwards to find most recent contract
1335 for i in range(line_index, -1, -1):
1336 line = lines[i].strip()
1337 if line.startswith('contract ') or line.startswith('abstract contract '):
1338 match = re.search(r'contract\s+(\w+)', line)
1339 if match:
1340 current_contract = match.group(1)
1341 break
1342 # Stop looking if we hit another contract boundary
1343 if line.startswith('contract ') and i < line_index:
1344 break
1346 return current_contract
1348 def _initialize_patterns(self) -> Dict[str, List[str]]:
1349 """Initialize vulnerability pattern detectors"""
1350 return {
1351 'reentrancy': [
1352 r'\.call\(',
1353 r'\.transfer\(',
1354 r'\.send\('
1355 ],
1356 'access_control': [
1357 r'function\s+\w+\s*public',
1358 r'missing.*modifier',
1359 r'no.*access.*control'
1360 ],
1361 'integer_overflow': [
1362 r'[\+\-\*\/]',
1363 r'(balance|amount|total|supply).*[\+\-\*\/]'
1364 ],
1365 'unchecked_call': [
1366 r'\.call\(',
1367 r'\.delegatecall\('
1368 ],
1369 'oracle_manipulation': [
1370 r'uniswap.*router',
1371 r'price.*oracle',
1372 r'getAmountOut'
1373 ]
1374 }