Coverage for little_loops / dependency_graph.py: 98%

200 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2026-05-22 16:19 -0500

1"""Dependency graph for issue management. 

2 

3Constructs a directed acyclic graph (DAG) from issue dependencies, 

4providing topological sorting and cycle detection. 

5""" 

6 

7from __future__ import annotations 

8 

9import logging 

10from collections import deque 

11from dataclasses import dataclass, field 

12from typing import TYPE_CHECKING 

13 

14if TYPE_CHECKING: 

15 from little_loops.config import DependencyMappingConfig 

16 from little_loops.issue_parser import IssueInfo 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21@dataclass 

22class WaveContentionNote: 

23 """Annotation for a wave that was split due to file overlap.""" 

24 

25 contended_paths: list[str] 

26 sub_wave_index: int 

27 total_sub_waves: int 

28 parent_wave_index: int = 0 

29 

30 

31@dataclass 

32class DependencyGraph: 

33 """Directed acyclic graph of issue dependencies. 

34 

35 Builds a graph from issue dependencies where edges represent 

36 "blocked by" relationships. Provides methods for: 

37 - Topological sorting (dependency order) 

38 - Cycle detection 

39 - Ready issue queries (blockers resolved) 

40 - Blocking issue queries 

41 

42 Attributes: 

43 issues: Mapping from issue_id to IssueInfo 

44 blocked_by: Mapping from issue_id to set of blocking issue_ids 

45 blocks: Mapping from issue_id to set of blocked issue_ids 

46 depends_on_edges: Mapping from issue_id to set of soft-prerequisite issue_ids 

47 """ 

48 

49 issues: dict[str, IssueInfo] = field(default_factory=dict) 

50 blocked_by: dict[str, set[str]] = field(default_factory=dict) 

51 blocks: dict[str, set[str]] = field(default_factory=dict) 

52 depends_on_edges: dict[str, set[str]] = field(default_factory=dict) 

53 

54 @classmethod 

55 def from_issues( 

56 cls, 

57 issues: list[IssueInfo], 

58 completed_ids: set[str] | None = None, 

59 all_known_ids: set[str] | None = None, 

60 ) -> DependencyGraph: 

61 """Build graph from list of issues. 

62 

63 Constructs a dependency graph where: 

64 - Each issue is a node 

65 - blocked_by relationships create edges from blockers to blocked issues 

66 - Completed issues are treated as satisfied (not blocking) 

67 - Missing issues are logged as warnings but don't block 

68 

69 Args: 

70 issues: List of IssueInfo objects with blocked_by/blocks fields 

71 completed_ids: Set of completed issue IDs (treated as resolved) 

72 all_known_ids: Set of all issue IDs that exist on disk. When 

73 provided, references to issues in this set are silently 

74 skipped (not warned) even if they are not in the graph. 

75 

76 Returns: 

77 Constructed DependencyGraph 

78 

79 Example: 

80 >>> issues = [issue_a, issue_b, issue_c] 

81 >>> completed = {"FEAT-001"} # Already done 

82 >>> graph = DependencyGraph.from_issues(issues, completed) 

83 """ 

84 completed = completed_ids or set() 

85 graph = cls() 

86 

87 # Index all issues by ID 

88 for issue in issues: 

89 graph.issues[issue.issue_id] = issue 

90 graph.blocked_by[issue.issue_id] = set() 

91 graph.blocks[issue.issue_id] = set() 

92 

93 # Build dependency edges 

94 all_issue_ids = set(graph.issues.keys()) 

95 for issue in issues: 

96 for blocker_id in issue.blocked_by: 

97 # Skip completed blockers (already satisfied) 

98 if blocker_id in completed: 

99 continue 

100 # Skip blockers not in the graph 

101 if blocker_id not in all_issue_ids: 

102 # Only warn if the issue truly doesn't exist on disk 

103 if all_known_ids is None or blocker_id not in all_known_ids: 

104 logger.warning( 

105 f"Issue {issue.issue_id} blocked by unknown issue {blocker_id}" 

106 ) 

107 continue 

108 # Add bidirectional edge 

109 graph.blocked_by[issue.issue_id].add(blocker_id) 

110 graph.blocks[blocker_id].add(issue.issue_id) 

111 

112 # Second pass: honour one-sided blocks: declarations. 

113 # If A declares blocks: [B] without a matching blocked_by: [A] on B, 

114 # the A→B edge was never added above; add it now (dedup guard prevents 

115 # double-counting symmetric declarations). 

116 for issue in issues: 

117 for blocked_id in issue.blocks: 

118 if blocked_id in completed: 

119 continue 

120 if blocked_id not in all_issue_ids: 

121 if all_known_ids is None or blocked_id not in all_known_ids: 

122 logger.warning(f"Issue {issue.issue_id} blocks unknown issue {blocked_id}") 

123 continue 

124 if issue.issue_id not in graph.blocked_by.get(blocked_id, set()): 

125 graph.blocked_by[blocked_id].add(issue.issue_id) 

126 graph.blocks[issue.issue_id].add(blocked_id) 

127 

128 # Third pass: populate depends_on_edges (soft prerequisite relationships). 

129 # depends_on is one-directional — no reverse edge is built here. 

130 for issue in issues: 

131 for target_id in issue.depends_on: 

132 if target_id in completed: 

133 continue 

134 if target_id not in all_issue_ids: 

135 if all_known_ids is None or target_id not in all_known_ids: 

136 logger.warning( 

137 f"Issue {issue.issue_id} has depends_on unknown issue {target_id}" 

138 ) 

139 continue 

140 if issue.issue_id not in graph.depends_on_edges: 

141 graph.depends_on_edges[issue.issue_id] = set() 

142 graph.depends_on_edges[issue.issue_id].add(target_id) 

143 

144 return graph 

145 

146 def get_ready_issues(self, completed: set[str] | None = None) -> list[IssueInfo]: 

147 """Return issues whose blockers are all completed. 

148 

149 An issue is "ready" if: 

150 - It is not already completed 

151 - All its blockers are either completed or not in the graph 

152 

153 Args: 

154 completed: Set of completed issue IDs 

155 

156 Returns: 

157 List of IssueInfo for issues with no active blockers, 

158 sorted by priority (highest first) then issue_id 

159 """ 

160 completed = completed or set() 

161 ready = [] 

162 for issue_id, issue in self.issues.items(): 

163 if issue_id in completed: 

164 continue 

165 blockers = self.get_blocking_issues(issue_id, completed) 

166 if not blockers: 

167 ready.append(issue) 

168 # Sort by priority then issue_id for consistent ordering 

169 ready.sort(key=lambda x: (x.priority_int, x.issue_id)) 

170 return ready 

171 

172 def get_execution_waves(self, completed: set[str] | None = None) -> list[list[IssueInfo]]: 

173 """Return issues grouped into parallel execution waves. 

174 

175 Wave 1: All issues with no blockers (or blockers already completed) 

176 Wave 2: Issues whose blockers are all in wave 1 

177 Wave N: Issues whose blockers are all in waves 1..N-1 

178 

179 This is similar to topological_sort but groups issues by "level" 

180 rather than returning a flat list. 

181 

182 Args: 

183 completed: Set of already-completed issue IDs 

184 

185 Returns: 

186 List of waves, each wave is a list of issues that can run in parallel. 

187 Empty list if graph is empty or all issues are completed. 

188 

189 Raises: 

190 ValueError: If graph contains cycles (not a DAG) 

191 

192 Example: 

193 If A blocks B and C, and B and C block D: 

194 - Wave 1: [A] 

195 - Wave 2: [B, C] 

196 - Wave 3: [D] 

197 """ 

198 completed = completed or set() 

199 waves: list[list[IssueInfo]] = [] 

200 processed: set[str] = set(completed) 

201 

202 while True: 

203 # Get issues ready to run (all hard blockers in processed set) 

204 wave = self.get_ready_issues(completed=processed) 

205 if not wave: 

206 break 

207 

208 # Soft ordering: nudge depends_on targets into this wave when their 

209 # hard blockers are all satisfied by processed ∪ current wave. 

210 after_wave: set[str] = processed | {i.issue_id for i in wave} 

211 nudged: list[IssueInfo] = [] 

212 for issue in wave: 

213 for target_id in self.depends_on_edges.get(issue.issue_id, set()): 

214 if target_id in after_wave or target_id not in self.issues: 

215 continue 

216 if not self.get_blocking_issues(target_id, after_wave): 

217 nudged.append(self.issues[target_id]) 

218 after_wave.add(target_id) 

219 wave.extend(nudged) 

220 

221 waves.append(wave) 

222 # Mark this wave (including nudged) as processed for next iteration 

223 for issue in wave: 

224 processed.add(issue.issue_id) 

225 

226 # Check for cycles - if we have unprocessed issues, there's a cycle 

227 remaining = set(self.issues.keys()) - processed 

228 if remaining: 

229 cycles = self.detect_cycles() 

230 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles) 

231 raise ValueError(f"Dependency graph contains cycles: {cycle_str}") 

232 

233 return waves 

234 

235 def is_blocked(self, issue_id: str, completed: set[str] | None = None) -> bool: 

236 """Check if an issue is still blocked. 

237 

238 Args: 

239 issue_id: Issue ID to check 

240 completed: Set of completed issue IDs 

241 

242 Returns: 

243 True if issue has unresolved blockers, False otherwise 

244 """ 

245 return bool(self.get_blocking_issues(issue_id, completed)) 

246 

247 def get_blocking_issues(self, issue_id: str, completed: set[str] | None = None) -> set[str]: 

248 """Return incomplete issues blocking this one. 

249 

250 Args: 

251 issue_id: Issue ID to check 

252 completed: Set of completed issue IDs 

253 

254 Returns: 

255 Set of issue IDs still blocking this issue 

256 """ 

257 completed = completed or set() 

258 blockers = self.blocked_by.get(issue_id, set()) 

259 return blockers - completed 

260 

261 def get_blocked_by_issue(self, issue_id: str) -> set[str]: 

262 """Return issues that this issue blocks. 

263 

264 Args: 

265 issue_id: Issue ID to check 

266 

267 Returns: 

268 Set of issue IDs that are blocked by this issue 

269 """ 

270 return self.blocks.get(issue_id, set()).copy() 

271 

272 def topological_sort(self) -> list[IssueInfo]: 

273 """Return issues in dependency order (Kahn's algorithm). 

274 

275 Issues with no dependencies come first, followed by issues 

276 whose dependencies have been satisfied. Within each "level", 

277 issues are sorted by priority then issue_id. 

278 

279 Returns: 

280 List of IssueInfo in topological order 

281 

282 Raises: 

283 ValueError: If graph contains cycles (not a DAG) 

284 

285 Example: 

286 If A blocks B, and B blocks C, returns [A, B, C] 

287 """ 

288 # Calculate in-degree for each node (number of blockers) 

289 in_degree: dict[str, int] = { 

290 issue_id: len(blockers) for issue_id, blockers in self.blocked_by.items() 

291 } 

292 

293 # Start with nodes that have no blockers, sorted by priority 

294 zero_degree = [ 

295 self.issues[issue_id] for issue_id, degree in in_degree.items() if degree == 0 

296 ] 

297 zero_degree.sort(key=lambda x: (x.priority_int, x.issue_id)) 

298 queue: deque[str] = deque(issue.issue_id for issue in zero_degree) 

299 

300 result: list[IssueInfo] = [] 

301 while queue: 

302 issue_id = queue.popleft() 

303 result.append(self.issues[issue_id]) 

304 

305 # Reduce in-degree for nodes this one blocks 

306 # Collect newly ready nodes, then sort before adding to queue 

307 newly_ready: list[IssueInfo] = [] 

308 for blocked_id in self.blocks.get(issue_id, set()): 

309 in_degree[blocked_id] -= 1 

310 if in_degree[blocked_id] == 0: 

311 newly_ready.append(self.issues[blocked_id]) 

312 

313 # Sort newly ready by priority for consistent ordering 

314 newly_ready.sort(key=lambda x: (x.priority_int, x.issue_id)) 

315 for issue in newly_ready: 

316 queue.append(issue.issue_id) 

317 

318 # Check for cycles - if we didn't process all nodes, there's a cycle 

319 if len(result) != len(self.issues): 

320 cycles = self.detect_cycles() 

321 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles) 

322 raise ValueError(f"Dependency graph contains cycles: {cycle_str}") 

323 

324 return result 

325 

326 def detect_cycles(self) -> list[list[str]]: 

327 """Detect and return any dependency cycles. 

328 

329 Uses DFS with coloring to find back edges indicating cycles. 

330 A cycle exists when we encounter a node that is currently being 

331 visited (GRAY state) in the DFS traversal. 

332 

333 Returns: 

334 List of cycles, each cycle is a list of issue IDs forming 

335 a path from the cycle start back to itself. 

336 Empty list if no cycles exist. 

337 

338 Example: 

339 If A -> B -> C -> A (circular), returns [["A", "B", "C", "A"]] 

340 """ 

341 WHITE, GRAY, BLACK = 0, 1, 2 

342 color: dict[str, int] = dict.fromkeys(self.issues, WHITE) 

343 cycles: list[list[str]] = [] 

344 path: list[str] = [] 

345 

346 def dfs(node: str) -> None: 

347 color[node] = GRAY 

348 path.append(node) 

349 

350 # Traverse blockers (edges point from blocked to blocker) 

351 for neighbor in self.blocked_by.get(node, set()): 

352 if neighbor not in color: 

353 continue 

354 if color[neighbor] == GRAY: 

355 # Found cycle - extract from path 

356 cycle_start = path.index(neighbor) 

357 cycle = path[cycle_start:] + [neighbor] 

358 cycles.append(cycle) 

359 elif color[neighbor] == WHITE: 

360 dfs(neighbor) 

361 

362 path.pop() 

363 color[node] = BLACK 

364 

365 for issue_id in self.issues: 

366 if color[issue_id] == WHITE: 

367 dfs(issue_id) 

368 

369 return cycles 

370 

371 def has_cycles(self) -> bool: 

372 """Check if the graph contains any cycles. 

373 

374 Returns: 

375 True if cycles exist, False otherwise 

376 """ 

377 return len(self.detect_cycles()) > 0 

378 

379 def __len__(self) -> int: 

380 """Return number of issues in the graph.""" 

381 return len(self.issues) 

382 

383 def __contains__(self, issue_id: str) -> bool: 

384 """Check if an issue is in the graph.""" 

385 return issue_id in self.issues 

386 

387 

388def refine_waves_for_contention( 

389 waves: list[list[IssueInfo]], 

390 *, 

391 config: DependencyMappingConfig | None = None, 

392) -> tuple[list[list[IssueInfo]], list[WaveContentionNote | None]]: 

393 """Refine execution waves by splitting on file overlap. 

394 

395 For each wave with multiple issues, extracts file hints from issue 

396 content and checks for pairwise overlaps. Overlapping issues are 

397 split into sub-waves using greedy graph coloring so no two issues 

398 in the same sub-wave modify the same files. 

399 

400 Args: 

401 waves: Execution waves from get_execution_waves() 

402 

403 Returns: 

404 Tuple of (refined_waves, contention_notes). 

405 refined_waves: Wave list with contention-free sub-waves. 

406 contention_notes: Parallel list (same length as refined_waves). 

407 None for waves that weren't split, WaveContentionNote for sub-waves. 

408 """ 

409 from little_loops.parallel.file_hints import FileHints, extract_file_hints 

410 

411 refined: list[list[IssueInfo]] = [] 

412 annotations: list[WaveContentionNote | None] = [] 

413 

414 for orig_idx, wave in enumerate(waves): 

415 if len(wave) <= 1: 

416 refined.append(wave) 

417 annotations.append(None) 

418 continue 

419 

420 # Extract file hints for each issue in the wave 

421 hints: dict[str, FileHints] = {} 

422 for issue in wave: 

423 content = issue.path.read_text() if issue.path.exists() else "" 

424 hints[issue.issue_id] = extract_file_hints(content, issue.issue_id) 

425 

426 # Single pass — build conflict adjacency and collect contended paths together 

427 conflicts: dict[str, set[str]] = {issue.issue_id: set() for issue in wave} 

428 contended: set[str] = set() 

429 for i, a in enumerate(wave): 

430 for b in wave[i + 1 :]: 

431 paths = hints[a.issue_id].get_overlapping_paths(hints[b.issue_id], config=config) 

432 if paths: 

433 conflicts[a.issue_id].add(b.issue_id) 

434 conflicts[b.issue_id].add(a.issue_id) 

435 contended |= paths 

436 

437 # If no conflicts, keep wave as-is 

438 if not any(conflicts.values()): 

439 refined.append(wave) 

440 annotations.append(None) 

441 continue 

442 

443 contended_paths = sorted(contended) 

444 

445 # Greedy graph coloring — assign each issue the lowest color 

446 # not used by any conflicting neighbor 

447 color: dict[str, int] = {} 

448 for issue in wave: # iterate in priority order (preserved from get_ready_issues) 

449 used_colors = {color[c] for c in conflicts[issue.issue_id] if c in color} 

450 c = 0 

451 while c in used_colors: 

452 c += 1 

453 color[issue.issue_id] = c 

454 

455 # Group issues by color into sub-waves, preserving priority order 

456 max_color = max(color.values()) 

457 total_sub_waves = max_color + 1 

458 for c in range(total_sub_waves): 

459 sub_wave = [issue for issue in wave if color[issue.issue_id] == c] 

460 if sub_wave: 

461 refined.append(sub_wave) 

462 annotations.append( 

463 WaveContentionNote( 

464 contended_paths=contended_paths, 

465 sub_wave_index=c, 

466 total_sub_waves=total_sub_waves, 

467 parent_wave_index=orig_idx, 

468 ) 

469 ) 

470 

471 logger.info(f" Wave split into {total_sub_waves} sub-waves due to file overlap") 

472 

473 return refined, annotations