Coverage for little_loops / issue_discovery / search.py: 71%

187 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Issue file search and main discovery functions.""" 

2 

3from __future__ import annotations 

4 

5import re 

6import subprocess 

7from datetime import datetime 

8from pathlib import Path 

9from typing import TYPE_CHECKING 

10 

11from little_loops.issue_discovery.extraction import ( 

12 _build_reopen_section, 

13 detect_regression_or_duplicate, 

14) 

15from little_loops.issue_discovery.matching import ( 

16 FindingMatch, 

17 MatchClassification, 

18 RegressionEvidence, 

19 _calculate_word_overlap, 

20 _extract_words, 

21 _matches_issue_type, 

22) 

23 

24if TYPE_CHECKING: 

25 from little_loops.config import BRConfig 

26 from little_loops.logger import Logger 

27 

28 

29# ============================================================================= 

30# Issue Search Functions 

31# ============================================================================= 

32 

33 

34def _get_all_issue_files( 

35 config: BRConfig, 

36 include_completed: bool = True, 

37 include_deferred: bool = False, 

38) -> list[tuple[Path, bool]]: 

39 """Get all issue files with their completion status. 

40 

41 Status is read from each file's YAML ``status:`` frontmatter (ENH-1418). 

42 Files live in their type directories (``bugs/``, ``features/`` etc.) 

43 regardless of completion state. ``is_completed`` in the returned tuples 

44 is ``True`` for done/cancelled/deferred issues (i.e. non-active). 

45 

46 For backwards compatibility, files in legacy ``completed/`` and 

47 ``deferred/`` sibling directories are also surfaced when their 

48 respective ``include_*`` flag is set. 

49 

50 Args: 

51 config: Project configuration 

52 include_completed: Whether to include completed/cancelled issues 

53 include_deferred: Whether to include deferred issues 

54 

55 Returns: 

56 List of ``(path, is_completed)`` tuples. 

57 """ 

58 from little_loops.frontmatter import parse_frontmatter 

59 

60 files: list[tuple[Path, bool]] = [] 

61 

62 for category in config.issue_categories: 

63 issue_dir = config.get_issue_dir(category) 

64 if not issue_dir.exists(): 

65 continue 

66 for f in issue_dir.glob("*.md"): 

67 try: 

68 fm = parse_frontmatter(f.read_text(encoding="utf-8")) 

69 except Exception: 

70 files.append((f, False)) 

71 continue 

72 status = fm.get("status", "open") 

73 if status in ("done", "cancelled"): 

74 if include_completed: 

75 files.append((f, True)) 

76 elif status == "deferred": 

77 if include_deferred: 

78 files.append((f, True)) 

79 else: 

80 files.append((f, False)) 

81 

82 # Legacy completed/ and deferred/ sibling dirs (pre-ENH-1418) 

83 if include_completed: 

84 legacy_completed = config.get_completed_dir() 

85 if legacy_completed.exists(): 

86 for f in legacy_completed.glob("*.md"): 

87 files.append((f, True)) 

88 

89 if include_deferred: 

90 legacy_deferred = config.get_deferred_dir() 

91 if legacy_deferred.exists(): 

92 for f in legacy_deferred.glob("*.md"): 

93 files.append((f, True)) 

94 

95 return files 

96 

97 

98def search_issues_by_content( 

99 config: BRConfig, 

100 search_terms: list[str], 

101 include_completed: bool = True, 

102) -> list[tuple[Path, float, bool]]: 

103 """Search issues by content with relevance scoring. 

104 

105 Args: 

106 config: Project configuration 

107 search_terms: Terms to search for 

108 include_completed: Whether to include completed issues 

109 

110 Returns: 

111 List of (path, score, is_completed) sorted by score descending 

112 """ 

113 results: list[tuple[Path, float, bool]] = [] 

114 search_words = set() 

115 for term in search_terms: 

116 search_words.update(_extract_words(term)) 

117 

118 if not search_words: 

119 return results 

120 

121 for issue_path, is_completed in _get_all_issue_files(config, include_completed): 

122 try: 

123 content = issue_path.read_text(encoding="utf-8") 

124 content_words = _extract_words(content) 

125 score = _calculate_word_overlap(search_words, content_words) 

126 if score > 0.1: # Minimum threshold 

127 results.append((issue_path, score, is_completed)) 

128 except Exception: 

129 continue 

130 

131 results.sort(key=lambda x: x[1], reverse=True) 

132 return results 

133 

134 

135def search_issues_by_file_path( 

136 config: BRConfig, 

137 file_path: str, 

138 include_completed: bool = True, 

139) -> list[tuple[Path, bool]]: 

140 """Search for issues mentioning a specific file path. 

141 

142 Args: 

143 config: Project configuration 

144 file_path: File path to search for 

145 include_completed: Whether to include completed issues 

146 

147 Returns: 

148 List of (issue_path, is_completed) tuples 

149 """ 

150 results: list[tuple[Path, bool]] = [] 

151 normalized_path = file_path.strip().lower() 

152 

153 # Also match partial paths (e.g., "module.py" matches "src/module.py") 

154 path_parts = normalized_path.split("/") 

155 filename = path_parts[-1] if path_parts else normalized_path 

156 

157 for issue_path, is_completed in _get_all_issue_files(config, include_completed): 

158 try: 

159 content = issue_path.read_text(encoding="utf-8").lower() 

160 # Check for exact path or filename match 

161 if normalized_path in content or filename in content: 

162 results.append((issue_path, is_completed)) 

163 except Exception: 

164 continue 

165 

166 return results 

167 

168 

169# ============================================================================= 

170# Main Discovery Functions 

171# ============================================================================= 

172 

173 

174def find_existing_issue( 

175 config: BRConfig, 

176 finding_type: str, 

177 file_path: str | None, 

178 finding_title: str, 

179 finding_content: str, 

180) -> FindingMatch: 

181 """Search for an existing issue matching this finding. 

182 

183 Uses a multi-pass approach: 

184 1. Exact file path match in Location sections 

185 2. Title word overlap (>70% = likely duplicate) 

186 3. Content overlap analysis 

187 

188 For matches to completed issues, performs regression analysis to determine 

189 if the match is a regression (fix broke) or invalid fix (never worked). 

190 

191 Args: 

192 config: Project configuration 

193 finding_type: Issue type ("BUG", "ENH", "FEAT") 

194 file_path: File path from finding (if any) 

195 finding_title: Title of the finding 

196 finding_content: Full content/description of finding 

197 

198 Returns: 

199 FindingMatch with best match details, including classification and 

200 regression evidence for completed issue matches 

201 """ 

202 exact_threshold = config.issues.duplicate_detection.exact_threshold 

203 similar_threshold = config.issues.duplicate_detection.similar_threshold 

204 

205 best_match = FindingMatch( 

206 issue_path=None, 

207 match_type="none", 

208 match_score=0.0, 

209 exact_threshold=exact_threshold, 

210 similar_threshold=similar_threshold, 

211 ) 

212 

213 # Pass 1: Exact file path match 

214 if file_path: 

215 path_matches = search_issues_by_file_path(config, file_path) 

216 for issue_path, is_completed in path_matches: 

217 try: 

218 # Check if same type of finding (uses configured categories) 

219 issue_type_match = _matches_issue_type( 

220 finding_type, issue_path, config, is_completed 

221 ) 

222 if issue_type_match: 

223 # Determine classification 

224 if is_completed: 

225 classification, evidence = detect_regression_or_duplicate( 

226 config, issue_path 

227 ) 

228 else: 

229 classification = MatchClassification.DUPLICATE 

230 evidence = None 

231 

232 # High confidence if same file + same type 

233 return FindingMatch( 

234 issue_path=issue_path, 

235 match_type="exact", 

236 match_score=0.85, 

237 is_completed=is_completed, 

238 matched_terms=[file_path], 

239 classification=classification, 

240 regression_evidence=evidence, 

241 exact_threshold=exact_threshold, 

242 similar_threshold=similar_threshold, 

243 ) 

244 except Exception: 

245 continue 

246 

247 # Pass 2: Title similarity 

248 title_words = _extract_words(finding_title) 

249 if title_words: 

250 best_pass2: tuple[Path, bool, float, list[str]] | None = None 

251 best_pass2_score = best_match.match_score 

252 for issue_path, is_completed in _get_all_issue_files(config): 

253 try: 

254 # Extract title from issue file 

255 content = issue_path.read_text(encoding="utf-8") 

256 title_match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE) 

257 if title_match: 

258 issue_title = title_match.group(1) 

259 issue_words = _extract_words(issue_title) 

260 overlap = _calculate_word_overlap(title_words, issue_words) 

261 if overlap > 0.7 and overlap > best_pass2_score: 

262 best_pass2_score = overlap 

263 best_pass2 = ( 

264 issue_path, 

265 is_completed, 

266 overlap, 

267 list(title_words & issue_words), 

268 ) 

269 except Exception: 

270 continue 

271 

272 # Determine classification once for the single best Pass 2 match 

273 if best_pass2 is not None: 

274 issue_path, is_completed, overlap, matched_terms = best_pass2 

275 if is_completed: 

276 classification, evidence = detect_regression_or_duplicate(config, issue_path) 

277 else: 

278 classification = MatchClassification.DUPLICATE 

279 evidence = None 

280 best_match = FindingMatch( 

281 issue_path=issue_path, 

282 match_type="similar", 

283 match_score=overlap, 

284 is_completed=is_completed, 

285 matched_terms=matched_terms, 

286 classification=classification, 

287 regression_evidence=evidence, 

288 exact_threshold=exact_threshold, 

289 similar_threshold=similar_threshold, 

290 ) 

291 

292 # Pass 3: Content analysis 

293 if best_match.match_score < similar_threshold: 

294 content_matches = search_issues_by_content( 

295 config, 

296 [finding_title, finding_content], 

297 ) 

298 best_pass3: tuple[Path, bool, float] | None = None 

299 best_pass3_score = best_match.match_score 

300 for issue_path, score, is_completed in content_matches[:5]: # Top 5 

301 adjusted_score = score * 0.8 # Content matches are less precise 

302 if adjusted_score > best_pass3_score: 

303 best_pass3_score = adjusted_score 

304 best_pass3 = (issue_path, is_completed, adjusted_score) 

305 

306 # Determine classification once for the single best Pass 3 match 

307 if best_pass3 is not None: 

308 issue_path, is_completed, adjusted_score = best_pass3 

309 if is_completed: 

310 classification, evidence = detect_regression_or_duplicate(config, issue_path) 

311 else: 

312 classification = MatchClassification.DUPLICATE 

313 evidence = None 

314 best_match = FindingMatch( 

315 issue_path=issue_path, 

316 match_type="content", 

317 match_score=adjusted_score, 

318 is_completed=is_completed, 

319 classification=classification, 

320 regression_evidence=evidence, 

321 exact_threshold=exact_threshold, 

322 similar_threshold=similar_threshold, 

323 ) 

324 

325 # If no match found, classification is NEW_ISSUE (the default) 

326 return best_match 

327 

328 

329# ============================================================================= 

330# Issue Reopening and Updating 

331# ============================================================================= 

332 

333 

334def _get_category_from_issue_path(issue_path: Path, config: BRConfig) -> str: 

335 """Determine the category for an issue based on its filename. 

336 

337 Args: 

338 issue_path: Path to issue file 

339 config: Project configuration 

340 

341 Returns: 

342 Category name (e.g., "bugs", "enhancements", "features") 

343 """ 

344 filename = issue_path.name.upper() 

345 for category_name, category_config in config.issues.categories.items(): 

346 if category_config.prefix in filename: 

347 return category_name 

348 return "bugs" # Default 

349 

350 

351def reopen_issue( 

352 config: BRConfig, 

353 completed_issue_path: Path, 

354 reopen_reason: str, 

355 new_context: str, 

356 source_command: str, 

357 logger: Logger, 

358 classification: MatchClassification | None = None, 

359 regression_evidence: RegressionEvidence | None = None, 

360) -> Path | None: 

361 """Move issue from completed back to active with Reopened section. 

362 

363 Args: 

364 config: Project configuration 

365 completed_issue_path: Path to issue in completed/ 

366 reopen_reason: Reason for reopening 

367 new_context: New context/findings to add 

368 source_command: Command triggering the reopen 

369 logger: Logger for output 

370 classification: How this issue was classified (regression, invalid_fix, etc.) 

371 regression_evidence: Evidence supporting the classification 

372 

373 Returns: 

374 New path to reopened issue, or None if failed 

375 """ 

376 if not completed_issue_path.exists(): 

377 logger.error(f"Completed issue not found: {completed_issue_path}") 

378 return None 

379 

380 # Determine target category directory 

381 category = _get_category_from_issue_path(completed_issue_path, config) 

382 target_dir = config.get_issue_dir(category) 

383 target_dir.mkdir(parents=True, exist_ok=True) 

384 

385 target_path = target_dir / completed_issue_path.name 

386 

387 # Safety check - don't overwrite a *different* active issue at the 

388 # target. If the completed_issue_path is already at target_path 

389 # (post-ENH-1418: file lives in its type dir), this is the same file 

390 # and we just rewrite frontmatter in place. 

391 if target_path.exists() and target_path.resolve() != completed_issue_path.resolve(): 

392 logger.warning(f"Active issue already exists: {target_path}") 

393 return None 

394 

395 # Log with classification info if available 

396 if classification == MatchClassification.REGRESSION: 

397 logger.info(f"Reopening {completed_issue_path.name} as REGRESSION -> {category}/") 

398 elif classification == MatchClassification.INVALID_FIX: 

399 logger.info(f"Reopening {completed_issue_path.name} as INVALID_FIX -> {category}/") 

400 else: 

401 logger.info(f"Reopening {completed_issue_path.name} -> {category}/") 

402 

403 try: 

404 from little_loops.frontmatter import update_frontmatter 

405 

406 content = completed_issue_path.read_text(encoding="utf-8") 

407 

408 reopen_section = _build_reopen_section( 

409 reopen_reason, 

410 new_context, 

411 source_command, 

412 classification, 

413 regression_evidence, 

414 ) 

415 content += reopen_section 

416 content = update_frontmatter(content, {"status": "open"}) 

417 

418 same_file = completed_issue_path.resolve() == target_path.resolve() 

419 if same_file: 

420 # Post-ENH-1418: file already in its type dir; just rewrite content. 

421 completed_issue_path.write_text(content, encoding="utf-8") 

422 else: 

423 # Legacy path: file lives in completed/ — move it back to the type dir. 

424 result = subprocess.run( 

425 ["git", "mv", str(completed_issue_path), str(target_path)], 

426 capture_output=True, 

427 text=True, 

428 ) 

429 if result.returncode != 0: 

430 logger.warning(f"git mv failed, using manual copy: {result.stderr}") 

431 target_path.write_text(content, encoding="utf-8") 

432 completed_issue_path.unlink() 

433 else: 

434 target_path.write_text(content, encoding="utf-8") 

435 

436 logger.success(f"Reopened: {target_path.name}") 

437 return target_path 

438 

439 except Exception as e: 

440 logger.error(f"Failed to reopen issue: {e}") 

441 return None 

442 

443 

444def update_existing_issue( 

445 config: BRConfig, 

446 issue_path: Path, 

447 update_section_name: str, 

448 update_content: str, 

449 source_command: str, 

450 logger: Logger, 

451) -> bool: 

452 """Add new findings to an existing issue. 

453 

454 Args: 

455 config: Project configuration 

456 issue_path: Path to issue file 

457 update_section_name: Name for the update section 

458 update_content: Content to add 

459 source_command: Command triggering the update 

460 logger: Logger for output 

461 

462 Returns: 

463 True if update succeeded 

464 """ 

465 if not issue_path.exists(): 

466 logger.error(f"Issue not found: {issue_path}") 

467 return False 

468 

469 try: 

470 content = issue_path.read_text(encoding="utf-8") 

471 

472 # Build update section 

473 update_section = f""" 

474 

475--- 

476 

477## {update_section_name} 

478 

479- **Date**: {datetime.now().strftime("%Y-%m-%d")} 

480- **Source**: {source_command} 

481 

482{update_content} 

483""" 

484 

485 # Check if section already exists 

486 if f"## {update_section_name}" not in content: 

487 content += update_section 

488 issue_path.write_text(content, encoding="utf-8") 

489 logger.success(f"Updated: {issue_path.name}") 

490 else: 

491 logger.info(f"Section already exists in {issue_path.name}, skipping") 

492 

493 return True 

494 

495 except Exception as e: 

496 logger.error(f"Failed to update issue: {e}") 

497 return False