Coverage for little_loops / issue_discovery / search.py: 71%
187 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Issue file search and main discovery functions."""
3from __future__ import annotations
5import re
6import subprocess
7from datetime import datetime
8from pathlib import Path
9from typing import TYPE_CHECKING
11from little_loops.issue_discovery.extraction import (
12 _build_reopen_section,
13 detect_regression_or_duplicate,
14)
15from little_loops.issue_discovery.matching import (
16 FindingMatch,
17 MatchClassification,
18 RegressionEvidence,
19 _calculate_word_overlap,
20 _extract_words,
21 _matches_issue_type,
22)
24if TYPE_CHECKING:
25 from little_loops.config import BRConfig
26 from little_loops.logger import Logger
29# =============================================================================
30# Issue Search Functions
31# =============================================================================
34def _get_all_issue_files(
35 config: BRConfig,
36 include_completed: bool = True,
37 include_deferred: bool = False,
38) -> list[tuple[Path, bool]]:
39 """Get all issue files with their completion status.
41 Status is read from each file's YAML ``status:`` frontmatter (ENH-1418).
42 Files live in their type directories (``bugs/``, ``features/`` etc.)
43 regardless of completion state. ``is_completed`` in the returned tuples
44 is ``True`` for done/cancelled/deferred issues (i.e. non-active).
46 For backwards compatibility, files in legacy ``completed/`` and
47 ``deferred/`` sibling directories are also surfaced when their
48 respective ``include_*`` flag is set.
50 Args:
51 config: Project configuration
52 include_completed: Whether to include completed/cancelled issues
53 include_deferred: Whether to include deferred issues
55 Returns:
56 List of ``(path, is_completed)`` tuples.
57 """
58 from little_loops.frontmatter import parse_frontmatter
60 files: list[tuple[Path, bool]] = []
62 for category in config.issue_categories:
63 issue_dir = config.get_issue_dir(category)
64 if not issue_dir.exists():
65 continue
66 for f in issue_dir.glob("*.md"):
67 try:
68 fm = parse_frontmatter(f.read_text(encoding="utf-8"))
69 except Exception:
70 files.append((f, False))
71 continue
72 status = fm.get("status", "open")
73 if status in ("done", "cancelled"):
74 if include_completed:
75 files.append((f, True))
76 elif status == "deferred":
77 if include_deferred:
78 files.append((f, True))
79 else:
80 files.append((f, False))
82 # Legacy completed/ and deferred/ sibling dirs (pre-ENH-1418)
83 if include_completed:
84 legacy_completed = config.get_completed_dir()
85 if legacy_completed.exists():
86 for f in legacy_completed.glob("*.md"):
87 files.append((f, True))
89 if include_deferred:
90 legacy_deferred = config.get_deferred_dir()
91 if legacy_deferred.exists():
92 for f in legacy_deferred.glob("*.md"):
93 files.append((f, True))
95 return files
98def search_issues_by_content(
99 config: BRConfig,
100 search_terms: list[str],
101 include_completed: bool = True,
102) -> list[tuple[Path, float, bool]]:
103 """Search issues by content with relevance scoring.
105 Args:
106 config: Project configuration
107 search_terms: Terms to search for
108 include_completed: Whether to include completed issues
110 Returns:
111 List of (path, score, is_completed) sorted by score descending
112 """
113 results: list[tuple[Path, float, bool]] = []
114 search_words = set()
115 for term in search_terms:
116 search_words.update(_extract_words(term))
118 if not search_words:
119 return results
121 for issue_path, is_completed in _get_all_issue_files(config, include_completed):
122 try:
123 content = issue_path.read_text(encoding="utf-8")
124 content_words = _extract_words(content)
125 score = _calculate_word_overlap(search_words, content_words)
126 if score > 0.1: # Minimum threshold
127 results.append((issue_path, score, is_completed))
128 except Exception:
129 continue
131 results.sort(key=lambda x: x[1], reverse=True)
132 return results
135def search_issues_by_file_path(
136 config: BRConfig,
137 file_path: str,
138 include_completed: bool = True,
139) -> list[tuple[Path, bool]]:
140 """Search for issues mentioning a specific file path.
142 Args:
143 config: Project configuration
144 file_path: File path to search for
145 include_completed: Whether to include completed issues
147 Returns:
148 List of (issue_path, is_completed) tuples
149 """
150 results: list[tuple[Path, bool]] = []
151 normalized_path = file_path.strip().lower()
153 # Also match partial paths (e.g., "module.py" matches "src/module.py")
154 path_parts = normalized_path.split("/")
155 filename = path_parts[-1] if path_parts else normalized_path
157 for issue_path, is_completed in _get_all_issue_files(config, include_completed):
158 try:
159 content = issue_path.read_text(encoding="utf-8").lower()
160 # Check for exact path or filename match
161 if normalized_path in content or filename in content:
162 results.append((issue_path, is_completed))
163 except Exception:
164 continue
166 return results
169# =============================================================================
170# Main Discovery Functions
171# =============================================================================
174def find_existing_issue(
175 config: BRConfig,
176 finding_type: str,
177 file_path: str | None,
178 finding_title: str,
179 finding_content: str,
180) -> FindingMatch:
181 """Search for an existing issue matching this finding.
183 Uses a multi-pass approach:
184 1. Exact file path match in Location sections
185 2. Title word overlap (>70% = likely duplicate)
186 3. Content overlap analysis
188 For matches to completed issues, performs regression analysis to determine
189 if the match is a regression (fix broke) or invalid fix (never worked).
191 Args:
192 config: Project configuration
193 finding_type: Issue type ("BUG", "ENH", "FEAT")
194 file_path: File path from finding (if any)
195 finding_title: Title of the finding
196 finding_content: Full content/description of finding
198 Returns:
199 FindingMatch with best match details, including classification and
200 regression evidence for completed issue matches
201 """
202 exact_threshold = config.issues.duplicate_detection.exact_threshold
203 similar_threshold = config.issues.duplicate_detection.similar_threshold
205 best_match = FindingMatch(
206 issue_path=None,
207 match_type="none",
208 match_score=0.0,
209 exact_threshold=exact_threshold,
210 similar_threshold=similar_threshold,
211 )
213 # Pass 1: Exact file path match
214 if file_path:
215 path_matches = search_issues_by_file_path(config, file_path)
216 for issue_path, is_completed in path_matches:
217 try:
218 # Check if same type of finding (uses configured categories)
219 issue_type_match = _matches_issue_type(
220 finding_type, issue_path, config, is_completed
221 )
222 if issue_type_match:
223 # Determine classification
224 if is_completed:
225 classification, evidence = detect_regression_or_duplicate(
226 config, issue_path
227 )
228 else:
229 classification = MatchClassification.DUPLICATE
230 evidence = None
232 # High confidence if same file + same type
233 return FindingMatch(
234 issue_path=issue_path,
235 match_type="exact",
236 match_score=0.85,
237 is_completed=is_completed,
238 matched_terms=[file_path],
239 classification=classification,
240 regression_evidence=evidence,
241 exact_threshold=exact_threshold,
242 similar_threshold=similar_threshold,
243 )
244 except Exception:
245 continue
247 # Pass 2: Title similarity
248 title_words = _extract_words(finding_title)
249 if title_words:
250 best_pass2: tuple[Path, bool, float, list[str]] | None = None
251 best_pass2_score = best_match.match_score
252 for issue_path, is_completed in _get_all_issue_files(config):
253 try:
254 # Extract title from issue file
255 content = issue_path.read_text(encoding="utf-8")
256 title_match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE)
257 if title_match:
258 issue_title = title_match.group(1)
259 issue_words = _extract_words(issue_title)
260 overlap = _calculate_word_overlap(title_words, issue_words)
261 if overlap > 0.7 and overlap > best_pass2_score:
262 best_pass2_score = overlap
263 best_pass2 = (
264 issue_path,
265 is_completed,
266 overlap,
267 list(title_words & issue_words),
268 )
269 except Exception:
270 continue
272 # Determine classification once for the single best Pass 2 match
273 if best_pass2 is not None:
274 issue_path, is_completed, overlap, matched_terms = best_pass2
275 if is_completed:
276 classification, evidence = detect_regression_or_duplicate(config, issue_path)
277 else:
278 classification = MatchClassification.DUPLICATE
279 evidence = None
280 best_match = FindingMatch(
281 issue_path=issue_path,
282 match_type="similar",
283 match_score=overlap,
284 is_completed=is_completed,
285 matched_terms=matched_terms,
286 classification=classification,
287 regression_evidence=evidence,
288 exact_threshold=exact_threshold,
289 similar_threshold=similar_threshold,
290 )
292 # Pass 3: Content analysis
293 if best_match.match_score < similar_threshold:
294 content_matches = search_issues_by_content(
295 config,
296 [finding_title, finding_content],
297 )
298 best_pass3: tuple[Path, bool, float] | None = None
299 best_pass3_score = best_match.match_score
300 for issue_path, score, is_completed in content_matches[:5]: # Top 5
301 adjusted_score = score * 0.8 # Content matches are less precise
302 if adjusted_score > best_pass3_score:
303 best_pass3_score = adjusted_score
304 best_pass3 = (issue_path, is_completed, adjusted_score)
306 # Determine classification once for the single best Pass 3 match
307 if best_pass3 is not None:
308 issue_path, is_completed, adjusted_score = best_pass3
309 if is_completed:
310 classification, evidence = detect_regression_or_duplicate(config, issue_path)
311 else:
312 classification = MatchClassification.DUPLICATE
313 evidence = None
314 best_match = FindingMatch(
315 issue_path=issue_path,
316 match_type="content",
317 match_score=adjusted_score,
318 is_completed=is_completed,
319 classification=classification,
320 regression_evidence=evidence,
321 exact_threshold=exact_threshold,
322 similar_threshold=similar_threshold,
323 )
325 # If no match found, classification is NEW_ISSUE (the default)
326 return best_match
329# =============================================================================
330# Issue Reopening and Updating
331# =============================================================================
334def _get_category_from_issue_path(issue_path: Path, config: BRConfig) -> str:
335 """Determine the category for an issue based on its filename.
337 Args:
338 issue_path: Path to issue file
339 config: Project configuration
341 Returns:
342 Category name (e.g., "bugs", "enhancements", "features")
343 """
344 filename = issue_path.name.upper()
345 for category_name, category_config in config.issues.categories.items():
346 if category_config.prefix in filename:
347 return category_name
348 return "bugs" # Default
351def reopen_issue(
352 config: BRConfig,
353 completed_issue_path: Path,
354 reopen_reason: str,
355 new_context: str,
356 source_command: str,
357 logger: Logger,
358 classification: MatchClassification | None = None,
359 regression_evidence: RegressionEvidence | None = None,
360) -> Path | None:
361 """Move issue from completed back to active with Reopened section.
363 Args:
364 config: Project configuration
365 completed_issue_path: Path to issue in completed/
366 reopen_reason: Reason for reopening
367 new_context: New context/findings to add
368 source_command: Command triggering the reopen
369 logger: Logger for output
370 classification: How this issue was classified (regression, invalid_fix, etc.)
371 regression_evidence: Evidence supporting the classification
373 Returns:
374 New path to reopened issue, or None if failed
375 """
376 if not completed_issue_path.exists():
377 logger.error(f"Completed issue not found: {completed_issue_path}")
378 return None
380 # Determine target category directory
381 category = _get_category_from_issue_path(completed_issue_path, config)
382 target_dir = config.get_issue_dir(category)
383 target_dir.mkdir(parents=True, exist_ok=True)
385 target_path = target_dir / completed_issue_path.name
387 # Safety check - don't overwrite a *different* active issue at the
388 # target. If the completed_issue_path is already at target_path
389 # (post-ENH-1418: file lives in its type dir), this is the same file
390 # and we just rewrite frontmatter in place.
391 if target_path.exists() and target_path.resolve() != completed_issue_path.resolve():
392 logger.warning(f"Active issue already exists: {target_path}")
393 return None
395 # Log with classification info if available
396 if classification == MatchClassification.REGRESSION:
397 logger.info(f"Reopening {completed_issue_path.name} as REGRESSION -> {category}/")
398 elif classification == MatchClassification.INVALID_FIX:
399 logger.info(f"Reopening {completed_issue_path.name} as INVALID_FIX -> {category}/")
400 else:
401 logger.info(f"Reopening {completed_issue_path.name} -> {category}/")
403 try:
404 from little_loops.frontmatter import update_frontmatter
406 content = completed_issue_path.read_text(encoding="utf-8")
408 reopen_section = _build_reopen_section(
409 reopen_reason,
410 new_context,
411 source_command,
412 classification,
413 regression_evidence,
414 )
415 content += reopen_section
416 content = update_frontmatter(content, {"status": "open"})
418 same_file = completed_issue_path.resolve() == target_path.resolve()
419 if same_file:
420 # Post-ENH-1418: file already in its type dir; just rewrite content.
421 completed_issue_path.write_text(content, encoding="utf-8")
422 else:
423 # Legacy path: file lives in completed/ — move it back to the type dir.
424 result = subprocess.run(
425 ["git", "mv", str(completed_issue_path), str(target_path)],
426 capture_output=True,
427 text=True,
428 )
429 if result.returncode != 0:
430 logger.warning(f"git mv failed, using manual copy: {result.stderr}")
431 target_path.write_text(content, encoding="utf-8")
432 completed_issue_path.unlink()
433 else:
434 target_path.write_text(content, encoding="utf-8")
436 logger.success(f"Reopened: {target_path.name}")
437 return target_path
439 except Exception as e:
440 logger.error(f"Failed to reopen issue: {e}")
441 return None
444def update_existing_issue(
445 config: BRConfig,
446 issue_path: Path,
447 update_section_name: str,
448 update_content: str,
449 source_command: str,
450 logger: Logger,
451) -> bool:
452 """Add new findings to an existing issue.
454 Args:
455 config: Project configuration
456 issue_path: Path to issue file
457 update_section_name: Name for the update section
458 update_content: Content to add
459 source_command: Command triggering the update
460 logger: Logger for output
462 Returns:
463 True if update succeeded
464 """
465 if not issue_path.exists():
466 logger.error(f"Issue not found: {issue_path}")
467 return False
469 try:
470 content = issue_path.read_text(encoding="utf-8")
472 # Build update section
473 update_section = f"""
475---
477## {update_section_name}
479- **Date**: {datetime.now().strftime("%Y-%m-%d")}
480- **Source**: {source_command}
482{update_content}
483"""
485 # Check if section already exists
486 if f"## {update_section_name}" not in content:
487 content += update_section
488 issue_path.write_text(content, encoding="utf-8")
489 logger.success(f"Updated: {issue_path.name}")
490 else:
491 logger.info(f"Section already exists in {issue_path.name}, skipping")
493 return True
495 except Exception as e:
496 logger.error(f"Failed to update issue: {e}")
497 return False