Coverage for src/dataknobs_fsm/api/advanced.py: 22%
544 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
1r"""Advanced API for debugging and profiling FSM workflows.
3This module provides advanced interfaces for users who need fine-grained
4control over FSM execution, debugging capabilities, performance profiling,
5and detailed execution monitoring. Use this API when building complex
6workflows or troubleshooting execution issues.
8Architecture:
9 The AdvancedFSM API provides three levels of control beyond SimpleFSM/AsyncSimpleFSM:
11 **1. Step-by-Step Execution:**
12 Execute FSM transitions one at a time with full inspection of state
13 between each step. Essential for debugging complex state machines.
15 **2. Breakpoint Debugging:**
16 Set breakpoints at specific states and inspect execution context when
17 reached. Similar to debugger breakpoints in code.
19 **3. Performance Profiling:**
20 Measure execution time for each state and transition, identify bottlenecks,
21 and optimize workflow performance.
23 **When to Use AdvancedFSM:**
24 - Debugging complex workflows with unexpected behavior
25 - Profiling performance to identify slow states
26 - Building custom execution strategies
27 - Implementing sophisticated error recovery
28 - Monitoring production workflows in detail
29 - Testing and validating FSM configurations
31 **When NOT to Use AdvancedFSM:**
32 - Simple production workflows (use AsyncSimpleFSM)
33 - High-throughput batch processing (use SimpleFSM.process_batch)
34 - Prototyping and scripts (use SimpleFSM)
36Execution Modes:
37 AdvancedFSM supports five execution modes:
39 **STEP_BY_STEP:**
40 Execute one transition at a time. After each step, execution pauses and
41 returns control to caller with current state and data. Call step() to
42 continue to next transition.
44 **BREAKPOINT:**
45 Set breakpoints at specific states. Execution runs normally until reaching
46 a breakpoint state, then pauses. Inspect state and data, then continue.
48 **TRACE:**
49 Record detailed execution trace including all state transitions, data
50 transformations, and timing information. Useful for auditing and debugging.
52 **PROFILE:**
53 Measure and record performance metrics for each state and transition.
54 Identify bottlenecks and optimize slow operations.
56 **DEBUG:**
57 Enable verbose logging and detailed error messages. All execution events
58 are logged with full context for debugging.
60Common Debugging Patterns:
61 **Step-by-Step Debugging:**
62 ```python
63 from dataknobs_fsm.api.advanced import AdvancedFSM, ExecutionMode
64 from dataknobs_fsm.config.loader import ConfigLoader
65 from dataknobs_fsm.config.builder import FSMBuilder
67 # Load and build FSM
68 config = ConfigLoader().load_from_file('pipeline.yaml')
69 fsm = FSMBuilder().build(config)
71 # Create advanced FSM in step mode
72 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.STEP_BY_STEP)
74 # Execute one step at a time
75 data = {'input': 'test data'}
76 while True:
77 result = advanced.step(data)
79 print(f"Step: {result.from_state} -> {result.to_state}")
80 print(f"Data before: {result.data_before}")
81 print(f"Data after: {result.data_after}")
82 print(f"Duration: {result.duration:.3f}s")
84 if result.is_complete:
85 break
86 if not result.success:
87 print(f"Error: {result.error}")
88 break
90 data = result.data_after
91 ```
93 **Breakpoint Debugging:**
94 ```python
95 # Set breakpoints at specific states
96 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.BREAKPOINT)
97 advanced.add_breakpoint('transform_state')
98 advanced.add_breakpoint('validate_state')
100 # Run until breakpoint
101 result = advanced.run_until_breakpoint({'input': 'data'})
103 if result.at_breakpoint:
104 print(f"Stopped at: {result.to_state}")
105 print(f"Current data: {result.data_after}")
107 # Inspect, modify data, then continue
108 result.data_after['debug_flag'] = True
109 final = advanced.run_until_breakpoint(result.data_after)
110 ```
112 **Performance Profiling:**
113 ```python
114 # Profile execution to find bottlenecks
115 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.PROFILE)
117 # Execute with profiling
118 profile_data = advanced.profile_execution({'input': 'data'})
120 # Analyze results
121 print("Performance Profile:")
122 for state, metrics in profile_data['states'].items():
123 print(f"{state}:")
124 print(f" Time: {metrics['duration']:.3f}s")
125 print(f" Calls: {metrics['call_count']}")
126 print(f" Avg: {metrics['avg_duration']:.3f}s")
128 # Find slowest state
129 slowest = max(profile_data['states'].items(),
130 key=lambda x: x[1]['duration'])
131 print(f"\nBottleneck: {slowest[0]} ({slowest[1]['duration']:.2f}s)")
132 ```
134 **Execution Tracing:**
135 ```python
136 # Record full execution trace
137 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.TRACE)
138 trace = advanced.trace_execution({'input': 'data'})
140 # Analyze trace
141 print(f"Total steps: {len(trace['steps'])}")
142 print(f"Total time: {trace['total_duration']:.2f}s")
143 print(f"\nExecution path:")
144 for step in trace['steps']:
145 print(f" {step['timestamp']}: {step['from']} -> {step['to']}")
146 if step.get('error'):
147 print(f" ERROR: {step['error']}")
148 ```
150Execution Hooks:
151 Monitor execution events in real-time with hooks:
153 ```python
154 from dataknobs_fsm.api.advanced import ExecutionHook
156 # Define hook functions
157 def on_state_enter(state_name, data):
158 print(f"Entering: {state_name}")
160 def on_state_exit(state_name, data, duration):
161 print(f"Exiting: {state_name} ({duration:.3f}s)")
163 def on_error(state_name, error):
164 print(f"Error in {state_name}: {error}")
166 # Create hooks
167 hooks = ExecutionHook(
168 on_state_enter=on_state_enter,
169 on_state_exit=on_state_exit,
170 on_error=on_error
171 )
173 # Set hooks
174 advanced = AdvancedFSM(fsm)
175 advanced.set_hooks(hooks)
177 # Hooks will be called during execution
178 result = advanced.execute({'input': 'data'})
179 ```
181Advanced Use Cases:
182 **Custom Execution Strategies:**
183 Implement custom traversal strategies for special workflow patterns:
185 ```python
186 from dataknobs_fsm.execution.engine import TraversalStrategy
188 class PriorityTraversal(TraversalStrategy):
189 \"\"\"Execute high-priority states first.\"\"\"
191 def select_next_arc(self, state, arcs):
192 # Custom logic to select next transition
193 return max(arcs, key=lambda a: a.priority)
195 advanced = AdvancedFSM(fsm)
196 advanced.set_execution_strategy(PriorityTraversal())
197 ```
199 **Transaction Management:**
200 Configure transactional execution with rollback:
202 ```python
203 from dataknobs_fsm.core.transactions import TransactionStrategy
205 # Configure transactions
206 advanced.configure_transactions(
207 strategy=TransactionStrategy.TWO_PHASE_COMMIT,
208 isolation_level='READ_COMMITTED'
209 )
211 # Execution will use transactions
212 try:
213 result = advanced.execute(data)
214 except Exception:
215 # Automatic rollback on error
216 pass
217 ```
219 **Resource Monitoring:**
220 Monitor resource acquisition and release:
222 ```python
223 def on_resource_acquire(name, resource):
224 print(f"Acquired: {name}")
226 def on_resource_release(name, resource):
227 print(f"Released: {name}")
229 hooks = ExecutionHook(
230 on_resource_acquire=on_resource_acquire,
231 on_resource_release=on_resource_release
232 )
233 advanced.set_hooks(hooks)
234 ```
236Example:
237 Complete debugging workflow:
239 ```python
240 from dataknobs_fsm.api.advanced import (
241 AdvancedFSM, ExecutionMode, ExecutionHook
242 )
243 from dataknobs_fsm.config.loader import ConfigLoader
244 from dataknobs_fsm.config.builder import FSMBuilder
246 # Load FSM
247 config = ConfigLoader().load_from_file('complex_pipeline.yaml')
248 fsm = FSMBuilder().build(config)
250 # Create advanced FSM with profiling
251 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.PROFILE)
253 # Set up monitoring hooks
254 errors = []
256 def on_error(state, error):
257 errors.append({'state': state, 'error': str(error)})
259 hooks = ExecutionHook(on_error=on_error)
260 advanced.set_hooks(hooks)
262 # Add breakpoints at critical states
263 advanced.add_breakpoint('data_validation')
264 advanced.add_breakpoint('external_api_call')
266 # Execute with monitoring
267 try:
268 profile_data = advanced.profile_execution({
269 'input_file': 'data.json',
270 'config': {'mode': 'strict'}
271 })
273 # Analyze performance
274 print("Performance Analysis:")
275 for state, metrics in profile_data['states'].items():
276 if metrics['duration'] > 1.0: # Slow states
277 print(f"⚠️ {state}: {metrics['duration']:.2f}s")
279 # Check for errors
280 if errors:
281 print("\nErrors encountered:")
282 for err in errors:
283 print(f" {err['state']}: {err['error']}")
285 except Exception as e:
286 print(f"Fatal error: {e}")
287 # Get trace for debugging
288 trace = advanced.get_trace()
289 print(f"Failed at: {trace['steps'][-1]}")
290 ```
292See Also:
293 - :class:`SimpleFSM`: Simple synchronous API
294 - :class:`AsyncSimpleFSM`: Async production API
295 - :class:`ExecutionMode`: Available execution modes
296 - :class:`ExecutionHook`: Hook system for monitoring
297 - :mod:`dataknobs_fsm.execution.engine`: Execution engine
298 - :mod:`dataknobs_fsm.execution.history`: Execution history tracking
299"""
301import time
302from collections.abc import AsyncGenerator, Callable
303from contextlib import asynccontextmanager
304from dataclasses import dataclass, field
305from enum import Enum
306from pathlib import Path
307from typing import Any
309from dataknobs_data import Record
311from ..core.context_factory import ContextFactory
312from ..core.data_modes import DataHandler, DataHandlingMode
313from ..core.fsm import FSM
314from ..core.modes import ProcessingMode
315from ..core.state import StateInstance
316from ..core.transactions import TransactionManager, TransactionStrategy
317from ..execution.async_engine import AsyncExecutionEngine
318from ..execution.context import ExecutionContext
319from ..execution.engine import ExecutionEngine, TraversalStrategy
320from ..execution.history import ExecutionHistory
321from ..resources.base import IResourceProvider
322from ..resources.manager import ResourceManager
323from ..storage.base import IHistoryStorage
326class ExecutionMode(Enum):
327 """Advanced execution modes."""
328 STEP_BY_STEP = "step" # Execute one transition at a time
329 BREAKPOINT = "breakpoint" # Stop at specific states
330 TRACE = "trace" # Full execution tracing
331 PROFILE = "profile" # Performance profiling
332 DEBUG = "debug" # Debug mode with detailed logging
335@dataclass
336class ExecutionHook:
337 """Hook for monitoring execution events."""
338 on_state_enter: Callable | None = None
339 on_state_exit: Callable | None = None
340 on_arc_execute: Callable | None = None
341 on_error: Callable | None = None
342 on_resource_acquire: Callable | None = None
343 on_resource_release: Callable | None = None
344 on_transaction_begin: Callable | None = None
345 on_transaction_commit: Callable | None = None
346 on_transaction_rollback: Callable | None = None
349@dataclass
350class StepResult:
351 """Result from a single step execution."""
352 from_state: str
353 to_state: str
354 transition: str
355 data_before: dict[str, Any] = field(default_factory=dict)
356 data_after: dict[str, Any] = field(default_factory=dict)
357 duration: float = 0.0
358 success: bool = True
359 error: str | None = None
360 at_breakpoint: bool = False
361 is_complete: bool = False
364class AdvancedFSM:
365 """Advanced FSM interface with full control capabilities."""
367 def __init__(
368 self,
369 fsm: FSM,
370 execution_mode: ExecutionMode = ExecutionMode.STEP_BY_STEP,
371 custom_functions: dict[str, Callable] | None = None
372 ):
373 """Initialize AdvancedFSM.
375 Args:
376 fsm: Core FSM instance
377 execution_mode: Execution mode for advanced control
378 custom_functions: Optional custom functions to register
379 """
380 self.fsm = fsm
381 self.execution_mode = execution_mode
382 self._engine = ExecutionEngine(fsm)
383 self._async_engine = AsyncExecutionEngine(fsm)
384 self._resource_manager = ResourceManager()
385 self._transaction_manager = None
386 self._history = None
387 self._storage = None
388 self._hooks = ExecutionHook()
389 self._breakpoints = set()
390 self._trace_buffer = []
391 self._profile_data = {}
392 self._custom_functions = custom_functions or {}
394 def set_execution_strategy(self, strategy: TraversalStrategy) -> None:
395 """Set custom execution strategy.
397 Args:
398 strategy: Execution strategy to use
399 """
400 self._engine.strategy = strategy
402 def set_data_handler(self, handler: DataHandler) -> None:
403 """Set custom data handler.
405 Args:
406 handler: Data handler implementation
407 """
408 self._engine.data_handler = handler
410 def configure_transactions(
411 self,
412 strategy: TransactionStrategy,
413 **config: Any
414 ) -> None:
415 """Configure transaction management.
417 Args:
418 strategy: Transaction strategy to use
419 **config: Strategy-specific configuration
420 """
421 self._transaction_manager = TransactionManager.create(strategy, **config)
423 def register_resource(
424 self,
425 name: str,
426 resource: IResourceProvider | dict[str, Any]
427 ) -> None:
428 """Register a custom resource.
430 Args:
431 name: Resource name
432 resource: Resource instance or configuration
433 """
434 if isinstance(resource, dict):
435 # Use ResourceManager factory method
436 self._resource_manager.register_from_dict(name, resource)
437 else:
438 # Assume it's already a provider
439 self._resource_manager.register_provider(name, resource)
441 def set_hooks(self, hooks: ExecutionHook) -> None:
442 """Set execution hooks for monitoring.
444 Args:
445 hooks: Execution hooks configuration
446 """
447 self._hooks = hooks
449 def add_breakpoint(self, state_name: str) -> None:
450 """Add a breakpoint at a specific state.
452 Args:
453 state_name: Name of state to break at
454 """
455 self._breakpoints.add(state_name)
457 def remove_breakpoint(self, state_name: str) -> None:
458 """Remove a breakpoint.
460 Args:
461 state_name: Name of state to remove breakpoint from
462 """
463 self._breakpoints.discard(state_name)
465 def clear_breakpoints(self) -> None:
466 """Clear all breakpoints."""
467 self._breakpoints.clear()
469 @property
470 def breakpoints(self) -> set:
471 """Get the current breakpoints."""
472 return self._breakpoints.copy()
474 @property
475 def hooks(self) -> ExecutionHook:
476 """Get the current execution hooks."""
477 return self._hooks
479 @property
480 def history_enabled(self) -> bool:
481 """Check if history tracking is enabled."""
482 return self._history is not None
484 @property
485 def max_history_depth(self) -> int:
486 """Get the maximum history depth."""
487 return self._history.max_depth if self._history else 0
489 @property
490 def execution_history(self) -> list:
491 """Get the execution history steps."""
492 return self._history.steps if self._history else []
494 def enable_history(
495 self,
496 storage: IHistoryStorage | None = None,
497 max_depth: int = 100
498 ) -> None:
499 """Enable execution history tracking.
501 Args:
502 storage: Optional storage backend for history
503 max_depth: Maximum history depth to track
504 """
505 import uuid
507 from dataknobs_fsm.core.data_modes import DataHandlingMode
509 # Get FSM name from the FSM object
510 fsm_name = getattr(self.fsm, 'name', 'unnamed_fsm')
512 # Generate a unique execution ID
513 execution_id = str(uuid.uuid4())
515 self._history = ExecutionHistory(
516 fsm_name=fsm_name,
517 execution_id=execution_id,
518 data_mode=DataHandlingMode.COPY, # Default data mode
519 max_depth=max_depth
520 )
521 self._storage = storage
523 def disable_history(self) -> None:
524 """Disable history tracking."""
525 self._history = None
526 self._storage = None
528 def create_context(
529 self,
530 data: dict[str, Any] | Record,
531 data_mode: DataHandlingMode = DataHandlingMode.COPY,
532 initial_state: str | None = None
533 ) -> ExecutionContext:
534 """Create an execution context for manual control (synchronous).
536 Args:
537 data: Initial data
538 data_mode: Data handling mode
539 initial_state: Starting state name
541 Returns:
542 ExecutionContext for manual execution
543 """
544 # Create context with appropriate data handling
545 # Use SINGLE processing mode as default
546 processing_mode = ProcessingMode.SINGLE
548 context = ContextFactory.create_context(
549 self.fsm,
550 data,
551 data_mode=processing_mode
552 )
554 # Set initial state if provided
555 if initial_state:
556 context.set_state(initial_state)
557 else:
558 # Find and set initial state using shared helper
559 initial_state = self._find_initial_state()
560 if initial_state:
561 context.set_state(initial_state)
563 # Update state instance using shared helper
564 if context.current_state:
565 self._update_state_instance(context, context.current_state)
567 # Register custom functions if any
568 if self._custom_functions:
569 if not hasattr(self.fsm, 'function_registry'):
570 self.fsm.function_registry = {}
571 self.fsm.function_registry.update(self._custom_functions)
573 return context
575 @asynccontextmanager
576 async def execution_context(
577 self,
578 data: dict[str, Any] | Record,
579 data_mode: DataHandlingMode = DataHandlingMode.COPY,
580 initial_state: str | None = None
581 ) -> AsyncGenerator[ExecutionContext, None]:
582 """Create an execution context for manual control.
584 Args:
585 data: Initial data
586 data_mode: Data handling mode
587 initial_state: Starting state name
589 Yields:
590 ExecutionContext for manual execution
591 """
592 # Create context using factory
593 context = ContextFactory.create_context(
594 fsm=self.fsm,
595 data=data,
596 initial_state=initial_state,
597 data_mode=ProcessingMode.SINGLE,
598 resource_manager=self._resource_manager
599 )
601 # Set transaction manager if configured
602 if self._transaction_manager:
603 context.transaction_manager = self._transaction_manager # type: ignore[unreachable]
605 # Get the state instance for the hook
606 state_instance = context.current_state_instance
607 if not state_instance:
608 # Create state instance if not set by factory
609 state_instance = self.fsm.create_state_instance(
610 context.current_state, # type: ignore
611 context.data.copy() if isinstance(context.data, dict) else {}
612 )
613 context.current_state_instance = state_instance
615 # Call hook with StateInstance
616 if self._hooks.on_state_enter:
617 await self._hooks.on_state_enter(state_instance)
619 try:
620 yield context
621 finally:
622 # Cleanup
623 if self._hooks.on_state_exit:
624 await self._hooks.on_state_exit(state_instance)
625 await self._resource_manager.cleanup()
627 async def step(
628 self,
629 context: ExecutionContext,
630 arc_name: str | None = None
631 ) -> StateInstance | None:
632 """Execute a single transition step.
634 Args:
635 context: Execution context
636 arc_name: Optional specific arc to follow
638 Returns:
639 New state instance or None if no transition
640 """
641 # Store the current state before the transition
642 state_before = context.current_state
644 # Use the async execution engine to execute one step
645 # This ensures consistent execution logic across all FSM types
646 _success, _result = await self._async_engine.execute(
647 context=context,
648 data=None, # Don't override context data
649 max_transitions=1, # Execute exactly one transition
650 arc_name=arc_name # Pass arc_name for filtering
651 )
653 # Check if we actually transitioned to a new state
654 if context.current_state != state_before and context.current_state is not None:
655 # Update state instance using shared helper
656 self._update_state_instance(context, context.current_state)
657 new_state = context.current_state_instance
659 # Track in history using shared helper
660 if context.current_state is not None:
661 self._record_history_step(context.current_state, arc_name, context)
663 # Add to trace using shared helper (we need to adjust the helper slightly)
664 if self.execution_mode == ExecutionMode.TRACE:
665 self._trace_buffer.append({
666 'from': state_before,
667 'to': context.current_state,
668 'arc': arc_name or 'transition',
669 'data': context.data
670 })
672 # Call state enter hook (async version)
673 if self._hooks.on_state_enter:
674 await self._hooks.on_state_enter(new_state)
676 return new_state
678 # No transition occurred
679 return None
681 async def run_until_breakpoint(
682 self,
683 context: ExecutionContext,
684 max_steps: int = 1000
685 ) -> StateInstance | None:
686 """Run execution until a breakpoint is hit.
688 Args:
689 context: Execution context
690 max_steps: Maximum steps to execute (safety limit)
692 Returns:
693 State instance where execution stopped
694 """
695 for _ in range(max_steps):
696 # Check if current state is a breakpoint
697 if context.current_state in self._breakpoints:
698 return context.current_state_instance
700 # Step to next state
701 new_state = await self.step(context)
703 # Check if we reached an end state or no transition occurred
704 if not new_state or self._is_at_end_state(context):
705 return context.current_state_instance
707 # Hit max steps limit
708 return context.current_state_instance
710 async def trace_execution(
711 self,
712 data: dict[str, Any] | Record,
713 initial_state: str | None = None
714 ) -> list[dict[str, Any]]:
715 """Execute with full tracing enabled.
717 Args:
718 data: Input data
719 initial_state: Optional starting state
721 Returns:
722 List of trace entries
723 """
724 self.execution_mode = ExecutionMode.TRACE
725 self._trace_buffer.clear()
727 async with self.execution_context(data, initial_state=initial_state) as context:
728 # Run to completion
729 while True:
730 new_state = await self.step(context)
731 if not new_state or new_state.definition.is_end:
732 break
734 return self._trace_buffer
736 async def profile_execution(
737 self,
738 data: dict[str, Any] | Record,
739 initial_state: str | None = None
740 ) -> dict[str, Any]:
741 """Execute with performance profiling.
743 Args:
744 data: Input data
745 initial_state: Optional starting state
747 Returns:
748 Profiling data
749 """
750 import time
752 self.execution_mode = ExecutionMode.PROFILE
753 self._profile_data.clear()
755 async with self.execution_context(data, initial_state=initial_state) as context:
756 start_time = time.time()
757 transitions = 0
759 # Track per-state timing
760 state_times = {}
761 state_start = time.time()
763 while True:
764 # Get current state name
765 if isinstance(context.current_state, str):
766 current_state_name = context.current_state
767 else:
768 current_state_name = context.current_state if context.current_state else "unknown"
770 # Step
771 new_state = await self.step(context)
773 # Record state timing
774 state_duration = time.time() - state_start
775 if current_state_name not in state_times:
776 state_times[current_state_name] = []
777 state_times[current_state_name].append(state_duration)
779 if not new_state or (hasattr(new_state, 'definition') and new_state.definition.is_end):
780 break
782 transitions += 1
783 state_start = time.time()
785 total_time = time.time() - start_time
787 # Compute statistics
788 self._profile_data = {
789 'total_time': total_time,
790 'transitions': transitions,
791 'avg_transition_time': total_time / transitions if transitions > 0 else 0,
792 'state_times': {
793 state: {
794 'count': len(times),
795 'total': sum(times),
796 'avg': sum(times) / len(times),
797 'min': min(times),
798 'max': max(times)
799 }
800 for state, times in state_times.items()
801 }
802 }
804 return self._profile_data
806 def get_available_transitions(
807 self,
808 state_name: str
809 ) -> list[dict[str, Any]]:
810 """Get available transitions from a state.
812 Args:
813 state_name: Name of state
815 Returns:
816 List of available transition information
817 """
818 arcs = self.fsm.get_outgoing_arcs(state_name)
819 return [
820 {
821 'name': arc.name,
822 'target': arc.target_state,
823 'has_pre_test': arc.pre_test is not None,
824 'has_transform': arc.transform is not None
825 }
826 for arc in arcs
827 ]
829 def inspect_state(self, state_name: str) -> dict[str, Any]:
830 """Inspect a state's configuration.
832 Args:
833 state_name: Name of state to inspect
835 Returns:
836 State configuration details
837 """
838 state = self.fsm.get_state(state_name)
839 if not state:
840 return {'error': f'State {state_name} not found'}
842 return {
843 'name': state.name,
844 'is_start': self.fsm.is_start_state(state_name),
845 'is_end': self.fsm.is_end_state(state_name),
846 'has_transform': len(state.transform_functions) > 0,
847 'has_validator': len(state.validation_functions) > 0,
848 'resources': [r.name for r in state.resource_requirements] if state.resource_requirements else [],
849 'metadata': state.metadata,
850 'arcs': state.arcs
851 }
853 def visualize_fsm(self) -> str:
854 """Generate a visual representation of the FSM.
856 Returns:
857 GraphViz DOT format string
858 """
859 lines = ['digraph FSM {']
860 lines.append(' rankdir=LR;')
861 lines.append(' node [shape=circle];')
863 # Add states
864 for state in self.fsm.states.values():
865 attrs = []
866 if state.is_start:
867 attrs.append('style=filled')
868 attrs.append('fillcolor=green')
869 elif state.is_end:
870 attrs.append('shape=doublecircle')
871 attrs.append('style=filled')
872 attrs.append('fillcolor=red')
874 if attrs:
875 lines.append(f' {state.name} [{",".join(attrs)}];')
876 else:
877 lines.append(f' {state.name};')
879 # Add arcs
880 for state_name in self.fsm.states:
881 for arc in self.fsm.get_outgoing_arcs(state_name):
882 label = arc.name if arc.name else ""
883 lines.append(f' {state_name} -> {arc.target_state} [label="{label}"];')
885 lines.append('}')
886 return '\n'.join(lines)
888 async def validate_network(self) -> dict[str, Any]:
889 """Validate the FSM network for consistency.
891 Returns:
892 Validation results
893 """
894 issues = []
896 # Check for unreachable states
897 reachable = set()
898 to_visit = [s.name for s in self.fsm.states.values() if s.is_start]
900 while to_visit:
901 state = to_visit.pop(0)
902 if state in reachable:
903 continue
904 reachable.add(state)
906 arcs = self.fsm.get_outgoing_arcs(state)
907 for arc in arcs:
908 if arc.target_state not in reachable:
909 to_visit.append(arc.target_state)
911 unreachable = set(self.fsm.states.keys()) - reachable
912 if unreachable:
913 issues.append({
914 'type': 'unreachable_states',
915 'states': list(unreachable)
916 })
918 # Check for dead ends (non-end states with no outgoing arcs)
919 for state_name, state in self.fsm.states.items():
920 if not state.is_end:
921 arcs = self.fsm.get_outgoing_arcs(state_name)
922 if not arcs:
923 issues.append({
924 'type': 'dead_end',
925 'state': state_name
926 })
928 return {
929 'valid': len(issues) == 0,
930 'issues': issues,
931 'stats': {
932 'total_states': len(self.fsm.states),
933 'reachable_states': len(reachable),
934 'unreachable_states': len(unreachable),
935 'start_states': sum(1 for s in self.fsm.states.values() if s.is_start), # type: ignore
936 'end_states': sum(1 for s in self.fsm.states.values() if s.is_end) # type: ignore
937 }
938 }
940 def get_history(self) -> ExecutionHistory | None:
941 """Get execution history if enabled.
943 Returns:
944 Execution history or None
945 """
946 return self._history
948 async def save_history(self) -> bool:
949 """Save execution history to storage.
951 Returns:
952 True if saved successfully
953 """
954 if self._history and self._storage: # type: ignore[unreachable]
955 return await self._storage.save(self._history) # type: ignore[unreachable]
956 return False
958 async def load_history(self, history_id: str) -> bool:
959 """Load execution history from storage.
961 Args:
962 history_id: History identifier
964 Returns:
965 True if loaded successfully
966 """
967 if self._storage:
968 history = await self._storage.load(history_id) # type: ignore[unreachable]
969 if history:
970 self._history = history
971 return True
972 return False
974 # ========== Shared Helper Methods ==========
975 # These methods contain logic shared between sync and async implementations
977 def _get_available_transitions(
978 self,
979 context: ExecutionContext,
980 arc_name: str | None = None
981 ) -> list:
982 """Get available transitions from current state (shared logic).
984 Args:
985 context: Execution context
986 arc_name: Optional specific arc to filter for
988 Returns:
989 List of available arcs
990 """
991 transitions = []
992 if not context.current_state:
993 return transitions
995 # Get arcs from current state
996 arcs = self.fsm.get_outgoing_arcs(context.current_state)
998 for arc in arcs:
999 # Filter by arc name if specified
1000 if arc_name and arc.name != arc_name:
1001 continue
1003 # Check arc condition
1004 if arc.pre_test:
1005 # Get function registry
1006 registry = getattr(self.fsm, 'function_registry', {})
1007 if hasattr(registry, 'functions'):
1008 functions = registry.functions
1009 else:
1010 functions = registry
1012 # Check in registry and custom functions
1013 test_func = functions.get(arc.pre_test) or self._custom_functions.get(arc.pre_test)
1014 if test_func:
1015 try:
1016 if test_func(context.data, context):
1017 transitions.append(arc)
1018 except Exception:
1019 pass
1020 else:
1021 # No condition, arc is always available
1022 transitions.append(arc)
1024 return transitions
1026 def _execute_arc_transform(
1027 self,
1028 arc,
1029 context: ExecutionContext
1030 ) -> tuple[bool, Any]:
1031 """Execute arc transform function (shared logic).
1033 Args:
1034 arc: Arc with potential transform
1035 context: Execution context
1037 Returns:
1038 Tuple of (success, result_or_error)
1039 """
1040 if not arc.transform:
1041 return True, context.data
1043 # Get function registry
1044 registry = getattr(self.fsm, 'function_registry', {})
1045 if hasattr(registry, 'functions'):
1046 functions = registry.functions
1047 else:
1048 functions = registry
1050 # Look for transform in registry or custom functions
1051 transform_func = functions.get(arc.transform) or self._custom_functions.get(arc.transform)
1053 if transform_func:
1054 try:
1055 result = transform_func(context.data, context)
1056 return True, result
1057 except Exception as e:
1058 return False, str(e)
1060 return True, context.data
1062 def _update_state_instance(
1063 self,
1064 context: ExecutionContext,
1065 state_name: str
1066 ) -> None:
1067 """Update the current state instance in context (shared logic).
1069 Args:
1070 context: Execution context
1071 state_name: Name of the new state
1072 """
1073 state_def = self.fsm.states.get(state_name)
1074 if state_def:
1075 context.current_state_instance = StateInstance(
1076 definition=state_def,
1077 data=context.data
1078 )
1079 # Mark if it's an end state
1080 context.metadata['is_end_state'] = state_def.is_end
1082 def _is_at_end_state(self, context: ExecutionContext) -> bool:
1083 """Check if context is at an end state (shared logic).
1085 Args:
1086 context: Execution context
1088 Returns:
1089 True if at an end state
1090 """
1091 if not context.current_state:
1092 return False
1094 state = self.fsm.states.get(context.current_state)
1095 if state:
1096 return state.is_end
1098 return context.metadata.get('is_end_state', False)
1100 def _record_trace_entry(
1101 self,
1102 from_state: str,
1103 to_state: str,
1104 arc_name: str | None,
1105 context: ExecutionContext
1106 ) -> None:
1107 """Record a trace entry if in trace mode (shared logic).
1109 Args:
1110 from_state: State transitioning from
1111 to_state: State transitioning to
1112 arc_name: Name of arc taken
1113 context: Execution context
1114 """
1115 if self.execution_mode == ExecutionMode.TRACE:
1116 self._trace_buffer.append({
1117 'from_state': from_state,
1118 'to_state': to_state,
1119 'transition': arc_name or f"{from_state}->{to_state}",
1120 'data': context.get_data_snapshot(),
1121 'timestamp': time.time()
1122 })
1124 def _record_history_step(
1125 self,
1126 state_name: str,
1127 arc_name: str | None,
1128 context: ExecutionContext
1129 ) -> None:
1130 """Record a history step if history is enabled (shared logic).
1132 Args:
1133 state_name: Current state name
1134 arc_name: Arc taken
1135 context: Execution context
1136 """
1137 if self._history:
1138 step = self._history.add_step( # type: ignore[unreachable]
1139 state_name=state_name,
1140 network_name=getattr(context, 'network_name', 'main'),
1141 data=context.data
1142 )
1143 step.complete(arc_taken=arc_name or 'transition')
1145 def _call_hook_sync(
1146 self,
1147 hook_name: str,
1148 *args: Any
1149 ) -> None:
1150 """Call a hook synchronously if it exists (shared logic).
1152 Args:
1153 hook_name: Name of hook attribute
1154 args: Arguments to pass to hook
1155 """
1156 hook = getattr(self._hooks, hook_name, None)
1157 if hook:
1158 try:
1159 hook(*args)
1160 except Exception:
1161 pass # Silently ignore hook errors
1163 def _find_initial_state(self) -> str | None:
1164 """Find the initial state in the FSM (shared logic).
1166 Returns:
1167 Name of initial state or None
1168 """
1169 for state_name, state in self.fsm.states.items():
1170 if state.is_start:
1171 return state_name
1172 return None
1174 # ========== Synchronous Execution Methods ==========
1176 def execute_step_sync(
1177 self,
1178 context: ExecutionContext,
1179 arc_name: str | None = None
1180 ) -> StepResult:
1181 """Execute a single transition step synchronously.
1183 Args:
1184 context: Execution context
1185 arc_name: Optional specific arc to follow
1187 Returns:
1188 StepResult with transition details
1189 """
1190 start_time = time.time()
1191 from_state = context.current_state or "initial"
1192 data_before = context.get_data_snapshot()
1194 try:
1195 # Initialize state if needed
1196 if not context.current_state:
1197 initial_state = self._find_initial_state()
1198 if initial_state:
1199 context.set_state(initial_state)
1200 self._update_state_instance(context, initial_state)
1201 # Execute transforms for initial state
1202 if hasattr(self._engine, '_execute_state_transforms'):
1203 self._engine._execute_state_transforms(context, initial_state)
1204 else:
1205 return StepResult(
1206 from_state=from_state,
1207 to_state=from_state,
1208 transition="error",
1209 data_before=data_before,
1210 data_after=context.get_data_snapshot(),
1211 duration=time.time() - start_time,
1212 success=False,
1213 error="No initial state found"
1214 )
1216 # Use shared logic to get transitions
1217 transitions = self._get_available_transitions(context, arc_name)
1219 if not transitions:
1220 # No transitions available
1221 return StepResult(
1222 from_state=from_state,
1223 to_state=from_state,
1224 transition="none",
1225 data_before=data_before,
1226 data_after=context.get_data_snapshot(),
1227 duration=time.time() - start_time,
1228 success=True,
1229 is_complete=self._is_at_end_state(context)
1230 )
1232 # Take first valid transition (could be enhanced with strategy selection)
1233 arc = transitions[0]
1235 # Execute arc transform using shared logic
1236 success, result = self._execute_arc_transform(arc, context)
1237 if success:
1238 context.data = result
1239 else:
1240 return StepResult(
1241 from_state=from_state,
1242 to_state=from_state,
1243 transition=arc.name or "error",
1244 data_before=data_before,
1245 data_after=context.get_data_snapshot(),
1246 duration=time.time() - start_time,
1247 success=False,
1248 error=result
1249 )
1251 # Update state
1252 context.set_state(arc.target_state)
1253 self._update_state_instance(context, arc.target_state)
1255 # Execute state transforms when entering the new state
1256 # This is critical for sync execution to match async behavior
1257 if hasattr(self._engine, '_execute_state_transforms'):
1258 self._engine._execute_state_transforms(context, arc.target_state)
1260 # Check if we hit a breakpoint
1261 at_breakpoint = arc.target_state in self._breakpoints
1263 # Record in trace buffer if in trace mode
1264 self._record_trace_entry(from_state, arc.target_state, arc.name, context)
1266 # Record in history if enabled
1267 self._record_history_step(arc.target_state, arc.name, context)
1269 # Call hooks if configured
1270 self._call_hook_sync('on_state_exit', from_state)
1271 self._call_hook_sync('on_state_enter', arc.target_state)
1273 return StepResult(
1274 from_state=from_state,
1275 to_state=arc.target_state,
1276 transition=arc.name or f"{from_state}->{arc.target_state}",
1277 data_before=data_before,
1278 data_after=context.get_data_snapshot(),
1279 duration=time.time() - start_time,
1280 success=True,
1281 at_breakpoint=at_breakpoint,
1282 is_complete=self._is_at_end_state(context)
1283 )
1285 except Exception as e:
1286 self._call_hook_sync('on_error', e)
1288 return StepResult(
1289 from_state=from_state,
1290 to_state=from_state,
1291 transition="error",
1292 data_before=data_before,
1293 data_after=context.get_data_snapshot(),
1294 duration=time.time() - start_time,
1295 success=False,
1296 error=str(e)
1297 )
1299 def run_until_breakpoint_sync(
1300 self,
1301 context: ExecutionContext,
1302 max_steps: int = 1000
1303 ) -> StateInstance | None:
1304 """Run execution until a breakpoint is hit (synchronous).
1306 Args:
1307 context: Execution context
1308 max_steps: Maximum steps to execute
1310 Returns:
1311 State instance where execution stopped
1312 """
1313 for _ in range(max_steps):
1314 # Check if at breakpoint
1315 if context.current_state in self._breakpoints:
1316 return context.current_state_instance
1318 # Execute step
1319 result = self.execute_step_sync(context)
1321 # Check for completion or error
1322 if not result.success or result.is_complete:
1323 return context.current_state_instance
1325 # Check if stuck
1326 if result.from_state == result.to_state and result.transition == "none":
1327 return context.current_state_instance
1329 return context.current_state_instance
1331 def trace_execution_sync(
1332 self,
1333 data: dict[str, Any] | Record,
1334 initial_state: str | None = None,
1335 max_steps: int = 1000
1336 ) -> list[dict[str, Any]]:
1337 """Execute with full tracing enabled (synchronous).
1339 Args:
1340 data: Input data
1341 initial_state: Optional starting state
1342 max_steps: Maximum steps to execute
1344 Returns:
1345 List of trace entries
1346 """
1347 self.execution_mode = ExecutionMode.TRACE
1348 self._trace_buffer.clear()
1350 context = self.create_context(data, initial_state=initial_state)
1352 for _ in range(max_steps):
1353 # Execute step (trace recording happens inside execute_step_sync)
1354 result = self.execute_step_sync(context)
1356 # Check termination conditions
1357 if not result.success or result.is_complete:
1358 break
1360 if result.from_state == result.to_state and result.transition == "none":
1361 break
1363 return self._trace_buffer
1365 def profile_execution_sync(
1366 self,
1367 data: dict[str, Any] | Record,
1368 initial_state: str | None = None,
1369 max_steps: int = 1000
1370 ) -> dict[str, Any]:
1371 """Execute with performance profiling (synchronous).
1373 Args:
1374 data: Input data
1375 initial_state: Optional starting state
1376 max_steps: Maximum steps to execute
1378 Returns:
1379 Profiling data
1380 """
1381 self.execution_mode = ExecutionMode.PROFILE
1382 self._profile_data.clear()
1384 context = self.create_context(data, initial_state=initial_state)
1386 start_time = time.time()
1387 transitions = 0
1388 state_times = {}
1389 transition_times = []
1391 for _ in range(max_steps):
1392 state_start = time.time()
1393 current_state = context.current_state
1395 # Execute step
1396 result = self.execute_step_sync(context)
1398 # Record timings
1399 if current_state:
1400 if current_state not in state_times:
1401 state_times[current_state] = []
1402 state_times[current_state].append(time.time() - state_start)
1404 if result.success and result.from_state != result.to_state:
1405 transition_times.append(result.duration)
1406 transitions += 1
1408 # Check termination
1409 if not result.success or result.is_complete:
1410 break
1412 if result.from_state == result.to_state and result.transition == "none":
1413 break
1415 # Calculate statistics
1416 self._profile_data = {
1417 'total_time': time.time() - start_time,
1418 'transitions': transitions,
1419 'states_visited': len(state_times),
1420 'avg_transition_time': sum(transition_times) / len(transition_times) if transition_times else 0,
1421 'state_times': {
1422 state: {
1423 'count': len(times),
1424 'total': sum(times),
1425 'avg': sum(times) / len(times),
1426 'min': min(times),
1427 'max': max(times)
1428 }
1429 for state, times in state_times.items()
1430 },
1431 'final_state': context.current_state,
1432 'final_data': context.get_data_snapshot()
1433 }
1435 return self._profile_data
1438class FSMDebugger:
1439 """Interactive debugger for FSM execution (fully synchronous)."""
1441 def __init__(self, fsm: AdvancedFSM):
1442 """Initialize debugger.
1444 Args:
1445 fsm: Advanced FSM instance to debug
1446 """
1447 self.fsm = fsm
1448 self.context: ExecutionContext | None = None
1449 self.watch_vars: dict[str, Any] = {}
1450 self.command_history: list[str] = []
1451 self.step_count: int = 0
1452 self.execution_history: list[StepResult] = []
1454 @property
1455 def current_state(self) -> str | None:
1456 """Get the current state name."""
1457 if not self.context:
1458 return None
1459 return self.context.get_current_state()
1461 @property
1462 def watches(self) -> dict[str, Any]:
1463 """Get current watch variable values."""
1464 return self.watch_vars.copy()
1466 def start(
1467 self,
1468 data: dict[str, Any] | Record,
1469 initial_state: str | None = None
1470 ) -> None:
1471 """Start debugging session (synchronous).
1473 Args:
1474 data: Initial data
1475 initial_state: Optional starting state
1476 """
1477 self.context = self.fsm.create_context(data, initial_state=initial_state)
1478 self.step_count = 0
1479 self.execution_history.clear()
1481 print(f"Debugger started at state: {self.context.current_state or 'initial'}")
1482 print(f"Data: {self.context.get_data_snapshot()}")
1484 def step(self) -> StepResult:
1485 """Execute single step and return detailed result.
1487 Returns:
1488 StepResult with transition details
1489 """
1490 if not self.context:
1491 print("No active debugging session. Call start() first.")
1492 return StepResult(
1493 from_state="none",
1494 to_state="none",
1495 transition="error",
1496 success=False,
1497 error="No active debugging session"
1498 )
1500 result = self.fsm.execute_step_sync(self.context)
1501 self.step_count += 1
1502 self.execution_history.append(result)
1504 # Print step information
1505 if result.success:
1506 if result.from_state == result.to_state and result.transition == "none":
1507 print(f"Step {self.step_count}: No transition available from '{result.from_state}'")
1508 else:
1509 print(f"Step {self.step_count}: {result.from_state} -> {result.to_state} via '{result.transition}'")
1511 if result.at_breakpoint:
1512 print("*** Hit breakpoint ***")
1514 if result.is_complete:
1515 print("*** Reached end state ***")
1516 else:
1517 print(f"Step {self.step_count}: Error - {result.error}")
1519 # Check watches
1520 self._check_watches()
1522 return result
1524 def continue_to_breakpoint(self) -> StateInstance | None:
1525 """Continue execution until a breakpoint is hit.
1527 Returns:
1528 State instance where execution stopped
1529 """
1530 if not self.context:
1531 print("No active debugging session")
1532 return None
1534 print(f"Continuing from state: {self.context.current_state}")
1535 final_state = self.fsm.run_until_breakpoint_sync(self.context)
1537 if final_state:
1538 print(f"Stopped at: {self.context.current_state}")
1539 if self.context.current_state in self.fsm._breakpoints:
1540 print("*** At breakpoint ***")
1541 if self.context.is_complete():
1542 print("*** Execution complete ***")
1544 return final_state
1546 def inspect(self, path: str = "") -> Any:
1547 """Inspect data at path.
1549 Args:
1550 path: Dot-separated path to data field (empty for all data)
1552 Returns:
1553 Value at path
1554 """
1555 if not self.context:
1556 print("No active debugging session")
1557 return None
1559 data = self.context.data
1561 if not path:
1562 return data
1564 # Navigate path
1565 for key in path.split('.'):
1566 if isinstance(data, dict):
1567 data = data.get(key)
1568 elif hasattr(data, key):
1569 data = getattr(data, key)
1570 else:
1571 return None
1572 return data
1574 def watch(self, name: str, path: str) -> None:
1575 """Add a watch expression.
1577 Args:
1578 name: Watch name
1579 path: Data path to watch
1580 """
1581 self.watch_vars[name] = path
1582 value = self.inspect(path)
1583 print(f"Watch '{name}' added: {path} = {value}")
1585 def unwatch(self, name: str) -> None:
1586 """Remove a watch expression.
1588 Args:
1589 name: Watch name to remove
1590 """
1591 if name in self.watch_vars:
1592 del self.watch_vars[name]
1593 print(f"Watch '{name}' removed")
1595 def _check_watches(self) -> None:
1596 """Check and print changed watch values."""
1597 if not self.watch_vars:
1598 return
1600 for name, path in self.watch_vars.items():
1601 value = self.inspect(path)
1602 print(f" Watch '{name}': {path} = {value}")
1604 def print_watches(self) -> None:
1605 """Print all watch values."""
1606 if not self.watch_vars:
1607 print("No watches set")
1608 return
1610 for name, path in self.watch_vars.items():
1611 value = self.inspect(path)
1612 print(f"{name}: {path} = {value}")
1614 def print_state(self) -> None:
1615 """Print current state information."""
1616 if not self.context:
1617 print("No active debugging session")
1618 return
1620 print("\n=== State Information ===")
1621 print(f"Current State: {self.context.current_state}")
1622 print(f"Previous State: {self.context.previous_state}")
1623 print(f"Is Complete: {self.context.is_complete()}")
1624 print("\nData:")
1625 data = self.context.get_data_snapshot()
1626 for key, value in data.items():
1627 print(f" {key}: {value}")
1629 # Print available transitions
1630 transitions = self.fsm._get_available_transitions(self.context)
1631 if transitions:
1632 print("\nAvailable Transitions:")
1633 for arc in transitions:
1634 print(f" - {arc.name or 'unnamed'} -> {arc.target_state}")
1635 else:
1636 if self.context.is_complete():
1637 print("\nNo transitions (end state)")
1638 else:
1639 print("\nNo available transitions")
1641 def inspect_current_state(self) -> dict[str, Any]:
1642 """Get detailed information about current state.
1644 Returns:
1645 Dictionary with state details
1646 """
1647 if not self.context:
1648 return {"error": "No active debugging session"}
1650 return {
1651 'state': self.context.current_state,
1652 'previous_state': self.context.previous_state,
1653 'data': self.context.get_data_snapshot(),
1654 'is_complete': self.context.is_complete(),
1655 'step_count': self.step_count,
1656 'at_breakpoint': self.context.current_state in self.fsm._breakpoints,
1657 'available_transitions': [
1658 {'name': arc.name, 'target': arc.target_state}
1659 for arc in self.fsm._get_available_transitions(self.context)
1660 ]
1661 }
1663 def get_history(self, limit: int = 10) -> list[StepResult]:
1664 """Get recent execution history.
1666 Args:
1667 limit: Maximum number of steps to return
1669 Returns:
1670 List of recent step results
1671 """
1672 return self.execution_history[-limit:]
1674 def reset(self, data: dict[str, Any] | Record | None = None) -> None:
1675 """Reset debugger with new data.
1677 Args:
1678 data: New data (uses current data if None)
1679 """
1680 if data is None and self.context:
1681 data = self.context.data
1683 if data is None:
1684 print("No data available for reset")
1685 return
1687 self.start(data)
1690def create_advanced_fsm(
1691 config: str | Path | dict[str, Any] | FSM,
1692 custom_functions: dict[str, Callable] | None = None,
1693 **kwargs: Any
1694) -> AdvancedFSM:
1695 """Factory function to create an AdvancedFSM instance.
1697 Args:
1698 config: Configuration, FSM instance, or path
1699 custom_functions: Optional custom functions to register
1700 **kwargs: Additional arguments
1702 Returns:
1703 Configured AdvancedFSM instance
1704 """
1705 if isinstance(config, FSM):
1706 fsm = config
1707 else:
1708 from ..config.builder import FSMBuilder
1709 from ..config.loader import ConfigLoader
1711 loader = ConfigLoader()
1713 if isinstance(config, (str, Path)):
1714 config_obj = loader.load_from_file(str(config))
1715 else:
1716 # Load from dict
1717 config_obj = loader.load_from_dict(config)
1719 builder = FSMBuilder()
1721 # Register custom functions if provided
1722 if custom_functions:
1723 for name, func in custom_functions.items():
1724 builder.register_function(name, func)
1726 fsm = builder.build(config_obj)
1728 return AdvancedFSM(fsm, **kwargs)