Coverage for little_loops / dependency_mapper / operations.py: 90%
135 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Dependency fix and mutation operations.
3Functions for applying dependency proposals to issue files and
4auto-repairing broken dependency references.
5"""
7from __future__ import annotations
9import re
10from pathlib import Path
11from typing import TYPE_CHECKING
13from little_loops.dependency_mapper.analysis import validate_dependencies
14from little_loops.dependency_mapper.models import DependencyProposal, FixResult
16if TYPE_CHECKING:
17 from little_loops.config import BRConfig
18 from little_loops.issue_parser import IssueInfo
21def apply_proposals(
22 proposals: list[DependencyProposal],
23 issue_files: dict[str, Path],
24) -> list[str]:
25 """Write approved dependency proposals to issue files.
27 For each proposal, adds the target to the source's ``## Blocked By``
28 section and the source to the target's ``## Blocks`` section.
30 Args:
31 proposals: Approved proposals to apply
32 issue_files: Mapping from issue_id to file path
34 Returns:
35 List of modified file paths
36 """
37 modified: set[str] = set()
39 for proposal in proposals:
40 source_path = issue_files.get(proposal.source_id)
41 target_path = issue_files.get(proposal.target_id)
43 if proposal.reason == "depends_on":
44 if source_path and source_path.exists():
45 _add_to_section(source_path, "Depends On", proposal.target_id)
46 modified.add(str(source_path))
47 elif proposal.reason == "relates_to":
48 if source_path and source_path.exists():
49 _add_to_section(source_path, "Relates To", proposal.target_id)
50 modified.add(str(source_path))
51 if target_path and target_path.exists():
52 _add_to_section(target_path, "Relates To", proposal.source_id)
53 modified.add(str(target_path))
54 else:
55 # Default: Blocked By / Blocks relationship
56 if source_path and source_path.exists():
57 _add_to_section(source_path, "Blocked By", proposal.target_id)
58 modified.add(str(source_path))
59 if target_path and target_path.exists():
60 _add_to_section(target_path, "Blocks", proposal.source_id)
61 modified.add(str(target_path))
63 return sorted(modified)
66def _add_to_section(file_path: Path, section_name: str, issue_id: str) -> None:
67 """Add an issue ID to a markdown section in a file.
69 If the section exists, appends a new list item.
70 If the section doesn't exist, creates it before the
71 ``## Labels`` or ``## Status`` section, or at the end of the file.
73 Args:
74 file_path: Path to the issue file
75 section_name: Section name (e.g., "Blocked By" or "Blocks")
76 issue_id: Issue ID to add (e.g., "FEAT-001")
77 """
78 content = file_path.read_text(encoding="utf-8")
80 # Check if the ID is already in the section
81 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
82 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
84 if section_match:
85 # Section exists — check if ID already present
86 start = section_match.end()
87 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE)
88 if next_section:
89 section_content = content[start : start + next_section.start()]
90 else:
91 section_content = content[start:]
93 if issue_id in section_content:
94 return # Already present
96 # Find insertion point: end of section content (before next section or EOF)
97 insert_pos = (
98 start
99 + len(section_content.rstrip())
100 + (len(section_content) - len(section_content.rstrip()))
101 )
102 # Actually, insert at end of the last list item in the section
103 # Find the last non-blank line in the section
104 section_lines = section_content.rstrip().split("\n")
105 last_content_line_offset = 0
106 for line in reversed(section_lines):
107 if line.strip():
108 break
109 last_content_line_offset += len(line) + 1
111 insert_pos = start + len(section_content.rstrip())
112 new_entry = f"\n- {issue_id}"
113 content = content[:insert_pos] + new_entry + content[insert_pos:]
114 else:
115 # Section doesn't exist — create it
116 new_section = f"\n## {section_name}\n\n- {issue_id}\n"
118 # Try to insert before ## Depends On, ## Relates To, ## Labels, or ## Status
119 for anchor in ("## Depends On", "## Relates To", "## Labels", "## Status"):
120 anchor_match = re.search(rf"^{re.escape(anchor)}\s*$", content, re.MULTILINE)
121 if anchor_match:
122 insert_pos = anchor_match.start()
123 content = content[:insert_pos] + new_section + "\n" + content[insert_pos:]
124 break
125 else:
126 # Append at end
127 content = content.rstrip() + "\n" + new_section
129 file_path.write_text(content, encoding="utf-8")
132def _remove_from_section(file_path: Path, section_name: str, issue_id: str) -> bool:
133 """Remove an issue ID from a markdown section in a file.
135 If the section becomes empty after removal, the entire section is removed.
137 Args:
138 file_path: Path to the issue file
139 section_name: Section name (e.g., "Blocked By" or "Blocks")
140 issue_id: Issue ID to remove (e.g., "FEAT-001")
142 Returns:
143 True if a change was made, False if the ID was not found.
144 """
145 content = file_path.read_text(encoding="utf-8")
147 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$"
148 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE)
150 if not section_match:
151 return False
153 start = section_match.end()
154 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE)
155 if next_section:
156 section_end = start + next_section.start()
157 else:
158 section_end = len(content)
160 section_content = content[start:section_end]
162 # Find the line containing this issue ID
163 line_pattern = rf"^[-*]\s+\*{{0,2}}{re.escape(issue_id)}\b[^\n]*\n?"
164 line_match = re.search(line_pattern, section_content, re.MULTILINE)
165 if not line_match:
166 return False
168 # Remove the line
169 new_section_content = (
170 section_content[: line_match.start()] + section_content[line_match.end() :]
171 )
173 # Check if the section is now empty (no list items remaining)
174 remaining_items = re.search(r"^[-*]\s+", new_section_content, re.MULTILINE)
175 if not remaining_items:
176 # Remove entire section (header + content)
177 # Include leading newline if present
178 remove_start = section_match.start()
179 if remove_start > 0 and content[remove_start - 1] == "\n":
180 remove_start -= 1
181 content = content[:remove_start] + content[section_end:]
182 else:
183 content = content[:start] + new_section_content + content[section_end:]
185 file_path.write_text(content, encoding="utf-8")
186 return True
189def fix_dependencies(
190 issues: list[IssueInfo],
191 completed_ids: set[str] | None = None,
192 all_known_ids: set[str] | None = None,
193 dry_run: bool = False,
194) -> FixResult:
195 """Auto-repair broken dependency references.
197 Fixes three types of validation issues:
198 - Broken refs: removes references to non-existent issues from Blocked By
199 - Stale completed refs: removes references to completed issues from Blocked By
200 - Missing backlinks: adds missing Blocks entries for bidirectional consistency
202 Cycles are explicitly out of scope and are skipped with a count.
204 Args:
205 issues: List of parsed issue objects
206 completed_ids: Set of completed issue IDs
207 all_known_ids: Set of all issue IDs that exist on disk
208 dry_run: If True, report what would change without modifying files
210 Returns:
211 FixResult with changes made and files modified
212 """
213 validation = validate_dependencies(issues, completed_ids, all_known_ids)
214 result = FixResult()
216 if not validation.has_issues:
217 return result
219 # Build issue path map
220 issue_path_map: dict[str, Path] = {issue.issue_id: issue.path for issue in issues}
222 # Fix broken refs: remove from Blocked By
223 for issue_id, ref_id in validation.broken_refs:
224 path = issue_path_map.get(issue_id)
225 if not path or not path.exists():
226 continue
227 desc = f"Removed broken ref {ref_id} from {issue_id}"
228 result.changes.append(desc)
229 if not dry_run:
230 if _remove_from_section(path, "Blocked By", ref_id):
231 result.modified_files.add(str(path))
233 # Fix stale completed refs: remove from Blocked By
234 for issue_id, ref_id in validation.stale_completed_refs:
235 path = issue_path_map.get(issue_id)
236 if not path or not path.exists():
237 continue
238 desc = f"Removed stale ref {ref_id} (completed) from {issue_id}"
239 result.changes.append(desc)
240 if not dry_run:
241 if _remove_from_section(path, "Blocked By", ref_id):
242 result.modified_files.add(str(path))
244 # Fix missing backlinks: add to Blocks
245 for issue_id, ref_id in validation.missing_backlinks:
246 target_path = issue_path_map.get(ref_id)
247 if not target_path or not target_path.exists():
248 continue
249 desc = f"Added backlink: {issue_id} to {ref_id}'s Blocks section"
250 result.changes.append(desc)
251 if not dry_run:
252 _add_to_section(target_path, "Blocks", issue_id)
253 result.modified_files.add(str(target_path))
255 # Report skipped cycles
256 result.skipped_cycles = len(validation.cycles)
258 return result
261def gather_all_issue_ids(issues_dir: Path, config: BRConfig | None = None) -> set[str]:
262 """Scan all issue directories for issue IDs (lightweight, filename-only).
264 Scans type-scoped category directories (bugs/, features/, enhancements/,
265 epics/) for markdown files with issue ID patterns in their filenames.
266 Done and deferred issues remain in type dirs with status frontmatter, so
267 scanning only type dirs finds all known IDs.
269 Args:
270 issues_dir: Path to the issues base directory (e.g., .issues)
271 config: Optional project config. When supplied, active category names
272 are read from config so that custom categories are included.
273 When omitted, falls back to
274 ``["bugs", "features", "enhancements", "epics"]``.
276 Returns:
277 Set of all issue IDs found across all type-scoped category directories.
278 """
279 if config is not None:
280 subdirs = config.issue_categories
281 else:
282 subdirs = ["bugs", "features", "enhancements", "epics"]
284 ids: set[str] = set()
285 for subdir in subdirs:
286 d = issues_dir / subdir
287 if not d.exists():
288 continue
289 for f in d.glob("*.md"):
290 match = re.search(r"(BUG|FEAT|ENH|EPIC)-(\d+)", f.name)
291 if match:
292 ids.add(f"{match.group(1)}-{match.group(2)}")
293 return ids