Coverage for little_loops / issue_parser.py: 97%
329 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Issue file parsing for little-loops.
3Parses issue markdown files to extract metadata like priority, ID, type, and title.
4"""
6from __future__ import annotations
8import logging
9import re
10from dataclasses import dataclass, field
11from pathlib import Path
12from typing import TYPE_CHECKING, Any
14from little_loops.cli_args import _id_matches
15from little_loops.frontmatter import parse_frontmatter
17if TYPE_CHECKING:
18 from little_loops.config import BRConfig
21logger = logging.getLogger(__name__)
23# Regex pattern for issue IDs in list items
24# Matches: "- FEAT-001", "- BUG-123", "* ENH-005", "- FEAT-001 (some note)"
25# Also handles bold markdown: "- **ENH-1000**: description"
26ISSUE_ID_PATTERN = re.compile(r"^[-*]\s+\*{0,2}([A-Z]+-\d+)", re.MULTILINE)
29_NORMALIZED_RE = re.compile(r"^P[0-5]-(BUG|FEAT|ENH|EPIC)-[0-9]{3,}-[a-z0-9-]+\.md$")
30_ISSUE_TYPE_RE = re.compile(r"-(BUG|FEAT|ENH|EPIC)-")
33def is_normalized(filename: str) -> bool:
34 """Check whether an issue filename conforms to naming conventions.
36 Args:
37 filename: The basename of the issue file (e.g. 'P2-BUG-010-my-issue.md').
39 Returns:
40 True if the filename matches ``^P[0-5]-(BUG|FEAT|ENH|EPIC)-[0-9]{3,}-[a-z0-9-]+\\.md$``.
41 """
42 return bool(_NORMALIZED_RE.match(filename))
45def is_formatted(issue_path: Path, templates_dir: Path | None = None) -> bool:
46 """Check whether an issue file has been formatted.
48 An issue is considered formatted if either:
49 1. Its ## Session Log contains a ``/ll:format-issue`` entry, OR
50 2. It has all required sections per its type template (structural check).
52 Args:
53 issue_path: Path to the issue markdown file.
54 templates_dir: Optional override for the templates directory.
56 Returns:
57 True if the issue is formatted by either criterion, False otherwise.
58 Returns False for files whose type cannot be determined or whose template
59 cannot be loaded.
60 """
61 from little_loops.issue_template import load_issue_sections
62 from little_loops.session_log import parse_session_log
64 try:
65 content = issue_path.read_text(encoding="utf-8")
66 except Exception:
67 return False
69 # Criterion 1: /ll:format-issue appears in the session log
70 if "/ll:format-issue" in parse_session_log(content):
71 return True
73 # Criterion 2: all required sections are present as ## headings
74 type_match = _ISSUE_TYPE_RE.search(issue_path.name)
75 if not type_match:
76 return False
77 issue_type = type_match.group(1)
79 try:
80 sections_data = load_issue_sections(issue_type, templates_dir)
81 except Exception:
82 return False
84 required: set[str] = set()
85 for name, defn in sections_data.get("common_sections", {}).items():
86 if defn.get("required") is True and not defn.get("deprecated", False):
87 required.add(name)
88 for name, defn in sections_data.get("type_sections", {}).items():
89 if defn.get("level") == "required" and not defn.get("deprecated", False):
90 required.add(name)
92 if not required:
93 return True
95 headings = {m.strip() for m in re.findall(r"^##\s+(.+)$", content, re.MULTILINE)}
96 return required.issubset(headings)
99def slugify(text: str) -> str:
100 """Convert text to slug format for filenames.
102 Args:
103 text: Text to convert
105 Returns:
106 Lowercase slug with hyphens
107 """
108 text = re.sub(r"[^\w\s-]", "", text)
109 text = re.sub(r"[-\s]+", "-", text)
110 return text.strip("-").lower()
113def get_next_issue_number(config: BRConfig, category: str | None = None) -> int:
114 """Determine the next globally unique issue number.
116 Scans ALL issue directories (active and completed) to find the highest
117 existing number across ALL issue types (BUG, FEAT, ENH). Issue numbers
118 are globally unique regardless of type.
120 Args:
121 config: Project configuration
122 category: Unused, kept for backwards compatibility
124 Returns:
125 Next available issue number (globally unique across all types)
126 """
127 max_num = 0
129 # Get all known prefixes from configuration
130 all_prefixes = [cat_config.prefix for cat_config in config.issues.categories.values()]
132 # Directories to scan: ALL category directories. Status (open/done/deferred)
133 # now lives in frontmatter, so all issues — active and inactive — are in
134 # their type dir. We still scan the legacy completed/ and deferred/ dirs
135 # if they happen to exist (in-flight migration safety).
136 dirs_to_scan: list[Path] = []
137 for cat_name in config.issues.categories:
138 dirs_to_scan.append(config.get_issue_dir(cat_name))
139 legacy_completed = config.project_root / config.issues.base_dir / "completed"
140 legacy_deferred = config.project_root / config.issues.base_dir / "deferred"
141 if legacy_completed.exists():
142 dirs_to_scan.append(legacy_completed)
143 if legacy_deferred.exists():
144 dirs_to_scan.append(legacy_deferred)
146 if not all_prefixes:
147 return max_num + 1
149 # Pre-compile a single union regex to match any known prefix
150 prefix_pattern = re.compile(r"(?:" + "|".join(re.escape(p) for p in all_prefixes) + r")-(\d+)")
152 for dir_path in dirs_to_scan:
153 if not dir_path.exists():
154 continue
155 for file in dir_path.glob("*.md"):
156 match = prefix_pattern.search(file.name)
157 if match:
158 num = int(match.group(1))
159 if num > max_num:
160 max_num = num
162 return max_num + 1
165@dataclass
166class ProductImpact:
167 """Product impact assessment for an issue.
169 Attributes:
170 goal_alignment: ID of the strategic priority this supports
171 persona_impact: ID of the persona affected
172 business_value: Business value assessment (high|medium|low)
173 user_benefit: Description of how this helps the target user
174 """
176 goal_alignment: str | None = None
177 persona_impact: str | None = None
178 business_value: str | None = None # high|medium|low
179 user_benefit: str | None = None
181 def to_dict(self) -> dict[str, Any]:
182 """Convert to dictionary for JSON serialization."""
183 return {
184 "goal_alignment": self.goal_alignment,
185 "persona_impact": self.persona_impact,
186 "business_value": self.business_value,
187 "user_benefit": self.user_benefit,
188 }
190 @classmethod
191 def from_dict(cls, data: dict[str, Any] | None) -> ProductImpact | None:
192 """Create ProductImpact from dictionary.
194 Args:
195 data: Dictionary with product impact fields, or None
197 Returns:
198 ProductImpact instance or None if data is None/empty
199 """
200 if not data:
201 return None
202 return cls(
203 goal_alignment=data.get("goal_alignment"),
204 persona_impact=data.get("persona_impact"),
205 business_value=data.get("business_value"),
206 user_benefit=data.get("user_benefit"),
207 )
210@dataclass
211class IssueInfo:
212 """Parsed information from an issue file.
214 Attributes:
215 path: Path to the issue file
216 issue_type: Type of issue (e.g., "bugs", "features")
217 priority: Priority level (e.g., "P0", "P1")
218 issue_id: Issue identifier (e.g., "BUG-123")
219 title: Issue title from markdown header
220 blocked_by: List of issue IDs that block this issue
221 blocks: List of issue IDs that this issue blocks
222 discovered_by: Source command/workflow that created this issue
223 product_impact: Product impact assessment (optional)
224 effort: Effort estimate (1=low, 2=medium, 3=high), inferred from priority if absent
225 impact: Impact estimate (1=low, 2=medium, 3=high), inferred from priority if absent
226 confidence_score: Readiness score (0-100) written by /ll:confidence-check, or None
227 outcome_confidence: Outcome confidence (0-100) written by /ll:confidence-check, or None
228 score_complexity: Outcome criterion A – Complexity (0-25), written by /ll:confidence-check, or None
229 score_test_coverage: Outcome criterion B – Test Coverage (0-25), written by /ll:confidence-check, or None
230 score_ambiguity: Outcome criterion C – Ambiguity (0-25), written by /ll:confidence-check, or None
231 score_change_surface: Outcome criterion D – Change Surface (0-25), written by /ll:confidence-check, or None
232 testable: Whether TDD phase should be applied; False skips TDD, None treated as testable
233 session_commands: Distinct /ll:* commands found in the ## Session Log section
234 session_command_counts: Per-command occurrence counts from the ## Session Log section
235 labels: Labels extracted from the ## Labels section of the issue file
236 milestone: Sprint or milestone name this issue is assigned to; None if unassigned
237 status: Issue lifecycle status read from frontmatter; defaults to "open"
238 parent: Parent issue ID (e.g., EPIC-123); populated from frontmatter `parent:` or deprecated `parent_issue:`
239 depends_on: List of issue IDs this issue depends on (soft prerequisite)
240 relates_to: List of related issue IDs; populated from frontmatter `relates_to:` or deprecated `related:`
241 duplicate_of: Issue ID that this issue duplicates
242 """
244 path: Path
245 issue_type: str
246 priority: str
247 issue_id: str
248 title: str
249 blocked_by: list[str] = field(default_factory=list)
250 blocks: list[str] = field(default_factory=list)
251 parent: str | None = None
252 depends_on: list[str] = field(default_factory=list)
253 relates_to: list[str] = field(default_factory=list)
254 duplicate_of: str | None = None
255 discovered_by: str | None = None
256 epic: str | None = None
257 product_impact: ProductImpact | None = None
258 effort: int | None = None
259 impact: int | None = None
260 confidence_score: int | None = None
261 outcome_confidence: int | None = None
262 score_complexity: int | None = None
263 score_test_coverage: int | None = None
264 score_ambiguity: int | None = None
265 score_change_surface: int | None = None
266 size: str | None = None
267 testable: bool | None = None
268 decision_needed: bool | None = None
269 missing_artifacts: bool | None = None
270 implementation_order_risk: bool | None = None
271 session_commands: list[str] = field(default_factory=list)
272 session_command_counts: dict[str, int] = field(default_factory=dict)
273 labels: list[str] = field(default_factory=list)
274 milestone: str | None = None
275 status: str = "open"
277 @property
278 def priority_int(self) -> int:
279 """Convert priority to integer for comparison (lower = higher priority)."""
280 # Support P0-P5 priorities
281 match = re.match(r"^P(\d+)$", self.priority)
282 if match:
283 return int(match.group(1))
284 return 99 # Unknown priority sorts last
286 def to_dict(self) -> dict[str, Any]:
287 """Convert to dictionary for JSON serialization."""
288 return {
289 "path": str(self.path),
290 "issue_type": self.issue_type,
291 "priority": self.priority,
292 "issue_id": self.issue_id,
293 "title": self.title,
294 "blocked_by": self.blocked_by,
295 "blocks": self.blocks,
296 "parent": self.parent,
297 "depends_on": self.depends_on,
298 "relates_to": self.relates_to,
299 "duplicate_of": self.duplicate_of,
300 "discovered_by": self.discovered_by,
301 "epic": self.epic,
302 "product_impact": (self.product_impact.to_dict() if self.product_impact else None),
303 "effort": self.effort,
304 "impact": self.impact,
305 "confidence_score": self.confidence_score,
306 "outcome_confidence": self.outcome_confidence,
307 "score_complexity": self.score_complexity,
308 "score_test_coverage": self.score_test_coverage,
309 "score_ambiguity": self.score_ambiguity,
310 "score_change_surface": self.score_change_surface,
311 "size": self.size,
312 "testable": self.testable,
313 "decision_needed": self.decision_needed,
314 "missing_artifacts": self.missing_artifacts,
315 "implementation_order_risk": self.implementation_order_risk,
316 "session_commands": self.session_commands,
317 "session_command_counts": self.session_command_counts,
318 "labels": self.labels,
319 "milestone": self.milestone,
320 "status": self.status,
321 }
323 @classmethod
324 def from_dict(cls, data: dict[str, Any]) -> IssueInfo:
325 """Create IssueInfo from dictionary."""
326 return cls(
327 path=Path(data["path"]),
328 issue_type=data["issue_type"],
329 priority=data["priority"],
330 issue_id=data["issue_id"],
331 title=data["title"],
332 blocked_by=data.get("blocked_by", []),
333 blocks=data.get("blocks", []),
334 parent=data.get("parent"),
335 depends_on=data.get("depends_on", []),
336 relates_to=data.get("relates_to", []),
337 duplicate_of=data.get("duplicate_of"),
338 discovered_by=data.get("discovered_by"),
339 epic=data.get("epic"),
340 product_impact=ProductImpact.from_dict(data.get("product_impact")),
341 effort=data.get("effort"),
342 impact=data.get("impact"),
343 confidence_score=data.get("confidence_score"),
344 outcome_confidence=data.get("outcome_confidence"),
345 score_complexity=data.get("score_complexity"),
346 score_test_coverage=data.get("score_test_coverage"),
347 score_ambiguity=data.get("score_ambiguity"),
348 score_change_surface=data.get("score_change_surface"),
349 size=data.get("size"),
350 testable=data.get("testable"),
351 decision_needed=data.get("decision_needed"),
352 missing_artifacts=data.get("missing_artifacts"),
353 implementation_order_risk=data.get("implementation_order_risk"),
354 session_commands=data.get("session_commands", []),
355 session_command_counts=data.get("session_command_counts", {}),
356 labels=data.get("labels", []),
357 milestone=data.get("milestone"),
358 status=data.get("status", "open"),
359 )
362class IssueParser:
363 """Parses issue files based on project configuration.
365 Uses BRConfig to understand issue categories, prefixes, and priorities.
366 """
368 def __init__(self, config: BRConfig) -> None:
369 """Initialize parser with project configuration.
371 Args:
372 config: Project configuration
373 """
374 self.config = config
375 self._build_prefix_map()
377 def _build_prefix_map(self) -> None:
378 """Build mapping from issue prefixes to category names."""
379 self._prefix_to_category: dict[str, str] = {}
380 for category_name, category in self.config.issues.categories.items():
381 self._prefix_to_category[category.prefix] = category_name
383 def parse_file(self, issue_path: Path) -> IssueInfo:
384 """Parse an issue file to extract metadata.
386 Args:
387 issue_path: Path to the issue markdown file
389 Returns:
390 Parsed IssueInfo
391 """
392 filename = issue_path.name
394 # Parse priority from filename prefix (e.g., P1-BUG-123-...)
395 priority = self._parse_priority(filename)
397 # Parse issue type and ID from filename
398 issue_type, issue_id = self._parse_type_and_id(filename, issue_path)
400 # Read content once for all content-based parsing
401 content = self._read_content(issue_path)
403 # Parse frontmatter for discovered_by, epic, product impact, effort, and impact
404 frontmatter = parse_frontmatter(content)
405 discovered_by = frontmatter.get("discovered_by")
406 epic = frontmatter.get("epic")
407 size = frontmatter.get("size")
408 product_impact = self._parse_product_impact(frontmatter)
409 effort_raw = frontmatter.get("effort")
410 impact_raw = frontmatter.get("impact")
411 effort = int(effort_raw) if effort_raw is not None and str(effort_raw).isdigit() else None
412 impact = int(impact_raw) if impact_raw is not None and str(impact_raw).isdigit() else None
413 confidence_raw = frontmatter.get("confidence_score")
414 outcome_raw = frontmatter.get("outcome_confidence")
415 confidence_score = (
416 int(confidence_raw)
417 if confidence_raw is not None and str(confidence_raw).isdigit()
418 else None
419 )
420 outcome_confidence = (
421 int(outcome_raw) if outcome_raw is not None and str(outcome_raw).isdigit() else None
422 )
423 complexity_raw = frontmatter.get("score_complexity")
424 test_coverage_raw = frontmatter.get("score_test_coverage")
425 ambiguity_raw = frontmatter.get("score_ambiguity")
426 change_surface_raw = frontmatter.get("score_change_surface")
427 score_complexity = (
428 int(complexity_raw)
429 if complexity_raw is not None and str(complexity_raw).isdigit()
430 else None
431 )
432 score_test_coverage = (
433 int(test_coverage_raw)
434 if test_coverage_raw is not None and str(test_coverage_raw).isdigit()
435 else None
436 )
437 score_ambiguity = (
438 int(ambiguity_raw)
439 if ambiguity_raw is not None and str(ambiguity_raw).isdigit()
440 else None
441 )
442 score_change_surface = (
443 int(change_surface_raw)
444 if change_surface_raw is not None and str(change_surface_raw).isdigit()
445 else None
446 )
447 testable_raw = frontmatter.get("testable")
448 if isinstance(testable_raw, str):
449 testable_value: bool | None = (
450 testable_raw.lower() == "true"
451 if testable_raw.lower() in ("true", "false")
452 else None
453 )
454 else:
455 testable_value = testable_raw
457 decision_needed_raw = frontmatter.get("decision_needed")
458 if isinstance(decision_needed_raw, str):
459 decision_needed_value: bool | None = (
460 decision_needed_raw.lower() == "true"
461 if decision_needed_raw.lower() in ("true", "false")
462 else None
463 )
464 else:
465 decision_needed_value = decision_needed_raw
467 missing_artifacts_raw = frontmatter.get("missing_artifacts")
468 if isinstance(missing_artifacts_raw, str):
469 missing_artifacts_value: bool | None = (
470 missing_artifacts_raw.lower() == "true"
471 if missing_artifacts_raw.lower() in ("true", "false")
472 else None
473 )
474 else:
475 missing_artifacts_value = missing_artifacts_raw
477 implementation_order_risk_raw = frontmatter.get("implementation_order_risk")
478 if isinstance(implementation_order_risk_raw, str):
479 implementation_order_risk_value: bool | None = (
480 implementation_order_risk_raw.lower() == "true"
481 if implementation_order_risk_raw.lower() in ("true", "false")
482 else None
483 )
484 else:
485 implementation_order_risk_value = implementation_order_risk_raw
487 status = frontmatter.get("status", "open")
488 if status == "open" and frontmatter.get("completed_at"):
489 status = "done"
491 parent = frontmatter.get("parent")
492 if parent is None and (alias_val := frontmatter.get("parent_issue")):
493 logger.warning(
494 "%s: deprecated frontmatter key 'parent_issue' — rename to 'parent'",
495 issue_path.name,
496 )
497 parent = alias_val
499 duplicate_of = frontmatter.get("duplicate_of")
501 relates_to: list[str] = []
502 if alias_val := frontmatter.get("related"):
503 logger.warning(
504 "%s: deprecated frontmatter key 'related' — rename to 'relates_to'",
505 issue_path.name,
506 )
507 relates_to = (
508 [id.strip() for id in alias_val.strip("\"'").split(",") if id.strip()]
509 if isinstance(alias_val, str)
510 else list(alias_val)
511 )
513 depends_on: list[str] = []
515 # Parse title: prefer frontmatter title: field, then markdown header, then filename stem
516 title = frontmatter.get("title") or self._parse_title_from_content(content, issue_path)
517 blocked_by = self._parse_blocked_by(content)
518 blocks = self._parse_blocks(content)
520 # Also read blocked_by/blocks/depends_on/relates_to from frontmatter (canonical format).
521 # When both sources provide values and they differ, prefer frontmatter and warn
522 # so stale body sections are surfaced rather than silently merged.
523 for fm_key, body_ids in (
524 ("blocked_by", blocked_by),
525 ("blocks", blocks),
526 ("depends_on", depends_on),
527 ("relates_to", relates_to),
528 ):
529 fm_val = frontmatter.get(fm_key)
530 if not fm_val:
531 continue
532 fm_ids = (
533 [id.strip() for id in fm_val.strip("\"'").split(",") if id.strip()]
534 if isinstance(fm_val, str)
535 else list(fm_val)
536 )
537 if body_ids and set(fm_ids) != set(body_ids):
538 logger.warning(
539 "%s: frontmatter %s %s conflicts with body section %s; "
540 "preferring frontmatter — update or remove the stale body section",
541 issue_path.name,
542 fm_key,
543 fm_ids,
544 body_ids,
545 )
546 body_ids.clear()
547 body_ids.extend(fm_ids)
548 elif not body_ids:
549 body_ids.extend(fm_ids)
551 # Parse labels from frontmatter
552 labels: list[str] = []
553 fm_labels = frontmatter.get("labels")
554 if fm_labels:
555 if isinstance(fm_labels, str):
556 labels = [lb.strip() for lb in fm_labels.split(",") if lb.strip()]
557 else:
558 labels = [str(lb) for lb in fm_labels]
560 # Parse milestone from frontmatter
561 milestone: str | None = frontmatter.get("milestone") or None
563 # Parse session commands from ## Session Log section
564 from little_loops.session_log import count_session_commands, parse_session_log
566 session_commands = parse_session_log(content)
567 session_command_counts = count_session_commands(content)
569 return IssueInfo(
570 path=issue_path,
571 issue_type=issue_type,
572 priority=priority,
573 issue_id=issue_id,
574 title=title,
575 blocked_by=blocked_by,
576 blocks=blocks,
577 parent=parent,
578 depends_on=depends_on,
579 relates_to=relates_to,
580 duplicate_of=duplicate_of,
581 discovered_by=discovered_by,
582 epic=epic,
583 product_impact=product_impact,
584 effort=effort,
585 impact=impact,
586 confidence_score=confidence_score,
587 outcome_confidence=outcome_confidence,
588 score_complexity=score_complexity,
589 score_test_coverage=score_test_coverage,
590 score_ambiguity=score_ambiguity,
591 score_change_surface=score_change_surface,
592 size=size,
593 testable=testable_value,
594 decision_needed=decision_needed_value,
595 missing_artifacts=missing_artifacts_value,
596 implementation_order_risk=implementation_order_risk_value,
597 session_commands=session_commands,
598 session_command_counts=session_command_counts,
599 labels=labels,
600 milestone=milestone,
601 status=status,
602 )
604 def _parse_priority(self, filename: str) -> str:
605 """Extract priority from filename.
607 Args:
608 filename: Issue filename
610 Returns:
611 Priority string (e.g., "P1") or last priority if not found
612 """
613 for priority in self.config.issue_priorities:
614 if filename.startswith(f"{priority}-"):
615 return priority
616 # Default to lowest priority if not found
617 return self.config.issue_priorities[-1] if self.config.issue_priorities else "P3"
619 def _get_category_for_prefix(self, prefix: str) -> str:
620 """Get category name from issue prefix.
622 Args:
623 prefix: Issue prefix (e.g., "BUG", "FEAT")
625 Returns:
626 Category name (e.g., "bugs", "features"), defaults to "bugs"
627 """
628 return self._prefix_to_category.get(prefix, "bugs")
630 def _parse_type_and_id(self, filename: str, issue_path: Path) -> tuple[str, str]:
631 """Extract issue type and ID from filename.
633 Args:
634 filename: Issue filename
635 issue_path: Full path to issue file
637 Returns:
638 Tuple of (issue_type, issue_id)
639 """
640 # Try to match known prefixes (BUG, FEAT, ENH, etc.)
641 for prefix, category in self._prefix_to_category.items():
642 pattern = rf"({prefix})-(\d+)"
643 match = re.search(pattern, filename)
644 if match:
645 issue_id = f"{match.group(1)}-{match.group(2)}"
646 return category, issue_id
648 # Fall back to inferring category from directory.
649 parent_name = issue_path.parent.name
650 for category_name, category_config in self.config.issues.categories.items():
651 if parent_name == category_config.dir:
652 # If the filename uses the standard P[0-5]-NNN-... shape but
653 # omits the type token, capture the number directly and pair
654 # it with the directory-derived prefix. Without this, generic
655 # number scanning would pick up the priority digit instead.
656 priority_match = re.match(r"^P\d+-(\d+)(?:[-.]|$)", filename)
657 if priority_match:
658 return category_name, f"{category_config.prefix}-{priority_match.group(1)}"
659 issue_id = self._generate_id_from_filename(filename, category_config.prefix)
660 return category_name, issue_id
662 # Last resort: use filename as ID
663 return "bugs", filename.replace(".md", "")
665 def _generate_id_from_filename(self, filename: str, prefix: str) -> str:
666 """Generate an issue ID from filename when not explicitly present.
668 Args:
669 filename: Issue filename
670 prefix: Issue prefix to use
672 Returns:
673 Generated issue ID
674 """
675 # Strip a leading priority token (e.g. "P2-") so it does not get
676 # picked up as the issue number by the generic digit scan below.
677 scan_target = re.sub(r"^P\d+-", "", filename)
678 numbers = re.findall(r"\d+", scan_target)
679 if numbers:
680 return f"{prefix}-{numbers[0]}"
681 # Use next sequential number instead of hash-based fallback
682 # This ensures IDs are deterministic and don't collide with existing issues
683 category = self._get_category_for_prefix(prefix)
684 next_num = get_next_issue_number(self.config, category)
685 return f"{prefix}-{next_num:03d}"
687 def _read_content(self, issue_path: Path) -> str:
688 """Read file content, returning empty string on error.
690 Args:
691 issue_path: Path to issue file
693 Returns:
694 File content or empty string on error
695 """
696 try:
697 return issue_path.read_text(encoding="utf-8")
698 except Exception as e:
699 logger.warning("Failed to read %s: %s", issue_path.name, e)
700 return ""
702 def _parse_title_from_content(self, content: str, issue_path: Path) -> str:
703 """Extract title from issue file content.
705 Args:
706 content: Pre-read file content
707 issue_path: Path to issue file (for fallback)
709 Returns:
710 Issue title or filename stem as fallback
711 """
712 if content:
713 # Look for markdown header: # ISSUE-ID: Title
714 match = re.search(r"^#\s+[\w-]+:\s*(.+)$", content, re.MULTILINE)
715 if match:
716 return match.group(1).strip()
717 # Try first header of any format
718 match = re.search(r"^#\s+(.+)$", content, re.MULTILINE)
719 if match:
720 return match.group(1).strip()
721 # Fall back to filename
722 return issue_path.stem
724 def _parse_section_items(self, content: str, section_name: str) -> list[str]:
725 """Extract issue IDs from a markdown section.
727 Finds section header (## Section Name) and extracts issue IDs
728 from list items until the next section or end of file.
729 Skips content inside code fences.
731 Args:
732 content: File content to parse
733 section_name: Section name to find (e.g., "Blocked By")
735 Returns:
736 List of issue IDs found in the section
737 """
738 if not content:
739 return []
741 # Strip code fences to avoid matching sections in examples
742 content_without_code = self._strip_code_fences(content)
744 # Match section header case-insensitively
745 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
746 match = re.search(section_pattern, content_without_code, re.MULTILINE | re.IGNORECASE)
747 if not match:
748 return []
750 # Get content after section header until next ## header or end
751 start = match.end()
752 next_section = re.search(r"^##\s+", content_without_code[start:], re.MULTILINE)
753 if next_section:
754 section_content = content_without_code[start : start + next_section.start()]
755 else:
756 section_content = content_without_code[start:]
758 # Extract issue IDs from list items
759 issue_ids = ISSUE_ID_PATTERN.findall(section_content)
760 return issue_ids
762 def _strip_code_fences(self, content: str) -> str:
763 """Remove code fence blocks from content.
765 Replaces content between ``` markers with empty lines to preserve
766 line numbers while removing code fence content from parsing.
768 Args:
769 content: File content
771 Returns:
772 Content with code fence blocks replaced by empty lines
773 """
774 # Match code fences: ``` or ```language through closing ```
775 result = []
776 in_fence = False
777 for line in content.split("\n"):
778 if line.startswith("```"):
779 in_fence = not in_fence
780 result.append("") # Preserve line count
781 elif in_fence:
782 result.append("") # Replace fenced content with empty line
783 else:
784 result.append(line)
785 return "\n".join(result)
787 def _parse_blocked_by(self, content: str) -> list[str]:
788 """Extract issue IDs from ## Blocked By section.
790 Args:
791 content: File content to parse
793 Returns:
794 List of issue IDs that block this issue
795 """
796 return self._parse_section_items(content, "Blocked By")
798 def _parse_blocks(self, content: str) -> list[str]:
799 """Extract issue IDs from ## Blocks section.
801 Args:
802 content: File content to parse
804 Returns:
805 List of issue IDs that this issue blocks
806 """
807 return self._parse_section_items(content, "Blocks")
809 def _parse_product_impact(self, frontmatter: dict[str, Any]) -> ProductImpact | None:
810 """Extract product impact from frontmatter.
812 Args:
813 frontmatter: Dictionary of frontmatter fields
815 Returns:
816 ProductImpact instance if any product fields are present, None otherwise
817 """
818 # Check if any product fields are present
819 product_fields = ("goal_alignment", "persona_impact", "business_value", "user_benefit")
820 if not any(frontmatter.get(key) for key in product_fields):
821 return None
823 return ProductImpact(
824 goal_alignment=frontmatter.get("goal_alignment"),
825 persona_impact=frontmatter.get("persona_impact"),
826 business_value=frontmatter.get("business_value"),
827 user_benefit=frontmatter.get("user_benefit"),
828 )
831def find_issues(
832 config: BRConfig,
833 category: str | None = None,
834 skip_ids: set[str] | None = None,
835 only_ids: list[str] | set[str] | None = None,
836 type_prefixes: set[str] | None = None,
837) -> list[IssueInfo]:
838 """Find all issues matching criteria.
840 Args:
841 config: Project configuration
842 category: Optional category to filter (e.g., "bugs")
843 skip_ids: Issue IDs to skip
844 only_ids: If provided, only include these issue IDs. When a list,
845 results are returned in list order (input sequence preserved).
846 When a set, results are sorted by priority as usual.
847 type_prefixes: If provided, only include issues whose ID starts with
848 one of these prefixes (e.g., {"BUG", "ENH"})
850 Returns:
851 List of IssueInfo sorted by priority, or in only_ids list order when
852 only_ids is a list
853 """
854 skip_ids = skip_ids or set()
855 parser = IssueParser(config)
856 issues: list[IssueInfo] = []
858 # Determine which categories to search
859 if category:
860 categories = [category] if category in config.issue_categories else []
861 else:
862 categories = config.issue_categories
864 for cat in categories:
865 issue_dir = config.get_issue_dir(cat)
866 if not issue_dir.exists():
867 continue
869 for issue_file in issue_dir.glob("*.md"):
870 info = parser.parse_file(issue_file)
871 # Status-based filter: skip done/cancelled/deferred regardless of dir
872 if info.status in ("done", "cancelled", "deferred"):
873 continue
874 # Apply skip filter
875 if info.issue_id in skip_ids:
876 continue
877 # Apply only filter (if specified)
878 if only_ids is not None and not any(_id_matches(info.issue_id, p) for p in only_ids):
879 continue
880 # Apply type filter (if specified)
881 if type_prefixes is not None:
882 prefix = info.issue_id.split("-", 1)[0]
883 if prefix not in type_prefixes:
884 continue
885 issues.append(info)
887 # When only_ids is a list, preserve input order; otherwise sort by priority
888 if isinstance(only_ids, list):
889 issues.sort(
890 key=lambda x: next(
891 (i for i, p in enumerate(only_ids) if _id_matches(x.issue_id, p)),
892 len(only_ids),
893 )
894 )
895 else:
896 issues.sort(key=lambda x: (x.priority_int, x.issue_id))
897 return issues
900def find_highest_priority_issue(
901 config: BRConfig,
902 category: str | None = None,
903 skip_ids: set[str] | None = None,
904 only_ids: set[str] | None = None,
905 type_prefixes: set[str] | None = None,
906) -> IssueInfo | None:
907 """Find the highest priority issue.
909 Args:
910 config: Project configuration
911 category: Optional category to filter
912 skip_ids: Issue IDs to skip
913 only_ids: If provided, only include these issue IDs
914 type_prefixes: If provided, only include issues with these type prefixes
916 Returns:
917 Highest priority IssueInfo or None if no issues found
918 """
919 issues = find_issues(config, category, skip_ids, only_ids, type_prefixes)
920 return issues[0] if issues else None