Coverage for little_loops / dependency_graph.py: 98%
200 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Dependency graph for issue management.
3Constructs a directed acyclic graph (DAG) from issue dependencies,
4providing topological sorting and cycle detection.
5"""
7from __future__ import annotations
9import logging
10from collections import deque
11from dataclasses import dataclass, field
12from typing import TYPE_CHECKING
14if TYPE_CHECKING:
15 from little_loops.config import DependencyMappingConfig
16 from little_loops.issue_parser import IssueInfo
18logger = logging.getLogger(__name__)
21@dataclass
22class WaveContentionNote:
23 """Annotation for a wave that was split due to file overlap."""
25 contended_paths: list[str]
26 sub_wave_index: int
27 total_sub_waves: int
28 parent_wave_index: int = 0
31@dataclass
32class DependencyGraph:
33 """Directed acyclic graph of issue dependencies.
35 Builds a graph from issue dependencies where edges represent
36 "blocked by" relationships. Provides methods for:
37 - Topological sorting (dependency order)
38 - Cycle detection
39 - Ready issue queries (blockers resolved)
40 - Blocking issue queries
42 Attributes:
43 issues: Mapping from issue_id to IssueInfo
44 blocked_by: Mapping from issue_id to set of blocking issue_ids
45 blocks: Mapping from issue_id to set of blocked issue_ids
46 depends_on_edges: Mapping from issue_id to set of soft-prerequisite issue_ids
47 """
49 issues: dict[str, IssueInfo] = field(default_factory=dict)
50 blocked_by: dict[str, set[str]] = field(default_factory=dict)
51 blocks: dict[str, set[str]] = field(default_factory=dict)
52 depends_on_edges: dict[str, set[str]] = field(default_factory=dict)
54 @classmethod
55 def from_issues(
56 cls,
57 issues: list[IssueInfo],
58 completed_ids: set[str] | None = None,
59 all_known_ids: set[str] | None = None,
60 ) -> DependencyGraph:
61 """Build graph from list of issues.
63 Constructs a dependency graph where:
64 - Each issue is a node
65 - blocked_by relationships create edges from blockers to blocked issues
66 - Completed issues are treated as satisfied (not blocking)
67 - Missing issues are logged as warnings but don't block
69 Args:
70 issues: List of IssueInfo objects with blocked_by/blocks fields
71 completed_ids: Set of completed issue IDs (treated as resolved)
72 all_known_ids: Set of all issue IDs that exist on disk. When
73 provided, references to issues in this set are silently
74 skipped (not warned) even if they are not in the graph.
76 Returns:
77 Constructed DependencyGraph
79 Example:
80 >>> issues = [issue_a, issue_b, issue_c]
81 >>> completed = {"FEAT-001"} # Already done
82 >>> graph = DependencyGraph.from_issues(issues, completed)
83 """
84 completed = completed_ids or set()
85 graph = cls()
87 # Index all issues by ID
88 for issue in issues:
89 graph.issues[issue.issue_id] = issue
90 graph.blocked_by[issue.issue_id] = set()
91 graph.blocks[issue.issue_id] = set()
93 # Build dependency edges
94 all_issue_ids = set(graph.issues.keys())
95 for issue in issues:
96 for blocker_id in issue.blocked_by:
97 # Skip completed blockers (already satisfied)
98 if blocker_id in completed:
99 continue
100 # Skip blockers not in the graph
101 if blocker_id not in all_issue_ids:
102 # Only warn if the issue truly doesn't exist on disk
103 if all_known_ids is None or blocker_id not in all_known_ids:
104 logger.warning(
105 f"Issue {issue.issue_id} blocked by unknown issue {blocker_id}"
106 )
107 continue
108 # Add bidirectional edge
109 graph.blocked_by[issue.issue_id].add(blocker_id)
110 graph.blocks[blocker_id].add(issue.issue_id)
112 # Second pass: honour one-sided blocks: declarations.
113 # If A declares blocks: [B] without a matching blocked_by: [A] on B,
114 # the A→B edge was never added above; add it now (dedup guard prevents
115 # double-counting symmetric declarations).
116 for issue in issues:
117 for blocked_id in issue.blocks:
118 if blocked_id in completed:
119 continue
120 if blocked_id not in all_issue_ids:
121 if all_known_ids is None or blocked_id not in all_known_ids:
122 logger.warning(f"Issue {issue.issue_id} blocks unknown issue {blocked_id}")
123 continue
124 if issue.issue_id not in graph.blocked_by.get(blocked_id, set()):
125 graph.blocked_by[blocked_id].add(issue.issue_id)
126 graph.blocks[issue.issue_id].add(blocked_id)
128 # Third pass: populate depends_on_edges (soft prerequisite relationships).
129 # depends_on is one-directional — no reverse edge is built here.
130 for issue in issues:
131 for target_id in issue.depends_on:
132 if target_id in completed:
133 continue
134 if target_id not in all_issue_ids:
135 if all_known_ids is None or target_id not in all_known_ids:
136 logger.warning(
137 f"Issue {issue.issue_id} has depends_on unknown issue {target_id}"
138 )
139 continue
140 if issue.issue_id not in graph.depends_on_edges:
141 graph.depends_on_edges[issue.issue_id] = set()
142 graph.depends_on_edges[issue.issue_id].add(target_id)
144 return graph
146 def get_ready_issues(self, completed: set[str] | None = None) -> list[IssueInfo]:
147 """Return issues whose blockers are all completed.
149 An issue is "ready" if:
150 - It is not already completed
151 - All its blockers are either completed or not in the graph
153 Args:
154 completed: Set of completed issue IDs
156 Returns:
157 List of IssueInfo for issues with no active blockers,
158 sorted by priority (highest first) then issue_id
159 """
160 completed = completed or set()
161 ready = []
162 for issue_id, issue in self.issues.items():
163 if issue_id in completed:
164 continue
165 blockers = self.get_blocking_issues(issue_id, completed)
166 if not blockers:
167 ready.append(issue)
168 # Sort by priority then issue_id for consistent ordering
169 ready.sort(key=lambda x: (x.priority_int, x.issue_id))
170 return ready
172 def get_execution_waves(self, completed: set[str] | None = None) -> list[list[IssueInfo]]:
173 """Return issues grouped into parallel execution waves.
175 Wave 1: All issues with no blockers (or blockers already completed)
176 Wave 2: Issues whose blockers are all in wave 1
177 Wave N: Issues whose blockers are all in waves 1..N-1
179 This is similar to topological_sort but groups issues by "level"
180 rather than returning a flat list.
182 Args:
183 completed: Set of already-completed issue IDs
185 Returns:
186 List of waves, each wave is a list of issues that can run in parallel.
187 Empty list if graph is empty or all issues are completed.
189 Raises:
190 ValueError: If graph contains cycles (not a DAG)
192 Example:
193 If A blocks B and C, and B and C block D:
194 - Wave 1: [A]
195 - Wave 2: [B, C]
196 - Wave 3: [D]
197 """
198 completed = completed or set()
199 waves: list[list[IssueInfo]] = []
200 processed: set[str] = set(completed)
202 while True:
203 # Get issues ready to run (all hard blockers in processed set)
204 wave = self.get_ready_issues(completed=processed)
205 if not wave:
206 break
208 # Soft ordering: nudge depends_on targets into this wave when their
209 # hard blockers are all satisfied by processed ∪ current wave.
210 after_wave: set[str] = processed | {i.issue_id for i in wave}
211 nudged: list[IssueInfo] = []
212 for issue in wave:
213 for target_id in self.depends_on_edges.get(issue.issue_id, set()):
214 if target_id in after_wave or target_id not in self.issues:
215 continue
216 if not self.get_blocking_issues(target_id, after_wave):
217 nudged.append(self.issues[target_id])
218 after_wave.add(target_id)
219 wave.extend(nudged)
221 waves.append(wave)
222 # Mark this wave (including nudged) as processed for next iteration
223 for issue in wave:
224 processed.add(issue.issue_id)
226 # Check for cycles - if we have unprocessed issues, there's a cycle
227 remaining = set(self.issues.keys()) - processed
228 if remaining:
229 cycles = self.detect_cycles()
230 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles)
231 raise ValueError(f"Dependency graph contains cycles: {cycle_str}")
233 return waves
235 def is_blocked(self, issue_id: str, completed: set[str] | None = None) -> bool:
236 """Check if an issue is still blocked.
238 Args:
239 issue_id: Issue ID to check
240 completed: Set of completed issue IDs
242 Returns:
243 True if issue has unresolved blockers, False otherwise
244 """
245 return bool(self.get_blocking_issues(issue_id, completed))
247 def get_blocking_issues(self, issue_id: str, completed: set[str] | None = None) -> set[str]:
248 """Return incomplete issues blocking this one.
250 Args:
251 issue_id: Issue ID to check
252 completed: Set of completed issue IDs
254 Returns:
255 Set of issue IDs still blocking this issue
256 """
257 completed = completed or set()
258 blockers = self.blocked_by.get(issue_id, set())
259 return blockers - completed
261 def get_blocked_by_issue(self, issue_id: str) -> set[str]:
262 """Return issues that this issue blocks.
264 Args:
265 issue_id: Issue ID to check
267 Returns:
268 Set of issue IDs that are blocked by this issue
269 """
270 return self.blocks.get(issue_id, set()).copy()
272 def topological_sort(self) -> list[IssueInfo]:
273 """Return issues in dependency order (Kahn's algorithm).
275 Issues with no dependencies come first, followed by issues
276 whose dependencies have been satisfied. Within each "level",
277 issues are sorted by priority then issue_id.
279 Returns:
280 List of IssueInfo in topological order
282 Raises:
283 ValueError: If graph contains cycles (not a DAG)
285 Example:
286 If A blocks B, and B blocks C, returns [A, B, C]
287 """
288 # Calculate in-degree for each node (number of blockers)
289 in_degree: dict[str, int] = {
290 issue_id: len(blockers) for issue_id, blockers in self.blocked_by.items()
291 }
293 # Start with nodes that have no blockers, sorted by priority
294 zero_degree = [
295 self.issues[issue_id] for issue_id, degree in in_degree.items() if degree == 0
296 ]
297 zero_degree.sort(key=lambda x: (x.priority_int, x.issue_id))
298 queue: deque[str] = deque(issue.issue_id for issue in zero_degree)
300 result: list[IssueInfo] = []
301 while queue:
302 issue_id = queue.popleft()
303 result.append(self.issues[issue_id])
305 # Reduce in-degree for nodes this one blocks
306 # Collect newly ready nodes, then sort before adding to queue
307 newly_ready: list[IssueInfo] = []
308 for blocked_id in self.blocks.get(issue_id, set()):
309 in_degree[blocked_id] -= 1
310 if in_degree[blocked_id] == 0:
311 newly_ready.append(self.issues[blocked_id])
313 # Sort newly ready by priority for consistent ordering
314 newly_ready.sort(key=lambda x: (x.priority_int, x.issue_id))
315 for issue in newly_ready:
316 queue.append(issue.issue_id)
318 # Check for cycles - if we didn't process all nodes, there's a cycle
319 if len(result) != len(self.issues):
320 cycles = self.detect_cycles()
321 cycle_str = ", ".join(" -> ".join(cycle) for cycle in cycles)
322 raise ValueError(f"Dependency graph contains cycles: {cycle_str}")
324 return result
326 def detect_cycles(self) -> list[list[str]]:
327 """Detect and return any dependency cycles.
329 Uses DFS with coloring to find back edges indicating cycles.
330 A cycle exists when we encounter a node that is currently being
331 visited (GRAY state) in the DFS traversal.
333 Returns:
334 List of cycles, each cycle is a list of issue IDs forming
335 a path from the cycle start back to itself.
336 Empty list if no cycles exist.
338 Example:
339 If A -> B -> C -> A (circular), returns [["A", "B", "C", "A"]]
340 """
341 WHITE, GRAY, BLACK = 0, 1, 2
342 color: dict[str, int] = dict.fromkeys(self.issues, WHITE)
343 cycles: list[list[str]] = []
344 path: list[str] = []
346 def dfs(node: str) -> None:
347 color[node] = GRAY
348 path.append(node)
350 # Traverse blockers (edges point from blocked to blocker)
351 for neighbor in self.blocked_by.get(node, set()):
352 if neighbor not in color:
353 continue
354 if color[neighbor] == GRAY:
355 # Found cycle - extract from path
356 cycle_start = path.index(neighbor)
357 cycle = path[cycle_start:] + [neighbor]
358 cycles.append(cycle)
359 elif color[neighbor] == WHITE:
360 dfs(neighbor)
362 path.pop()
363 color[node] = BLACK
365 for issue_id in self.issues:
366 if color[issue_id] == WHITE:
367 dfs(issue_id)
369 return cycles
371 def has_cycles(self) -> bool:
372 """Check if the graph contains any cycles.
374 Returns:
375 True if cycles exist, False otherwise
376 """
377 return len(self.detect_cycles()) > 0
379 def __len__(self) -> int:
380 """Return number of issues in the graph."""
381 return len(self.issues)
383 def __contains__(self, issue_id: str) -> bool:
384 """Check if an issue is in the graph."""
385 return issue_id in self.issues
388def refine_waves_for_contention(
389 waves: list[list[IssueInfo]],
390 *,
391 config: DependencyMappingConfig | None = None,
392) -> tuple[list[list[IssueInfo]], list[WaveContentionNote | None]]:
393 """Refine execution waves by splitting on file overlap.
395 For each wave with multiple issues, extracts file hints from issue
396 content and checks for pairwise overlaps. Overlapping issues are
397 split into sub-waves using greedy graph coloring so no two issues
398 in the same sub-wave modify the same files.
400 Args:
401 waves: Execution waves from get_execution_waves()
403 Returns:
404 Tuple of (refined_waves, contention_notes).
405 refined_waves: Wave list with contention-free sub-waves.
406 contention_notes: Parallel list (same length as refined_waves).
407 None for waves that weren't split, WaveContentionNote for sub-waves.
408 """
409 from little_loops.parallel.file_hints import FileHints, extract_file_hints
411 refined: list[list[IssueInfo]] = []
412 annotations: list[WaveContentionNote | None] = []
414 for orig_idx, wave in enumerate(waves):
415 if len(wave) <= 1:
416 refined.append(wave)
417 annotations.append(None)
418 continue
420 # Extract file hints for each issue in the wave
421 hints: dict[str, FileHints] = {}
422 for issue in wave:
423 content = issue.path.read_text() if issue.path.exists() else ""
424 hints[issue.issue_id] = extract_file_hints(content, issue.issue_id)
426 # Single pass — build conflict adjacency and collect contended paths together
427 conflicts: dict[str, set[str]] = {issue.issue_id: set() for issue in wave}
428 contended: set[str] = set()
429 for i, a in enumerate(wave):
430 for b in wave[i + 1 :]:
431 paths = hints[a.issue_id].get_overlapping_paths(hints[b.issue_id], config=config)
432 if paths:
433 conflicts[a.issue_id].add(b.issue_id)
434 conflicts[b.issue_id].add(a.issue_id)
435 contended |= paths
437 # If no conflicts, keep wave as-is
438 if not any(conflicts.values()):
439 refined.append(wave)
440 annotations.append(None)
441 continue
443 contended_paths = sorted(contended)
445 # Greedy graph coloring — assign each issue the lowest color
446 # not used by any conflicting neighbor
447 color: dict[str, int] = {}
448 for issue in wave: # iterate in priority order (preserved from get_ready_issues)
449 used_colors = {color[c] for c in conflicts[issue.issue_id] if c in color}
450 c = 0
451 while c in used_colors:
452 c += 1
453 color[issue.issue_id] = c
455 # Group issues by color into sub-waves, preserving priority order
456 max_color = max(color.values())
457 total_sub_waves = max_color + 1
458 for c in range(total_sub_waves):
459 sub_wave = [issue for issue in wave if color[issue.issue_id] == c]
460 if sub_wave:
461 refined.append(sub_wave)
462 annotations.append(
463 WaveContentionNote(
464 contended_paths=contended_paths,
465 sub_wave_index=c,
466 total_sub_waves=total_sub_waves,
467 parent_wave_index=orig_idx,
468 )
469 )
471 logger.info(f" Wave split into {total_sub_waves} sub-waves due to file overlap")
473 return refined, annotations