Coverage for little_loops / issue_parser.py: 97%

329 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Issue file parsing for little-loops. 

2 

3Parses issue markdown files to extract metadata like priority, ID, type, and title. 

4""" 

5 

6from __future__ import annotations 

7 

8import logging 

9import re 

10from dataclasses import dataclass, field 

11from pathlib import Path 

12from typing import TYPE_CHECKING, Any 

13 

14from little_loops.cli_args import _id_matches 

15from little_loops.frontmatter import parse_frontmatter 

16 

17if TYPE_CHECKING: 

18 from little_loops.config import BRConfig 

19 

20 

21logger = logging.getLogger(__name__) 

22 

23# Regex pattern for issue IDs in list items 

24# Matches: "- FEAT-001", "- BUG-123", "* ENH-005", "- FEAT-001 (some note)" 

25# Also handles bold markdown: "- **ENH-1000**: description" 

26ISSUE_ID_PATTERN = re.compile(r"^[-*]\s+\*{0,2}([A-Z]+-\d+)", re.MULTILINE) 

27 

28 

29_NORMALIZED_RE = re.compile(r"^P[0-5]-(BUG|FEAT|ENH|EPIC)-[0-9]{3,}-[a-z0-9-]+\.md$") 

30_ISSUE_TYPE_RE = re.compile(r"-(BUG|FEAT|ENH|EPIC)-") 

31 

32 

33def is_normalized(filename: str) -> bool: 

34 """Check whether an issue filename conforms to naming conventions. 

35 

36 Args: 

37 filename: The basename of the issue file (e.g. 'P2-BUG-010-my-issue.md'). 

38 

39 Returns: 

40 True if the filename matches ``^P[0-5]-(BUG|FEAT|ENH|EPIC)-[0-9]{3,}-[a-z0-9-]+\\.md$``. 

41 """ 

42 return bool(_NORMALIZED_RE.match(filename)) 

43 

44 

45def is_formatted(issue_path: Path, templates_dir: Path | None = None) -> bool: 

46 """Check whether an issue file has been formatted. 

47 

48 An issue is considered formatted if either: 

49 1. Its ## Session Log contains a ``/ll:format-issue`` entry, OR 

50 2. It has all required sections per its type template (structural check). 

51 

52 Args: 

53 issue_path: Path to the issue markdown file. 

54 templates_dir: Optional override for the templates directory. 

55 

56 Returns: 

57 True if the issue is formatted by either criterion, False otherwise. 

58 Returns False for files whose type cannot be determined or whose template 

59 cannot be loaded. 

60 """ 

61 from little_loops.issue_template import load_issue_sections 

62 from little_loops.session_log import parse_session_log 

63 

64 try: 

65 content = issue_path.read_text(encoding="utf-8") 

66 except Exception: 

67 return False 

68 

69 # Criterion 1: /ll:format-issue appears in the session log 

70 if "/ll:format-issue" in parse_session_log(content): 

71 return True 

72 

73 # Criterion 2: all required sections are present as ## headings 

74 type_match = _ISSUE_TYPE_RE.search(issue_path.name) 

75 if not type_match: 

76 return False 

77 issue_type = type_match.group(1) 

78 

79 try: 

80 sections_data = load_issue_sections(issue_type, templates_dir) 

81 except Exception: 

82 return False 

83 

84 required: set[str] = set() 

85 for name, defn in sections_data.get("common_sections", {}).items(): 

86 if defn.get("required") is True and not defn.get("deprecated", False): 

87 required.add(name) 

88 for name, defn in sections_data.get("type_sections", {}).items(): 

89 if defn.get("level") == "required" and not defn.get("deprecated", False): 

90 required.add(name) 

91 

92 if not required: 

93 return True 

94 

95 headings = {m.strip() for m in re.findall(r"^##\s+(.+)$", content, re.MULTILINE)} 

96 return required.issubset(headings) 

97 

98 

99def slugify(text: str) -> str: 

100 """Convert text to slug format for filenames. 

101 

102 Args: 

103 text: Text to convert 

104 

105 Returns: 

106 Lowercase slug with hyphens 

107 """ 

108 text = re.sub(r"[^\w\s-]", "", text) 

109 text = re.sub(r"[-\s]+", "-", text) 

110 return text.strip("-").lower() 

111 

112 

113def get_next_issue_number(config: BRConfig, category: str | None = None) -> int: 

114 """Determine the next globally unique issue number. 

115 

116 Scans ALL issue directories (active and completed) to find the highest 

117 existing number across ALL issue types (BUG, FEAT, ENH). Issue numbers 

118 are globally unique regardless of type. 

119 

120 Args: 

121 config: Project configuration 

122 category: Unused, kept for backwards compatibility 

123 

124 Returns: 

125 Next available issue number (globally unique across all types) 

126 """ 

127 max_num = 0 

128 

129 # Get all known prefixes from configuration 

130 all_prefixes = [cat_config.prefix for cat_config in config.issues.categories.values()] 

131 

132 # Directories to scan: ALL category directories. Status (open/done/deferred) 

133 # now lives in frontmatter, so all issues — active and inactive — are in 

134 # their type dir. We still scan the legacy completed/ and deferred/ dirs 

135 # if they happen to exist (in-flight migration safety). 

136 dirs_to_scan: list[Path] = [] 

137 for cat_name in config.issues.categories: 

138 dirs_to_scan.append(config.get_issue_dir(cat_name)) 

139 legacy_completed = config.project_root / config.issues.base_dir / "completed" 

140 legacy_deferred = config.project_root / config.issues.base_dir / "deferred" 

141 if legacy_completed.exists(): 

142 dirs_to_scan.append(legacy_completed) 

143 if legacy_deferred.exists(): 

144 dirs_to_scan.append(legacy_deferred) 

145 

146 if not all_prefixes: 

147 return max_num + 1 

148 

149 # Pre-compile a single union regex to match any known prefix 

150 prefix_pattern = re.compile(r"(?:" + "|".join(re.escape(p) for p in all_prefixes) + r")-(\d+)") 

151 

152 for dir_path in dirs_to_scan: 

153 if not dir_path.exists(): 

154 continue 

155 for file in dir_path.glob("*.md"): 

156 match = prefix_pattern.search(file.name) 

157 if match: 

158 num = int(match.group(1)) 

159 if num > max_num: 

160 max_num = num 

161 

162 return max_num + 1 

163 

164 

165@dataclass 

166class ProductImpact: 

167 """Product impact assessment for an issue. 

168 

169 Attributes: 

170 goal_alignment: ID of the strategic priority this supports 

171 persona_impact: ID of the persona affected 

172 business_value: Business value assessment (high|medium|low) 

173 user_benefit: Description of how this helps the target user 

174 """ 

175 

176 goal_alignment: str | None = None 

177 persona_impact: str | None = None 

178 business_value: str | None = None # high|medium|low 

179 user_benefit: str | None = None 

180 

181 def to_dict(self) -> dict[str, Any]: 

182 """Convert to dictionary for JSON serialization.""" 

183 return { 

184 "goal_alignment": self.goal_alignment, 

185 "persona_impact": self.persona_impact, 

186 "business_value": self.business_value, 

187 "user_benefit": self.user_benefit, 

188 } 

189 

190 @classmethod 

191 def from_dict(cls, data: dict[str, Any] | None) -> ProductImpact | None: 

192 """Create ProductImpact from dictionary. 

193 

194 Args: 

195 data: Dictionary with product impact fields, or None 

196 

197 Returns: 

198 ProductImpact instance or None if data is None/empty 

199 """ 

200 if not data: 

201 return None 

202 return cls( 

203 goal_alignment=data.get("goal_alignment"), 

204 persona_impact=data.get("persona_impact"), 

205 business_value=data.get("business_value"), 

206 user_benefit=data.get("user_benefit"), 

207 ) 

208 

209 

210@dataclass 

211class IssueInfo: 

212 """Parsed information from an issue file. 

213 

214 Attributes: 

215 path: Path to the issue file 

216 issue_type: Type of issue (e.g., "bugs", "features") 

217 priority: Priority level (e.g., "P0", "P1") 

218 issue_id: Issue identifier (e.g., "BUG-123") 

219 title: Issue title from markdown header 

220 blocked_by: List of issue IDs that block this issue 

221 blocks: List of issue IDs that this issue blocks 

222 discovered_by: Source command/workflow that created this issue 

223 product_impact: Product impact assessment (optional) 

224 effort: Effort estimate (1=low, 2=medium, 3=high), inferred from priority if absent 

225 impact: Impact estimate (1=low, 2=medium, 3=high), inferred from priority if absent 

226 confidence_score: Readiness score (0-100) written by /ll:confidence-check, or None 

227 outcome_confidence: Outcome confidence (0-100) written by /ll:confidence-check, or None 

228 score_complexity: Outcome criterion A – Complexity (0-25), written by /ll:confidence-check, or None 

229 score_test_coverage: Outcome criterion B – Test Coverage (0-25), written by /ll:confidence-check, or None 

230 score_ambiguity: Outcome criterion C – Ambiguity (0-25), written by /ll:confidence-check, or None 

231 score_change_surface: Outcome criterion D – Change Surface (0-25), written by /ll:confidence-check, or None 

232 testable: Whether TDD phase should be applied; False skips TDD, None treated as testable 

233 session_commands: Distinct /ll:* commands found in the ## Session Log section 

234 session_command_counts: Per-command occurrence counts from the ## Session Log section 

235 labels: Labels extracted from the ## Labels section of the issue file 

236 milestone: Sprint or milestone name this issue is assigned to; None if unassigned 

237 status: Issue lifecycle status read from frontmatter; defaults to "open" 

238 parent: Parent issue ID (e.g., EPIC-123); populated from frontmatter `parent:` or deprecated `parent_issue:` 

239 depends_on: List of issue IDs this issue depends on (soft prerequisite) 

240 relates_to: List of related issue IDs; populated from frontmatter `relates_to:` or deprecated `related:` 

241 duplicate_of: Issue ID that this issue duplicates 

242 """ 

243 

244 path: Path 

245 issue_type: str 

246 priority: str 

247 issue_id: str 

248 title: str 

249 blocked_by: list[str] = field(default_factory=list) 

250 blocks: list[str] = field(default_factory=list) 

251 parent: str | None = None 

252 depends_on: list[str] = field(default_factory=list) 

253 relates_to: list[str] = field(default_factory=list) 

254 duplicate_of: str | None = None 

255 discovered_by: str | None = None 

256 epic: str | None = None 

257 product_impact: ProductImpact | None = None 

258 effort: int | None = None 

259 impact: int | None = None 

260 confidence_score: int | None = None 

261 outcome_confidence: int | None = None 

262 score_complexity: int | None = None 

263 score_test_coverage: int | None = None 

264 score_ambiguity: int | None = None 

265 score_change_surface: int | None = None 

266 size: str | None = None 

267 testable: bool | None = None 

268 decision_needed: bool | None = None 

269 missing_artifacts: bool | None = None 

270 implementation_order_risk: bool | None = None 

271 session_commands: list[str] = field(default_factory=list) 

272 session_command_counts: dict[str, int] = field(default_factory=dict) 

273 labels: list[str] = field(default_factory=list) 

274 milestone: str | None = None 

275 status: str = "open" 

276 

277 @property 

278 def priority_int(self) -> int: 

279 """Convert priority to integer for comparison (lower = higher priority).""" 

280 # Support P0-P5 priorities 

281 match = re.match(r"^P(\d+)$", self.priority) 

282 if match: 

283 return int(match.group(1)) 

284 return 99 # Unknown priority sorts last 

285 

286 def to_dict(self) -> dict[str, Any]: 

287 """Convert to dictionary for JSON serialization.""" 

288 return { 

289 "path": str(self.path), 

290 "issue_type": self.issue_type, 

291 "priority": self.priority, 

292 "issue_id": self.issue_id, 

293 "title": self.title, 

294 "blocked_by": self.blocked_by, 

295 "blocks": self.blocks, 

296 "parent": self.parent, 

297 "depends_on": self.depends_on, 

298 "relates_to": self.relates_to, 

299 "duplicate_of": self.duplicate_of, 

300 "discovered_by": self.discovered_by, 

301 "epic": self.epic, 

302 "product_impact": (self.product_impact.to_dict() if self.product_impact else None), 

303 "effort": self.effort, 

304 "impact": self.impact, 

305 "confidence_score": self.confidence_score, 

306 "outcome_confidence": self.outcome_confidence, 

307 "score_complexity": self.score_complexity, 

308 "score_test_coverage": self.score_test_coverage, 

309 "score_ambiguity": self.score_ambiguity, 

310 "score_change_surface": self.score_change_surface, 

311 "size": self.size, 

312 "testable": self.testable, 

313 "decision_needed": self.decision_needed, 

314 "missing_artifacts": self.missing_artifacts, 

315 "implementation_order_risk": self.implementation_order_risk, 

316 "session_commands": self.session_commands, 

317 "session_command_counts": self.session_command_counts, 

318 "labels": self.labels, 

319 "milestone": self.milestone, 

320 "status": self.status, 

321 } 

322 

323 @classmethod 

324 def from_dict(cls, data: dict[str, Any]) -> IssueInfo: 

325 """Create IssueInfo from dictionary.""" 

326 return cls( 

327 path=Path(data["path"]), 

328 issue_type=data["issue_type"], 

329 priority=data["priority"], 

330 issue_id=data["issue_id"], 

331 title=data["title"], 

332 blocked_by=data.get("blocked_by", []), 

333 blocks=data.get("blocks", []), 

334 parent=data.get("parent"), 

335 depends_on=data.get("depends_on", []), 

336 relates_to=data.get("relates_to", []), 

337 duplicate_of=data.get("duplicate_of"), 

338 discovered_by=data.get("discovered_by"), 

339 epic=data.get("epic"), 

340 product_impact=ProductImpact.from_dict(data.get("product_impact")), 

341 effort=data.get("effort"), 

342 impact=data.get("impact"), 

343 confidence_score=data.get("confidence_score"), 

344 outcome_confidence=data.get("outcome_confidence"), 

345 score_complexity=data.get("score_complexity"), 

346 score_test_coverage=data.get("score_test_coverage"), 

347 score_ambiguity=data.get("score_ambiguity"), 

348 score_change_surface=data.get("score_change_surface"), 

349 size=data.get("size"), 

350 testable=data.get("testable"), 

351 decision_needed=data.get("decision_needed"), 

352 missing_artifacts=data.get("missing_artifacts"), 

353 implementation_order_risk=data.get("implementation_order_risk"), 

354 session_commands=data.get("session_commands", []), 

355 session_command_counts=data.get("session_command_counts", {}), 

356 labels=data.get("labels", []), 

357 milestone=data.get("milestone"), 

358 status=data.get("status", "open"), 

359 ) 

360 

361 

362class IssueParser: 

363 """Parses issue files based on project configuration. 

364 

365 Uses BRConfig to understand issue categories, prefixes, and priorities. 

366 """ 

367 

368 def __init__(self, config: BRConfig) -> None: 

369 """Initialize parser with project configuration. 

370 

371 Args: 

372 config: Project configuration 

373 """ 

374 self.config = config 

375 self._build_prefix_map() 

376 

377 def _build_prefix_map(self) -> None: 

378 """Build mapping from issue prefixes to category names.""" 

379 self._prefix_to_category: dict[str, str] = {} 

380 for category_name, category in self.config.issues.categories.items(): 

381 self._prefix_to_category[category.prefix] = category_name 

382 

383 def parse_file(self, issue_path: Path) -> IssueInfo: 

384 """Parse an issue file to extract metadata. 

385 

386 Args: 

387 issue_path: Path to the issue markdown file 

388 

389 Returns: 

390 Parsed IssueInfo 

391 """ 

392 filename = issue_path.name 

393 

394 # Parse priority from filename prefix (e.g., P1-BUG-123-...) 

395 priority = self._parse_priority(filename) 

396 

397 # Parse issue type and ID from filename 

398 issue_type, issue_id = self._parse_type_and_id(filename, issue_path) 

399 

400 # Read content once for all content-based parsing 

401 content = self._read_content(issue_path) 

402 

403 # Parse frontmatter for discovered_by, epic, product impact, effort, and impact 

404 frontmatter = parse_frontmatter(content) 

405 discovered_by = frontmatter.get("discovered_by") 

406 epic = frontmatter.get("epic") 

407 size = frontmatter.get("size") 

408 product_impact = self._parse_product_impact(frontmatter) 

409 effort_raw = frontmatter.get("effort") 

410 impact_raw = frontmatter.get("impact") 

411 effort = int(effort_raw) if effort_raw is not None and str(effort_raw).isdigit() else None 

412 impact = int(impact_raw) if impact_raw is not None and str(impact_raw).isdigit() else None 

413 confidence_raw = frontmatter.get("confidence_score") 

414 outcome_raw = frontmatter.get("outcome_confidence") 

415 confidence_score = ( 

416 int(confidence_raw) 

417 if confidence_raw is not None and str(confidence_raw).isdigit() 

418 else None 

419 ) 

420 outcome_confidence = ( 

421 int(outcome_raw) if outcome_raw is not None and str(outcome_raw).isdigit() else None 

422 ) 

423 complexity_raw = frontmatter.get("score_complexity") 

424 test_coverage_raw = frontmatter.get("score_test_coverage") 

425 ambiguity_raw = frontmatter.get("score_ambiguity") 

426 change_surface_raw = frontmatter.get("score_change_surface") 

427 score_complexity = ( 

428 int(complexity_raw) 

429 if complexity_raw is not None and str(complexity_raw).isdigit() 

430 else None 

431 ) 

432 score_test_coverage = ( 

433 int(test_coverage_raw) 

434 if test_coverage_raw is not None and str(test_coverage_raw).isdigit() 

435 else None 

436 ) 

437 score_ambiguity = ( 

438 int(ambiguity_raw) 

439 if ambiguity_raw is not None and str(ambiguity_raw).isdigit() 

440 else None 

441 ) 

442 score_change_surface = ( 

443 int(change_surface_raw) 

444 if change_surface_raw is not None and str(change_surface_raw).isdigit() 

445 else None 

446 ) 

447 testable_raw = frontmatter.get("testable") 

448 if isinstance(testable_raw, str): 

449 testable_value: bool | None = ( 

450 testable_raw.lower() == "true" 

451 if testable_raw.lower() in ("true", "false") 

452 else None 

453 ) 

454 else: 

455 testable_value = testable_raw 

456 

457 decision_needed_raw = frontmatter.get("decision_needed") 

458 if isinstance(decision_needed_raw, str): 

459 decision_needed_value: bool | None = ( 

460 decision_needed_raw.lower() == "true" 

461 if decision_needed_raw.lower() in ("true", "false") 

462 else None 

463 ) 

464 else: 

465 decision_needed_value = decision_needed_raw 

466 

467 missing_artifacts_raw = frontmatter.get("missing_artifacts") 

468 if isinstance(missing_artifacts_raw, str): 

469 missing_artifacts_value: bool | None = ( 

470 missing_artifacts_raw.lower() == "true" 

471 if missing_artifacts_raw.lower() in ("true", "false") 

472 else None 

473 ) 

474 else: 

475 missing_artifacts_value = missing_artifacts_raw 

476 

477 implementation_order_risk_raw = frontmatter.get("implementation_order_risk") 

478 if isinstance(implementation_order_risk_raw, str): 

479 implementation_order_risk_value: bool | None = ( 

480 implementation_order_risk_raw.lower() == "true" 

481 if implementation_order_risk_raw.lower() in ("true", "false") 

482 else None 

483 ) 

484 else: 

485 implementation_order_risk_value = implementation_order_risk_raw 

486 

487 status = frontmatter.get("status", "open") 

488 if status == "open" and frontmatter.get("completed_at"): 

489 status = "done" 

490 

491 parent = frontmatter.get("parent") 

492 if parent is None and (alias_val := frontmatter.get("parent_issue")): 

493 logger.warning( 

494 "%s: deprecated frontmatter key 'parent_issue' — rename to 'parent'", 

495 issue_path.name, 

496 ) 

497 parent = alias_val 

498 

499 duplicate_of = frontmatter.get("duplicate_of") 

500 

501 relates_to: list[str] = [] 

502 if alias_val := frontmatter.get("related"): 

503 logger.warning( 

504 "%s: deprecated frontmatter key 'related' — rename to 'relates_to'", 

505 issue_path.name, 

506 ) 

507 relates_to = ( 

508 [id.strip() for id in alias_val.strip("\"'").split(",") if id.strip()] 

509 if isinstance(alias_val, str) 

510 else list(alias_val) 

511 ) 

512 

513 depends_on: list[str] = [] 

514 

515 # Parse title: prefer frontmatter title: field, then markdown header, then filename stem 

516 title = frontmatter.get("title") or self._parse_title_from_content(content, issue_path) 

517 blocked_by = self._parse_blocked_by(content) 

518 blocks = self._parse_blocks(content) 

519 

520 # Also read blocked_by/blocks/depends_on/relates_to from frontmatter (canonical format). 

521 # When both sources provide values and they differ, prefer frontmatter and warn 

522 # so stale body sections are surfaced rather than silently merged. 

523 for fm_key, body_ids in ( 

524 ("blocked_by", blocked_by), 

525 ("blocks", blocks), 

526 ("depends_on", depends_on), 

527 ("relates_to", relates_to), 

528 ): 

529 fm_val = frontmatter.get(fm_key) 

530 if not fm_val: 

531 continue 

532 fm_ids = ( 

533 [id.strip() for id in fm_val.strip("\"'").split(",") if id.strip()] 

534 if isinstance(fm_val, str) 

535 else list(fm_val) 

536 ) 

537 if body_ids and set(fm_ids) != set(body_ids): 

538 logger.warning( 

539 "%s: frontmatter %s %s conflicts with body section %s; " 

540 "preferring frontmatter — update or remove the stale body section", 

541 issue_path.name, 

542 fm_key, 

543 fm_ids, 

544 body_ids, 

545 ) 

546 body_ids.clear() 

547 body_ids.extend(fm_ids) 

548 elif not body_ids: 

549 body_ids.extend(fm_ids) 

550 

551 # Parse labels from frontmatter 

552 labels: list[str] = [] 

553 fm_labels = frontmatter.get("labels") 

554 if fm_labels: 

555 if isinstance(fm_labels, str): 

556 labels = [lb.strip() for lb in fm_labels.split(",") if lb.strip()] 

557 else: 

558 labels = [str(lb) for lb in fm_labels] 

559 

560 # Parse milestone from frontmatter 

561 milestone: str | None = frontmatter.get("milestone") or None 

562 

563 # Parse session commands from ## Session Log section 

564 from little_loops.session_log import count_session_commands, parse_session_log 

565 

566 session_commands = parse_session_log(content) 

567 session_command_counts = count_session_commands(content) 

568 

569 return IssueInfo( 

570 path=issue_path, 

571 issue_type=issue_type, 

572 priority=priority, 

573 issue_id=issue_id, 

574 title=title, 

575 blocked_by=blocked_by, 

576 blocks=blocks, 

577 parent=parent, 

578 depends_on=depends_on, 

579 relates_to=relates_to, 

580 duplicate_of=duplicate_of, 

581 discovered_by=discovered_by, 

582 epic=epic, 

583 product_impact=product_impact, 

584 effort=effort, 

585 impact=impact, 

586 confidence_score=confidence_score, 

587 outcome_confidence=outcome_confidence, 

588 score_complexity=score_complexity, 

589 score_test_coverage=score_test_coverage, 

590 score_ambiguity=score_ambiguity, 

591 score_change_surface=score_change_surface, 

592 size=size, 

593 testable=testable_value, 

594 decision_needed=decision_needed_value, 

595 missing_artifacts=missing_artifacts_value, 

596 implementation_order_risk=implementation_order_risk_value, 

597 session_commands=session_commands, 

598 session_command_counts=session_command_counts, 

599 labels=labels, 

600 milestone=milestone, 

601 status=status, 

602 ) 

603 

604 def _parse_priority(self, filename: str) -> str: 

605 """Extract priority from filename. 

606 

607 Args: 

608 filename: Issue filename 

609 

610 Returns: 

611 Priority string (e.g., "P1") or last priority if not found 

612 """ 

613 for priority in self.config.issue_priorities: 

614 if filename.startswith(f"{priority}-"): 

615 return priority 

616 # Default to lowest priority if not found 

617 return self.config.issue_priorities[-1] if self.config.issue_priorities else "P3" 

618 

619 def _get_category_for_prefix(self, prefix: str) -> str: 

620 """Get category name from issue prefix. 

621 

622 Args: 

623 prefix: Issue prefix (e.g., "BUG", "FEAT") 

624 

625 Returns: 

626 Category name (e.g., "bugs", "features"), defaults to "bugs" 

627 """ 

628 return self._prefix_to_category.get(prefix, "bugs") 

629 

630 def _parse_type_and_id(self, filename: str, issue_path: Path) -> tuple[str, str]: 

631 """Extract issue type and ID from filename. 

632 

633 Args: 

634 filename: Issue filename 

635 issue_path: Full path to issue file 

636 

637 Returns: 

638 Tuple of (issue_type, issue_id) 

639 """ 

640 # Try to match known prefixes (BUG, FEAT, ENH, etc.) 

641 for prefix, category in self._prefix_to_category.items(): 

642 pattern = rf"({prefix})-(\d+)" 

643 match = re.search(pattern, filename) 

644 if match: 

645 issue_id = f"{match.group(1)}-{match.group(2)}" 

646 return category, issue_id 

647 

648 # Fall back to inferring category from directory. 

649 parent_name = issue_path.parent.name 

650 for category_name, category_config in self.config.issues.categories.items(): 

651 if parent_name == category_config.dir: 

652 # If the filename uses the standard P[0-5]-NNN-... shape but 

653 # omits the type token, capture the number directly and pair 

654 # it with the directory-derived prefix. Without this, generic 

655 # number scanning would pick up the priority digit instead. 

656 priority_match = re.match(r"^P\d+-(\d+)(?:[-.]|$)", filename) 

657 if priority_match: 

658 return category_name, f"{category_config.prefix}-{priority_match.group(1)}" 

659 issue_id = self._generate_id_from_filename(filename, category_config.prefix) 

660 return category_name, issue_id 

661 

662 # Last resort: use filename as ID 

663 return "bugs", filename.replace(".md", "") 

664 

665 def _generate_id_from_filename(self, filename: str, prefix: str) -> str: 

666 """Generate an issue ID from filename when not explicitly present. 

667 

668 Args: 

669 filename: Issue filename 

670 prefix: Issue prefix to use 

671 

672 Returns: 

673 Generated issue ID 

674 """ 

675 # Strip a leading priority token (e.g. "P2-") so it does not get 

676 # picked up as the issue number by the generic digit scan below. 

677 scan_target = re.sub(r"^P\d+-", "", filename) 

678 numbers = re.findall(r"\d+", scan_target) 

679 if numbers: 

680 return f"{prefix}-{numbers[0]}" 

681 # Use next sequential number instead of hash-based fallback 

682 # This ensures IDs are deterministic and don't collide with existing issues 

683 category = self._get_category_for_prefix(prefix) 

684 next_num = get_next_issue_number(self.config, category) 

685 return f"{prefix}-{next_num:03d}" 

686 

687 def _read_content(self, issue_path: Path) -> str: 

688 """Read file content, returning empty string on error. 

689 

690 Args: 

691 issue_path: Path to issue file 

692 

693 Returns: 

694 File content or empty string on error 

695 """ 

696 try: 

697 return issue_path.read_text(encoding="utf-8") 

698 except Exception as e: 

699 logger.warning("Failed to read %s: %s", issue_path.name, e) 

700 return "" 

701 

702 def _parse_title_from_content(self, content: str, issue_path: Path) -> str: 

703 """Extract title from issue file content. 

704 

705 Args: 

706 content: Pre-read file content 

707 issue_path: Path to issue file (for fallback) 

708 

709 Returns: 

710 Issue title or filename stem as fallback 

711 """ 

712 if content: 

713 # Look for markdown header: # ISSUE-ID: Title 

714 match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE) 

715 if match: 

716 return match.group(1).strip() 

717 # Try first header of any format 

718 match = re.search(r"^#\s+(.+)$", content, re.MULTILINE) 

719 if match: 

720 return match.group(1).strip() 

721 # Fall back to filename 

722 return issue_path.stem 

723 

724 def _parse_section_items(self, content: str, section_name: str) -> list[str]: 

725 """Extract issue IDs from a markdown section. 

726 

727 Finds section header (## Section Name) and extracts issue IDs 

728 from list items until the next section or end of file. 

729 Skips content inside code fences. 

730 

731 Args: 

732 content: File content to parse 

733 section_name: Section name to find (e.g., "Blocked By") 

734 

735 Returns: 

736 List of issue IDs found in the section 

737 """ 

738 if not content: 

739 return [] 

740 

741 # Strip code fences to avoid matching sections in examples 

742 content_without_code = self._strip_code_fences(content) 

743 

744 # Match section header case-insensitively 

745 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$" 

746 match = re.search(section_pattern, content_without_code, re.MULTILINE | re.IGNORECASE) 

747 if not match: 

748 return [] 

749 

750 # Get content after section header until next ## header or end 

751 start = match.end() 

752 next_section = re.search(r"^##\s+", content_without_code[start:], re.MULTILINE) 

753 if next_section: 

754 section_content = content_without_code[start : start + next_section.start()] 

755 else: 

756 section_content = content_without_code[start:] 

757 

758 # Extract issue IDs from list items 

759 issue_ids = ISSUE_ID_PATTERN.findall(section_content) 

760 return issue_ids 

761 

762 def _strip_code_fences(self, content: str) -> str: 

763 """Remove code fence blocks from content. 

764 

765 Replaces content between ``` markers with empty lines to preserve 

766 line numbers while removing code fence content from parsing. 

767 

768 Args: 

769 content: File content 

770 

771 Returns: 

772 Content with code fence blocks replaced by empty lines 

773 """ 

774 # Match code fences: ``` or ```language through closing ``` 

775 result = [] 

776 in_fence = False 

777 for line in content.split("\n"): 

778 if line.startswith("```"): 

779 in_fence = not in_fence 

780 result.append("") # Preserve line count 

781 elif in_fence: 

782 result.append("") # Replace fenced content with empty line 

783 else: 

784 result.append(line) 

785 return "\n".join(result) 

786 

787 def _parse_blocked_by(self, content: str) -> list[str]: 

788 """Extract issue IDs from ## Blocked By section. 

789 

790 Args: 

791 content: File content to parse 

792 

793 Returns: 

794 List of issue IDs that block this issue 

795 """ 

796 return self._parse_section_items(content, "Blocked By") 

797 

798 def _parse_blocks(self, content: str) -> list[str]: 

799 """Extract issue IDs from ## Blocks section. 

800 

801 Args: 

802 content: File content to parse 

803 

804 Returns: 

805 List of issue IDs that this issue blocks 

806 """ 

807 return self._parse_section_items(content, "Blocks") 

808 

809 def _parse_product_impact(self, frontmatter: dict[str, Any]) -> ProductImpact | None: 

810 """Extract product impact from frontmatter. 

811 

812 Args: 

813 frontmatter: Dictionary of frontmatter fields 

814 

815 Returns: 

816 ProductImpact instance if any product fields are present, None otherwise 

817 """ 

818 # Check if any product fields are present 

819 product_fields = ("goal_alignment", "persona_impact", "business_value", "user_benefit") 

820 if not any(frontmatter.get(key) for key in product_fields): 

821 return None 

822 

823 return ProductImpact( 

824 goal_alignment=frontmatter.get("goal_alignment"), 

825 persona_impact=frontmatter.get("persona_impact"), 

826 business_value=frontmatter.get("business_value"), 

827 user_benefit=frontmatter.get("user_benefit"), 

828 ) 

829 

830 

831def find_issues( 

832 config: BRConfig, 

833 category: str | None = None, 

834 skip_ids: set[str] | None = None, 

835 only_ids: list[str] | set[str] | None = None, 

836 type_prefixes: set[str] | None = None, 

837) -> list[IssueInfo]: 

838 """Find all issues matching criteria. 

839 

840 Args: 

841 config: Project configuration 

842 category: Optional category to filter (e.g., "bugs") 

843 skip_ids: Issue IDs to skip 

844 only_ids: If provided, only include these issue IDs. When a list, 

845 results are returned in list order (input sequence preserved). 

846 When a set, results are sorted by priority as usual. 

847 type_prefixes: If provided, only include issues whose ID starts with 

848 one of these prefixes (e.g., {"BUG", "ENH"}) 

849 

850 Returns: 

851 List of IssueInfo sorted by priority, or in only_ids list order when 

852 only_ids is a list 

853 """ 

854 skip_ids = skip_ids or set() 

855 parser = IssueParser(config) 

856 issues: list[IssueInfo] = [] 

857 

858 # Determine which categories to search 

859 if category: 

860 categories = [category] if category in config.issue_categories else [] 

861 else: 

862 categories = config.issue_categories 

863 

864 for cat in categories: 

865 issue_dir = config.get_issue_dir(cat) 

866 if not issue_dir.exists(): 

867 continue 

868 

869 for issue_file in issue_dir.glob("*.md"): 

870 info = parser.parse_file(issue_file) 

871 # Status-based filter: skip done/cancelled/deferred regardless of dir 

872 if info.status in ("done", "cancelled", "deferred"): 

873 continue 

874 # Apply skip filter 

875 if info.issue_id in skip_ids: 

876 continue 

877 # Apply only filter (if specified) 

878 if only_ids is not None and not any(_id_matches(info.issue_id, p) for p in only_ids): 

879 continue 

880 # Apply type filter (if specified) 

881 if type_prefixes is not None: 

882 prefix = info.issue_id.split("-", 1)[0] 

883 if prefix not in type_prefixes: 

884 continue 

885 issues.append(info) 

886 

887 # When only_ids is a list, preserve input order; otherwise sort by priority 

888 if isinstance(only_ids, list): 

889 issues.sort( 

890 key=lambda x: next( 

891 (i for i, p in enumerate(only_ids) if _id_matches(x.issue_id, p)), 

892 len(only_ids), 

893 ) 

894 ) 

895 else: 

896 issues.sort(key=lambda x: (x.priority_int, x.issue_id)) 

897 return issues 

898 

899 

900def find_highest_priority_issue( 

901 config: BRConfig, 

902 category: str | None = None, 

903 skip_ids: set[str] | None = None, 

904 only_ids: set[str] | None = None, 

905 type_prefixes: set[str] | None = None, 

906) -> IssueInfo | None: 

907 """Find the highest priority issue. 

908 

909 Args: 

910 config: Project configuration 

911 category: Optional category to filter 

912 skip_ids: Issue IDs to skip 

913 only_ids: If provided, only include these issue IDs 

914 type_prefixes: If provided, only include issues with these type prefixes 

915 

916 Returns: 

917 Highest priority IssueInfo or None if no issues found 

918 """ 

919 issues = find_issues(config, category, skip_ids, only_ids, type_prefixes) 

920 return issues[0] if issues else None