Coverage for little_loops / dependency_mapper / operations.py: 90%

135 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Dependency fix and mutation operations. 

2 

3Functions for applying dependency proposals to issue files and 

4auto-repairing broken dependency references. 

5""" 

6 

7from __future__ import annotations 

8 

9import re 

10from pathlib import Path 

11from typing import TYPE_CHECKING 

12 

13from little_loops.dependency_mapper.analysis import validate_dependencies 

14from little_loops.dependency_mapper.models import DependencyProposal, FixResult 

15 

16if TYPE_CHECKING: 

17 from little_loops.config import BRConfig 

18 from little_loops.issue_parser import IssueInfo 

19 

20 

21def apply_proposals( 

22 proposals: list[DependencyProposal], 

23 issue_files: dict[str, Path], 

24) -> list[str]: 

25 """Write approved dependency proposals to issue files. 

26 

27 For each proposal, adds the target to the source's ``## Blocked By`` 

28 section and the source to the target's ``## Blocks`` section. 

29 

30 Args: 

31 proposals: Approved proposals to apply 

32 issue_files: Mapping from issue_id to file path 

33 

34 Returns: 

35 List of modified file paths 

36 """ 

37 modified: set[str] = set() 

38 

39 for proposal in proposals: 

40 source_path = issue_files.get(proposal.source_id) 

41 target_path = issue_files.get(proposal.target_id) 

42 

43 if proposal.reason == "depends_on": 

44 if source_path and source_path.exists(): 

45 _add_to_section(source_path, "Depends On", proposal.target_id) 

46 modified.add(str(source_path)) 

47 elif proposal.reason == "relates_to": 

48 if source_path and source_path.exists(): 

49 _add_to_section(source_path, "Relates To", proposal.target_id) 

50 modified.add(str(source_path)) 

51 if target_path and target_path.exists(): 

52 _add_to_section(target_path, "Relates To", proposal.source_id) 

53 modified.add(str(target_path)) 

54 else: 

55 # Default: Blocked By / Blocks relationship 

56 if source_path and source_path.exists(): 

57 _add_to_section(source_path, "Blocked By", proposal.target_id) 

58 modified.add(str(source_path)) 

59 if target_path and target_path.exists(): 

60 _add_to_section(target_path, "Blocks", proposal.source_id) 

61 modified.add(str(target_path)) 

62 

63 return sorted(modified) 

64 

65 

66def _add_to_section(file_path: Path, section_name: str, issue_id: str) -> None: 

67 """Add an issue ID to a markdown section in a file. 

68 

69 If the section exists, appends a new list item. 

70 If the section doesn't exist, creates it before the 

71 ``## Labels`` or ``## Status`` section, or at the end of the file. 

72 

73 Args: 

74 file_path: Path to the issue file 

75 section_name: Section name (e.g., "Blocked By" or "Blocks") 

76 issue_id: Issue ID to add (e.g., "FEAT-001") 

77 """ 

78 content = file_path.read_text(encoding="utf-8") 

79 

80 # Check if the ID is already in the section 

81 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$" 

82 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE) 

83 

84 if section_match: 

85 # Section exists — check if ID already present 

86 start = section_match.end() 

87 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE) 

88 if next_section: 

89 section_content = content[start : start + next_section.start()] 

90 else: 

91 section_content = content[start:] 

92 

93 if issue_id in section_content: 

94 return # Already present 

95 

96 # Find insertion point: end of section content (before next section or EOF) 

97 insert_pos = ( 

98 start 

99 + len(section_content.rstrip()) 

100 + (len(section_content) - len(section_content.rstrip())) 

101 ) 

102 # Actually, insert at end of the last list item in the section 

103 # Find the last non-blank line in the section 

104 section_lines = section_content.rstrip().split("\n") 

105 last_content_line_offset = 0 

106 for line in reversed(section_lines): 

107 if line.strip(): 

108 break 

109 last_content_line_offset += len(line) + 1 

110 

111 insert_pos = start + len(section_content.rstrip()) 

112 new_entry = f"\n- {issue_id}" 

113 content = content[:insert_pos] + new_entry + content[insert_pos:] 

114 else: 

115 # Section doesn't exist — create it 

116 new_section = f"\n## {section_name}\n\n- {issue_id}\n" 

117 

118 # Try to insert before ## Depends On, ## Relates To, ## Labels, or ## Status 

119 for anchor in ("## Depends On", "## Relates To", "## Labels", "## Status"): 

120 anchor_match = re.search(rf"^{re.escape(anchor)}\s*$", content, re.MULTILINE) 

121 if anchor_match: 

122 insert_pos = anchor_match.start() 

123 content = content[:insert_pos] + new_section + "\n" + content[insert_pos:] 

124 break 

125 else: 

126 # Append at end 

127 content = content.rstrip() + "\n" + new_section 

128 

129 file_path.write_text(content, encoding="utf-8") 

130 

131 

132def _remove_from_section(file_path: Path, section_name: str, issue_id: str) -> bool: 

133 """Remove an issue ID from a markdown section in a file. 

134 

135 If the section becomes empty after removal, the entire section is removed. 

136 

137 Args: 

138 file_path: Path to the issue file 

139 section_name: Section name (e.g., "Blocked By" or "Blocks") 

140 issue_id: Issue ID to remove (e.g., "FEAT-001") 

141 

142 Returns: 

143 True if a change was made, False if the ID was not found. 

144 """ 

145 content = file_path.read_text(encoding="utf-8") 

146 

147 section_pattern = rf"^##\s+{re.escape(section_name)}\s*$" 

148 section_match = re.search(section_pattern, content, re.MULTILINE | re.IGNORECASE) 

149 

150 if not section_match: 

151 return False 

152 

153 start = section_match.end() 

154 next_section = re.search(r"^##\s+", content[start:], re.MULTILINE) 

155 if next_section: 

156 section_end = start + next_section.start() 

157 else: 

158 section_end = len(content) 

159 

160 section_content = content[start:section_end] 

161 

162 # Find the line containing this issue ID 

163 line_pattern = rf"^[-*]\s+\*{{0,2}}{re.escape(issue_id)}\b[^\n]*\n?" 

164 line_match = re.search(line_pattern, section_content, re.MULTILINE) 

165 if not line_match: 

166 return False 

167 

168 # Remove the line 

169 new_section_content = ( 

170 section_content[: line_match.start()] + section_content[line_match.end() :] 

171 ) 

172 

173 # Check if the section is now empty (no list items remaining) 

174 remaining_items = re.search(r"^[-*]\s+", new_section_content, re.MULTILINE) 

175 if not remaining_items: 

176 # Remove entire section (header + content) 

177 # Include leading newline if present 

178 remove_start = section_match.start() 

179 if remove_start > 0 and content[remove_start - 1] == "\n": 

180 remove_start -= 1 

181 content = content[:remove_start] + content[section_end:] 

182 else: 

183 content = content[:start] + new_section_content + content[section_end:] 

184 

185 file_path.write_text(content, encoding="utf-8") 

186 return True 

187 

188 

189def fix_dependencies( 

190 issues: list[IssueInfo], 

191 completed_ids: set[str] | None = None, 

192 all_known_ids: set[str] | None = None, 

193 dry_run: bool = False, 

194) -> FixResult: 

195 """Auto-repair broken dependency references. 

196 

197 Fixes three types of validation issues: 

198 - Broken refs: removes references to non-existent issues from Blocked By 

199 - Stale completed refs: removes references to completed issues from Blocked By 

200 - Missing backlinks: adds missing Blocks entries for bidirectional consistency 

201 

202 Cycles are explicitly out of scope and are skipped with a count. 

203 

204 Args: 

205 issues: List of parsed issue objects 

206 completed_ids: Set of completed issue IDs 

207 all_known_ids: Set of all issue IDs that exist on disk 

208 dry_run: If True, report what would change without modifying files 

209 

210 Returns: 

211 FixResult with changes made and files modified 

212 """ 

213 validation = validate_dependencies(issues, completed_ids, all_known_ids) 

214 result = FixResult() 

215 

216 if not validation.has_issues: 

217 return result 

218 

219 # Build issue path map 

220 issue_path_map: dict[str, Path] = {issue.issue_id: issue.path for issue in issues} 

221 

222 # Fix broken refs: remove from Blocked By 

223 for issue_id, ref_id in validation.broken_refs: 

224 path = issue_path_map.get(issue_id) 

225 if not path or not path.exists(): 

226 continue 

227 desc = f"Removed broken ref {ref_id} from {issue_id}" 

228 result.changes.append(desc) 

229 if not dry_run: 

230 if _remove_from_section(path, "Blocked By", ref_id): 

231 result.modified_files.add(str(path)) 

232 

233 # Fix stale completed refs: remove from Blocked By 

234 for issue_id, ref_id in validation.stale_completed_refs: 

235 path = issue_path_map.get(issue_id) 

236 if not path or not path.exists(): 

237 continue 

238 desc = f"Removed stale ref {ref_id} (completed) from {issue_id}" 

239 result.changes.append(desc) 

240 if not dry_run: 

241 if _remove_from_section(path, "Blocked By", ref_id): 

242 result.modified_files.add(str(path)) 

243 

244 # Fix missing backlinks: add to Blocks 

245 for issue_id, ref_id in validation.missing_backlinks: 

246 target_path = issue_path_map.get(ref_id) 

247 if not target_path or not target_path.exists(): 

248 continue 

249 desc = f"Added backlink: {issue_id} to {ref_id}'s Blocks section" 

250 result.changes.append(desc) 

251 if not dry_run: 

252 _add_to_section(target_path, "Blocks", issue_id) 

253 result.modified_files.add(str(target_path)) 

254 

255 # Report skipped cycles 

256 result.skipped_cycles = len(validation.cycles) 

257 

258 return result 

259 

260 

261def gather_all_issue_ids(issues_dir: Path, config: BRConfig | None = None) -> set[str]: 

262 """Scan all issue directories for issue IDs (lightweight, filename-only). 

263 

264 Scans type-scoped category directories (bugs/, features/, enhancements/, 

265 epics/) for markdown files with issue ID patterns in their filenames. 

266 Done and deferred issues remain in type dirs with status frontmatter, so 

267 scanning only type dirs finds all known IDs. 

268 

269 Args: 

270 issues_dir: Path to the issues base directory (e.g., .issues) 

271 config: Optional project config. When supplied, active category names 

272 are read from config so that custom categories are included. 

273 When omitted, falls back to 

274 ``["bugs", "features", "enhancements", "epics"]``. 

275 

276 Returns: 

277 Set of all issue IDs found across all type-scoped category directories. 

278 """ 

279 if config is not None: 

280 subdirs = config.issue_categories 

281 else: 

282 subdirs = ["bugs", "features", "enhancements", "epics"] 

283 

284 ids: set[str] = set() 

285 for subdir in subdirs: 

286 d = issues_dir / subdir 

287 if not d.exists(): 

288 continue 

289 for f in d.glob("*.md"): 

290 match = re.search(r"(BUG|FEAT|ENH|EPIC)-(\d+)", f.name) 

291 if match: 

292 ids.add(f"{match.group(1)}-{match.group(2)}") 

293 return ids