Coverage for little_loops / cli / issues / search.py: 89%

256 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""ll-issues search: Search issues with filters and sorting.""" 

2 

3from __future__ import annotations 

4 

5import argparse 

6import re 

7from collections.abc import Callable 

8from datetime import date, datetime 

9from typing import TYPE_CHECKING 

10 

11from little_loops.cli.output import PRIORITY_COLOR, TYPE_COLOR, colorize, print_json 

12 

13if TYPE_CHECKING: 

14 from pathlib import Path 

15 

16 from little_loops.config import BRConfig 

17 from little_loops.config.features import NextIssueConfig 

18 from little_loops.issue_parser import IssueInfo 

19 

20 

21def _parse_discovered_date(content: str) -> datetime | None: 

22 """Extract creation datetime from YAML frontmatter. 

23 

24 Prefers ``captured_at`` (ISO datetime, sub-day resolution) when present, 

25 falling back to ``discovered_date`` (date-granular). Always returns a 

26 ``datetime`` — regex-only results are normalized to midnight — so sort 

27 comparisons never mix ``date`` and ``datetime``. 

28 """ 

29 from little_loops.frontmatter import parse_frontmatter 

30 

31 fm = parse_frontmatter(content) 

32 captured = fm.get("captured_at") 

33 if isinstance(captured, str) and captured: 

34 try: 

35 return datetime.fromisoformat(captured.rstrip("Z")).replace(tzinfo=None) 

36 except ValueError: 

37 pass 

38 

39 match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL) 

40 if not match: 

41 return None 

42 date_match = re.search(r"discovered_date:\s*(\S+)", match.group(1)) 

43 if not date_match: 

44 return None 

45 date_str = date_match.group(1).strip("\"'") 

46 try: 

47 d = date.fromisoformat(date_str[:10]) 

48 except ValueError: 

49 return None 

50 return datetime.combine(d, datetime.min.time()) 

51 

52 

53def _parse_updated_date(content: str, file_path: Path) -> date | None: 

54 r"""Extract last-activity date from the ## Session Log section, or fall back to file mtime. 

55 

56 Scans for the most recent timestamp entry in the Session Log section. 

57 Entry format: `- \`/ll:cmd\` - YYYY-MM-DDTHH:MM:SS - \`path\`` 

58 Falls back to file mtime when no session log timestamps are found. 

59 """ 

60 import re as _re 

61 

62 _SESSION_LOG_RE = _re.compile( 

63 r"^## Session Log\s*\n+(.*?)(?:\n##|\n---|\Z)", _re.MULTILINE | _re.DOTALL 

64 ) 

65 _TIMESTAMP_RE = _re.compile(r"- `[^`]+` - (\d{4}-\d{2}-\d{2})T\d{2}:\d{2}:\d{2}") 

66 

67 match = _SESSION_LOG_RE.search(content) 

68 if match: 

69 timestamps = _TIMESTAMP_RE.findall(match.group(1)) 

70 if timestamps: 

71 try: 

72 return date.fromisoformat(timestamps[-1]) 

73 except ValueError: 

74 pass 

75 

76 # Fallback to file mtime 

77 try: 

78 return date.fromtimestamp(file_path.stat().st_mtime) 

79 except OSError: 

80 return None 

81 

82 

83def _parse_labels_from_content(content: str) -> list[str]: 

84 """Extract labels from ## Labels section (backtick-wrapped items).""" 

85 match = re.search(r"## Labels\s*\n(.*?)(?:\n##|\Z)", content, re.DOTALL) 

86 if not match: 

87 return [] 

88 return [m.lower() for m in re.findall(r"`([^`]+)`", match.group(1))] 

89 

90 

91def _parse_priority_filter(priority_values: list[str]) -> set[str]: 

92 """Parse priority filter values, supporting ranges like P0-P2.""" 

93 priorities: set[str] = set() 

94 for val in priority_values: 

95 range_match = re.match(r"^(P\d)-(P\d)$", val) 

96 if range_match: 

97 start = int(range_match.group(1)[1:]) 

98 end = int(range_match.group(2)[1:]) 

99 for i in range(start, end + 1): 

100 priorities.add(f"P{i}") 

101 elif re.match(r"^P\d$", val): 

102 priorities.add(val) 

103 return priorities 

104 

105 

106def _load_issues_with_status( 

107 config: BRConfig, 

108 include_open: bool, 

109 include_done: bool, 

110 include_deferred: bool, 

111) -> list[tuple[IssueInfo, str]]: 

112 """Load issues from type directories, tagged with their frontmatter status. 

113 

114 Scans only type-scoped directories (bugs/, features/, etc.) and reads 

115 ``IssueInfo.status`` from frontmatter instead of inferring status from the 

116 directory name. 

117 

118 Returns: 

119 List of (IssueInfo, status) where status is the frontmatter value 

120 (e.g. 'open', 'in_progress', 'blocked', 'done', 'cancelled', 'deferred'). 

121 """ 

122 from little_loops.issue_parser import IssueParser 

123 

124 parser = IssueParser(config) 

125 results: list[tuple[IssueInfo, str]] = [] 

126 

127 for category in config.issue_categories: 

128 issue_dir = config.get_issue_dir(category) 

129 if not issue_dir.exists(): 

130 continue 

131 for f in sorted(issue_dir.glob("*.md")): 

132 try: 

133 issue = parser.parse_file(f) 

134 status = issue.status # frontmatter field, default "open" 

135 if status in ("open", "in_progress", "blocked"): 

136 if include_open: 

137 results.append((issue, status)) 

138 elif status in ("done", "cancelled"): 

139 if include_done: 

140 results.append((issue, status)) 

141 elif status == "deferred": 

142 if include_deferred: 

143 results.append((issue, status)) 

144 elif include_open: 

145 # Unknown status: treat as open 

146 results.append((issue, status)) 

147 except Exception: 

148 continue 

149 

150 return results 

151 

152 

153_STRATEGY_SORT_KEYS: dict[str, tuple[tuple[str, str], ...]] = { 

154 "confidence_first": ( 

155 ("outcome_confidence", "desc"), 

156 ("confidence_score", "desc"), 

157 ("priority", "asc"), 

158 ), 

159 "priority_first": ( 

160 ("priority", "asc"), 

161 ("outcome_confidence", "desc"), 

162 ("confidence_score", "desc"), 

163 ), 

164} 

165 

166 

167def build_sort_key(config: NextIssueConfig) -> Callable[[IssueInfo], tuple]: 

168 """Return a sort-key callable for an IssueInfo list based on NextIssueConfig. 

169 

170 Resolution order: 

171 - If ``config.sort_keys`` is set, it takes precedence over ``config.strategy``. 

172 - Otherwise, the named strategy preset is used. 

173 

174 The returned callable produces a multi-field tuple where each component is 

175 transformed per-field so that a plain ``sorted(..., reverse=False)`` call 

176 yields the desired order. Do NOT pass ``reverse=True`` — mixed directions 

177 are baked into each tuple component. 

178 

179 None-handling (per-field sentinel): 

180 - ``direction="desc"`` (higher = better): component = ``-value`` when set, ``1`` when None. 

181 This mirrors ``-(value or -1)`` — values > 0 sort first, None sorts after 0. 

182 - ``direction="asc"`` (lower = better): component = ``value`` when set, ``9999`` when None. 

183 

184 The schema key ``"priority"`` maps to ``IssueInfo.priority_int``; all other 

185 keys are read directly off the dataclass. 

186 

187 Raises: 

188 ValueError: If ``config.strategy`` is unknown (``from_dict`` normally 

189 guards this, but direct instantiation can bypass validation). 

190 """ 

191 if config.sort_keys is not None: 

192 entries: tuple[tuple[str, str], ...] = tuple( 

193 (sk.key, sk.direction) for sk in config.sort_keys 

194 ) 

195 else: 

196 preset = _STRATEGY_SORT_KEYS.get(config.strategy) 

197 if preset is None: 

198 raise ValueError(f"Unknown strategy: {config.strategy!r}") 

199 entries = preset 

200 

201 def key(issue: IssueInfo) -> tuple: 

202 parts: list[int] = [] 

203 for field_name, direction in entries: 

204 attr = "priority_int" if field_name == "priority" else field_name 

205 value = getattr(issue, attr) 

206 if direction == "desc": 

207 parts.append(-value if value is not None else 1) 

208 else: 

209 parts.append(value if value is not None else 9999) 

210 return tuple(parts) 

211 

212 return key 

213 

214 

215def _sort_issues( 

216 items: list[tuple], 

217 sort_field: str, 

218 descending: bool, 

219) -> list[tuple]: 

220 """Sort issues by the requested field.""" 

221 

222 def key(item: tuple) -> tuple: 

223 issue, _status, disc_date, comp_date, *_rest = item 

224 if sort_field == "priority": 

225 return (issue.priority_int, issue.issue_id) 

226 if sort_field == "id": 

227 m = re.search(r"-(\d+)$", issue.issue_id) 

228 num = int(m.group(1)) if m else 0 

229 return (issue.issue_id.split("-", 1)[0], num) 

230 if sort_field in ("date", "created"): 

231 return (disc_date or datetime.max,) 

232 if sort_field == "completed": 

233 return (comp_date or date.max,) 

234 if sort_field == "type": 

235 return (issue.issue_id.split("-", 1)[0], issue.priority_int, issue.issue_id) 

236 if sort_field == "title": 

237 return (issue.title.lower(),) 

238 if sort_field == "confidence": 

239 score = issue.confidence_score if issue.confidence_score is not None else 9999 

240 return (score,) 

241 if sort_field == "outcome": 

242 score = issue.outcome_confidence if issue.outcome_confidence is not None else 9999 

243 return (score,) 

244 if sort_field == "refinement": 

245 refinement_commands = { 

246 "/ll:verify-issues", 

247 "/ll:refine-issue", 

248 "/ll:tradeoff-review-issues", 

249 "/ll:map-dependencies", 

250 "/ll:ready-issue", 

251 } 

252 counts: dict[str, int] = getattr(issue, "session_command_counts", {}) or {} 

253 total = sum(counts.get(cmd, 0) for cmd in refinement_commands) 

254 return (total,) 

255 return (issue.priority_int, issue.issue_id) 

256 

257 return sorted(items, key=key, reverse=descending) 

258 

259 

260def cmd_search(config: BRConfig, args: argparse.Namespace) -> int: 

261 """Search issues with optional text query, filters, and sorting. 

262 

263 Args: 

264 config: Project configuration 

265 args: Parsed arguments 

266 

267 Returns: 

268 Exit code (0 = success) 

269 """ 

270 # Resolve status flags 

271 status = getattr(args, "status", "open") 

272 if getattr(args, "include_completed", False): 

273 status = "all" 

274 

275 include_open = status in ("open", "in_progress", "blocked", "all") 

276 include_done = status in ("done", "cancelled", "all") 

277 include_deferred = status in ("deferred", "all") 

278 

279 # Load issues 

280 raw = _load_issues_with_status(config, include_open, include_done, include_deferred) 

281 

282 # Parse additional metadata (dates, labels) only when needed 

283 query: str | None = getattr(args, "query", None) 

284 since_date: date | None = None 

285 until_date: date | None = None 

286 raw_since = getattr(args, "since", None) 

287 raw_until = getattr(args, "until", None) 

288 if raw_since: 

289 try: 

290 since_date = date.fromisoformat(raw_since) 

291 except ValueError: 

292 print(f"Invalid --since date: {raw_since!r}. Use YYYY-MM-DD.") 

293 return 1 

294 if raw_until: 

295 try: 

296 until_date = date.fromisoformat(raw_until) 

297 except ValueError: 

298 print(f"Invalid --until date: {raw_until!r}. Use YYYY-MM-DD.") 

299 return 1 

300 

301 sort_field = getattr(args, "sort", "priority") or "priority" 

302 need_content = bool( 

303 query 

304 or since_date 

305 or until_date 

306 or getattr(args, "label", None) 

307 or sort_field in {"date", "created", "completed"} 

308 or getattr(args, "date_field", "discovered") == "updated" 

309 ) 

310 

311 # Build enriched list: (IssueInfo, status, discovered_date, completed_date, labels) 

312 enriched: list[tuple[IssueInfo, str, datetime | None, date | None, list[str]]] = [] 

313 for issue, stat in raw: 

314 if need_content: 

315 try: 

316 content = issue.path.read_text(encoding="utf-8") 

317 except Exception: 

318 content = "" 

319 disc_date = _parse_discovered_date(content) 

320 labels = _parse_labels_from_content(content) 

321 else: 

322 content = "" 

323 disc_date = None 

324 labels = [] 

325 

326 # --- Filter: text query --- 

327 if query: 

328 haystack = (issue.title + "\n" + content).lower() 

329 if query.lower() not in haystack: 

330 continue 

331 

332 # --- Filter: type --- 

333 type_filters: list[str] = getattr(args, "type", None) or [] 

334 if type_filters: 

335 issue_type = issue.issue_id.split("-", 1)[0] 

336 if issue_type not in type_filters: 

337 continue 

338 

339 # --- Filter: priority --- 

340 priority_filters: list[str] = getattr(args, "priority", None) or [] 

341 if priority_filters: 

342 allowed = _parse_priority_filter(priority_filters) 

343 if issue.priority not in allowed: 

344 continue 

345 

346 # --- Filter: label --- 

347 label_filters: list[str] = getattr(args, "label", None) or [] 

348 if label_filters: 

349 if not any(lf.lower() in labels for lf in label_filters): 

350 continue 

351 

352 # --- Filter: date range --- 

353 date_field = getattr(args, "date_field", "discovered") 

354 ref_date: date | None 

355 if date_field == "discovered": 

356 # disc_date is datetime for sort precision; normalize to date for day-granular filters 

357 ref_date = disc_date.date() if disc_date is not None else None 

358 else: 

359 ref_date = _parse_updated_date(content, issue.path) 

360 

361 if since_date and (ref_date is None or ref_date < since_date): 

362 continue 

363 if until_date and (ref_date is None or ref_date > until_date): 

364 continue 

365 

366 comp_date: date | None = None 

367 if sort_field == "completed" and need_content: 

368 from little_loops.issue_history.parsing import _parse_completion_date 

369 

370 comp_date = _parse_completion_date(content, issue.path) 

371 enriched.append((issue, stat, disc_date, comp_date, labels)) 

372 

373 # --- Sort --- 

374 # Default direction: desc for date/created/completed (newest first), asc for everything else 

375 if getattr(args, "desc", False): 

376 descending = True 

377 elif getattr(args, "asc", False): 

378 descending = False 

379 else: 

380 descending = sort_field in {"date", "created", "completed"} 

381 

382 enriched = _sort_issues(enriched, sort_field, descending) 

383 

384 # --- Limit --- 

385 limit = getattr(args, "limit", None) 

386 if limit and limit > 0: 

387 enriched = enriched[:limit] 

388 

389 if not enriched: 

390 print("No issues found.") 

391 return 0 

392 

393 # --- Output --- 

394 issues_out = [item[0] for item in enriched] 

395 statuses_out = [item[1] for item in enriched] 

396 dates_out = [item[2] for item in enriched] 

397 labels_out = [item[4] for item in enriched] 

398 

399 if getattr(args, "json", False): 

400 print_json( 

401 [ 

402 { 

403 "id": issue.issue_id, 

404 "priority": issue.priority, 

405 "type": issue.issue_id.split("-", 1)[0], 

406 "title": issue.title, 

407 "path": str(issue.path), 

408 "status": stat, 

409 "discovered_date": d.date().isoformat() if d else None, 

410 "labels": lbls, 

411 } 

412 for issue, stat, d, lbls in zip( 

413 issues_out, statuses_out, dates_out, labels_out, strict=True 

414 ) 

415 ] 

416 ) 

417 return 0 

418 

419 fmt = getattr(args, "format", "table") or "table" 

420 

421 if fmt == "ids": 

422 for issue in issues_out: 

423 print(issue.issue_id) 

424 return 0 

425 

426 if fmt == "list": 

427 for issue in issues_out: 

428 print(f"{issue.path.name} {issue.title}") 

429 return 0 

430 

431 # Default: table (grouped by type, similar to ll-issues list) 

432 buckets: dict[str, list[tuple[IssueInfo, str]]] = {"BUG": [], "FEAT": [], "ENH": [], "EPIC": []} 

433 for issue, stat in zip(issues_out, statuses_out, strict=True): 

434 prefix = issue.issue_id.split("-", 1)[0] 

435 if prefix in buckets: 

436 buckets[prefix].append((issue, stat)) 

437 

438 type_labels = {"BUG": "Bugs", "FEAT": "Features", "ENH": "Enhancements", "EPIC": "Epics"} 

439 lines: list[str] = [] 

440 for prefix, label in type_labels.items(): 

441 group = buckets[prefix] 

442 if not group: 

443 continue 

444 header = colorize(f"{label} ({len(group)})", f"{TYPE_COLOR.get(prefix, '0')};1") 

445 lines.append(header) 

446 for issue, stat in group: 

447 issue_type = issue.issue_id.split("-", 1)[0] 

448 colored_id = colorize(issue.issue_id, TYPE_COLOR.get(issue_type, "0")) 

449 colored_priority = colorize(issue.priority, PRIORITY_COLOR.get(issue.priority, "0")) 

450 status_tag = f" [{stat}]" if stat not in ("open", "in_progress") else "" 

451 lines.append(f" {colored_priority} {colored_id} {issue.title}{status_tag}") 

452 lines.append("") 

453 lines.append(f"Total: {len(issues_out)} issue(s) found") 

454 print("\n".join(lines)) 

455 return 0