Coverage for little_loops / workflow_sequence / analysis.py: 98%
332 statements
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
« prev ^ index » next coverage.py v7.12.0, created at 2026-05-22 16:19 -0500
1"""Analysis functions for workflow sequence detection."""
3from __future__ import annotations
5import json
6import re
7import sys
8from datetime import datetime
9from pathlib import Path
10from typing import Any
12import yaml
14from little_loops.workflow_sequence.io import _load_messages, _load_patterns
15from little_loops.workflow_sequence.models import (
16 EntityCluster,
17 SessionLink,
18 Workflow,
19 WorkflowAnalysis,
20 WorkflowBoundary,
21)
23# Module-level compiled regex patterns
24FILE_PATTERN = re.compile(r"[\w./-]+\.(?:md|py|json|yaml|yml|js|ts|tsx|jsx|sh|toml)", re.IGNORECASE)
25PHASE_PATTERN = re.compile(r"phase[- ]?\d+", re.IGNORECASE)
26MODULE_PATTERN = re.compile(r"module[- ]?\d+", re.IGNORECASE)
27COMMAND_PATTERN = re.compile(r"/[\w:-]+")
28ISSUE_PATTERN = re.compile(r"(?:BUG|FEAT|ENH|EPIC)-\d+", re.IGNORECASE)
30# Verb class taxonomy for semantic similarity
31VERB_CLASSES: dict[str, set[str]] = {
32 "deletion": {"remove", "delete", "drop", "eliminate", "clear", "clean"},
33 "modification": {"update", "change", "modify", "edit", "fix", "adjust", "revise"},
34 "creation": {"create", "add", "generate", "write", "make", "build"},
35 "search": {"find", "search", "locate", "where", "what", "which", "list"},
36 "verification": {"check", "verify", "validate", "confirm", "review", "ensure"},
37 "execution": {"run", "execute", "launch", "start", "invoke", "call"},
38}
40# Workflow templates: category sequences that indicate common patterns
41WORKFLOW_TEMPLATES: dict[str, list[str]] = {
42 "explore → modify → verify": ["file_search", "code_modification", "testing"],
43 "create → refine → finalize": ["file_write", "code_modification", "git_operation"],
44 "review → fix → commit": ["code_review", "code_modification", "git_operation"],
45 "plan → implement → verify": ["planning", "code_modification", "testing"],
46 "debug → fix → test": ["debugging", "code_modification", "testing"],
47}
49# Maps content keywords to workflow category labels used by WORKFLOW_TEMPLATES
50_CONTENT_CATEGORY_MAP: dict[str, list[str]] = {
51 "file_search": ["search", "find", "glob", "grep", "locate"],
52 "code_modification": ["edit", "write", "fix", "refactor", "update", "implement"],
53 "testing": ["test", "pytest", "assert", "verify", "check"],
54 "git_operation": ["commit", "push", "branch", "pr", "merge", "pull"],
55 "planning": ["plan", "design", "architect", "outline", "draft"],
56 "debugging": ["debug", "trace", "breakpoint", "error", "exception", "bug"],
57 "code_review": ["review", "inspect", "audit", "read", "examine"],
58 "file_write": ["create", "generate", "scaffold", "write", "add"],
59}
62# -----------------------------------------------------------------------------
63# Core Analysis Functions
64# -----------------------------------------------------------------------------
67def extract_entities(content: str) -> set[str]:
68 """Extract file paths, commands, and concepts from message content.
70 Args:
71 content: Message text content
73 Returns:
74 Set of extracted entities (file paths, commands, issue IDs, etc.)
75 """
76 entities: set[str] = set()
78 # File paths
79 entities.update(FILE_PATTERN.findall(content))
81 # Phase/module references
82 entities.update(PHASE_PATTERN.findall(content.lower()))
83 entities.update(MODULE_PATTERN.findall(content.lower()))
85 # Slash commands
86 entities.update(COMMAND_PATTERN.findall(content))
88 # Issue IDs (normalize to uppercase)
89 entities.update(match.upper() for match in ISSUE_PATTERN.findall(content))
91 return entities
94def calculate_boundary_weight(gap_seconds: int) -> float:
95 """Calculate workflow boundary weight based on time gap.
97 Args:
98 gap_seconds: Time gap between messages in seconds
100 Returns:
101 Boundary weight from 0.0 to 0.95
102 """
103 if gap_seconds < 30:
104 return 0.0
105 elif gap_seconds < 120:
106 return 0.1
107 elif gap_seconds < 300:
108 return 0.3
109 elif gap_seconds < 900:
110 return 0.5
111 elif gap_seconds < 1800:
112 return 0.7
113 elif gap_seconds < 7200:
114 return 0.85
115 else:
116 return 0.95
119def entity_overlap(entities_a: set[str], entities_b: set[str]) -> float:
120 """Calculate Jaccard similarity between two entity sets.
122 Args:
123 entities_a: First set of entities
124 entities_b: Second set of entities
126 Returns:
127 Jaccard similarity coefficient (0.0 to 1.0)
128 """
129 if not entities_a or not entities_b:
130 return 0.0
131 intersection = len(entities_a & entities_b)
132 union = len(entities_a | entities_b)
133 return intersection / union if union > 0 else 0.0
136def get_verb_class(content: str) -> str | None:
137 """Extract verb class from message content.
139 Args:
140 content: Message text content
142 Returns:
143 Verb class name or None if no match
144 """
145 content_lower = content.lower()
146 words = set(re.findall(r"\b\w+\b", content_lower))
148 for verb_class, verbs in VERB_CLASSES.items():
149 if words & verbs:
150 return verb_class
151 return None
154def semantic_similarity(
155 content_a: str,
156 content_b: str,
157 entities_a: set[str],
158 entities_b: set[str],
159 category_a: str | None,
160 category_b: str | None,
161) -> float:
162 """Calculate semantic similarity between two messages.
164 Uses weighted combination of:
165 - Keyword overlap (0.3)
166 - Verb class match (0.3)
167 - Entity overlap (0.3)
168 - Category match (0.1)
170 Args:
171 content_a: First message content
172 content_b: Second message content
173 entities_a: Entities from first message
174 entities_b: Entities from second message
175 category_a: Category of first message
176 category_b: Category of second message
178 Returns:
179 Similarity score (0.0 to 1.0)
180 """
181 # Keyword overlap (simple word-level Jaccard)
182 words_a = set(re.findall(r"\b[a-z]{3,}\b", content_a.lower()))
183 words_b = set(re.findall(r"\b[a-z]{3,}\b", content_b.lower()))
184 keyword_sim = len(words_a & words_b) / len(words_a | words_b) if words_a | words_b else 0.0
186 # Verb class similarity
187 verb_a = get_verb_class(content_a)
188 verb_b = get_verb_class(content_b)
189 verb_sim = 1.0 if verb_a and verb_a == verb_b else 0.0
191 # Entity overlap
192 entity_sim = entity_overlap(entities_a, entities_b)
194 # Category match
195 category_sim = 1.0 if category_a and category_a == category_b else 0.0
197 # Weighted combination
198 return keyword_sim * 0.3 + verb_sim * 0.3 + entity_sim * 0.3 + category_sim * 0.1
201# -----------------------------------------------------------------------------
202# Internal Analysis Functions
203# -----------------------------------------------------------------------------
206def _detect_handoff(content: str) -> bool:
207 """Check if message indicates a session handoff."""
208 handoff_markers = [
209 "/ll:handoff",
210 "continue in new session",
211 "pick up in next session",
212 "resuming from",
213 "continuation of",
214 ]
215 content_lower = content.lower()
216 return any(marker in content_lower for marker in handoff_markers)
219def _parse_timestamps(messages: list[dict[str, Any]]) -> list[datetime]:
220 """Parse valid ISO timestamps from a list of messages, stripping timezone info."""
221 timestamps = []
222 for msg in messages:
223 ts_str = msg.get("timestamp", "")
224 if ts_str:
225 try:
226 ts = datetime.fromisoformat(ts_str.replace("Z", "+00:00"))
227 if ts.tzinfo is not None:
228 ts = ts.replace(tzinfo=None)
229 timestamps.append(ts)
230 except (ValueError, AttributeError, TypeError):
231 pass
232 return timestamps
235def _group_by_session(messages: list[dict[str, Any]]) -> dict[str, list[dict[str, Any]]]:
236 """Group messages by session_id."""
237 sessions: dict[str, list[dict[str, Any]]] = {}
238 for msg in messages:
239 session_id = msg.get("session_id", "unknown")
240 if session_id not in sessions:
241 sessions[session_id] = []
242 sessions[session_id].append(msg)
243 return sessions
246def _link_sessions(sessions: dict[str, list[dict[str, Any]]]) -> list[SessionLink]:
247 """Identify sessions that are part of the same workflow."""
248 links: list[SessionLink] = []
249 session_ids = list(sessions.keys())
250 link_counter = 0
252 for i, session_a_id in enumerate(session_ids):
253 session_a = sessions[session_a_id]
254 if not session_a:
255 continue
257 # Extract session metadata
258 last_msg_a = session_a[-1] if session_a else {}
259 entities_a: set[str] = set()
260 for msg in session_a:
261 entities_a.update(extract_entities(msg.get("content", "")))
262 branch_a = last_msg_a.get("git_branch")
264 for session_b_id in session_ids[i + 1 :]:
265 session_b = sessions[session_b_id]
266 if not session_b:
267 continue
269 first_msg_b = session_b[0] if session_b else {}
270 entities_b: set[str] = set()
271 for msg in session_b:
272 entities_b.update(extract_entities(msg.get("content", "")))
273 branch_b = first_msg_b.get("git_branch")
275 # Calculate link score
276 score = 0.0
277 evidence: list[str] = []
279 # Same git branch (HIGH weight)
280 if branch_a and branch_a == branch_b:
281 score += 0.4
282 evidence.append("shared_branch")
284 # Explicit handoff marker (HIGH weight)
285 if any(_detect_handoff(msg.get("content", "")) for msg in session_a):
286 score += 0.4
287 evidence.append("handoff_detected")
289 # Shared entities (MEDIUM weight)
290 overlap = entity_overlap(entities_a, entities_b)
291 if overlap > 0.5:
292 score += 0.2
293 evidence.append("entity_overlap")
294 elif overlap > 0.3:
295 score += 0.1
296 evidence.append("partial_entity_overlap")
298 if score > 0.3:
299 link_counter += 1
301 # Calculate span
302 timestamps = _parse_timestamps(session_a + session_b)
304 span_hours = 0.0
305 if len(timestamps) >= 2:
306 try:
307 span_hours = (max(timestamps) - min(timestamps)).total_seconds() / 3600
308 except TypeError:
309 span_hours = 0.0
311 links.append(
312 SessionLink(
313 link_id=f"link-{link_counter:03d}",
314 sessions=[
315 {
316 "session_id": session_a_id,
317 "position": 1,
318 "link_evidence": evidence[0] if evidence else "score",
319 },
320 {
321 "session_id": session_b_id,
322 "position": 2,
323 "link_evidence": evidence[-1] if evidence else "score",
324 },
325 ],
326 unified_workflow={
327 "name": f"Linked workflow {link_counter}",
328 "total_messages": len(session_a) + len(session_b),
329 "span_hours": round(span_hours, 1),
330 "evidence": evidence,
331 },
332 confidence=min(score, 1.0),
333 )
334 )
336 return links
339def _cluster_by_entities(
340 messages: list[dict[str, Any]], overlap_threshold: float = 0.3
341) -> list[EntityCluster]:
342 """Cluster messages with significant entity overlap."""
343 clusters: list[EntityCluster] = []
344 cluster_counter = 0
346 for msg in messages:
347 content = msg.get("content", "")
348 msg_entities = extract_entities(content)
350 if not msg_entities:
351 continue
353 # Find matching cluster
354 matched_cluster = None
355 best_overlap = overlap_threshold
357 for cluster in clusters:
358 overlap = entity_overlap(msg_entities, cluster.all_entities)
359 if overlap > best_overlap:
360 best_overlap = overlap
361 matched_cluster = cluster
363 if matched_cluster:
364 entities_matched = sorted(msg_entities & matched_cluster.all_entities)
365 matched_cluster.all_entities.update(msg_entities)
366 matched_cluster.messages.append(
367 {
368 "uuid": msg.get("uuid", ""),
369 "content": content[:80] + "..." if len(content) > 80 else content,
370 "entities_matched": entities_matched,
371 "timestamp": msg.get("timestamp"),
372 }
373 )
374 # Update cohesion score (average overlap of messages)
375 matched_cluster.cohesion_score = (
376 matched_cluster.cohesion_score * (len(matched_cluster.messages) - 1) + best_overlap
377 ) / len(matched_cluster.messages)
378 else:
379 cluster_counter += 1
380 # Create new cluster
381 primary = sorted(msg_entities)[:3] # Top 3 entities
382 cluster = EntityCluster(
383 cluster_id=f"cluster-{cluster_counter:03d}",
384 primary_entities=primary,
385 all_entities=msg_entities.copy(),
386 messages=[
387 {
388 "uuid": msg.get("uuid", ""),
389 "content": content[:80] + "..." if len(content) > 80 else content,
390 "entities_matched": sorted(msg_entities),
391 "timestamp": msg.get("timestamp"),
392 }
393 ],
394 cohesion_score=1.0,
395 )
396 clusters.append(cluster)
398 # Populate span and inferred_workflow for each multi-message cluster
399 for cluster in clusters:
400 # Compute span from timestamps
401 timestamps = _parse_timestamps(cluster.messages)
402 if len(timestamps) >= 2:
403 cluster.span = {
404 "start": min(timestamps).isoformat(),
405 "end": max(timestamps).isoformat(),
406 }
408 # Infer workflow by matching cluster message content against WORKFLOW_TEMPLATES
409 cluster_categories: set[str] = set()
410 for m in cluster.messages:
411 lower = m.get("content", "").lower()
412 for category, keywords in _CONTENT_CATEGORY_MAP.items():
413 if any(kw in lower for kw in keywords):
414 cluster_categories.add(category)
416 best_name: str | None = None
417 best_score = 0.0
418 for template_name, template_cats in WORKFLOW_TEMPLATES.items():
419 template_set = set(template_cats)
420 if template_set:
421 overlap = len(cluster_categories & template_set) / len(template_set)
422 if overlap > best_score:
423 best_score = overlap
424 best_name = template_name
425 if best_score >= 0.3:
426 cluster.inferred_workflow = best_name
428 # Filter out single-message clusters
429 return [c for c in clusters if len(c.messages) >= 2]
432def _compute_boundaries(
433 messages: list[dict[str, Any]], boundary_threshold: float = 0.6
434) -> list[WorkflowBoundary]:
435 """Compute workflow boundaries between consecutive messages."""
436 boundaries: list[WorkflowBoundary] = []
438 # Sort by timestamp
439 sorted_msgs = sorted(messages, key=lambda m: m.get("timestamp", ""))
441 # Pre-compute entity sets once per message (avoids re-extracting per sliding-window pair)
442 all_entities = [extract_entities(m.get("content", "")) for m in sorted_msgs]
444 for i in range(len(sorted_msgs) - 1):
445 msg_a = sorted_msgs[i]
446 msg_b = sorted_msgs[i + 1]
448 # Parse timestamps
449 pair_timestamps = _parse_timestamps([msg_a, msg_b])
450 if len(pair_timestamps) == 2:
451 gap_seconds = int((pair_timestamps[1] - pair_timestamps[0]).total_seconds())
452 else:
453 gap_seconds = 0
455 # Calculate time gap weight
456 time_weight = calculate_boundary_weight(gap_seconds)
458 # Calculate entity overlap using pre-computed sets
459 entities_a = all_entities[i]
460 entities_b = all_entities[i + 1]
461 overlap = entity_overlap(entities_a, entities_b)
463 # Adjust for entity overlap (reduce boundary weight if same topic)
464 final_score = time_weight
465 if overlap > 0.5:
466 final_score = max(0.0, time_weight - 0.3)
467 elif overlap > 0.3:
468 final_score = max(0.0, time_weight - 0.15)
470 is_boundary = final_score >= boundary_threshold
472 boundaries.append(
473 WorkflowBoundary(
474 msg_a=msg_a.get("uuid", ""),
475 msg_b=msg_b.get("uuid", ""),
476 time_gap_seconds=gap_seconds,
477 time_gap_weight=time_weight,
478 entity_overlap=overlap,
479 final_boundary_score=final_score,
480 is_boundary=is_boundary,
481 )
482 )
484 return boundaries
487def _get_message_category(msg_uuid: str, patterns: dict[str, Any]) -> str | None:
488 """Look up message category from Step 1 patterns."""
489 for category_info in patterns.get("category_distribution", []):
490 for example in category_info.get("example_messages", []):
491 if example.get("uuid") == msg_uuid:
492 category = category_info.get("category")
493 return category if isinstance(category, str) else None
494 return None
497def _build_category_index(patterns: dict[str, Any]) -> dict[str, str]:
498 """Build a flat UUID → category mapping from patterns category_distribution."""
499 index: dict[str, str] = {}
500 for category_info in patterns.get("category_distribution", []):
501 category = category_info.get("category")
502 if not isinstance(category, str):
503 continue
504 for example in category_info.get("example_messages", []):
505 uuid = example.get("uuid")
506 if uuid:
507 index[uuid] = category
508 return index
511def _detect_workflows(
512 messages: list[dict[str, Any]],
513 boundaries: list[WorkflowBoundary],
514 patterns: dict[str, Any],
515) -> list[Workflow]:
516 """Detect multi-step workflows using template matching."""
517 workflows: list[Workflow] = []
518 workflow_counter = 0
520 # Sort messages by timestamp
521 sorted_msgs = sorted(messages, key=lambda m: m.get("timestamp", ""))
523 # Build boundary index (msg_b uuid -> is_boundary)
524 boundary_before: dict[str, bool] = {}
525 for b in boundaries:
526 boundary_before[b.msg_b] = b.is_boundary
528 # Build category index (uuid -> category) for O(1) lookups
529 category_index = _build_category_index(patterns)
531 # Segment messages by boundaries
532 segments: list[list[dict[str, Any]]] = []
533 current_segment: list[dict[str, Any]] = []
535 for msg in sorted_msgs:
536 uuid = msg.get("uuid", "")
537 if boundary_before.get(uuid, False) and current_segment:
538 segments.append(current_segment)
539 current_segment = []
540 current_segment.append(msg)
542 if current_segment:
543 segments.append(current_segment)
545 # Match each segment against workflow templates
546 for segment in segments:
547 if len(segment) < 2:
548 continue
550 # Get categories for segment messages (from patterns)
551 segment_categories: list[str] = []
552 for msg in segment:
553 cat = category_index.get(msg.get("uuid", ""))
554 if cat:
555 segment_categories.append(cat)
557 if len(segment_categories) < 2:
558 continue
560 # Find best matching template
561 best_match: tuple[str, float] | None = None
563 for template_name, template_cats in WORKFLOW_TEMPLATES.items():
564 # Check if template categories appear in sequence (allowing gaps)
565 template_idx = 0
566 matches = 0
568 for cat in segment_categories:
569 if template_idx < len(template_cats) and cat == template_cats[template_idx]:
570 matches += 1
571 template_idx += 1
573 if matches >= 2: # At least 2 template steps matched
574 confidence = matches / len(template_cats)
575 if best_match is None or confidence > best_match[1]:
576 best_match = (template_name, confidence)
578 if best_match:
579 workflow_counter += 1
581 # Calculate duration
582 timestamps = _parse_timestamps(segment)
584 duration_minutes = 0
585 if len(timestamps) >= 2:
586 try:
587 duration_minutes = int((max(timestamps) - min(timestamps)).total_seconds() / 60)
588 except TypeError:
589 duration_minutes = 0
591 # Get sessions
592 session_ids = list({msg.get("session_id", "") for msg in segment})
594 workflows.append(
595 Workflow(
596 workflow_id=f"wf-{workflow_counter:03d}",
597 name=f"Detected: {best_match[0]}",
598 pattern=best_match[0],
599 pattern_confidence=best_match[1],
600 messages=[
601 {
602 "uuid": msg.get("uuid", ""),
603 "category": category_index.get(msg.get("uuid", "")),
604 "step": i + 1,
605 }
606 for i, msg in enumerate(segment)
607 ],
608 session_span=session_ids,
609 duration_minutes=duration_minutes,
610 )
611 )
613 return workflows
616# -----------------------------------------------------------------------------
617# Main API
618# -----------------------------------------------------------------------------
621def analyze_workflows(
622 messages_file: Path,
623 patterns_file: Path,
624 output_file: Path | None = None,
625 overlap_threshold: float = 0.3,
626 boundary_threshold: float = 0.6,
627 verbose: bool = False,
628 output_format: str = "yaml",
629) -> WorkflowAnalysis:
630 """Main entry point: analyze workflows from messages and patterns.
632 Args:
633 messages_file: Path to JSONL file with extracted user messages
634 patterns_file: Path to YAML file from Step 1 (workflow-pattern-analyzer)
635 output_file: Output path for step2-workflows.yaml (optional)
636 overlap_threshold: Minimum entity overlap to cluster messages together (default: 0.3)
637 boundary_threshold: Minimum boundary score to split workflow segments (default: 0.6)
638 verbose: Emit per-stage progress to stderr (default: False)
639 output_format: Output serialization format, "yaml" or "json" (default: "yaml")
641 Returns:
642 WorkflowAnalysis with all analysis results
643 """
644 # Load inputs
645 messages = _load_messages(messages_file)
646 patterns = _load_patterns(patterns_file)
648 # Build metadata
649 metadata = {
650 "source_file": messages_file.name,
651 "patterns_file": patterns_file.name,
652 "message_count": len(messages),
653 "analysis_timestamp": datetime.now().isoformat(),
654 "module": "workflow-sequence-analyzer",
655 "version": "1.0",
656 }
658 # Run analysis pipeline
659 sessions = _group_by_session(messages)
660 if verbose:
661 print(f"[1/4] Linking sessions across {len(sessions)} session(s)...", file=sys.stderr)
662 session_links = _link_sessions(sessions)
663 if verbose:
664 print(f" → {len(session_links)} link(s) found", file=sys.stderr)
665 if verbose:
666 print("[2/4] Clustering by entities...", file=sys.stderr)
667 entity_clusters = _cluster_by_entities(messages, overlap_threshold=overlap_threshold)
668 if verbose:
669 print(f" → {len(entity_clusters)} cluster(s) found", file=sys.stderr)
670 if verbose:
671 print("[3/4] Computing workflow boundaries...", file=sys.stderr)
672 boundaries = _compute_boundaries(messages, boundary_threshold=boundary_threshold)
673 if verbose:
674 print(f" → {len(boundaries)} boundary/boundaries found", file=sys.stderr)
675 if verbose:
676 print("[4/4] Detecting workflows...", file=sys.stderr)
677 workflows = _detect_workflows(messages, boundaries, patterns)
678 if verbose:
679 print(f" → {len(workflows)} workflow(s) detected", file=sys.stderr)
681 # Cross-reference: link workflows to entity clusters and populate handoff_points
682 uuid_to_cluster: dict[str, str] = {}
683 for cluster in entity_clusters:
684 for msg in cluster.messages:
685 uuid = msg.get("uuid", "")
686 if uuid:
687 uuid_to_cluster[uuid] = cluster.cluster_id
689 uuid_to_content: dict[str, str] = {
690 m.get("uuid", ""): m.get("content", "") for m in messages if m.get("uuid", "")
691 }
693 for workflow in workflows:
694 cluster_votes: dict[str, int] = {}
695 for msg in workflow.messages:
696 cluster_id = uuid_to_cluster.get(msg.get("uuid", ""))
697 if cluster_id:
698 cluster_votes[cluster_id] = cluster_votes.get(cluster_id, 0) + 1
699 if cluster_votes:
700 workflow.entity_cluster = max(cluster_votes, key=cluster_votes.__getitem__)
702 for msg in workflow.messages:
703 uuid = msg.get("uuid", "")
704 if uuid and _detect_handoff(uuid_to_content.get(uuid, "")):
705 workflow.handoff_points.append({"uuid": uuid, "type": "explicit_handoff"})
707 # Compute handoff analysis
708 handoff_count = sum(
709 1
710 for link in session_links
711 if "handoff_detected" in link.unified_workflow.get("evidence", [])
712 )
714 handoff_analysis: dict[str, Any] = {
715 "total_handoffs": handoff_count,
716 "handoff_patterns": [
717 {"pattern": "explicit_handoff", "count": handoff_count},
718 {"pattern": "session_timeout", "count": len(session_links) - handoff_count},
719 ],
720 "recommendations": [],
721 }
723 if len(session_links) > handoff_count:
724 handoff_analysis["recommendations"].append(
725 "Consider using /ll:handoff for cleaner session transitions"
726 )
728 # Build result
729 analysis = WorkflowAnalysis(
730 metadata=metadata,
731 session_links=session_links,
732 entity_clusters=entity_clusters,
733 workflow_boundaries=boundaries,
734 workflows=workflows,
735 handoff_analysis=handoff_analysis,
736 )
738 # Write output if path provided
739 if output_file:
740 output_file = Path(output_file)
741 output_file.parent.mkdir(parents=True, exist_ok=True)
742 with open(output_file, "w", encoding="utf-8") as f:
743 if output_format == "json":
744 json.dump(analysis.to_dict(), f, indent=2, default=str)
745 else:
746 yaml.dump(analysis.to_dict(), f, default_flow_style=False, sort_keys=False)
748 return analysis