Coverage for little_loops / cli / issues / search.py: 89%
256 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""ll-issues search: Search issues with filters and sorting."""
3from __future__ import annotations
5import argparse
6import re
7from collections.abc import Callable
8from datetime import date, datetime
9from typing import TYPE_CHECKING
11from little_loops.cli.output import PRIORITY_COLOR, TYPE_COLOR, colorize, print_json
13if TYPE_CHECKING:
14 from pathlib import Path
16 from little_loops.config import BRConfig
17 from little_loops.config.features import NextIssueConfig
18 from little_loops.issue_parser import IssueInfo
21def _parse_discovered_date(content: str) -> datetime | None:
22 """Extract creation datetime from YAML frontmatter.
24 Prefers ``captured_at`` (ISO datetime, sub-day resolution) when present,
25 falling back to ``discovered_date`` (date-granular). Always returns a
26 ``datetime`` — regex-only results are normalized to midnight — so sort
27 comparisons never mix ``date`` and ``datetime``.
28 """
29 from little_loops.frontmatter import parse_frontmatter
31 fm = parse_frontmatter(content)
32 captured = fm.get("captured_at")
33 if isinstance(captured, str) and captured:
34 try:
35 return datetime.fromisoformat(captured.rstrip("Z")).replace(tzinfo=None)
36 except ValueError:
37 pass
39 match = re.match(r"^---\n(.*?)\n---", content, re.DOTALL)
40 if not match:
41 return None
42 date_match = re.search(r"discovered_date:\s*(\S+)", match.group(1))
43 if not date_match:
44 return None
45 date_str = date_match.group(1).strip("\"'")
46 try:
47 d = date.fromisoformat(date_str[:10])
48 except ValueError:
49 return None
50 return datetime.combine(d, datetime.min.time())
53def _parse_updated_date(content: str, file_path: Path) -> date | None:
54 r"""Extract last-activity date from the ## Session Log section, or fall back to file mtime.
56 Scans for the most recent timestamp entry in the Session Log section.
57 Entry format: `- \`/ll:cmd\` - YYYY-MM-DDTHH:MM:SS - \`path\``
58 Falls back to file mtime when no session log timestamps are found.
59 """
60 import re as _re
62 _SESSION_LOG_RE = _re.compile(
63 r"^## Session Log\s*\n+(.*?)(?:\n##|\n---|\Z)", _re.MULTILINE | _re.DOTALL
64 )
65 _TIMESTAMP_RE = _re.compile(r"- `[^`]+` - (\d{4}-\d{2}-\d{2})T\d{2}:\d{2}:\d{2}")
67 match = _SESSION_LOG_RE.search(content)
68 if match:
69 timestamps = _TIMESTAMP_RE.findall(match.group(1))
70 if timestamps:
71 try:
72 return date.fromisoformat(timestamps[-1])
73 except ValueError:
74 pass
76 # Fallback to file mtime
77 try:
78 return date.fromtimestamp(file_path.stat().st_mtime)
79 except OSError:
80 return None
83def _parse_labels_from_content(content: str) -> list[str]:
84 """Extract labels from ## Labels section (backtick-wrapped items)."""
85 match = re.search(r"## Labels\s*\n(.*?)(?:\n##|\Z)", content, re.DOTALL)
86 if not match:
87 return []
88 return [m.lower() for m in re.findall(r"`([^`]+)`", match.group(1))]
91def _parse_priority_filter(priority_values: list[str]) -> set[str]:
92 """Parse priority filter values, supporting ranges like P0-P2."""
93 priorities: set[str] = set()
94 for val in priority_values:
95 range_match = re.match(r"^(P\d)-(P\d)$", val)
96 if range_match:
97 start = int(range_match.group(1)[1:])
98 end = int(range_match.group(2)[1:])
99 for i in range(start, end + 1):
100 priorities.add(f"P{i}")
101 elif re.match(r"^P\d$", val):
102 priorities.add(val)
103 return priorities
106def _load_issues_with_status(
107 config: BRConfig,
108 include_open: bool,
109 include_done: bool,
110 include_deferred: bool,
111) -> list[tuple[IssueInfo, str]]:
112 """Load issues from type directories, tagged with their frontmatter status.
114 Scans only type-scoped directories (bugs/, features/, etc.) and reads
115 ``IssueInfo.status`` from frontmatter instead of inferring status from the
116 directory name.
118 Returns:
119 List of (IssueInfo, status) where status is the frontmatter value
120 (e.g. 'open', 'in_progress', 'blocked', 'done', 'cancelled', 'deferred').
121 """
122 from little_loops.issue_parser import IssueParser
124 parser = IssueParser(config)
125 results: list[tuple[IssueInfo, str]] = []
127 for category in config.issue_categories:
128 issue_dir = config.get_issue_dir(category)
129 if not issue_dir.exists():
130 continue
131 for f in sorted(issue_dir.glob("*.md")):
132 try:
133 issue = parser.parse_file(f)
134 status = issue.status # frontmatter field, default "open"
135 if status in ("open", "in_progress", "blocked"):
136 if include_open:
137 results.append((issue, status))
138 elif status in ("done", "cancelled"):
139 if include_done:
140 results.append((issue, status))
141 elif status == "deferred":
142 if include_deferred:
143 results.append((issue, status))
144 elif include_open:
145 # Unknown status: treat as open
146 results.append((issue, status))
147 except Exception:
148 continue
150 return results
153_STRATEGY_SORT_KEYS: dict[str, tuple[tuple[str, str], ...]] = {
154 "confidence_first": (
155 ("outcome_confidence", "desc"),
156 ("confidence_score", "desc"),
157 ("priority", "asc"),
158 ),
159 "priority_first": (
160 ("priority", "asc"),
161 ("outcome_confidence", "desc"),
162 ("confidence_score", "desc"),
163 ),
164}
167def build_sort_key(config: NextIssueConfig) -> Callable[[IssueInfo], tuple]:
168 """Return a sort-key callable for an IssueInfo list based on NextIssueConfig.
170 Resolution order:
171 - If ``config.sort_keys`` is set, it takes precedence over ``config.strategy``.
172 - Otherwise, the named strategy preset is used.
174 The returned callable produces a multi-field tuple where each component is
175 transformed per-field so that a plain ``sorted(..., reverse=False)`` call
176 yields the desired order. Do NOT pass ``reverse=True`` — mixed directions
177 are baked into each tuple component.
179 None-handling (per-field sentinel):
180 - ``direction="desc"`` (higher = better): component = ``-value`` when set, ``1`` when None.
181 This mirrors ``-(value or -1)`` — values > 0 sort first, None sorts after 0.
182 - ``direction="asc"`` (lower = better): component = ``value`` when set, ``9999`` when None.
184 The schema key ``"priority"`` maps to ``IssueInfo.priority_int``; all other
185 keys are read directly off the dataclass.
187 Raises:
188 ValueError: If ``config.strategy`` is unknown (``from_dict`` normally
189 guards this, but direct instantiation can bypass validation).
190 """
191 if config.sort_keys is not None:
192 entries: tuple[tuple[str, str], ...] = tuple(
193 (sk.key, sk.direction) for sk in config.sort_keys
194 )
195 else:
196 preset = _STRATEGY_SORT_KEYS.get(config.strategy)
197 if preset is None:
198 raise ValueError(f"Unknown strategy: {config.strategy!r}")
199 entries = preset
201 def key(issue: IssueInfo) -> tuple:
202 parts: list[int] = []
203 for field_name, direction in entries:
204 attr = "priority_int" if field_name == "priority" else field_name
205 value = getattr(issue, attr)
206 if direction == "desc":
207 parts.append(-value if value is not None else 1)
208 else:
209 parts.append(value if value is not None else 9999)
210 return tuple(parts)
212 return key
215def _sort_issues(
216 items: list[tuple],
217 sort_field: str,
218 descending: bool,
219) -> list[tuple]:
220 """Sort issues by the requested field."""
222 def key(item: tuple) -> tuple:
223 issue, _status, disc_date, comp_date, *_rest = item
224 if sort_field == "priority":
225 return (issue.priority_int, issue.issue_id)
226 if sort_field == "id":
227 m = re.search(r"-(\d+)$", issue.issue_id)
228 num = int(m.group(1)) if m else 0
229 return (issue.issue_id.split("-", 1)[0], num)
230 if sort_field in ("date", "created"):
231 return (disc_date or datetime.max,)
232 if sort_field == "completed":
233 return (comp_date or date.max,)
234 if sort_field == "type":
235 return (issue.issue_id.split("-", 1)[0], issue.priority_int, issue.issue_id)
236 if sort_field == "title":
237 return (issue.title.lower(),)
238 if sort_field == "confidence":
239 score = issue.confidence_score if issue.confidence_score is not None else 9999
240 return (score,)
241 if sort_field == "outcome":
242 score = issue.outcome_confidence if issue.outcome_confidence is not None else 9999
243 return (score,)
244 if sort_field == "refinement":
245 refinement_commands = {
246 "/ll:verify-issues",
247 "/ll:refine-issue",
248 "/ll:tradeoff-review-issues",
249 "/ll:map-dependencies",
250 "/ll:ready-issue",
251 }
252 counts: dict[str, int] = getattr(issue, "session_command_counts", {}) or {}
253 total = sum(counts.get(cmd, 0) for cmd in refinement_commands)
254 return (total,)
255 return (issue.priority_int, issue.issue_id)
257 return sorted(items, key=key, reverse=descending)
260def cmd_search(config: BRConfig, args: argparse.Namespace) -> int:
261 """Search issues with optional text query, filters, and sorting.
263 Args:
264 config: Project configuration
265 args: Parsed arguments
267 Returns:
268 Exit code (0 = success)
269 """
270 # Resolve status flags
271 status = getattr(args, "status", "open")
272 if getattr(args, "include_completed", False):
273 status = "all"
275 include_open = status in ("open", "in_progress", "blocked", "all")
276 include_done = status in ("done", "cancelled", "all")
277 include_deferred = status in ("deferred", "all")
279 # Load issues
280 raw = _load_issues_with_status(config, include_open, include_done, include_deferred)
282 # Parse additional metadata (dates, labels) only when needed
283 query: str | None = getattr(args, "query", None)
284 since_date: date | None = None
285 until_date: date | None = None
286 raw_since = getattr(args, "since", None)
287 raw_until = getattr(args, "until", None)
288 if raw_since:
289 try:
290 since_date = date.fromisoformat(raw_since)
291 except ValueError:
292 print(f"Invalid --since date: {raw_since!r}. Use YYYY-MM-DD.")
293 return 1
294 if raw_until:
295 try:
296 until_date = date.fromisoformat(raw_until)
297 except ValueError:
298 print(f"Invalid --until date: {raw_until!r}. Use YYYY-MM-DD.")
299 return 1
301 sort_field = getattr(args, "sort", "priority") or "priority"
302 need_content = bool(
303 query
304 or since_date
305 or until_date
306 or getattr(args, "label", None)
307 or sort_field in {"date", "created", "completed"}
308 or getattr(args, "date_field", "discovered") == "updated"
309 )
311 # Build enriched list: (IssueInfo, status, discovered_date, completed_date, labels)
312 enriched: list[tuple[IssueInfo, str, datetime | None, date | None, list[str]]] = []
313 for issue, stat in raw:
314 if need_content:
315 try:
316 content = issue.path.read_text(encoding="utf-8")
317 except Exception:
318 content = ""
319 disc_date = _parse_discovered_date(content)
320 labels = _parse_labels_from_content(content)
321 else:
322 content = ""
323 disc_date = None
324 labels = []
326 # --- Filter: text query ---
327 if query:
328 haystack = (issue.title + "\n" + content).lower()
329 if query.lower() not in haystack:
330 continue
332 # --- Filter: type ---
333 type_filters: list[str] = getattr(args, "type", None) or []
334 if type_filters:
335 issue_type = issue.issue_id.split("-", 1)[0]
336 if issue_type not in type_filters:
337 continue
339 # --- Filter: priority ---
340 priority_filters: list[str] = getattr(args, "priority", None) or []
341 if priority_filters:
342 allowed = _parse_priority_filter(priority_filters)
343 if issue.priority not in allowed:
344 continue
346 # --- Filter: label ---
347 label_filters: list[str] = getattr(args, "label", None) or []
348 if label_filters:
349 if not any(lf.lower() in labels for lf in label_filters):
350 continue
352 # --- Filter: date range ---
353 date_field = getattr(args, "date_field", "discovered")
354 ref_date: date | None
355 if date_field == "discovered":
356 # disc_date is datetime for sort precision; normalize to date for day-granular filters
357 ref_date = disc_date.date() if disc_date is not None else None
358 else:
359 ref_date = _parse_updated_date(content, issue.path)
361 if since_date and (ref_date is None or ref_date < since_date):
362 continue
363 if until_date and (ref_date is None or ref_date > until_date):
364 continue
366 comp_date: date | None = None
367 if sort_field == "completed" and need_content:
368 from little_loops.issue_history.parsing import _parse_completion_date
370 comp_date = _parse_completion_date(content, issue.path)
371 enriched.append((issue, stat, disc_date, comp_date, labels))
373 # --- Sort ---
374 # Default direction: desc for date/created/completed (newest first), asc for everything else
375 if getattr(args, "desc", False):
376 descending = True
377 elif getattr(args, "asc", False):
378 descending = False
379 else:
380 descending = sort_field in {"date", "created", "completed"}
382 enriched = _sort_issues(enriched, sort_field, descending)
384 # --- Limit ---
385 limit = getattr(args, "limit", None)
386 if limit and limit > 0:
387 enriched = enriched[:limit]
389 if not enriched:
390 print("No issues found.")
391 return 0
393 # --- Output ---
394 issues_out = [item[0] for item in enriched]
395 statuses_out = [item[1] for item in enriched]
396 dates_out = [item[2] for item in enriched]
397 labels_out = [item[4] for item in enriched]
399 if getattr(args, "json", False):
400 print_json(
401 [
402 {
403 "id": issue.issue_id,
404 "priority": issue.priority,
405 "type": issue.issue_id.split("-", 1)[0],
406 "title": issue.title,
407 "path": str(issue.path),
408 "status": stat,
409 "discovered_date": d.date().isoformat() if d else None,
410 "labels": lbls,
411 }
412 for issue, stat, d, lbls in zip(
413 issues_out, statuses_out, dates_out, labels_out, strict=True
414 )
415 ]
416 )
417 return 0
419 fmt = getattr(args, "format", "table") or "table"
421 if fmt == "ids":
422 for issue in issues_out:
423 print(issue.issue_id)
424 return 0
426 if fmt == "list":
427 for issue in issues_out:
428 print(f"{issue.path.name} {issue.title}")
429 return 0
431 # Default: table (grouped by type, similar to ll-issues list)
432 buckets: dict[str, list[tuple[IssueInfo, str]]] = {"BUG": [], "FEAT": [], "ENH": [], "EPIC": []}
433 for issue, stat in zip(issues_out, statuses_out, strict=True):
434 prefix = issue.issue_id.split("-", 1)[0]
435 if prefix in buckets:
436 buckets[prefix].append((issue, stat))
438 type_labels = {"BUG": "Bugs", "FEAT": "Features", "ENH": "Enhancements", "EPIC": "Epics"}
439 lines: list[str] = []
440 for prefix, label in type_labels.items():
441 group = buckets[prefix]
442 if not group:
443 continue
444 header = colorize(f"{label} ({len(group)})", f"{TYPE_COLOR.get(prefix, '0')};1")
445 lines.append(header)
446 for issue, stat in group:
447 issue_type = issue.issue_id.split("-", 1)[0]
448 colored_id = colorize(issue.issue_id, TYPE_COLOR.get(issue_type, "0"))
449 colored_priority = colorize(issue.priority, PRIORITY_COLOR.get(issue.priority, "0"))
450 status_tag = f" [{stat}]" if stat not in ("open", "in_progress") else ""
451 lines.append(f" {colored_priority} {colored_id} {issue.title}{status_tag}")
452 lines.append("")
453 lines.append(f"Total: {len(issues_out)} issue(s) found")
454 print("\n".join(lines))
455 return 0