Coverage for little_loops / issue_history / parsing.py: 95%

202 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Issue history parsing and scanning functions. 

2 

3Provides functions to parse completed issue files, extract metadata 

4from frontmatter and content, scan directories for issues, and 

5extract file paths from issue content. 

6""" 

7 

8from __future__ import annotations 

9 

10import logging 

11import re 

12import subprocess 

13from datetime import date, datetime 

14from pathlib import Path 

15from typing import Any 

16 

17from little_loops.frontmatter import parse_frontmatter 

18from little_loops.issue_history.models import CompletedIssue 

19from little_loops.text_utils import extract_file_paths 

20 

21logger = logging.getLogger(__name__) 

22 

23 

24def parse_completed_issue( 

25 file_path: Path, *, batch_dates: dict[str, date] | None = None 

26) -> CompletedIssue: 

27 """Parse a completed issue file. 

28 

29 Args: 

30 file_path: Path to the issue markdown file 

31 batch_dates: Optional pre-fetched mapping of filename → add-date from a batch 

32 git log call; when provided, skips the per-file subprocess call. 

33 

34 Returns: 

35 CompletedIssue with parsed metadata 

36 """ 

37 filename = file_path.name 

38 content = file_path.read_text(encoding="utf-8") 

39 

40 # Extract from filename: P[0-5]-[TYPE]-[NNN]-description.md 

41 issue_type = "UNKNOWN" 

42 priority = "P5" 

43 issue_id = "UNKNOWN" 

44 

45 # Match priority 

46 priority_match = re.match(r"^(P\d)", filename) 

47 if priority_match: 

48 priority = priority_match.group(1) 

49 

50 # Match type and ID 

51 type_match = re.search(r"(BUG|ENH|FEAT|EPIC)-(\d+)", filename) 

52 if type_match: 

53 issue_type = type_match.group(1) 

54 issue_id = f"{type_match.group(1)}-{type_match.group(2)}" 

55 

56 # Parse frontmatter once for discovered_by, discovered_date, captured_at 

57 fm = parse_frontmatter(content) 

58 discovered_by = _parse_discovered_by(fm) 

59 captured_at = _parse_captured_at(fm) 

60 discovered_date = _parse_discovered_date(fm) 

61 

62 # Parse completion date from Resolution section or file mtime 

63 completed_at = _parse_completed_at(fm) 

64 completed_date = _parse_completion_date(content, file_path, batch_dates=batch_dates, fm=fm) 

65 

66 return CompletedIssue( 

67 path=file_path, 

68 issue_type=issue_type, 

69 priority=priority, 

70 issue_id=issue_id, 

71 discovered_by=discovered_by, 

72 discovered_date=discovered_date, 

73 completed_date=completed_date, 

74 captured_at=captured_at, 

75 completed_at=completed_at, 

76 ) 

77 

78 

79def _parse_iso_datetime(value: Any) -> datetime | None: 

80 """Parse an ISO 8601 string into a naive datetime, or return None. 

81 

82 Strips a trailing ``Z`` for Python <3.11 compatibility (same convention as 

83 the sibling ``cli/issues/search.py`` implementation). 

84 """ 

85 if not isinstance(value, str) or not value: 

86 return None 

87 try: 

88 return datetime.fromisoformat(value.rstrip("Z")).replace(tzinfo=None) 

89 except ValueError: 

90 return None 

91 

92 

93def _parse_captured_at(fm: dict[str, Any]) -> datetime | None: 

94 """Extract captured_at datetime from parsed frontmatter.""" 

95 return _parse_iso_datetime(fm.get("captured_at")) 

96 

97 

98def _parse_completed_at(fm: dict[str, Any]) -> datetime | None: 

99 """Extract completed_at datetime from parsed frontmatter.""" 

100 return _parse_iso_datetime(fm.get("completed_at")) 

101 

102 

103def _parse_discovered_by(fm: dict[str, Any]) -> str | None: 

104 """Extract discovered_by from parsed frontmatter. 

105 

106 Args: 

107 fm: Parsed frontmatter dictionary 

108 

109 Returns: 

110 discovered_by value or None 

111 """ 

112 value = fm.get("discovered_by") 

113 return value if isinstance(value, str) else None 

114 

115 

116def _batch_completion_dates(_issues_dir: Path) -> dict[str, date]: 

117 """No-op stub kept for legacy callers. 

118 

119 The previous implementation used ``git log --diff-filter=A`` against 

120 ``completed/`` to detect when issue files were *moved* into completion. 

121 With status decoupled from directory location (ENH-1418), files no 

122 longer move on completion; ``completed_at:`` frontmatter is the primary 

123 source of truth, with a per-file ``git log -1`` fallback in 

124 ``_parse_completion_date``. ENH-1420 will backfill ``completed_at`` for 

125 pre-decoupling issues, after which the per-file fallback can also be 

126 removed. 

127 

128 Args: 

129 _issues_dir: Unused (kept for signature stability). 

130 

131 Returns: 

132 An empty mapping. 

133 """ 

134 return {} 

135 

136 

137def _parse_completion_date( 

138 content: str, 

139 file_path: Path, 

140 *, 

141 batch_dates: dict[str, date] | None = None, 

142 fm: dict[str, Any] | None = None, 

143) -> date | None: 

144 """Extract completion date from frontmatter, Resolution section, or git log. 

145 

146 Checks ``completed_at`` frontmatter first (coerced to ``date`` via ``.date()`` 

147 to preserve the existing return type); then the Resolution section regex; 

148 then falls back to batch_dates or a per-file git log call. 

149 

150 Args: 

151 content: File content 

152 file_path: Path for git log fallback 

153 batch_dates: Optional pre-fetched mapping of filename → add-date from a batch 

154 git log call; when provided, skips the per-file subprocess call if the 

155 file is found in the mapping. 

156 fm: Optional pre-parsed frontmatter dict. When absent, frontmatter is 

157 parsed from ``content`` so external callers with no ``fm`` benefit 

158 from the ``completed_at`` check transparently. 

159 

160 Returns: 

161 Completion date or None 

162 """ 

163 # Try completed_at frontmatter first (sub-day resolution source of truth) 

164 if fm is None: 

165 fm = parse_frontmatter(content) 

166 completed_at = _parse_completed_at(fm) 

167 if completed_at is not None: 

168 return completed_at.date() 

169 

170 # Try Resolution section: **Completed/Fixed/Closed/Date**: YYYY-MM-DD 

171 match = re.search(r"\*\*(?:Completed|Fixed|Closed|Date)\*\*:\s*(\d{4}-\d{2}-\d{2})", content) 

172 if match: 

173 try: 

174 return date.fromisoformat(match.group(1)) 

175 except ValueError: 

176 pass 

177 

178 # Check batch map before falling back to per-file git log 

179 if batch_dates is not None: 

180 return batch_dates.get(file_path.name) 

181 

182 # Fallback to git log: most recent commit date for this file (typically 

183 # the close/done commit, since status writes are the latest change). 

184 try: 

185 result = subprocess.run( 

186 ["git", "log", "--format=%as", "-1", "--", str(file_path)], 

187 capture_output=True, 

188 text=True, 

189 cwd=file_path.parent, 

190 ) 

191 if result.returncode == 0 and result.stdout.strip(): 

192 return date.fromisoformat(result.stdout.strip()) 

193 except (OSError, ValueError): 

194 pass 

195 return None 

196 

197 

198def _parse_resolution_action(content: str) -> str: 

199 """Extract resolution action category from issue content. 

200 

201 Categorizes based on Resolution section fields: 

202 - "completed": Normal completion with **Action**: fix/implement 

203 - "rejected": Explicitly rejected (out of scope, not valid) 

204 - "invalid": Invalid reference or spec 

205 - "duplicate": Duplicate of existing issue 

206 - "deferred": Deferred to future work 

207 

208 Args: 

209 content: Issue file content 

210 

211 Returns: 

212 Resolution category string 

213 """ 

214 # Look for Status field patterns 

215 status_match = re.search(r"\*\*Status\*\*:\s*(.+?)(?:\n|$)", content) 

216 if status_match: 

217 status = status_match.group(1).strip().lower() 

218 if "closed" in status: 

219 # Check Reason field for specific category 

220 reason_match = re.search(r"\*\*Reason\*\*:\s*(.+?)(?:\n|$)", content) 

221 if reason_match: 

222 reason = reason_match.group(1).strip().lower() 

223 if "duplicate" in reason: 

224 return "duplicate" 

225 if "invalid" in reason: 

226 return "invalid" 

227 if "deferred" in reason: 

228 return "deferred" 

229 if "rejected" in reason or "out of scope" in reason: 

230 return "rejected" 

231 # Generic closed without specific reason 

232 return "rejected" 

233 

234 # Check for Action field (normal completion) 

235 action_match = re.search(r"\*\*Action\*\*:\s*(.+?)(?:\n|$)", content) 

236 if action_match: 

237 return "completed" 

238 

239 # Default to completed if no resolution section 

240 return "completed" 

241 

242 

243def _detect_processing_agent(content: str, discovered_source: str | None = None) -> str: 

244 """Detect which processing agent handled an issue. 

245 

246 Detection strategy (in priority order): 

247 1. Check discovered_source field for 'll-parallel' or 'll-auto' 

248 2. Check content for '**Log Type**:' field 

249 3. Check content for '**Tool**:' field 

250 4. Default to 'manual' 

251 

252 Args: 

253 content: Issue file content 

254 discovered_source: Optional discovered_source frontmatter value 

255 

256 Returns: 

257 Agent name: 'll-auto', 'll-parallel', or 'manual' 

258 """ 

259 # Check discovered_source first 

260 if discovered_source: 

261 source_lower = discovered_source.lower() 

262 if "ll-parallel" in source_lower: 

263 return "ll-parallel" 

264 if "ll-auto" in source_lower: 

265 return "ll-auto" 

266 

267 # Check Log Type field 

268 log_type_match = re.search(r"\*\*Log Type\*\*:\s*(.+?)(?:\n|$)", content) 

269 if log_type_match: 

270 log_type = log_type_match.group(1).strip().lower() 

271 if "ll-parallel" in log_type: 

272 return "ll-parallel" 

273 if "ll-auto" in log_type: 

274 return "ll-auto" 

275 

276 # Check Tool field 

277 tool_match = re.search(r"\*\*Tool\*\*:\s*(.+?)(?:\n|$)", content) 

278 if tool_match: 

279 tool = tool_match.group(1).strip().lower() 

280 if "ll-parallel" in tool: 

281 return "ll-parallel" 

282 if "ll-auto" in tool: 

283 return "ll-auto" 

284 

285 # Default to manual 

286 return "manual" 

287 

288 

289def scan_completed_issues( 

290 issues_dir: Path, 

291 category_dirs: list[str] | None = None, 

292) -> list[CompletedIssue]: 

293 """Scan type directories for issues with ``status: done`` frontmatter. 

294 

295 Files no longer move into a ``completed/`` subdirectory on completion 

296 (ENH-1418). Completion is detected by ``status: done`` in the file's 

297 YAML frontmatter; files remain in their original type directory 

298 (``bugs/``, ``features/``, ``enhancements/``, ``epics/``). 

299 

300 For backwards compatibility with pre-decoupling repos, a sibling 

301 ``completed/`` directory under ``issues_dir`` is also scanned when 

302 present so legacy completed issues continue to surface. 

303 

304 Args: 

305 issues_dir: Path to ``.issues/`` (the parent of category dirs). 

306 category_dirs: Optional override of category subdirectories to scan. 

307 Defaults to ``["bugs", "features", "enhancements", "epics"]``. 

308 

309 Returns: 

310 List of parsed ``CompletedIssue`` objects, sorted by file path. 

311 """ 

312 issues: list[CompletedIssue] = [] 

313 

314 if not issues_dir.exists(): 

315 return issues 

316 

317 scan_dirs = category_dirs or ["bugs", "features", "enhancements", "epics"] 

318 paths_to_scan: list[Path] = [] 

319 for category_dir in scan_dirs: 

320 category_path = issues_dir / category_dir 

321 if not category_path.exists(): 

322 continue 

323 for file_path in category_path.glob("*.md"): 

324 try: 

325 content = file_path.read_text(encoding="utf-8") 

326 fm = parse_frontmatter(content) 

327 except Exception as e: 

328 logger.warning("Failed to read %s: %s", file_path, e) 

329 continue 

330 if fm.get("status") != "done": 

331 continue 

332 paths_to_scan.append(file_path) 

333 

334 # Legacy completed/ directory (pre-ENH-1418); scan unconditionally 

335 # so older repos keep working until ENH-1420 backfills. 

336 legacy_completed = issues_dir / "completed" 

337 if legacy_completed.exists(): 

338 paths_to_scan.extend(legacy_completed.glob("*.md")) 

339 

340 for file_path in sorted(paths_to_scan): 

341 try: 

342 issue = parse_completed_issue(file_path) 

343 issues.append(issue) 

344 except Exception as e: 

345 logger.warning("Failed to parse %s: %s", file_path, e) 

346 continue 

347 

348 return issues 

349 

350 

351def _parse_discovered_date(fm: dict[str, Any]) -> date | None: 

352 """Extract discovered date from parsed frontmatter. 

353 

354 Prefers ``captured_at`` (ISO datetime, sub-day resolution) when present, 

355 coercing via ``.date()`` to preserve the legacy ``date | None`` return type 

356 so callers in ``summary.py`` / ``analysis.py`` / ``cli/history.py`` don't 

357 need ``.date()`` adjustments. Falls back to ``discovered_date`` on absence 

358 or parse failure. 

359 

360 Args: 

361 fm: Parsed frontmatter dictionary 

362 

363 Returns: 

364 Discovered date or None 

365 """ 

366 captured = _parse_captured_at(fm) 

367 if captured is not None: 

368 return captured.date() 

369 

370 value = fm.get("discovered_date") 

371 if not isinstance(value, str): 

372 return None 

373 try: 

374 return date.fromisoformat(value) 

375 except ValueError: 

376 return None 

377 

378 

379def _extract_subsystem(content: str) -> str | None: 

380 """Extract primary subsystem/directory from issue content. 

381 

382 Args: 

383 content: Issue file content 

384 

385 Returns: 

386 Directory path (e.g., "scripts/little_loops/") or None 

387 """ 

388 # Look for file paths in Location or common patterns 

389 patterns = [ 

390 r"\*\*File\*\*:\s*`?([^`\n]+/)[^/`\n]+`?", # **File**: path/to/file.py 

391 r"`([a-zA-Z_][\w/.-]+/)[^/`]+\.py`", # `path/to/file.py` 

392 ] 

393 

394 for pattern in patterns: 

395 match = re.search(pattern, content) 

396 if match: 

397 return match.group(1) 

398 

399 return None 

400 

401 

402def _extract_paths_from_issue(content: str) -> list[str]: 

403 """Extract all file paths from issue content. 

404 

405 Delegates to :func:`~little_loops.text_utils.extract_file_paths` 

406 and returns results as a sorted list for backward compatibility. 

407 

408 Args: 

409 content: Issue file content 

410 

411 Returns: 

412 Sorted list of file paths found in content 

413 """ 

414 return sorted(extract_file_paths(content)) 

415 

416 

417def _find_test_file(source_path: str, project_root: Path | None = None) -> str | None: 

418 """Find corresponding test file for a source file. 

419 

420 Checks common test file naming patterns: 

421 - tests/test_<name>.py 

422 - tests/<path>/test_<name>.py 

423 - <path>/test_<name>.py 

424 - <path>/<name>_test.py 

425 - <path>/tests/test_<name>.py 

426 

427 Args: 

428 source_path: Path to source file (e.g., "src/core/processor.py") 

429 project_root: Project root for anchoring existence checks. Defaults to CWD. 

430 

431 Returns: 

432 Path to test file if found, None otherwise 

433 """ 

434 if not source_path.endswith(".py"): 

435 return None # Only check Python files for now 

436 

437 path = Path(source_path) 

438 stem = path.stem # filename without extension 

439 parent = str(path.parent) if path.parent != Path(".") else "" 

440 

441 # Generate candidate test file paths 

442 candidates: list[str] = [ 

443 f"tests/test_{stem}.py", 

444 f"{parent}/test_{stem}.py" if parent else f"test_{stem}.py", 

445 f"{parent}/{stem}_test.py" if parent else f"{stem}_test.py", 

446 f"{parent}/tests/test_{stem}.py" if parent else f"tests/test_{stem}.py", 

447 ] 

448 

449 # Add path-aware test locations 

450 if parent: 

451 candidates.append(f"tests/{parent}/test_{stem}.py") 

452 

453 # Project-specific pattern for little-loops 

454 # e.g., scripts/little_loops/foo.py -> scripts/tests/test_foo.py 

455 if source_path.startswith("scripts/little_loops/"): 

456 candidates.append(f"scripts/tests/test_{stem}.py") 

457 

458 for candidate in candidates: 

459 if (project_root / candidate).exists() if project_root else Path(candidate).exists(): 

460 return candidate 

461 

462 return None 

463 

464 

465def scan_active_issues( 

466 issues_dir: Path, 

467 category_dirs: list[str] | None = None, 

468) -> list[tuple[Path, str, str, date | None]]: 

469 """Scan active issue directories. 

470 

471 Args: 

472 issues_dir: Path to .issues/ directory 

473 category_dirs: List of category subdirectory names to scan. When 

474 omitted, defaults to ``["bugs", "features", "enhancements"]`` for 

475 backward compatibility. Pass ``config.issue_categories`` to 

476 include custom project categories. 

477 

478 Returns: 

479 List of (path, issue_type, priority, discovered_date) tuples 

480 """ 

481 results: list[tuple[Path, str, str, date | None]] = [] 

482 

483 for category_dir in category_dirs or ["bugs", "features", "enhancements"]: 

484 category_path = issues_dir / category_dir 

485 if not category_path.exists(): 

486 continue 

487 

488 for file_path in category_path.glob("*.md"): 

489 filename = file_path.name 

490 

491 # Extract priority 

492 priority = "P5" 

493 priority_match = re.match(r"^(P\d)", filename) 

494 if priority_match: 

495 priority = priority_match.group(1) 

496 

497 # Extract type 

498 issue_type = "UNKNOWN" 

499 type_match = re.search(r"(BUG|ENH|FEAT|EPIC)", filename) 

500 if type_match: 

501 issue_type = type_match.group(1) 

502 

503 # Extract discovered date from content 

504 discovered_date = None 

505 try: 

506 content = file_path.read_text(encoding="utf-8") 

507 fm = parse_frontmatter(content) 

508 discovered_date = _parse_discovered_date(fm) 

509 except Exception as e: 

510 logger.warning("Failed to parse %s: %s", file_path, e) 

511 

512 results.append((file_path, issue_type, priority, discovered_date)) 

513 

514 return results