Coverage for src/alprina_cli/agents/web3_auditor/solidity_analyzer.py: 11%

428 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-13 11:15 +0100

1""" 

2Solidity Smart Contract Static Analyzer 

3 

4Inspired by Slither but enhanced for startup Web3 security needs. 

5Focuses on OWASP Smart Contract Top 10 detection with economic context. 

6""" 

7 

8import re 

9import ast 

10from pathlib import Path 

11from typing import List, Dict, Any, Optional, Tuple 

12from dataclasses import dataclass 

13from enum import Enum 

14 

15class VulnerabilityType(Enum): 

16 REENTRANCY = "reentrancy" 

17 ACCESS_CONTROL = "access_control" 

18 INTEGER_OVERFLOW_UNDERFLOW = "integer_overflow" 

19 UNCHECKED_LOW_LEVEL_CALL = "unchecked_call" 

20 LOGIC_ERROR = "logic_error" 

21 TIMESTAMP_DEPENDENCE = "timestamp_dependence" 

22 UNINITIALIZED_STORAGE = "uninitialized_storage" 

23 ORACLE_MANIPULATION = "oracle_manipulation" 

24 GAS_LIMIT_ISSUE = "gas_limit" 

25 DENIAL_OF_SERVICE = "denial_of_service" 

26 

27@dataclass 

28class SolidityVulnerability: 

29 """Represents a smart contract vulnerability""" 

30 vulnerability_type: VulnerabilityType 

31 severity: str # "critical", "high", "medium", "low" 

32 title: str 

33 description: str 

34 file_path: str 

35 line_number: Optional[int] 

36 function_name: Optional[str] 

37 contract_name: str 

38 code_snippet: Optional[str] = None 

39 remediation: Optional[str] = None 

40 confidence: int = 100 # 0-100 

41 

42class SolidityStaticAnalyzer: 

43 """ 

44 Enhanced Solidity analyzer focused on Web3 startup security needs 

45 """ 

46 

47 def __init__(self): 

48 self.vulnerability_patterns = self._initialize_patterns() 

49 self.contract_structure = None 

50 self.functions = [] 

51 self.state_variables = [] 

52 

53 def analyze_contract(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

54 """ 

55 Comprehensive smart contract vulnerability analysis 

56  

57 Args: 

58 contract_code: Solidity source code 

59 file_path: Path to the contract file 

60  

61 Returns: 

62 List of detected vulnerabilities 

63 """ 

64 vulnerabilities = [] 

65 

66 try: 

67 # Parse contract structure 

68 self._parse_contract_structure(contract_code) 

69 

70 # Run comprehensive vulnerability detection 

71 reentrancy_vulns = self._detect_reentrancy_vulnerabilities(contract_code, file_path) 

72 access_control_vulns = self._detect_access_control_vulnerabilities(contract_code, file_path) 

73 overflow_vulns = self._detect_integer_vulnerabilities(contract_code, file_path) 

74 call_vulns = self._detect_unchecked_calls(contract_code, file_path) 

75 logic_vulns = self._detect_logic_errors(contract_code, file_path) 

76 oracle_vulns = self._detect_oracle_manipulation(contract_code, file_path) 

77 input_vulns = self._detect_input_validation_issues(contract_code, file_path) 

78 

79 vulnerabilities.extend(reentrancy_vulns) 

80 vulnerabilities.extend(access_control_vulns) 

81 vulnerabilities.extend(overflow_vulns) 

82 vulnerabilities.extend(call_vulns) 

83 vulnerabilities.extend(logic_vulns) 

84 vulnerabilities.extend(oracle_vulns) 

85 vulnerabilities.extend(input_vulns) 

86 

87 except Exception as e: 

88 # Add parsing error as info-level vulnerability 

89 vulnerabilities.append(SolidityVulnerability( 

90 vulnerability_type=VulnerabilityType.LOGIC_ERROR, 

91 severity="low", 

92 title="Analysis Error", 

93 description=f"Could not fully analyze contract: {str(e)}", 

94 file_path=file_path, 

95 line_number=None, 

96 contract_name="unknown", 

97 confidence=20 

98 )) 

99 

100 return vulnerabilities 

101 

102 def _parse_contract_structure(self, contract_code: str): 

103 """Parse contract structure to identify functions and state variables""" 

104 lines = contract_code.split('\n') 

105 

106 # Find contracts 

107 self.contract_structure = { 

108 'contracts': [], 

109 'functions': [], 

110 'state_variables': [] 

111 } 

112 

113 current_contract = None 

114 

115 for i, line in enumerate(lines): 

116 line = line.strip() 

117 

118 # Skip comments and empty lines 

119 if not line or line.startswith('//') or line.startswith('/*'): 

120 continue 

121 

122 # Find contract definitions 

123 if line.startswith('contract ') or line.startswith('abstract contract '): 

124 contract_match = re.search(r'(abstract )?contract\s+(\w+)', line) 

125 if contract_match: 

126 current_contract = contract_match.group(2) 

127 self.contract_structure['contracts'].append({ 

128 'name': current_contract, 

129 'line': i + 1 

130 }) 

131 continue 

132 

133 # Find function definitions 

134 if current_contract and ('function ' in line or 'modifier ' in line): 

135 func_match = re.search(r'(function|modifier)\s+(\w+)', line) 

136 if func_match: 

137 func_type = func_match.group(1) 

138 func_name = func_match.group(2) 

139 function_info = { 

140 'name': func_name, 

141 'type': func_type, 

142 'contract': current_contract, 

143 'line': i + 1, 

144 'visibility': 'internal' # Default 

145 } 

146 

147 # Check visibility modifiers 

148 if 'public' in line: 

149 function_info['visibility'] = 'public' 

150 elif 'external' in line: 

151 function_info['visibility'] = 'external' 

152 elif 'internal' in line: 

153 function_info['visibility'] = 'internal' 

154 elif 'private' in line: 

155 function_info['visibility'] = 'private' 

156 

157 # Check for payable 

158 if 'payable' in line: 

159 function_info['payable'] = True 

160 

161 self.contract_structure['functions'].append(function_info) 

162 continue 

163 

164 # Find state variables 

165 if current_contract and ('uint256 ' in line or 'address ' in line or 'mapping(' in line): 

166 # Extract variable name 

167 var_match = re.search(r'(uint256|address|mapping)\s+(?:\(.*?\))?\s*(\w+)', line) 

168 if var_match: 

169 var_type = var_match.group(1) 

170 var_name = var_match.group(2) 

171 self.contract_structure['state_variables'].append({ 

172 'name': var_name, 

173 'type': var_type, 

174 'contract': current_contract, 

175 'line': i + 1 

176 }) 

177 

178 def _detect_reentrancy_vulnerabilities(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

179 """Detect reentrancy attack vulnerabilities""" 

180 vulnerabilities = [] 

181 lines = contract_code.split('\n') 

182 

183 for i, line in enumerate(lines): 

184 line_content = line.strip() 

185 if not line_content or line_content.startswith('//'): 

186 continue 

187 

188 # Pattern 1: Call to external address before state change 

189 if ('.call(' in line_content or '.transfer(' in line_content or '.send(' in line_content): 

190 # Check if this happens before state update in same function 

191 vuln = SolidityVulnerability( 

192 vulnerability_type=VulnerabilityType.REENTRANCY, 

193 severity="high", 

194 title="Potential Reentrancy", 

195 description=f"External call detected: {line_content[:80]}... Reentrancy vulnerability if state changes happen after this call.", 

196 file_path=file_path, 

197 line_number=i + 1, 

198 contract_name=self._get_current_function_contract(i, lines), 

199 code_snippet=line_content.strip(), 

200 remediation="Implement checks-effects-interactions pattern or use ReentrancyGuard modifier", 

201 confidence=75 

202 ) 

203 vulnerabilities.append(vuln) 

204 

205 # Pattern 2: Low-level calls without checks 

206 if '.call.value(' in line_content and 'return' not in line_content: 

207 vuln = SolidityVulnerability( 

208 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL, 

209 severity="medium", 

210 title="Unchecked Low-Level Call", 

211 description=f"Unverified low-level call: {line_content[:80]}...", 

212 file_path=file_path, 

213 line_number=i + 1, 

214 contract_name=self._get_current_function_contract(i, lines), 

215 code_snippet=line_content.strip(), 

216 remediation="Always check return values of low-level calls", 

217 confidence=85 

218 ) 

219 vulnerabilities.append(vuln) 

220 

221 return vulnerabilities 

222 

223 def _detect_access_control_vulnerabilities(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

224 """Detect access control vulnerabilities""" 

225 vulnerabilities = [] 

226 lines = contract_code.split('\n') 

227 

228 critical_functions = ['withdraw', 'transferOwnership', 'mint', 'burn', 'pause', 'unpause'] 

229 

230 for i, line in enumerate(lines): 

231 line_content = line.strip() 

232 if not line_content or line_content.startswith('//'): 

233 continue 

234 

235 # Check for critical functions without access controls 

236 for func_name in critical_functions: 

237 if f'function {func_name}' in line_content and 'public' in line_content: 

238 # Look for modifiers in the same line 

239 if not any(mod in line_content for mod in ['onlyOwner', 'require', 'if', 'modifier']): 

240 vuln = SolidityVulnerability( 

241 vulnerability_type=VulnerabilityType.ACCESS_CONTROL, 

242 severity="critical", 

243 title="Missing Access Control", 

244 description=f"Critical function {func_name} lacks proper access control modifier", 

245 file_path=file_path, 

246 line_number=i + 1, 

247 contract_name=self._get_current_function_contract(i, lines), 

248 function_name=func_name, 

249 code_snippet=line_content.strip(), 

250 remediation=f"Add access control modifier (e.g., onlyOwner) to {func_name} function", 

251 confidence=90 

252 ) 

253 vulnerabilities.append(vuln) 

254 

255 # Pattern: owner() function that returns hardcoded address 

256 for i, line in enumerate(lines): 

257 line_content = line.strip() 

258 if 'return' in line_content and '0x' in line_content: 

259 if 'owner()' in ''.join(lines[max(0, i-2):i+2]): 

260 vuln = SolidityVulnerability( 

261 vulnerability_type=VulnerabilityType.ACCESS_CONTROL, 

262 severity="medium", 

263 title="Hardcoded Owner Address", 

264 description="Owner function returns hardcoded address instead of dynamic storage", 

265 file_path=file_path, 

266 line_number=i + 1, 

267 contract_name=self._get_current_function_contract(i, lines), 

268 code_snippet=line_content.strip(), 

269 remediation="Use address storage variable for owner instead of hardcoded value", 

270 confidence=70 

271 ) 

272 vulnerabilities.append(vuln) 

273 

274 return vulnerabilities 

275 

276 def _detect_integer_vulnerabilities(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

277 """Detect integer overflow/underflow vulnerabilities""" 

278 vulnerabilities = [] 

279 lines = contract_code.split('\n') 

280 

281 arithmetic_operations = ['+', '-', '*', '/'] 

282 

283 for i, line in enumerate(lines): 

284 line_content = line.strip() 

285 if not line_content or line_content.startswith('//'): 

286 continue 

287 

288 # Look for arithmetic operations without SafeMath 

289 for op in arithmetic_operations: 

290 if op in line_content and 'SafeMath' not in ''.join(lines[max(0, i-5):i+5]): 

291 # Check if this is a critical operation (balance, amount, etc.) 

292 context_words = ['balance', 'amount', 'total', 'supply', 'price', 'value'] 

293 if any(word in line_content.lower() for word in context_words): 

294 vuln = SolidityVulnerability( 

295 vulnerability_type=VulnerabilityType.INTEGER_OVERFLOW_UNDERFLOW, 

296 severity="medium", 

297 title="Potential Integer Overflow/Underflow", 

298 description=f"Arithmetic operation without overflow protection: {line_content[:80]}...", 

299 file_path=file_path, 

300 line_number=i + 1, 

301 contract_name=self._get_current_function_contract(i, lines), 

302 code_snippet=line_content.strip(), 

303 remediation="Use SafeMath library or Solidity 0.8+ which has built-in overflow protection", 

304 confidence=65 

305 ) 

306 vulnerabilities.append(vuln) 

307 

308 return vulnerabilities 

309 

310 def _detect_unchecked_calls(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

311 """Detect unchecked external calls""" 

312 vulnerabilities = [] 

313 lines = contract_code.split('\n') 

314 

315 for i, line in enumerate(lines): 

316 line_content = line.strip() 

317 if not line_content or line_content.startswith('//'): 

318 continue 

319 

320 # External call patterns 

321 external_calls = ['.call(', '.delegatecall(', '.transfer(', '.send('] 

322 

323 for pattern in external_calls: 

324 if pattern in line_content: 

325 # Check if return value is being used or verified 

326 next_lines = lines[i+1:i+3] # Look at next 2-3 lines 

327 has_check = any('require(' in next_line or 'if (' in next_line 

328 for next_line in next_lines if next_line.strip()) 

329 

330 if not has_check: 

331 vuln = SolidityVulnerability( 

332 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL, 

333 severity="high", 

334 title="Unchecked External Call", 

335 description=f"External call {pattern} without return value verification", 

336 file_path=file_path, 

337 line_number=i + 1, 

338 contract_name=self._get_current_function_contract(i, lines), 

339 code_snippet=line_content.strip(), 

340 remediation="Always verify return values of external calls", 

341 confidence=80 

342 ) 

343 vulnerabilities.append(vuln) 

344 

345 return vulnerabilities 

346 

347 def _detect_logic_errors(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

348 """Detect logic errors and bad practices""" 

349 vulnerabilities = [] 

350 lines = contract_code.split('\n') 

351 

352 # Pattern 1: Using block.timestamp for critical operations 

353 for i, line in enumerate(lines): 

354 line_content = line.strip() 

355 if 'block.timestamp' in line_content: 

356 # Check if timestamp is used for something critical 

357 critical_contexts = ['deadline', 'expiration', 'unlock', 'vest'] 

358 if any(context in ''.join(lines[max(0, i-3):i+3]).lower() for context in critical_contexts): 

359 vuln = SolidityVulnerability( 

360 vulnerability_type=VulnerabilityType.TIMESTAMP_DEPENDENCE, 

361 severity="medium", 

362 title="Block Timestamp Manipulation Risk", 

363 description="Using block.timestamp for critical logic that miners can manipulate", 

364 file_path=file_path, 

365 line_number=i + 1, 

366 contract_name=self._get_current_function_contract(i, lines), 

367 code_snippet=line_content.strip(), 

368 remediation="Use block.number or external oracle for time-dependent logic", 

369 confidence=75 

370 ) 

371 vulnerabilities.append(vuln) 

372 

373 # Pattern 2: Uninitialized storage pointers 

374 for i, line in enumerate(lines): 

375 line_content = line.strip() 

376 if 'Storage(' in line_content and 'new' in line_content: 

377 vuln = SolidityVulnerability( 

378 vulnerability_type=VulnerabilityType.UNINITIALIZED_STORAGE, 

379 severity="medium", 

380 title="Potential Uninitialized Storage Pointer", 

381 description="Creating struct or array storage without proper initialization", 

382 file_path=file_path, 

383 line_number=i + 1, 

384 contract_name=self._get_current_function_contract(i, lines), 

385 code_snippet=line_content.strip(), 

386 remediation="Initialize storage variables properly before use", 

387 confidence=60 

388 ) 

389 vulnerabilities.append(vuln) 

390 

391 return vulnerabilities 

392 

393 def _detect_oracle_manipulation(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

394 """ 

395 Detect price oracle manipulation vulnerabilities 

396 

397 WEEK 2 DAY 1: Enhanced Oracle Manipulation Detection 

398 Based on OWASP SC02:2025 and 2024-2025 exploit research 

399 

400 Detection Patterns: 

401 1. Chainlink oracle staleness (no updatedAt check) 

402 2. Single oracle source (no aggregation) 

403 3. Missing price bounds validation 

404 4. UniswapV2 spot price usage (flash loan vulnerable) 

405 5. Missing TWAP implementation 

406 6. No oracle failure handling (try/catch) 

407 7. Direct pool reserve usage 

408 """ 

409 vulnerabilities = [] 

410 lines = contract_code.split('\n') 

411 

412 # Track oracle usage per function for contextual analysis 

413 function_contexts = self._extract_function_contexts(lines) 

414 

415 # Pattern 1: Chainlink oracle without staleness checks 

416 chainlink_vulns = self._detect_chainlink_staleness(lines, file_path, function_contexts) 

417 vulnerabilities.extend(chainlink_vulns) 

418 

419 # Pattern 2: Single oracle source without aggregation 

420 single_oracle_vulns = self._detect_single_oracle_usage(lines, file_path, function_contexts) 

421 vulnerabilities.extend(single_oracle_vulns) 

422 

423 # Pattern 3: Missing price bounds validation 

424 bounds_vulns = self._detect_missing_price_bounds(lines, file_path, function_contexts) 

425 vulnerabilities.extend(bounds_vulns) 

426 

427 # Pattern 4: UniswapV2 spot price vulnerability 

428 uniswap_vulns = self._detect_uniswap_spot_price(lines, file_path, function_contexts) 

429 vulnerabilities.extend(uniswap_vulns) 

430 

431 # Pattern 5: Pool reserve manipulation 

432 reserve_vulns = self._detect_pool_reserve_manipulation(lines, file_path, function_contexts) 

433 vulnerabilities.extend(reserve_vulns) 

434 

435 # Pattern 6: Missing oracle failure handling 

436 failure_vulns = self._detect_missing_oracle_failure_handling(lines, file_path, function_contexts) 

437 vulnerabilities.extend(failure_vulns) 

438 

439 return vulnerabilities 

440 

441 def _extract_function_contexts(self, lines: List[str]) -> Dict[int, Dict[str, Any]]: 

442 """Extract function contexts for contextual analysis""" 

443 contexts = {} 

444 current_function = None 

445 current_contract = None 

446 brace_count = 0 

447 

448 for i, line in enumerate(lines): 

449 line_stripped = line.strip() 

450 

451 # Track contract 

452 if line_stripped.startswith('contract ') or line_stripped.startswith('abstract contract '): 

453 match = re.search(r'contract\s+(\w+)', line_stripped) 

454 if match: 

455 current_contract = match.group(1) 

456 

457 # Track function 

458 if line_stripped.startswith('function '): 

459 match = re.search(r'function\s+(\w+)', line_stripped) 

460 if match: 

461 current_function = match.group(1) 

462 brace_count = 0 

463 

464 # Track braces for function scope 

465 brace_count += line_stripped.count('{') - line_stripped.count('}') 

466 

467 # Store context 

468 contexts[i] = { 

469 'function': current_function, 

470 'contract': current_contract, 

471 'in_function': brace_count > 0 and current_function is not None 

472 } 

473 

474 # Reset function when it ends 

475 if brace_count == 0 and current_function is not None: 

476 current_function = None 

477 

478 return contexts 

479 

480 def _detect_chainlink_staleness( 

481 self, 

482 lines: List[str], 

483 file_path: str, 

484 contexts: Dict[int, Dict[str, Any]] 

485 ) -> List[SolidityVulnerability]: 

486 """ 

487 Detect Chainlink oracle usage without staleness checks 

488 

489 CVE Pattern: Missing updatedAt validation 

490 Real Exploits: Polter Finance, BonqDAO Protocol 

491 """ 

492 vulnerabilities = [] 

493 

494 # Pattern: latestRoundData() without updatedAt check 

495 chainlink_patterns = [ 

496 r'latestRoundData\s*\(', 

497 r'AggregatorV3Interface', 

498 r'getRoundData\s*\(', 

499 ] 

500 

501 for i, line in enumerate(lines): 

502 line_content = line.strip() 

503 

504 # Check if line contains Chainlink oracle call 

505 if any(re.search(pattern, line_content) for pattern in chainlink_patterns): 

506 context = contexts.get(i, {}) 

507 function_name = context.get('function', 'unknown') 

508 contract_name = context.get('contract', 'unknown') 

509 

510 # Check next 15 lines for staleness validation 

511 check_window = lines[i:i+15] 

512 has_staleness_check = any( 

513 'updatedAt' in check_line and 

514 ('block.timestamp' in check_line or 'now' in check_line) 

515 for check_line in check_window 

516 ) 

517 

518 has_price_validation = any( 

519 'price' in check_line and 

520 ('require' in check_line or 'revert' in check_line) and 

521 ('>' in check_line or '<' in check_line) 

522 for check_line in check_window 

523 ) 

524 

525 if not has_staleness_check: 

526 vulnerabilities.append(SolidityVulnerability( 

527 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

528 severity="high", 

529 title="Chainlink Oracle Staleness Not Checked", 

530 description=( 

531 f"Function '{function_name}' uses Chainlink oracle without validating " 

532 f"data freshness. Stale price data can be exploited for profit. " 

533 f"OWASP SC02:2025 - Price Oracle Manipulation. " 

534 f"Similar to Polter Finance exploit (2024)." 

535 ), 

536 file_path=file_path, 

537 line_number=i + 1, 

538 function_name=function_name, 

539 contract_name=contract_name, 

540 code_snippet=line_content, 

541 remediation=( 

542 "Add staleness validation:\n" 

543 "require(block.timestamp - updatedAt < 3600, 'Stale price');\n" 

544 "Also validate: updatedAt > 0, answeredInRound >= roundId, price > 0" 

545 ), 

546 confidence=90 

547 )) 

548 

549 if not has_price_validation: 

550 vulnerabilities.append(SolidityVulnerability( 

551 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

552 severity="medium", 

553 title="Missing Chainlink Price Validation", 

554 description=( 

555 f"Function '{function_name}' doesn't validate price from Chainlink. " 

556 f"Price should be checked for: price > 0, within bounds." 

557 ), 

558 file_path=file_path, 

559 line_number=i + 1, 

560 function_name=function_name, 

561 contract_name=contract_name, 

562 code_snippet=line_content, 

563 remediation=( 

564 "Add price validation:\n" 

565 "require(price > 0, 'Invalid price');\n" 

566 "require(price >= minPrice && price <= maxPrice, 'Price out of bounds');" 

567 ), 

568 confidence=85 

569 )) 

570 

571 return vulnerabilities 

572 

573 def _detect_single_oracle_usage( 

574 self, 

575 lines: List[str], 

576 file_path: str, 

577 contexts: Dict[int, Dict[str, Any]] 

578 ) -> List[SolidityVulnerability]: 

579 """ 

580 Detect single oracle source without aggregation 

581 

582 OWASP Recommendation: Use multiple independent oracle sources 

583 """ 

584 vulnerabilities = [] 

585 

586 # Track oracle sources per function 

587 function_oracle_counts = {} 

588 

589 oracle_source_patterns = [ 

590 r'latestRoundData\s*\(', # Chainlink 

591 r'getAmountsOut\s*\(', # Uniswap 

592 r'consult\s*\(', # TWAP 

593 r'\.price\s*\(', # Generic price getter 

594 ] 

595 

596 for i, line in enumerate(lines): 

597 line_content = line.strip() 

598 context = contexts.get(i, {}) 

599 function_name = context.get('function') 

600 

601 if not function_name or not context.get('in_function'): 

602 continue 

603 

604 # Count oracle sources 

605 for pattern in oracle_source_patterns: 

606 if re.search(pattern, line_content): 

607 if function_name not in function_oracle_counts: 

608 function_oracle_counts[function_name] = { 

609 'count': 0, 

610 'line': i + 1, 

611 'contract': context.get('contract', 'unknown') 

612 } 

613 function_oracle_counts[function_name]['count'] += 1 

614 

615 # Report functions with single oracle source 

616 for func_name, data in function_oracle_counts.items(): 

617 if data['count'] == 1: 

618 vulnerabilities.append(SolidityVulnerability( 

619 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

620 severity="medium", 

621 title="Single Oracle Source - No Aggregation", 

622 description=( 

623 f"Function '{func_name}' relies on single oracle source. " 

624 f"OWASP SC02:2025 recommends multiple independent oracles " 

625 f"to prevent single-point manipulation. " 

626 f"$70M+ lost to oracle manipulation in 2024." 

627 ), 

628 file_path=file_path, 

629 line_number=data['line'], 

630 function_name=func_name, 

631 contract_name=data['contract'], 

632 code_snippet=None, 

633 remediation=( 

634 "Implement multi-oracle strategy:\n" 

635 "1. Use Chainlink + UniswapV3 TWAP\n" 

636 "2. Compare prices and revert if deviation > 10%\n" 

637 "3. Take median of 3+ oracle sources" 

638 ), 

639 confidence=80 

640 )) 

641 

642 return vulnerabilities 

643 

644 def _detect_missing_price_bounds( 

645 self, 

646 lines: List[str], 

647 file_path: str, 

648 contexts: Dict[int, Dict[str, Any]] 

649 ) -> List[SolidityVulnerability]: 

650 """ 

651 Detect missing MIN_PRICE and MAX_PRICE validation 

652 

653 OWASP Mitigation: Implement price boundaries 

654 """ 

655 vulnerabilities = [] 

656 

657 price_usage_pattern = r'(price|amount|value)\s*[=:]' 

658 

659 for i, line in enumerate(lines): 

660 line_content = line.strip() 

661 context = contexts.get(i, {}) 

662 

663 if not context.get('in_function'): 

664 continue 

665 

666 # Check if line assigns/uses price 

667 if re.search(price_usage_pattern, line_content): 

668 # Look for oracle calls in previous 5 lines 

669 prev_lines = lines[max(0, i-5):i+1] 

670 has_oracle_call = any( 

671 'latestRoundData' in prev or 

672 'getAmountOut' in prev or 

673 'consult' in prev 

674 for prev in prev_lines 

675 ) 

676 

677 if not has_oracle_call: 

678 continue 

679 

680 # Check for bounds validation in next 10 lines 

681 next_lines = lines[i+1:i+10] 

682 has_min_check = any('MIN' in next_line.upper() or 'minPrice' in next_line for next_line in next_lines) 

683 has_max_check = any('MAX' in next_line.upper() or 'maxPrice' in next_line for next_line in next_lines) 

684 

685 if not (has_min_check and has_max_check): 

686 vulnerabilities.append(SolidityVulnerability( 

687 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

688 severity="medium", 

689 title="Missing Price Bounds Validation", 

690 description=( 

691 f"Price usage at line {i+1} lacks MIN/MAX bounds validation. " 

692 f"OWASP SC02:2025 recommends price thresholds to detect anomalies. " 

693 f"Without bounds, extreme price manipulation goes undetected." 

694 ), 

695 file_path=file_path, 

696 line_number=i + 1, 

697 function_name=context.get('function', 'unknown'), 

698 contract_name=context.get('contract', 'unknown'), 

699 code_snippet=line_content, 

700 remediation=( 

701 "Add price bounds:\n" 

702 "uint256 constant MIN_PRICE = 1e6; // Adjust for token\n" 

703 "uint256 constant MAX_PRICE = 1e12;\n" 

704 "require(price >= MIN_PRICE && price <= MAX_PRICE, 'Price anomaly');" 

705 ), 

706 confidence=75 

707 )) 

708 

709 return vulnerabilities 

710 

711 def _detect_uniswap_spot_price( 

712 self, 

713 lines: List[str], 

714 file_path: str, 

715 contexts: Dict[int, Dict[str, Any]] 

716 ) -> List[SolidityVulnerability]: 

717 """ 

718 Detect UniswapV2 spot price usage (flash loan vulnerable) 

719 

720 Critical: Spot prices can be manipulated within single transaction 

721 Real Exploits: Moby (Jan 2025), The Vow (Aug 2024) 

722 """ 

723 vulnerabilities = [] 

724 

725 # Patterns indicating spot price usage 

726 spot_price_patterns = [ 

727 r'getAmountsOut\s*\(', 

728 r'getAmountOut\s*\(', 

729 r'getReserves\s*\(', 

730 r'\.reserves\(', 

731 r'pair\.getReserves', 

732 ] 

733 

734 twap_patterns = [ 

735 r'consult\s*\(', 

736 r'TWAP', 

737 r'timeWeighted', 

738 r'observe\s*\(', # UniswapV3 

739 ] 

740 

741 for i, line in enumerate(lines): 

742 line_content = line.strip() 

743 context = contexts.get(i, {}) 

744 

745 # Check if using spot price 

746 is_spot_price = any(re.search(pattern, line_content) for pattern in spot_price_patterns) 

747 

748 if not is_spot_price: 

749 continue 

750 

751 # Check if TWAP is also used (good) 

752 check_window = lines[max(0, i-10):i+10] 

753 has_twap = any( 

754 any(re.search(twap_pattern, check_line) for twap_pattern in twap_patterns) 

755 for check_line in check_window 

756 ) 

757 

758 if not has_twap: 

759 vulnerabilities.append(SolidityVulnerability( 

760 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

761 severity="critical", 

762 title="UniswapV2 Spot Price Flash Loan Vulnerability", 

763 description=( 

764 f"Line {i+1} uses Uniswap spot price without TWAP protection. " 

765 f"CRITICAL: Spot prices can be manipulated within single transaction. " 

766 f"Recent Exploits: Moby (Jan 2025), The Vow (Aug 2024). " 

767 f"Attackers use flash loans to manipulate pool reserves. " 

768 f"OWASP SC02:2025 - Most common DeFi exploit pattern (34.3%)." 

769 ), 

770 file_path=file_path, 

771 line_number=i + 1, 

772 function_name=context.get('function', 'unknown'), 

773 contract_name=context.get('contract', 'unknown'), 

774 code_snippet=line_content, 

775 remediation=( 

776 "CRITICAL FIX REQUIRED:\n" 

777 "1. Use UniswapV3 TWAP with observe() for time-weighted prices\n" 

778 "2. OR use Chainlink as primary oracle with Uniswap as backup\n" 

779 "3. Never rely on spot prices for critical logic\n" 

780 "4. Implement price deviation checks between oracles" 

781 ), 

782 confidence=95 

783 )) 

784 

785 return vulnerabilities 

786 

787 def _detect_pool_reserve_manipulation( 

788 self, 

789 lines: List[str], 

790 file_path: str, 

791 contexts: Dict[int, Dict[str, Any]] 

792 ) -> List[SolidityVulnerability]: 

793 """ 

794 Detect direct pool reserve usage for pricing 

795 

796 Using pool reserves directly is extremely vulnerable to manipulation 

797 """ 

798 vulnerabilities = [] 

799 

800 reserve_patterns = [ 

801 r'reserve0', 

802 r'reserve1', 

803 r'\.reserves\s*\(', 

804 r'balanceOf\(address\(this\)\)', 

805 r'token\.balanceOf\(pool\)', 

806 ] 

807 

808 for i, line in enumerate(lines): 

809 line_content = line.strip() 

810 context = contexts.get(i, {}) 

811 

812 if not context.get('in_function'): 

813 continue 

814 

815 # Check if using reserves for calculation 

816 uses_reserves = any(re.search(pattern, line_content) for pattern in reserve_patterns) 

817 

818 if uses_reserves and ('*' in line_content or '/' in line_content or '=' in line_content): 

819 # Check if it's in a price calculation context 

820 next_lines = lines[i:i+5] 

821 looks_like_price_calc = any( 

822 'price' in next_line.lower() or 

823 'value' in next_line.lower() or 

824 'amount' in next_line.lower() 

825 for next_line in next_lines 

826 ) 

827 

828 if looks_like_price_calc: 

829 vulnerabilities.append(SolidityVulnerability( 

830 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

831 severity="critical", 

832 title="Pool Reserve Direct Usage - Flash Loan Vulnerability", 

833 description=( 

834 f"Line {i+1} uses pool reserves directly for pricing. " 

835 f"CRITICAL: Reserves can be manipulated within single transaction. " 

836 f"This is the #1 DeFi exploit pattern. " 

837 f"Using pool balances as price oracle is NEVER safe." 

838 ), 

839 file_path=file_path, 

840 line_number=i + 1, 

841 function_name=context.get('function', 'unknown'), 

842 contract_name=context.get('contract', 'unknown'), 

843 code_snippet=line_content, 

844 remediation=( 

845 "CRITICAL FIX:\n" 

846 "1. Never use pool reserves directly for pricing\n" 

847 "2. Use Chainlink Price Feeds for external prices\n" 

848 "3. Use UniswapV3 TWAP with sufficient time window (30+ min)\n" 

849 "4. Implement multi-oracle aggregation" 

850 ), 

851 confidence=95 

852 )) 

853 

854 return vulnerabilities 

855 

856 def _detect_missing_oracle_failure_handling( 

857 self, 

858 lines: List[str], 

859 file_path: str, 

860 contexts: Dict[int, Dict[str, Any]] 

861 ) -> List[SolidityVulnerability]: 

862 """ 

863 Detect oracle calls without try/catch blocks 

864 

865 Oracle failures can DOS contracts if not handled properly 

866 """ 

867 vulnerabilities = [] 

868 

869 oracle_call_patterns = [ 

870 r'latestRoundData\s*\(', 

871 r'getRoundData\s*\(', 

872 ] 

873 

874 for i, line in enumerate(lines): 

875 line_content = line.strip() 

876 context = contexts.get(i, {}) 

877 

878 # Check if making oracle call 

879 is_oracle_call = any(re.search(pattern, line_content) for pattern in oracle_call_patterns) 

880 

881 if not is_oracle_call: 

882 continue 

883 

884 # Check if wrapped in try/catch 

885 prev_lines = lines[max(0, i-3):i] 

886 has_try = any('try' in prev_line for prev_line in prev_lines) 

887 

888 next_lines = lines[i+1:i+5] 

889 has_catch = any('catch' in next_line for next_line in next_lines) 

890 

891 if not (has_try and has_catch): 

892 vulnerabilities.append(SolidityVulnerability( 

893 vulnerability_type=VulnerabilityType.ORACLE_MANIPULATION, 

894 severity="medium", 

895 title="Missing Oracle Failure Handling", 

896 description=( 

897 f"Oracle call at line {i+1} lacks try/catch error handling. " 

898 f"Oracle failures can cause contract DOS. " 

899 f"Best practice: wrap oracle calls in try/catch with fallback logic." 

900 ), 

901 file_path=file_path, 

902 line_number=i + 1, 

903 function_name=context.get('function', 'unknown'), 

904 contract_name=context.get('contract', 'unknown'), 

905 code_snippet=line_content, 

906 remediation=( 

907 "Add error handling:\n" 

908 "try oracle.latestRoundData() returns (...) {\n" 

909 " // use data\n" 

910 "} catch {\n" 

911 " // fallback logic or revert gracefully\n" 

912 "}" 

913 ), 

914 confidence=70 

915 )) 

916 

917 return vulnerabilities 

918 

919 def _detect_input_validation_issues(self, contract_code: str, file_path: str) -> List[SolidityVulnerability]: 

920 """ 

921 Detect input validation vulnerabilities 

922 

923 WEEK 2 DAY 2: Enhanced Input Validation Detection 

924 Based on OWASP Smart Contract Top 10 2025 - $14.6M in losses 

925 

926 Detection Patterns: 

927 1. Missing address(0) checks for address parameters 

928 2. Missing zero/negative amount checks 

929 3. Missing array bounds validation 

930 4. Unchecked low-level call return values (enhanced) 

931 5. Missing parameter validation in critical functions 

932 6. Unsafe type conversions 

933 """ 

934 vulnerabilities = [] 

935 lines = contract_code.split('\n') 

936 

937 # Extract function contexts for analysis 

938 function_contexts = self._extract_function_contexts(lines) 

939 

940 # Pattern 1: Missing address(0) validation 

941 address_vulns = self._detect_missing_address_validation(lines, file_path, function_contexts) 

942 vulnerabilities.extend(address_vulns) 

943 

944 # Pattern 2: Missing amount/value validation 

945 amount_vulns = self._detect_missing_amount_validation(lines, file_path, function_contexts) 

946 vulnerabilities.extend(amount_vulns) 

947 

948 # Pattern 3: Missing array bounds validation 

949 array_vulns = self._detect_missing_array_bounds(lines, file_path, function_contexts) 

950 vulnerabilities.extend(array_vulns) 

951 

952 # Pattern 4: Enhanced unchecked external calls 

953 external_call_vulns = self._detect_unchecked_external_calls(lines, file_path, function_contexts) 

954 vulnerabilities.extend(external_call_vulns) 

955 

956 # Pattern 5: Unsafe type conversions 

957 conversion_vulns = self._detect_unsafe_type_conversions(lines, file_path, function_contexts) 

958 vulnerabilities.extend(conversion_vulns) 

959 

960 return vulnerabilities 

961 

962 def _detect_missing_address_validation( 

963 self, 

964 lines: List[str], 

965 file_path: str, 

966 contexts: Dict[int, Dict[str, Any]] 

967 ) -> List[SolidityVulnerability]: 

968 """ 

969 Detect missing address(0) validation 

970 

971 OWASP: Lack of Input Validation ($14.6M in losses) 

972 Critical Pattern: Sending funds to address(0) = permanent loss 

973 """ 

974 vulnerabilities = [] 

975 

976 # Track function parameters 

977 for i, line in enumerate(lines): 

978 line_content = line.strip() 

979 context = contexts.get(i, {}) 

980 

981 # Check if it's a function definition with address parameter 

982 if line_content.startswith('function '): 

983 # Extract parameters 

984 if '(' in line_content and ')' in line_content: 

985 params_match = re.search(r'\((.*?)\)', line_content) 

986 if params_match: 

987 params_str = params_match.group(1) 

988 

989 # Find address parameters 

990 address_params = re.findall(r'address\s+(\w+)', params_str) 

991 

992 if address_params: 

993 function_name = context.get('function', 'unknown') 

994 

995 # Check next 20 lines for address(0) validation 

996 check_window = lines[i+1:i+20] 

997 

998 for addr_param in address_params: 

999 has_validation = any( 

1000 f'{addr_param}' in check_line and 

1001 ('address(0)' in check_line or '0x0' in check_line) and 

1002 ('require' in check_line or 'revert' in check_line or 'if' in check_line) 

1003 for check_line in check_window 

1004 ) 

1005 

1006 # Check if it's used in critical operations 

1007 is_critical = any( 

1008 f'{addr_param}' in check_line and 

1009 any(op in check_line for op in ['transfer', 'send', 'call', 'delegatecall', '=']) 

1010 for check_line in check_window 

1011 ) 

1012 

1013 if not has_validation and is_critical: 

1014 vulnerabilities.append(SolidityVulnerability( 

1015 vulnerability_type=VulnerabilityType.LOGIC_ERROR, 

1016 severity="high", 

1017 title="Missing Address Zero Validation", 

1018 description=( 

1019 f"Parameter '{addr_param}' in function '{function_name}' lacks address(0) check. " 

1020 f"OWASP 2025: Lack of Input Validation ($14.6M in losses). " 

1021 f"Funds sent to address(0) are permanently lost - no private key exists. " 

1022 f"This is a common attack vector in 2024-2025." 

1023 ), 

1024 file_path=file_path, 

1025 line_number=i + 1, 

1026 function_name=function_name, 

1027 contract_name=context.get('contract', 'unknown'), 

1028 code_snippet=line_content, 

1029 remediation=( 

1030 f"Add validation:\n" 

1031 f"require({addr_param} != address(0), 'Zero address not allowed');" 

1032 ), 

1033 confidence=85 

1034 )) 

1035 

1036 return vulnerabilities 

1037 

1038 def _detect_missing_amount_validation( 

1039 self, 

1040 lines: List[str], 

1041 file_path: str, 

1042 contexts: Dict[int, Dict[str, Any]] 

1043 ) -> List[SolidityVulnerability]: 

1044 """ 

1045 Detect missing amount/value validation (zero or negative) 

1046 

1047 Common Pattern: Functions accepting amounts without validation 

1048 """ 

1049 vulnerabilities = [] 

1050 

1051 for i, line in enumerate(lines): 

1052 line_content = line.strip() 

1053 context = contexts.get(i, {}) 

1054 

1055 # Check if it's a function with amount/value parameter 

1056 if line_content.startswith('function '): 

1057 if '(' in line_content and ')' in line_content: 

1058 params_match = re.search(r'\((.*?)\)', line_content) 

1059 if params_match: 

1060 params_str = params_match.group(1) 

1061 

1062 # Find amount/value parameters (uint256, uint, int) 

1063 amount_params = re.findall( 

1064 r'(?:uint256|uint|int256|int)\s+(\w*(?:amount|value|quantity|balance|size)\w*)', 

1065 params_str, 

1066 re.IGNORECASE 

1067 ) 

1068 

1069 if amount_params: 

1070 function_name = context.get('function', 'unknown') 

1071 

1072 # Check next 15 lines for validation 

1073 check_window = lines[i+1:i+15] 

1074 

1075 for amount_param in amount_params: 

1076 has_validation = any( 

1077 f'{amount_param}' in check_line and 

1078 ('> 0' in check_line or '!= 0' in check_line or '>=' in check_line) and 

1079 ('require' in check_line or 'revert' in check_line) 

1080 for check_line in check_window 

1081 ) 

1082 

1083 if not has_validation: 

1084 vulnerabilities.append(SolidityVulnerability( 

1085 vulnerability_type=VulnerabilityType.LOGIC_ERROR, 

1086 severity="medium", 

1087 title="Missing Amount Validation", 

1088 description=( 

1089 f"Parameter '{amount_param}' in function '{function_name}' lacks validation. " 

1090 f"Should check: amount > 0 to prevent zero-value operations. " 

1091 f"OWASP 2025: Input Validation ($14.6M in losses)." 

1092 ), 

1093 file_path=file_path, 

1094 line_number=i + 1, 

1095 function_name=function_name, 

1096 contract_name=context.get('contract', 'unknown'), 

1097 code_snippet=line_content, 

1098 remediation=( 

1099 f"Add validation:\n" 

1100 f"require({amount_param} > 0, 'Amount must be greater than zero');" 

1101 ), 

1102 confidence=75 

1103 )) 

1104 

1105 return vulnerabilities 

1106 

1107 def _detect_missing_array_bounds( 

1108 self, 

1109 lines: List[str], 

1110 file_path: str, 

1111 contexts: Dict[int, Dict[str, Any]] 

1112 ) -> List[SolidityVulnerability]: 

1113 """ 

1114 Detect missing array bounds validation 

1115 

1116 Pattern: Array access without length check 

1117 """ 

1118 vulnerabilities = [] 

1119 

1120 for i, line in enumerate(lines): 

1121 line_content = line.strip() 

1122 context = contexts.get(i, {}) 

1123 

1124 if not context.get('in_function'): 

1125 continue 

1126 

1127 # Check for array access patterns 

1128 array_access_pattern = r'(\w+)\[(\w+|\d+)\]' 

1129 matches = re.findall(array_access_pattern, line_content) 

1130 

1131 for array_name, index in matches: 

1132 # Skip if index is a number 

1133 if index.isdigit(): 

1134 continue 

1135 

1136 # Check if there's a bounds check before this line 

1137 prev_lines = lines[max(0, i-5):i] 

1138 has_bounds_check = any( 

1139 f'{index}' in prev_line and 

1140 (f'{array_name}.length' in prev_line or 'length' in prev_line) and 

1141 ('require' in prev_line or 'if' in prev_line or '<' in prev_line) 

1142 for prev_line in prev_lines 

1143 ) 

1144 

1145 if not has_bounds_check: 

1146 vulnerabilities.append(SolidityVulnerability( 

1147 vulnerability_type=VulnerabilityType.LOGIC_ERROR, 

1148 severity="medium", 

1149 title="Missing Array Bounds Validation", 

1150 description=( 

1151 f"Array access '{array_name}[{index}]' lacks bounds checking. " 

1152 f"Out-of-bounds access causes revert but wastes gas. " 

1153 f"OWASP 2025: Input Validation." 

1154 ), 

1155 file_path=file_path, 

1156 line_number=i + 1, 

1157 function_name=context.get('function', 'unknown'), 

1158 contract_name=context.get('contract', 'unknown'), 

1159 code_snippet=line_content, 

1160 remediation=( 

1161 f"Add bounds check:\n" 

1162 f"require({index} < {array_name}.length, 'Index out of bounds');" 

1163 ), 

1164 confidence=70 

1165 )) 

1166 

1167 return vulnerabilities 

1168 

1169 def _detect_unchecked_external_calls( 

1170 self, 

1171 lines: List[str], 

1172 file_path: str, 

1173 contexts: Dict[int, Dict[str, Any]] 

1174 ) -> List[SolidityVulnerability]: 

1175 """ 

1176 Detect unchecked external calls (enhanced) 

1177 

1178 OWASP 2025: Unchecked External Calls ($550.7K in losses) 

1179 Climbed from #10 to #6 in 2025 rankings 

1180 

1181 Pattern: Low-level calls (.call, .delegatecall) without return value check 

1182 """ 

1183 vulnerabilities = [] 

1184 

1185 low_level_calls = [ 

1186 r'\.call\s*\(', 

1187 r'\.delegatecall\s*\(', 

1188 r'\.staticcall\s*\(', 

1189 ] 

1190 

1191 for i, line in enumerate(lines): 

1192 line_content = line.strip() 

1193 context = contexts.get(i, {}) 

1194 

1195 if not context.get('in_function'): 

1196 continue 

1197 

1198 # Check for low-level calls 

1199 for pattern in low_level_calls: 

1200 if re.search(pattern, line_content): 

1201 # Check if return value is captured 

1202 captures_return = re.search(r'(?:bool\s+\w+|[\(\w]+)\s*=.*\.(?:call|delegatecall|staticcall)', line_content) 

1203 

1204 if captures_return: 

1205 # Check if the captured value is validated 

1206 # Extract the variable name 

1207 var_match = re.search(r'(?:bool\s+(\w+)|[\(](\w+))', line_content) 

1208 if var_match: 

1209 var_name = var_match.group(1) or var_match.group(2) 

1210 

1211 # Check next 5 lines for require/if with this variable 

1212 check_window = lines[i+1:i+5] 

1213 has_validation = any( 

1214 var_name in check_line and 

1215 ('require' in check_line or 'if' in check_line or 'assert' in check_line) 

1216 for check_line in check_window 

1217 ) 

1218 

1219 if not has_validation: 

1220 vulnerabilities.append(SolidityVulnerability( 

1221 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL, 

1222 severity="high", 

1223 title="Unchecked External Call Return Value", 

1224 description=( 

1225 f"Low-level call return value '{var_name}' captured but not validated. " 

1226 f"OWASP 2025 #6: Unchecked External Calls ($550.7K in losses). " 

1227 f"Climbed from #10 to #6 in 2025 rankings. " 

1228 f"Failed external calls can cause unexpected behavior if not handled." 

1229 ), 

1230 file_path=file_path, 

1231 line_number=i + 1, 

1232 function_name=context.get('function', 'unknown'), 

1233 contract_name=context.get('contract', 'unknown'), 

1234 code_snippet=line_content, 

1235 remediation=( 

1236 f"Add validation:\n" 

1237 f"require({var_name}, 'External call failed');\n" 

1238 f"// OR use try/catch for better error handling" 

1239 ), 

1240 confidence=90 

1241 )) 

1242 else: 

1243 # Return value not even captured! 

1244 vulnerabilities.append(SolidityVulnerability( 

1245 vulnerability_type=VulnerabilityType.UNCHECKED_LOW_LEVEL_CALL, 

1246 severity="critical", 

1247 title="External Call Return Value Ignored", 

1248 description=( 

1249 f"Low-level call return value completely ignored. " 

1250 f"CRITICAL: OWASP 2025 #6 ($550.7K in losses). " 

1251 f"The call may fail silently causing logic errors or fund loss. " 

1252 f"Always capture and validate external call results." 

1253 ), 

1254 file_path=file_path, 

1255 line_number=i + 1, 

1256 function_name=context.get('function', 'unknown'), 

1257 contract_name=context.get('contract', 'unknown'), 

1258 code_snippet=line_content, 

1259 remediation=( 

1260 "Capture and validate return value:\n" 

1261 "(bool success, ) = target.call(...);\n" 

1262 "require(success, 'External call failed');" 

1263 ), 

1264 confidence=95 

1265 )) 

1266 

1267 return vulnerabilities 

1268 

1269 def _detect_unsafe_type_conversions( 

1270 self, 

1271 lines: List[str], 

1272 file_path: str, 

1273 contexts: Dict[int, Dict[str, Any]] 

1274 ) -> List[SolidityVulnerability]: 

1275 """ 

1276 Detect unsafe type conversions 

1277 

1278 Pattern: Converting between types without validation 

1279 """ 

1280 vulnerabilities = [] 

1281 

1282 conversion_patterns = [ 

1283 r'uint256\s*\(\s*int', # int to uint 

1284 r'uint\s*\(\s*int', 

1285 r'int\s*\(\s*uint', # uint to int 

1286 r'address\s*\(\s*uint', # uint to address 

1287 ] 

1288 

1289 for i, line in enumerate(lines): 

1290 line_content = line.strip() 

1291 context = contexts.get(i, {}) 

1292 

1293 if not context.get('in_function'): 

1294 continue 

1295 

1296 for pattern in conversion_patterns: 

1297 if re.search(pattern, line_content): 

1298 # Check if there's validation nearby 

1299 check_window = lines[max(0, i-2):i+3] 

1300 has_validation = any( 

1301 'require' in check_line or 'assert' in check_line 

1302 for check_line in check_window 

1303 ) 

1304 

1305 if not has_validation: 

1306 vulnerabilities.append(SolidityVulnerability( 

1307 vulnerability_type=VulnerabilityType.LOGIC_ERROR, 

1308 severity="medium", 

1309 title="Unsafe Type Conversion", 

1310 description=( 

1311 f"Type conversion without validation at line {i+1}. " 

1312 f"Converting between signed/unsigned or numeric/address types can cause unexpected behavior. " 

1313 f"OWASP 2025: Input Validation." 

1314 ), 

1315 file_path=file_path, 

1316 line_number=i + 1, 

1317 function_name=context.get('function', 'unknown'), 

1318 contract_name=context.get('contract', 'unknown'), 

1319 code_snippet=line_content, 

1320 remediation=( 

1321 "Add validation before conversion:\n" 

1322 "require(value >= 0, 'Invalid conversion');\n" 

1323 "// OR use SafeCast library for safe conversions" 

1324 ), 

1325 confidence=70 

1326 )) 

1327 

1328 return vulnerabilities 

1329 

1330 def _get_current_function_contract(self, line_index: int, lines: List[str]) -> str: 

1331 """Helper to determine current contract context""" 

1332 current_contract = "unknown" 

1333 

1334 # Look backwards to find most recent contract 

1335 for i in range(line_index, -1, -1): 

1336 line = lines[i].strip() 

1337 if line.startswith('contract ') or line.startswith('abstract contract '): 

1338 match = re.search(r'contract\s+(\w+)', line) 

1339 if match: 

1340 current_contract = match.group(1) 

1341 break 

1342 # Stop looking if we hit another contract boundary 

1343 if line.startswith('contract ') and i < line_index: 

1344 break 

1345 

1346 return current_contract 

1347 

1348 def _initialize_patterns(self) -> Dict[str, List[str]]: 

1349 """Initialize vulnerability pattern detectors""" 

1350 return { 

1351 'reentrancy': [ 

1352 r'\.call\(', 

1353 r'\.transfer\(', 

1354 r'\.send\(' 

1355 ], 

1356 'access_control': [ 

1357 r'function\s+\w+\s*public', 

1358 r'missing.*modifier', 

1359 r'no.*access.*control' 

1360 ], 

1361 'integer_overflow': [ 

1362 r'[\+\-\*\/]', 

1363 r'(balance|amount|total|supply).*[\+\-\*\/]' 

1364 ], 

1365 'unchecked_call': [ 

1366 r'\.call\(', 

1367 r'\.delegatecall\(' 

1368 ], 

1369 'oracle_manipulation': [ 

1370 r'uniswap.*router', 

1371 r'price.*oracle', 

1372 r'getAmountOut' 

1373 ] 

1374 }