Coverage for little_loops / dependency_mapper / analysis.py: 96%
197 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Dependency analysis functions.
3Functions for computing conflict scores, finding file overlaps,
4validating dependency references, and orchestrating full dependency analysis.
5"""
7from __future__ import annotations
9import logging
10import re
11from typing import TYPE_CHECKING
13from little_loops.dependency_graph import DependencyGraph
14from little_loops.dependency_mapper.models import (
15 DependencyProposal,
16 DependencyReport,
17 ParallelSafePair,
18 ValidationResult,
19)
20from little_loops.text_utils import extract_file_paths
22if TYPE_CHECKING:
23 from little_loops.config import DependencyMappingConfig
24 from little_loops.issue_parser import IssueInfo
26logger = logging.getLogger(__name__)
28_CODE_FENCE = re.compile(r"```[\s\S]*?```", re.MULTILINE)
30# Semantic target extraction patterns
31_PASCAL_CASE = re.compile(r"\b([A-Z][a-z]+(?:[A-Z][a-z]+)+)\b")
32_FUNCTION_REF = re.compile(r"`(\w+)\(\)`")
33_COMPONENT_SCOPE = re.compile(
34 r"(?:component|module|class|widget|section)[:\s]+[`\"']?([a-zA-Z0-9_./\-]{3,})[`\"']?",
35 re.IGNORECASE,
36)
38# UI region / section keywords mapped to canonical names
39_SECTION_KEYWORDS: dict[str, frozenset[str]] = {
40 "header": frozenset({"header", "navbar", "toolbar", "top bar"}),
41 "body": frozenset({"droppable"}),
42 "footer": frozenset({"footer", "status bar", "action bar"}),
43 "sidebar": frozenset({"sidebar", "side panel", "drawer"}),
44 "card": frozenset({"card", "tile"}),
45 "modal": frozenset({"modal", "dialog", "popup", "overlay"}),
46 "form": frozenset({"form"}),
47}
49# Modification type classification keywords
50_MODIFICATION_TYPES: dict[str, frozenset[str]] = {
51 "structural": frozenset(
52 {
53 "extract",
54 "split",
55 "refactor",
56 "restructure",
57 "reorganize",
58 "create new component",
59 "break out",
60 "separate",
61 "decompose",
62 }
63 ),
64 "infrastructure": frozenset(
65 {
66 "listener",
67 "provider",
68 "state management",
69 "routing",
70 "middleware",
71 "dragging",
72 "drag",
73 "drop",
74 "dnd",
75 }
76 ),
77 "enhancement": frozenset(
78 {
79 "add button",
80 "add field",
81 "add column",
82 "add stats",
83 "add icon",
84 "add toggle",
85 "empty state",
86 "placeholder",
87 "tooltip",
88 "badge",
89 }
90 ),
91}
94def _basename(path: str) -> str:
95 """Extract the basename from a file path."""
96 return path.rsplit("/", 1)[-1] if "/" in path else path
99def _extract_semantic_targets(content: str) -> set[str]:
100 """Extract component and function references from issue content.
102 Identifies PascalCase component names, function references,
103 and explicitly mentioned component/module scopes.
105 Args:
106 content: Issue file content
108 Returns:
109 Set of normalized semantic target names
110 """
111 if not content:
112 return set()
114 stripped = _CODE_FENCE.sub("", content)
115 targets: set[str] = set()
117 for match in _PASCAL_CASE.finditer(stripped):
118 targets.add(match.group(1).lower())
120 for match in _FUNCTION_REF.finditer(stripped):
121 targets.add(match.group(1).lower())
123 for match in _COMPONENT_SCOPE.finditer(stripped):
124 targets.add(match.group(1).lower())
126 return targets
129def _extract_section_mentions(content: str) -> set[str]:
130 """Extract UI region/section references from issue content.
132 Maps keywords like "header", "body", "sidebar" to canonical
133 section names using word-boundary matching.
135 Args:
136 content: Issue file content
138 Returns:
139 Set of canonical section names mentioned
140 """
141 if not content:
142 return set()
144 content_lower = content.lower()
145 sections: set[str] = set()
147 for section_name, keywords in _SECTION_KEYWORDS.items():
148 for keyword in keywords:
149 # Use word boundary for single words, substring for multi-word phrases
150 if " " in keyword:
151 if keyword in content_lower:
152 sections.add(section_name)
153 break
154 else:
155 if re.search(rf"\b{re.escape(keyword)}\b", content_lower):
156 sections.add(section_name)
157 break
159 return sections
162def _classify_modification_type(content: str) -> str:
163 """Classify the modification type of an issue.
165 Returns one of: "structural", "infrastructure", "enhancement".
166 Falls back to "enhancement" if no clear match.
168 Args:
169 content: Issue file content
171 Returns:
172 Modification type classification string
173 """
174 if not content:
175 return "enhancement"
177 content_lower = content.lower()
179 for mod_type in ("structural", "infrastructure", "enhancement"):
180 keywords = _MODIFICATION_TYPES[mod_type]
181 for keyword in keywords:
182 if keyword in content_lower:
183 return mod_type
185 return "enhancement"
188def compute_conflict_score(
189 content_a: str,
190 content_b: str,
191 *,
192 config: DependencyMappingConfig | None = None,
193) -> float:
194 """Compute semantic conflict score between two issues.
196 Combines three signals with configurable weights:
197 - Semantic target overlap (component/function names)
198 - Section mention overlap (UI regions)
199 - Modification type match
201 Args:
202 content_a: First issue's file content
203 content_b: Second issue's file content
204 config: Optional dependency mapping config for custom scoring weights.
205 Falls back to default weights (0.5/0.3/0.2) when not provided.
207 Returns:
208 Conflict score from 0.0 (parallel-safe) to 1.0 (definite conflict)
209 """
210 targets_a = _extract_semantic_targets(content_a)
211 targets_b = _extract_semantic_targets(content_b)
213 sections_a = _extract_section_mentions(content_a)
214 sections_b = _extract_section_mentions(content_b)
216 type_a = _classify_modification_type(content_a)
217 type_b = _classify_modification_type(content_b)
219 # Resolve scoring weights from config or defaults
220 w_semantic = config.scoring_weights.semantic if config else 0.5
221 w_section = config.scoring_weights.section if config else 0.3
222 w_type = config.scoring_weights.type if config else 0.2
224 # Signal 1: Semantic target overlap (0.0 - 1.0)
225 if targets_a and targets_b:
226 target_union = len(targets_a | targets_b)
227 target_score = len(targets_a & targets_b) / target_union if target_union > 0 else 0.0
228 else:
229 target_score = 0.0 # Unknown — default to no conflict
231 # Signal 2: Section overlap (0.0 or 1.0)
232 if sections_a and sections_b:
233 section_score = 1.0 if sections_a & sections_b else 0.0
234 else:
235 section_score = 0.0 # Unknown — default to no conflict
237 # Signal 3: Modification type match (0.0 or 1.0)
238 type_score = 1.0 if type_a == type_b else 0.0
240 return round(target_score * w_semantic + section_score * w_section + type_score * w_type, 2)
243def find_file_overlaps(
244 issues: list[IssueInfo],
245 issue_contents: dict[str, str],
246 *,
247 config: DependencyMappingConfig | None = None,
248) -> tuple[list[DependencyProposal], list[ParallelSafePair]]:
249 """Find issues that reference overlapping files and propose dependencies.
251 For each pair of issues where both reference the same file(s), computes
252 a semantic conflict score. High-conflict pairs get dependency proposals;
253 low-conflict pairs are reported as parallel-safe.
255 Pairs that already have a dependency relationship are skipped.
257 Args:
258 issues: List of parsed issue objects
259 issue_contents: Mapping from issue_id to file content
260 config: Optional dependency mapping config for custom thresholds.
261 Falls back to hardcoded defaults when not provided.
263 Returns:
264 Tuple of (proposed dependencies, parallel-safe pairs)
265 """
266 # Build existing dependency set for skip check (blocked_by and depends_on)
267 existing_deps: set[tuple[str, str]] = set()
268 for issue in issues:
269 for blocker_id in issue.blocked_by:
270 existing_deps.add((issue.issue_id, blocker_id))
271 for dep_id in issue.depends_on:
272 existing_deps.add((issue.issue_id, dep_id))
274 # Resolve overlap thresholds from config or defaults
275 min_files = config.overlap_min_files if config else 2
276 min_ratio = config.overlap_min_ratio if config else 0.25
277 exclude_files = (
278 frozenset(config.exclude_common_files)
279 if config
280 else frozenset(
281 {
282 "__init__.py",
283 "pyproject.toml",
284 "setup.py",
285 "setup.cfg",
286 "CHANGELOG.md",
287 "README.md",
288 "conftest.py",
289 }
290 )
291 )
293 # Extract file paths per issue, filtering common infrastructure files
294 issue_paths: dict[str, set[str]] = {}
295 for issue in issues:
296 content = issue_contents.get(issue.issue_id, "")
297 paths = extract_file_paths(content)
298 if paths:
299 filtered = {p for p in paths if _basename(p) not in exclude_files}
300 if filtered:
301 issue_paths[issue.issue_id] = filtered
303 proposals: list[DependencyProposal] = []
304 parallel_safe: list[ParallelSafePair] = []
305 issue_ids = sorted(issue_paths.keys())
307 _type_order = {"structural": 0, "infrastructure": 1, "enhancement": 2}
309 for i, id_a in enumerate(issue_ids):
310 for id_b in issue_ids[i + 1 :]:
311 overlap = issue_paths[id_a] & issue_paths[id_b]
312 if not overlap:
313 continue
315 # Apply minimum overlap guards (matching FileHints.overlaps_with)
316 smaller_set = min(len(issue_paths[id_a]), len(issue_paths[id_b]))
317 ratio = len(overlap) / smaller_set if smaller_set > 0 else 0.0
318 if len(overlap) < min_files or ratio < min_ratio:
319 continue
321 # Skip if dependency already exists (in either direction)
322 if (id_a, id_b) in existing_deps or (id_b, id_a) in existing_deps:
323 continue
325 content_a = issue_contents.get(id_a, "")
326 content_b = issue_contents.get(id_b, "")
327 conflict = compute_conflict_score(content_a, content_b, config=config)
329 overlap_list = sorted(overlap)
331 # Resolve conflict threshold from config or default
332 conflict_threshold = config.conflict_threshold if config else 0.4
334 # Low-conflict pairs are parallel-safe
335 if conflict < conflict_threshold:
336 sections_a = _extract_section_mentions(content_a)
337 sections_b = _extract_section_mentions(content_b)
338 if sections_a and sections_b:
339 reason = (
340 f"Different sections ({', '.join(sorted(sections_a))}"
341 f" vs {', '.join(sorted(sections_b))})"
342 )
343 else:
344 reason = "Low semantic conflict score"
345 parallel_safe.append(
346 ParallelSafePair(
347 issue_a=id_a,
348 issue_b=id_b,
349 shared_files=overlap_list,
350 conflict_score=conflict,
351 reason=reason,
352 )
353 )
354 continue
356 # Determine direction for high-conflict pairs
357 issue_a = next(iss for iss in issues if iss.issue_id == id_a)
358 issue_b = next(iss for iss in issues if iss.issue_id == id_b)
360 confidence_modifier = 1.0
362 if issue_a.priority_int != issue_b.priority_int:
363 # Different priorities: higher priority blocks lower
364 if issue_a.priority_int < issue_b.priority_int:
365 target_id, source_id = id_a, id_b
366 else:
367 target_id, source_id = id_b, id_a
368 else:
369 # Same priority: use modification type ordering
370 type_a = _classify_modification_type(content_a)
371 type_b = _classify_modification_type(content_b)
372 order_a = _type_order.get(type_a, 2)
373 order_b = _type_order.get(type_b, 2)
375 if order_a != order_b:
376 if order_a < order_b:
377 target_id, source_id = id_a, id_b
378 else:
379 target_id, source_id = id_b, id_a
380 else:
381 # Fall back to ID ordering with reduced confidence
382 if id_a < id_b:
383 target_id, source_id = id_a, id_b
384 else:
385 target_id, source_id = id_b, id_a
386 confidence_modifier = config.confidence_modifier if config else 0.5
388 min_paths = min(len(issue_paths[id_a]), len(issue_paths[id_b]))
389 confidence = len(overlap) / min_paths if min_paths > 0 else 0.0
390 confidence *= confidence_modifier
392 rationale = (
393 f"{source_id} and {target_id} both reference "
394 f"{', '.join(overlap_list[:3])}"
395 f"{' and more' if len(overlap_list) > 3 else ''}. "
396 f"{target_id} has higher priority and should be completed first."
397 )
399 proposals.append(
400 DependencyProposal(
401 source_id=source_id,
402 target_id=target_id,
403 reason="file_overlap",
404 confidence=round(confidence, 2),
405 rationale=rationale,
406 overlapping_files=overlap_list,
407 conflict_score=conflict,
408 )
409 )
411 # Sort by confidence descending
412 proposals.sort(key=lambda p: -p.confidence)
413 return proposals, parallel_safe
416def validate_dependencies(
417 issues: list[IssueInfo],
418 completed_ids: set[str] | None = None,
419 all_known_ids: set[str] | None = None,
420) -> ValidationResult:
421 """Validate existing dependency references for integrity.
423 Checks:
424 - Broken refs: blocked_by entries referencing nonexistent issues
425 - Missing backlinks: A blocks B but B doesn't list A in blocked_by
426 - Cycles: circular dependency chains
427 - Stale completed refs: blocked_by entries referencing completed issues
429 Args:
430 issues: List of parsed issue objects
431 completed_ids: Set of completed issue IDs
432 all_known_ids: Set of all issue IDs that exist on disk (across all
433 categories and completed). When provided, references to issues
434 in this set are not flagged as broken even if they are not in
435 the working ``issues`` list.
437 Returns:
438 ValidationResult with all detected problems
439 """
440 completed = completed_ids or set()
441 result = ValidationResult()
443 active_ids = {issue.issue_id for issue in issues}
444 all_known = active_ids | completed
445 if all_known_ids:
446 all_known = all_known | all_known_ids
448 # Build lookup maps
449 blocked_by_map: dict[str, set[str]] = {}
450 blocks_map: dict[str, set[str]] = {}
451 for issue in issues:
452 blocked_by_map[issue.issue_id] = set(issue.blocked_by)
453 blocks_map[issue.issue_id] = set(issue.blocks)
455 for issue in issues:
456 for ref_id in issue.blocked_by:
457 if ref_id not in all_known:
458 result.broken_refs.append((issue.issue_id, ref_id))
459 elif ref_id in completed:
460 result.stale_completed_refs.append((issue.issue_id, ref_id))
462 # Check backlinks: if A.blocked_by contains B, then B.blocks should contain A
463 for ref_id in issue.blocked_by:
464 if ref_id in active_ids:
465 target_blocks = blocks_map.get(ref_id, set())
466 if issue.issue_id not in target_blocks:
467 result.missing_backlinks.append((issue.issue_id, ref_id))
469 for ref_id in issue.depends_on:
470 if ref_id not in all_known:
471 result.broken_depends_on_refs.append((issue.issue_id, ref_id))
473 for ref_id in issue.relates_to:
474 if ref_id not in all_known:
475 result.broken_relates_to_refs.append((issue.issue_id, ref_id))
477 if issue.duplicate_of and issue.duplicate_of not in all_known:
478 result.broken_refs.append((issue.issue_id, issue.duplicate_of))
480 # Cycle detection using DependencyGraph
481 graph = DependencyGraph.from_issues(issues, completed, all_known_ids=all_known_ids)
482 result.cycles = graph.detect_cycles()
484 return result
487_DEPRECATED_RELATIONSHIP_KEYS = frozenset({"parent_issue", "related"})
488_FRONTMATTER_RE = re.compile(r"^---\n(.*?)\n---", re.DOTALL)
491def validate_frontmatter_fields(issues: list[IssueInfo]) -> None:
492 """Warn about deprecated relationship frontmatter keys found in issue files on disk.
494 Reads the raw file content for each issue and emits a logger.warning()
495 for any deprecated key (e.g., ``parent_issue:``, ``related:``) left over
496 from pre-ENH-1434 migration.
498 Args:
499 issues: List of parsed issue objects (must have a valid .path attribute)
500 """
501 for issue in issues:
502 if not issue.path.exists():
503 continue
504 content = issue.path.read_text(encoding="utf-8")
505 fm_match = _FRONTMATTER_RE.match(content)
506 if not fm_match:
507 continue
508 fm_block = fm_match.group(1)
509 for key in _DEPRECATED_RELATIONSHIP_KEYS:
510 if re.search(rf"^{re.escape(key)}\s*:", fm_block, re.MULTILINE):
511 logger.warning(
512 "%s: deprecated frontmatter key '%s' — rename to the canonical equivalent",
513 issue.issue_id,
514 key,
515 )
518def analyze_dependencies(
519 issues: list[IssueInfo],
520 issue_contents: dict[str, str],
521 completed_ids: set[str] | None = None,
522 all_known_ids: set[str] | None = None,
523 *,
524 config: DependencyMappingConfig | None = None,
525) -> DependencyReport:
526 """Run full dependency analysis: discovery and validation.
528 Args:
529 issues: List of parsed issue objects
530 issue_contents: Mapping from issue_id to file content
531 completed_ids: Set of completed issue IDs
532 all_known_ids: Set of all issue IDs that exist on disk
533 config: Optional dependency mapping config for custom thresholds.
535 Returns:
536 Comprehensive dependency report
537 """
538 proposals, parallel_safe = find_file_overlaps(issues, issue_contents, config=config)
539 validation = validate_dependencies(issues, completed_ids, all_known_ids)
541 existing_dep_count = sum(len(issue.blocked_by) for issue in issues)
543 return DependencyReport(
544 proposals=proposals,
545 parallel_safe=parallel_safe,
546 validation=validation,
547 issue_count=len(issues),
548 existing_dep_count=existing_dep_count,
549 )