Coverage for src/dataknobs_fsm/api/advanced.py: 22%

544 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:11 -0700

1r"""Advanced API for debugging and profiling FSM workflows. 

2 

3This module provides advanced interfaces for users who need fine-grained 

4control over FSM execution, debugging capabilities, performance profiling, 

5and detailed execution monitoring. Use this API when building complex 

6workflows or troubleshooting execution issues. 

7 

8Architecture: 

9 The AdvancedFSM API provides three levels of control beyond SimpleFSM/AsyncSimpleFSM: 

10 

11 **1. Step-by-Step Execution:** 

12 Execute FSM transitions one at a time with full inspection of state 

13 between each step. Essential for debugging complex state machines. 

14 

15 **2. Breakpoint Debugging:** 

16 Set breakpoints at specific states and inspect execution context when 

17 reached. Similar to debugger breakpoints in code. 

18 

19 **3. Performance Profiling:** 

20 Measure execution time for each state and transition, identify bottlenecks, 

21 and optimize workflow performance. 

22 

23 **When to Use AdvancedFSM:** 

24 - Debugging complex workflows with unexpected behavior 

25 - Profiling performance to identify slow states 

26 - Building custom execution strategies 

27 - Implementing sophisticated error recovery 

28 - Monitoring production workflows in detail 

29 - Testing and validating FSM configurations 

30 

31 **When NOT to Use AdvancedFSM:** 

32 - Simple production workflows (use AsyncSimpleFSM) 

33 - High-throughput batch processing (use SimpleFSM.process_batch) 

34 - Prototyping and scripts (use SimpleFSM) 

35 

36Execution Modes: 

37 AdvancedFSM supports five execution modes: 

38 

39 **STEP_BY_STEP:** 

40 Execute one transition at a time. After each step, execution pauses and 

41 returns control to caller with current state and data. Call step() to 

42 continue to next transition. 

43 

44 **BREAKPOINT:** 

45 Set breakpoints at specific states. Execution runs normally until reaching 

46 a breakpoint state, then pauses. Inspect state and data, then continue. 

47 

48 **TRACE:** 

49 Record detailed execution trace including all state transitions, data 

50 transformations, and timing information. Useful for auditing and debugging. 

51 

52 **PROFILE:** 

53 Measure and record performance metrics for each state and transition. 

54 Identify bottlenecks and optimize slow operations. 

55 

56 **DEBUG:** 

57 Enable verbose logging and detailed error messages. All execution events 

58 are logged with full context for debugging. 

59 

60Common Debugging Patterns: 

61 **Step-by-Step Debugging:** 

62 ```python 

63 from dataknobs_fsm.api.advanced import AdvancedFSM, ExecutionMode 

64 from dataknobs_fsm.config.loader import ConfigLoader 

65 from dataknobs_fsm.config.builder import FSMBuilder 

66 

67 # Load and build FSM 

68 config = ConfigLoader().load_from_file('pipeline.yaml') 

69 fsm = FSMBuilder().build(config) 

70 

71 # Create advanced FSM in step mode 

72 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.STEP_BY_STEP) 

73 

74 # Execute one step at a time 

75 data = {'input': 'test data'} 

76 while True: 

77 result = advanced.step(data) 

78 

79 print(f"Step: {result.from_state} -> {result.to_state}") 

80 print(f"Data before: {result.data_before}") 

81 print(f"Data after: {result.data_after}") 

82 print(f"Duration: {result.duration:.3f}s") 

83 

84 if result.is_complete: 

85 break 

86 if not result.success: 

87 print(f"Error: {result.error}") 

88 break 

89 

90 data = result.data_after 

91 ``` 

92 

93 **Breakpoint Debugging:** 

94 ```python 

95 # Set breakpoints at specific states 

96 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.BREAKPOINT) 

97 advanced.add_breakpoint('transform_state') 

98 advanced.add_breakpoint('validate_state') 

99 

100 # Run until breakpoint 

101 result = advanced.run_until_breakpoint({'input': 'data'}) 

102 

103 if result.at_breakpoint: 

104 print(f"Stopped at: {result.to_state}") 

105 print(f"Current data: {result.data_after}") 

106 

107 # Inspect, modify data, then continue 

108 result.data_after['debug_flag'] = True 

109 final = advanced.run_until_breakpoint(result.data_after) 

110 ``` 

111 

112 **Performance Profiling:** 

113 ```python 

114 # Profile execution to find bottlenecks 

115 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.PROFILE) 

116 

117 # Execute with profiling 

118 profile_data = advanced.profile_execution({'input': 'data'}) 

119 

120 # Analyze results 

121 print("Performance Profile:") 

122 for state, metrics in profile_data['states'].items(): 

123 print(f"{state}:") 

124 print(f" Time: {metrics['duration']:.3f}s") 

125 print(f" Calls: {metrics['call_count']}") 

126 print(f" Avg: {metrics['avg_duration']:.3f}s") 

127 

128 # Find slowest state 

129 slowest = max(profile_data['states'].items(), 

130 key=lambda x: x[1]['duration']) 

131 print(f"\nBottleneck: {slowest[0]} ({slowest[1]['duration']:.2f}s)") 

132 ``` 

133 

134 **Execution Tracing:** 

135 ```python 

136 # Record full execution trace 

137 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.TRACE) 

138 trace = advanced.trace_execution({'input': 'data'}) 

139 

140 # Analyze trace 

141 print(f"Total steps: {len(trace['steps'])}") 

142 print(f"Total time: {trace['total_duration']:.2f}s") 

143 print(f"\nExecution path:") 

144 for step in trace['steps']: 

145 print(f" {step['timestamp']}: {step['from']} -> {step['to']}") 

146 if step.get('error'): 

147 print(f" ERROR: {step['error']}") 

148 ``` 

149 

150Execution Hooks: 

151 Monitor execution events in real-time with hooks: 

152 

153 ```python 

154 from dataknobs_fsm.api.advanced import ExecutionHook 

155 

156 # Define hook functions 

157 def on_state_enter(state_name, data): 

158 print(f"Entering: {state_name}") 

159 

160 def on_state_exit(state_name, data, duration): 

161 print(f"Exiting: {state_name} ({duration:.3f}s)") 

162 

163 def on_error(state_name, error): 

164 print(f"Error in {state_name}: {error}") 

165 

166 # Create hooks 

167 hooks = ExecutionHook( 

168 on_state_enter=on_state_enter, 

169 on_state_exit=on_state_exit, 

170 on_error=on_error 

171 ) 

172 

173 # Set hooks 

174 advanced = AdvancedFSM(fsm) 

175 advanced.set_hooks(hooks) 

176 

177 # Hooks will be called during execution 

178 result = advanced.execute({'input': 'data'}) 

179 ``` 

180 

181Advanced Use Cases: 

182 **Custom Execution Strategies:** 

183 Implement custom traversal strategies for special workflow patterns: 

184 

185 ```python 

186 from dataknobs_fsm.execution.engine import TraversalStrategy 

187 

188 class PriorityTraversal(TraversalStrategy): 

189 \"\"\"Execute high-priority states first.\"\"\" 

190 

191 def select_next_arc(self, state, arcs): 

192 # Custom logic to select next transition 

193 return max(arcs, key=lambda a: a.priority) 

194 

195 advanced = AdvancedFSM(fsm) 

196 advanced.set_execution_strategy(PriorityTraversal()) 

197 ``` 

198 

199 **Transaction Management:** 

200 Configure transactional execution with rollback: 

201 

202 ```python 

203 from dataknobs_fsm.core.transactions import TransactionStrategy 

204 

205 # Configure transactions 

206 advanced.configure_transactions( 

207 strategy=TransactionStrategy.TWO_PHASE_COMMIT, 

208 isolation_level='READ_COMMITTED' 

209 ) 

210 

211 # Execution will use transactions 

212 try: 

213 result = advanced.execute(data) 

214 except Exception: 

215 # Automatic rollback on error 

216 pass 

217 ``` 

218 

219 **Resource Monitoring:** 

220 Monitor resource acquisition and release: 

221 

222 ```python 

223 def on_resource_acquire(name, resource): 

224 print(f"Acquired: {name}") 

225 

226 def on_resource_release(name, resource): 

227 print(f"Released: {name}") 

228 

229 hooks = ExecutionHook( 

230 on_resource_acquire=on_resource_acquire, 

231 on_resource_release=on_resource_release 

232 ) 

233 advanced.set_hooks(hooks) 

234 ``` 

235 

236Example: 

237 Complete debugging workflow: 

238 

239 ```python 

240 from dataknobs_fsm.api.advanced import ( 

241 AdvancedFSM, ExecutionMode, ExecutionHook 

242 ) 

243 from dataknobs_fsm.config.loader import ConfigLoader 

244 from dataknobs_fsm.config.builder import FSMBuilder 

245 

246 # Load FSM 

247 config = ConfigLoader().load_from_file('complex_pipeline.yaml') 

248 fsm = FSMBuilder().build(config) 

249 

250 # Create advanced FSM with profiling 

251 advanced = AdvancedFSM(fsm, execution_mode=ExecutionMode.PROFILE) 

252 

253 # Set up monitoring hooks 

254 errors = [] 

255 

256 def on_error(state, error): 

257 errors.append({'state': state, 'error': str(error)}) 

258 

259 hooks = ExecutionHook(on_error=on_error) 

260 advanced.set_hooks(hooks) 

261 

262 # Add breakpoints at critical states 

263 advanced.add_breakpoint('data_validation') 

264 advanced.add_breakpoint('external_api_call') 

265 

266 # Execute with monitoring 

267 try: 

268 profile_data = advanced.profile_execution({ 

269 'input_file': 'data.json', 

270 'config': {'mode': 'strict'} 

271 }) 

272 

273 # Analyze performance 

274 print("Performance Analysis:") 

275 for state, metrics in profile_data['states'].items(): 

276 if metrics['duration'] > 1.0: # Slow states 

277 print(f"⚠️ {state}: {metrics['duration']:.2f}s") 

278 

279 # Check for errors 

280 if errors: 

281 print("\nErrors encountered:") 

282 for err in errors: 

283 print(f" {err['state']}: {err['error']}") 

284 

285 except Exception as e: 

286 print(f"Fatal error: {e}") 

287 # Get trace for debugging 

288 trace = advanced.get_trace() 

289 print(f"Failed at: {trace['steps'][-1]}") 

290 ``` 

291 

292See Also: 

293 - :class:`SimpleFSM`: Simple synchronous API 

294 - :class:`AsyncSimpleFSM`: Async production API 

295 - :class:`ExecutionMode`: Available execution modes 

296 - :class:`ExecutionHook`: Hook system for monitoring 

297 - :mod:`dataknobs_fsm.execution.engine`: Execution engine 

298 - :mod:`dataknobs_fsm.execution.history`: Execution history tracking 

299""" 

300 

301import time 

302from collections.abc import AsyncGenerator, Callable 

303from contextlib import asynccontextmanager 

304from dataclasses import dataclass, field 

305from enum import Enum 

306from pathlib import Path 

307from typing import Any 

308 

309from dataknobs_data import Record 

310 

311from ..core.context_factory import ContextFactory 

312from ..core.data_modes import DataHandler, DataHandlingMode 

313from ..core.fsm import FSM 

314from ..core.modes import ProcessingMode 

315from ..core.state import StateInstance 

316from ..core.transactions import TransactionManager, TransactionStrategy 

317from ..execution.async_engine import AsyncExecutionEngine 

318from ..execution.context import ExecutionContext 

319from ..execution.engine import ExecutionEngine, TraversalStrategy 

320from ..execution.history import ExecutionHistory 

321from ..resources.base import IResourceProvider 

322from ..resources.manager import ResourceManager 

323from ..storage.base import IHistoryStorage 

324 

325 

326class ExecutionMode(Enum): 

327 """Advanced execution modes.""" 

328 STEP_BY_STEP = "step" # Execute one transition at a time 

329 BREAKPOINT = "breakpoint" # Stop at specific states 

330 TRACE = "trace" # Full execution tracing 

331 PROFILE = "profile" # Performance profiling 

332 DEBUG = "debug" # Debug mode with detailed logging 

333 

334 

335@dataclass 

336class ExecutionHook: 

337 """Hook for monitoring execution events.""" 

338 on_state_enter: Callable | None = None 

339 on_state_exit: Callable | None = None 

340 on_arc_execute: Callable | None = None 

341 on_error: Callable | None = None 

342 on_resource_acquire: Callable | None = None 

343 on_resource_release: Callable | None = None 

344 on_transaction_begin: Callable | None = None 

345 on_transaction_commit: Callable | None = None 

346 on_transaction_rollback: Callable | None = None 

347 

348 

349@dataclass 

350class StepResult: 

351 """Result from a single step execution.""" 

352 from_state: str 

353 to_state: str 

354 transition: str 

355 data_before: dict[str, Any] = field(default_factory=dict) 

356 data_after: dict[str, Any] = field(default_factory=dict) 

357 duration: float = 0.0 

358 success: bool = True 

359 error: str | None = None 

360 at_breakpoint: bool = False 

361 is_complete: bool = False 

362 

363 

364class AdvancedFSM: 

365 """Advanced FSM interface with full control capabilities.""" 

366 

367 def __init__( 

368 self, 

369 fsm: FSM, 

370 execution_mode: ExecutionMode = ExecutionMode.STEP_BY_STEP, 

371 custom_functions: dict[str, Callable] | None = None 

372 ): 

373 """Initialize AdvancedFSM. 

374  

375 Args: 

376 fsm: Core FSM instance 

377 execution_mode: Execution mode for advanced control 

378 custom_functions: Optional custom functions to register 

379 """ 

380 self.fsm = fsm 

381 self.execution_mode = execution_mode 

382 self._engine = ExecutionEngine(fsm) 

383 self._async_engine = AsyncExecutionEngine(fsm) 

384 self._resource_manager = ResourceManager() 

385 self._transaction_manager = None 

386 self._history = None 

387 self._storage = None 

388 self._hooks = ExecutionHook() 

389 self._breakpoints = set() 

390 self._trace_buffer = [] 

391 self._profile_data = {} 

392 self._custom_functions = custom_functions or {} 

393 

394 def set_execution_strategy(self, strategy: TraversalStrategy) -> None: 

395 """Set custom execution strategy. 

396  

397 Args: 

398 strategy: Execution strategy to use 

399 """ 

400 self._engine.strategy = strategy 

401 

402 def set_data_handler(self, handler: DataHandler) -> None: 

403 """Set custom data handler. 

404  

405 Args: 

406 handler: Data handler implementation 

407 """ 

408 self._engine.data_handler = handler 

409 

410 def configure_transactions( 

411 self, 

412 strategy: TransactionStrategy, 

413 **config: Any 

414 ) -> None: 

415 """Configure transaction management. 

416  

417 Args: 

418 strategy: Transaction strategy to use 

419 **config: Strategy-specific configuration 

420 """ 

421 self._transaction_manager = TransactionManager.create(strategy, **config) 

422 

423 def register_resource( 

424 self, 

425 name: str, 

426 resource: IResourceProvider | dict[str, Any] 

427 ) -> None: 

428 """Register a custom resource. 

429  

430 Args: 

431 name: Resource name 

432 resource: Resource instance or configuration 

433 """ 

434 if isinstance(resource, dict): 

435 # Use ResourceManager factory method 

436 self._resource_manager.register_from_dict(name, resource) 

437 else: 

438 # Assume it's already a provider 

439 self._resource_manager.register_provider(name, resource) 

440 

441 def set_hooks(self, hooks: ExecutionHook) -> None: 

442 """Set execution hooks for monitoring. 

443  

444 Args: 

445 hooks: Execution hooks configuration 

446 """ 

447 self._hooks = hooks 

448 

449 def add_breakpoint(self, state_name: str) -> None: 

450 """Add a breakpoint at a specific state. 

451  

452 Args: 

453 state_name: Name of state to break at 

454 """ 

455 self._breakpoints.add(state_name) 

456 

457 def remove_breakpoint(self, state_name: str) -> None: 

458 """Remove a breakpoint. 

459  

460 Args: 

461 state_name: Name of state to remove breakpoint from 

462 """ 

463 self._breakpoints.discard(state_name) 

464 

465 def clear_breakpoints(self) -> None: 

466 """Clear all breakpoints.""" 

467 self._breakpoints.clear() 

468 

469 @property 

470 def breakpoints(self) -> set: 

471 """Get the current breakpoints.""" 

472 return self._breakpoints.copy() 

473 

474 @property 

475 def hooks(self) -> ExecutionHook: 

476 """Get the current execution hooks.""" 

477 return self._hooks 

478 

479 @property 

480 def history_enabled(self) -> bool: 

481 """Check if history tracking is enabled.""" 

482 return self._history is not None 

483 

484 @property 

485 def max_history_depth(self) -> int: 

486 """Get the maximum history depth.""" 

487 return self._history.max_depth if self._history else 0 

488 

489 @property 

490 def execution_history(self) -> list: 

491 """Get the execution history steps.""" 

492 return self._history.steps if self._history else [] 

493 

494 def enable_history( 

495 self, 

496 storage: IHistoryStorage | None = None, 

497 max_depth: int = 100 

498 ) -> None: 

499 """Enable execution history tracking. 

500  

501 Args: 

502 storage: Optional storage backend for history 

503 max_depth: Maximum history depth to track 

504 """ 

505 import uuid 

506 

507 from dataknobs_fsm.core.data_modes import DataHandlingMode 

508 

509 # Get FSM name from the FSM object 

510 fsm_name = getattr(self.fsm, 'name', 'unnamed_fsm') 

511 

512 # Generate a unique execution ID 

513 execution_id = str(uuid.uuid4()) 

514 

515 self._history = ExecutionHistory( 

516 fsm_name=fsm_name, 

517 execution_id=execution_id, 

518 data_mode=DataHandlingMode.COPY, # Default data mode 

519 max_depth=max_depth 

520 ) 

521 self._storage = storage 

522 

523 def disable_history(self) -> None: 

524 """Disable history tracking.""" 

525 self._history = None 

526 self._storage = None 

527 

528 def create_context( 

529 self, 

530 data: dict[str, Any] | Record, 

531 data_mode: DataHandlingMode = DataHandlingMode.COPY, 

532 initial_state: str | None = None 

533 ) -> ExecutionContext: 

534 """Create an execution context for manual control (synchronous). 

535 

536 Args: 

537 data: Initial data 

538 data_mode: Data handling mode 

539 initial_state: Starting state name 

540 

541 Returns: 

542 ExecutionContext for manual execution 

543 """ 

544 # Create context with appropriate data handling 

545 # Use SINGLE processing mode as default 

546 processing_mode = ProcessingMode.SINGLE 

547 

548 context = ContextFactory.create_context( 

549 self.fsm, 

550 data, 

551 data_mode=processing_mode 

552 ) 

553 

554 # Set initial state if provided 

555 if initial_state: 

556 context.set_state(initial_state) 

557 else: 

558 # Find and set initial state using shared helper 

559 initial_state = self._find_initial_state() 

560 if initial_state: 

561 context.set_state(initial_state) 

562 

563 # Update state instance using shared helper 

564 if context.current_state: 

565 self._update_state_instance(context, context.current_state) 

566 

567 # Register custom functions if any 

568 if self._custom_functions: 

569 if not hasattr(self.fsm, 'function_registry'): 

570 self.fsm.function_registry = {} 

571 self.fsm.function_registry.update(self._custom_functions) 

572 

573 return context 

574 

575 @asynccontextmanager 

576 async def execution_context( 

577 self, 

578 data: dict[str, Any] | Record, 

579 data_mode: DataHandlingMode = DataHandlingMode.COPY, 

580 initial_state: str | None = None 

581 ) -> AsyncGenerator[ExecutionContext, None]: 

582 """Create an execution context for manual control. 

583 

584 Args: 

585 data: Initial data 

586 data_mode: Data handling mode 

587 initial_state: Starting state name 

588 

589 Yields: 

590 ExecutionContext for manual execution 

591 """ 

592 # Create context using factory 

593 context = ContextFactory.create_context( 

594 fsm=self.fsm, 

595 data=data, 

596 initial_state=initial_state, 

597 data_mode=ProcessingMode.SINGLE, 

598 resource_manager=self._resource_manager 

599 ) 

600 

601 # Set transaction manager if configured 

602 if self._transaction_manager: 

603 context.transaction_manager = self._transaction_manager # type: ignore[unreachable] 

604 

605 # Get the state instance for the hook 

606 state_instance = context.current_state_instance 

607 if not state_instance: 

608 # Create state instance if not set by factory 

609 state_instance = self.fsm.create_state_instance( 

610 context.current_state, # type: ignore 

611 context.data.copy() if isinstance(context.data, dict) else {} 

612 ) 

613 context.current_state_instance = state_instance 

614 

615 # Call hook with StateInstance 

616 if self._hooks.on_state_enter: 

617 await self._hooks.on_state_enter(state_instance) 

618 

619 try: 

620 yield context 

621 finally: 

622 # Cleanup 

623 if self._hooks.on_state_exit: 

624 await self._hooks.on_state_exit(state_instance) 

625 await self._resource_manager.cleanup() 

626 

627 async def step( 

628 self, 

629 context: ExecutionContext, 

630 arc_name: str | None = None 

631 ) -> StateInstance | None: 

632 """Execute a single transition step. 

633 

634 Args: 

635 context: Execution context 

636 arc_name: Optional specific arc to follow 

637 

638 Returns: 

639 New state instance or None if no transition 

640 """ 

641 # Store the current state before the transition 

642 state_before = context.current_state 

643 

644 # Use the async execution engine to execute one step 

645 # This ensures consistent execution logic across all FSM types 

646 _success, _result = await self._async_engine.execute( 

647 context=context, 

648 data=None, # Don't override context data 

649 max_transitions=1, # Execute exactly one transition 

650 arc_name=arc_name # Pass arc_name for filtering 

651 ) 

652 

653 # Check if we actually transitioned to a new state 

654 if context.current_state != state_before and context.current_state is not None: 

655 # Update state instance using shared helper 

656 self._update_state_instance(context, context.current_state) 

657 new_state = context.current_state_instance 

658 

659 # Track in history using shared helper 

660 if context.current_state is not None: 

661 self._record_history_step(context.current_state, arc_name, context) 

662 

663 # Add to trace using shared helper (we need to adjust the helper slightly) 

664 if self.execution_mode == ExecutionMode.TRACE: 

665 self._trace_buffer.append({ 

666 'from': state_before, 

667 'to': context.current_state, 

668 'arc': arc_name or 'transition', 

669 'data': context.data 

670 }) 

671 

672 # Call state enter hook (async version) 

673 if self._hooks.on_state_enter: 

674 await self._hooks.on_state_enter(new_state) 

675 

676 return new_state 

677 

678 # No transition occurred 

679 return None 

680 

681 async def run_until_breakpoint( 

682 self, 

683 context: ExecutionContext, 

684 max_steps: int = 1000 

685 ) -> StateInstance | None: 

686 """Run execution until a breakpoint is hit. 

687 

688 Args: 

689 context: Execution context 

690 max_steps: Maximum steps to execute (safety limit) 

691 

692 Returns: 

693 State instance where execution stopped 

694 """ 

695 for _ in range(max_steps): 

696 # Check if current state is a breakpoint 

697 if context.current_state in self._breakpoints: 

698 return context.current_state_instance 

699 

700 # Step to next state 

701 new_state = await self.step(context) 

702 

703 # Check if we reached an end state or no transition occurred 

704 if not new_state or self._is_at_end_state(context): 

705 return context.current_state_instance 

706 

707 # Hit max steps limit 

708 return context.current_state_instance 

709 

710 async def trace_execution( 

711 self, 

712 data: dict[str, Any] | Record, 

713 initial_state: str | None = None 

714 ) -> list[dict[str, Any]]: 

715 """Execute with full tracing enabled. 

716  

717 Args: 

718 data: Input data 

719 initial_state: Optional starting state 

720  

721 Returns: 

722 List of trace entries 

723 """ 

724 self.execution_mode = ExecutionMode.TRACE 

725 self._trace_buffer.clear() 

726 

727 async with self.execution_context(data, initial_state=initial_state) as context: 

728 # Run to completion 

729 while True: 

730 new_state = await self.step(context) 

731 if not new_state or new_state.definition.is_end: 

732 break 

733 

734 return self._trace_buffer 

735 

736 async def profile_execution( 

737 self, 

738 data: dict[str, Any] | Record, 

739 initial_state: str | None = None 

740 ) -> dict[str, Any]: 

741 """Execute with performance profiling. 

742  

743 Args: 

744 data: Input data 

745 initial_state: Optional starting state 

746  

747 Returns: 

748 Profiling data 

749 """ 

750 import time 

751 

752 self.execution_mode = ExecutionMode.PROFILE 

753 self._profile_data.clear() 

754 

755 async with self.execution_context(data, initial_state=initial_state) as context: 

756 start_time = time.time() 

757 transitions = 0 

758 

759 # Track per-state timing 

760 state_times = {} 

761 state_start = time.time() 

762 

763 while True: 

764 # Get current state name 

765 if isinstance(context.current_state, str): 

766 current_state_name = context.current_state 

767 else: 

768 current_state_name = context.current_state if context.current_state else "unknown" 

769 

770 # Step 

771 new_state = await self.step(context) 

772 

773 # Record state timing 

774 state_duration = time.time() - state_start 

775 if current_state_name not in state_times: 

776 state_times[current_state_name] = [] 

777 state_times[current_state_name].append(state_duration) 

778 

779 if not new_state or (hasattr(new_state, 'definition') and new_state.definition.is_end): 

780 break 

781 

782 transitions += 1 

783 state_start = time.time() 

784 

785 total_time = time.time() - start_time 

786 

787 # Compute statistics 

788 self._profile_data = { 

789 'total_time': total_time, 

790 'transitions': transitions, 

791 'avg_transition_time': total_time / transitions if transitions > 0 else 0, 

792 'state_times': { 

793 state: { 

794 'count': len(times), 

795 'total': sum(times), 

796 'avg': sum(times) / len(times), 

797 'min': min(times), 

798 'max': max(times) 

799 } 

800 for state, times in state_times.items() 

801 } 

802 } 

803 

804 return self._profile_data 

805 

806 def get_available_transitions( 

807 self, 

808 state_name: str 

809 ) -> list[dict[str, Any]]: 

810 """Get available transitions from a state. 

811  

812 Args: 

813 state_name: Name of state 

814  

815 Returns: 

816 List of available transition information 

817 """ 

818 arcs = self.fsm.get_outgoing_arcs(state_name) 

819 return [ 

820 { 

821 'name': arc.name, 

822 'target': arc.target_state, 

823 'has_pre_test': arc.pre_test is not None, 

824 'has_transform': arc.transform is not None 

825 } 

826 for arc in arcs 

827 ] 

828 

829 def inspect_state(self, state_name: str) -> dict[str, Any]: 

830 """Inspect a state's configuration. 

831  

832 Args: 

833 state_name: Name of state to inspect 

834  

835 Returns: 

836 State configuration details 

837 """ 

838 state = self.fsm.get_state(state_name) 

839 if not state: 

840 return {'error': f'State {state_name} not found'} 

841 

842 return { 

843 'name': state.name, 

844 'is_start': self.fsm.is_start_state(state_name), 

845 'is_end': self.fsm.is_end_state(state_name), 

846 'has_transform': len(state.transform_functions) > 0, 

847 'has_validator': len(state.validation_functions) > 0, 

848 'resources': [r.name for r in state.resource_requirements] if state.resource_requirements else [], 

849 'metadata': state.metadata, 

850 'arcs': state.arcs 

851 } 

852 

853 def visualize_fsm(self) -> str: 

854 """Generate a visual representation of the FSM. 

855  

856 Returns: 

857 GraphViz DOT format string 

858 """ 

859 lines = ['digraph FSM {'] 

860 lines.append(' rankdir=LR;') 

861 lines.append(' node [shape=circle];') 

862 

863 # Add states 

864 for state in self.fsm.states.values(): 

865 attrs = [] 

866 if state.is_start: 

867 attrs.append('style=filled') 

868 attrs.append('fillcolor=green') 

869 elif state.is_end: 

870 attrs.append('shape=doublecircle') 

871 attrs.append('style=filled') 

872 attrs.append('fillcolor=red') 

873 

874 if attrs: 

875 lines.append(f' {state.name} [{",".join(attrs)}];') 

876 else: 

877 lines.append(f' {state.name};') 

878 

879 # Add arcs 

880 for state_name in self.fsm.states: 

881 for arc in self.fsm.get_outgoing_arcs(state_name): 

882 label = arc.name if arc.name else "" 

883 lines.append(f' {state_name} -> {arc.target_state} [label="{label}"];') 

884 

885 lines.append('}') 

886 return '\n'.join(lines) 

887 

888 async def validate_network(self) -> dict[str, Any]: 

889 """Validate the FSM network for consistency. 

890  

891 Returns: 

892 Validation results 

893 """ 

894 issues = [] 

895 

896 # Check for unreachable states 

897 reachable = set() 

898 to_visit = [s.name for s in self.fsm.states.values() if s.is_start] 

899 

900 while to_visit: 

901 state = to_visit.pop(0) 

902 if state in reachable: 

903 continue 

904 reachable.add(state) 

905 

906 arcs = self.fsm.get_outgoing_arcs(state) 

907 for arc in arcs: 

908 if arc.target_state not in reachable: 

909 to_visit.append(arc.target_state) 

910 

911 unreachable = set(self.fsm.states.keys()) - reachable 

912 if unreachable: 

913 issues.append({ 

914 'type': 'unreachable_states', 

915 'states': list(unreachable) 

916 }) 

917 

918 # Check for dead ends (non-end states with no outgoing arcs) 

919 for state_name, state in self.fsm.states.items(): 

920 if not state.is_end: 

921 arcs = self.fsm.get_outgoing_arcs(state_name) 

922 if not arcs: 

923 issues.append({ 

924 'type': 'dead_end', 

925 'state': state_name 

926 }) 

927 

928 return { 

929 'valid': len(issues) == 0, 

930 'issues': issues, 

931 'stats': { 

932 'total_states': len(self.fsm.states), 

933 'reachable_states': len(reachable), 

934 'unreachable_states': len(unreachable), 

935 'start_states': sum(1 for s in self.fsm.states.values() if s.is_start), # type: ignore 

936 'end_states': sum(1 for s in self.fsm.states.values() if s.is_end) # type: ignore 

937 } 

938 } 

939 

940 def get_history(self) -> ExecutionHistory | None: 

941 """Get execution history if enabled. 

942  

943 Returns: 

944 Execution history or None 

945 """ 

946 return self._history 

947 

948 async def save_history(self) -> bool: 

949 """Save execution history to storage. 

950  

951 Returns: 

952 True if saved successfully 

953 """ 

954 if self._history and self._storage: # type: ignore[unreachable] 

955 return await self._storage.save(self._history) # type: ignore[unreachable] 

956 return False 

957 

958 async def load_history(self, history_id: str) -> bool: 

959 """Load execution history from storage. 

960  

961 Args: 

962 history_id: History identifier 

963  

964 Returns: 

965 True if loaded successfully 

966 """ 

967 if self._storage: 

968 history = await self._storage.load(history_id) # type: ignore[unreachable] 

969 if history: 

970 self._history = history 

971 return True 

972 return False 

973 

974 # ========== Shared Helper Methods ========== 

975 # These methods contain logic shared between sync and async implementations 

976 

977 def _get_available_transitions( 

978 self, 

979 context: ExecutionContext, 

980 arc_name: str | None = None 

981 ) -> list: 

982 """Get available transitions from current state (shared logic). 

983 

984 Args: 

985 context: Execution context 

986 arc_name: Optional specific arc to filter for 

987 

988 Returns: 

989 List of available arcs 

990 """ 

991 transitions = [] 

992 if not context.current_state: 

993 return transitions 

994 

995 # Get arcs from current state 

996 arcs = self.fsm.get_outgoing_arcs(context.current_state) 

997 

998 for arc in arcs: 

999 # Filter by arc name if specified 

1000 if arc_name and arc.name != arc_name: 

1001 continue 

1002 

1003 # Check arc condition 

1004 if arc.pre_test: 

1005 # Get function registry 

1006 registry = getattr(self.fsm, 'function_registry', {}) 

1007 if hasattr(registry, 'functions'): 

1008 functions = registry.functions 

1009 else: 

1010 functions = registry 

1011 

1012 # Check in registry and custom functions 

1013 test_func = functions.get(arc.pre_test) or self._custom_functions.get(arc.pre_test) 

1014 if test_func: 

1015 try: 

1016 if test_func(context.data, context): 

1017 transitions.append(arc) 

1018 except Exception: 

1019 pass 

1020 else: 

1021 # No condition, arc is always available 

1022 transitions.append(arc) 

1023 

1024 return transitions 

1025 

1026 def _execute_arc_transform( 

1027 self, 

1028 arc, 

1029 context: ExecutionContext 

1030 ) -> tuple[bool, Any]: 

1031 """Execute arc transform function (shared logic). 

1032 

1033 Args: 

1034 arc: Arc with potential transform 

1035 context: Execution context 

1036 

1037 Returns: 

1038 Tuple of (success, result_or_error) 

1039 """ 

1040 if not arc.transform: 

1041 return True, context.data 

1042 

1043 # Get function registry 

1044 registry = getattr(self.fsm, 'function_registry', {}) 

1045 if hasattr(registry, 'functions'): 

1046 functions = registry.functions 

1047 else: 

1048 functions = registry 

1049 

1050 # Look for transform in registry or custom functions 

1051 transform_func = functions.get(arc.transform) or self._custom_functions.get(arc.transform) 

1052 

1053 if transform_func: 

1054 try: 

1055 result = transform_func(context.data, context) 

1056 return True, result 

1057 except Exception as e: 

1058 return False, str(e) 

1059 

1060 return True, context.data 

1061 

1062 def _update_state_instance( 

1063 self, 

1064 context: ExecutionContext, 

1065 state_name: str 

1066 ) -> None: 

1067 """Update the current state instance in context (shared logic). 

1068 

1069 Args: 

1070 context: Execution context 

1071 state_name: Name of the new state 

1072 """ 

1073 state_def = self.fsm.states.get(state_name) 

1074 if state_def: 

1075 context.current_state_instance = StateInstance( 

1076 definition=state_def, 

1077 data=context.data 

1078 ) 

1079 # Mark if it's an end state 

1080 context.metadata['is_end_state'] = state_def.is_end 

1081 

1082 def _is_at_end_state(self, context: ExecutionContext) -> bool: 

1083 """Check if context is at an end state (shared logic). 

1084 

1085 Args: 

1086 context: Execution context 

1087 

1088 Returns: 

1089 True if at an end state 

1090 """ 

1091 if not context.current_state: 

1092 return False 

1093 

1094 state = self.fsm.states.get(context.current_state) 

1095 if state: 

1096 return state.is_end 

1097 

1098 return context.metadata.get('is_end_state', False) 

1099 

1100 def _record_trace_entry( 

1101 self, 

1102 from_state: str, 

1103 to_state: str, 

1104 arc_name: str | None, 

1105 context: ExecutionContext 

1106 ) -> None: 

1107 """Record a trace entry if in trace mode (shared logic). 

1108 

1109 Args: 

1110 from_state: State transitioning from 

1111 to_state: State transitioning to 

1112 arc_name: Name of arc taken 

1113 context: Execution context 

1114 """ 

1115 if self.execution_mode == ExecutionMode.TRACE: 

1116 self._trace_buffer.append({ 

1117 'from_state': from_state, 

1118 'to_state': to_state, 

1119 'transition': arc_name or f"{from_state}->{to_state}", 

1120 'data': context.get_data_snapshot(), 

1121 'timestamp': time.time() 

1122 }) 

1123 

1124 def _record_history_step( 

1125 self, 

1126 state_name: str, 

1127 arc_name: str | None, 

1128 context: ExecutionContext 

1129 ) -> None: 

1130 """Record a history step if history is enabled (shared logic). 

1131 

1132 Args: 

1133 state_name: Current state name 

1134 arc_name: Arc taken 

1135 context: Execution context 

1136 """ 

1137 if self._history: 

1138 step = self._history.add_step( # type: ignore[unreachable] 

1139 state_name=state_name, 

1140 network_name=getattr(context, 'network_name', 'main'), 

1141 data=context.data 

1142 ) 

1143 step.complete(arc_taken=arc_name or 'transition') 

1144 

1145 def _call_hook_sync( 

1146 self, 

1147 hook_name: str, 

1148 *args: Any 

1149 ) -> None: 

1150 """Call a hook synchronously if it exists (shared logic). 

1151 

1152 Args: 

1153 hook_name: Name of hook attribute 

1154 args: Arguments to pass to hook 

1155 """ 

1156 hook = getattr(self._hooks, hook_name, None) 

1157 if hook: 

1158 try: 

1159 hook(*args) 

1160 except Exception: 

1161 pass # Silently ignore hook errors 

1162 

1163 def _find_initial_state(self) -> str | None: 

1164 """Find the initial state in the FSM (shared logic). 

1165 

1166 Returns: 

1167 Name of initial state or None 

1168 """ 

1169 for state_name, state in self.fsm.states.items(): 

1170 if state.is_start: 

1171 return state_name 

1172 return None 

1173 

1174 # ========== Synchronous Execution Methods ========== 

1175 

1176 def execute_step_sync( 

1177 self, 

1178 context: ExecutionContext, 

1179 arc_name: str | None = None 

1180 ) -> StepResult: 

1181 """Execute a single transition step synchronously. 

1182 

1183 Args: 

1184 context: Execution context 

1185 arc_name: Optional specific arc to follow 

1186 

1187 Returns: 

1188 StepResult with transition details 

1189 """ 

1190 start_time = time.time() 

1191 from_state = context.current_state or "initial" 

1192 data_before = context.get_data_snapshot() 

1193 

1194 try: 

1195 # Initialize state if needed 

1196 if not context.current_state: 

1197 initial_state = self._find_initial_state() 

1198 if initial_state: 

1199 context.set_state(initial_state) 

1200 self._update_state_instance(context, initial_state) 

1201 # Execute transforms for initial state 

1202 if hasattr(self._engine, '_execute_state_transforms'): 

1203 self._engine._execute_state_transforms(context, initial_state) 

1204 else: 

1205 return StepResult( 

1206 from_state=from_state, 

1207 to_state=from_state, 

1208 transition="error", 

1209 data_before=data_before, 

1210 data_after=context.get_data_snapshot(), 

1211 duration=time.time() - start_time, 

1212 success=False, 

1213 error="No initial state found" 

1214 ) 

1215 

1216 # Use shared logic to get transitions 

1217 transitions = self._get_available_transitions(context, arc_name) 

1218 

1219 if not transitions: 

1220 # No transitions available 

1221 return StepResult( 

1222 from_state=from_state, 

1223 to_state=from_state, 

1224 transition="none", 

1225 data_before=data_before, 

1226 data_after=context.get_data_snapshot(), 

1227 duration=time.time() - start_time, 

1228 success=True, 

1229 is_complete=self._is_at_end_state(context) 

1230 ) 

1231 

1232 # Take first valid transition (could be enhanced with strategy selection) 

1233 arc = transitions[0] 

1234 

1235 # Execute arc transform using shared logic 

1236 success, result = self._execute_arc_transform(arc, context) 

1237 if success: 

1238 context.data = result 

1239 else: 

1240 return StepResult( 

1241 from_state=from_state, 

1242 to_state=from_state, 

1243 transition=arc.name or "error", 

1244 data_before=data_before, 

1245 data_after=context.get_data_snapshot(), 

1246 duration=time.time() - start_time, 

1247 success=False, 

1248 error=result 

1249 ) 

1250 

1251 # Update state 

1252 context.set_state(arc.target_state) 

1253 self._update_state_instance(context, arc.target_state) 

1254 

1255 # Execute state transforms when entering the new state 

1256 # This is critical for sync execution to match async behavior 

1257 if hasattr(self._engine, '_execute_state_transforms'): 

1258 self._engine._execute_state_transforms(context, arc.target_state) 

1259 

1260 # Check if we hit a breakpoint 

1261 at_breakpoint = arc.target_state in self._breakpoints 

1262 

1263 # Record in trace buffer if in trace mode 

1264 self._record_trace_entry(from_state, arc.target_state, arc.name, context) 

1265 

1266 # Record in history if enabled 

1267 self._record_history_step(arc.target_state, arc.name, context) 

1268 

1269 # Call hooks if configured 

1270 self._call_hook_sync('on_state_exit', from_state) 

1271 self._call_hook_sync('on_state_enter', arc.target_state) 

1272 

1273 return StepResult( 

1274 from_state=from_state, 

1275 to_state=arc.target_state, 

1276 transition=arc.name or f"{from_state}->{arc.target_state}", 

1277 data_before=data_before, 

1278 data_after=context.get_data_snapshot(), 

1279 duration=time.time() - start_time, 

1280 success=True, 

1281 at_breakpoint=at_breakpoint, 

1282 is_complete=self._is_at_end_state(context) 

1283 ) 

1284 

1285 except Exception as e: 

1286 self._call_hook_sync('on_error', e) 

1287 

1288 return StepResult( 

1289 from_state=from_state, 

1290 to_state=from_state, 

1291 transition="error", 

1292 data_before=data_before, 

1293 data_after=context.get_data_snapshot(), 

1294 duration=time.time() - start_time, 

1295 success=False, 

1296 error=str(e) 

1297 ) 

1298 

1299 def run_until_breakpoint_sync( 

1300 self, 

1301 context: ExecutionContext, 

1302 max_steps: int = 1000 

1303 ) -> StateInstance | None: 

1304 """Run execution until a breakpoint is hit (synchronous). 

1305 

1306 Args: 

1307 context: Execution context 

1308 max_steps: Maximum steps to execute 

1309 

1310 Returns: 

1311 State instance where execution stopped 

1312 """ 

1313 for _ in range(max_steps): 

1314 # Check if at breakpoint 

1315 if context.current_state in self._breakpoints: 

1316 return context.current_state_instance 

1317 

1318 # Execute step 

1319 result = self.execute_step_sync(context) 

1320 

1321 # Check for completion or error 

1322 if not result.success or result.is_complete: 

1323 return context.current_state_instance 

1324 

1325 # Check if stuck 

1326 if result.from_state == result.to_state and result.transition == "none": 

1327 return context.current_state_instance 

1328 

1329 return context.current_state_instance 

1330 

1331 def trace_execution_sync( 

1332 self, 

1333 data: dict[str, Any] | Record, 

1334 initial_state: str | None = None, 

1335 max_steps: int = 1000 

1336 ) -> list[dict[str, Any]]: 

1337 """Execute with full tracing enabled (synchronous). 

1338 

1339 Args: 

1340 data: Input data 

1341 initial_state: Optional starting state 

1342 max_steps: Maximum steps to execute 

1343 

1344 Returns: 

1345 List of trace entries 

1346 """ 

1347 self.execution_mode = ExecutionMode.TRACE 

1348 self._trace_buffer.clear() 

1349 

1350 context = self.create_context(data, initial_state=initial_state) 

1351 

1352 for _ in range(max_steps): 

1353 # Execute step (trace recording happens inside execute_step_sync) 

1354 result = self.execute_step_sync(context) 

1355 

1356 # Check termination conditions 

1357 if not result.success or result.is_complete: 

1358 break 

1359 

1360 if result.from_state == result.to_state and result.transition == "none": 

1361 break 

1362 

1363 return self._trace_buffer 

1364 

1365 def profile_execution_sync( 

1366 self, 

1367 data: dict[str, Any] | Record, 

1368 initial_state: str | None = None, 

1369 max_steps: int = 1000 

1370 ) -> dict[str, Any]: 

1371 """Execute with performance profiling (synchronous). 

1372 

1373 Args: 

1374 data: Input data 

1375 initial_state: Optional starting state 

1376 max_steps: Maximum steps to execute 

1377 

1378 Returns: 

1379 Profiling data 

1380 """ 

1381 self.execution_mode = ExecutionMode.PROFILE 

1382 self._profile_data.clear() 

1383 

1384 context = self.create_context(data, initial_state=initial_state) 

1385 

1386 start_time = time.time() 

1387 transitions = 0 

1388 state_times = {} 

1389 transition_times = [] 

1390 

1391 for _ in range(max_steps): 

1392 state_start = time.time() 

1393 current_state = context.current_state 

1394 

1395 # Execute step 

1396 result = self.execute_step_sync(context) 

1397 

1398 # Record timings 

1399 if current_state: 

1400 if current_state not in state_times: 

1401 state_times[current_state] = [] 

1402 state_times[current_state].append(time.time() - state_start) 

1403 

1404 if result.success and result.from_state != result.to_state: 

1405 transition_times.append(result.duration) 

1406 transitions += 1 

1407 

1408 # Check termination 

1409 if not result.success or result.is_complete: 

1410 break 

1411 

1412 if result.from_state == result.to_state and result.transition == "none": 

1413 break 

1414 

1415 # Calculate statistics 

1416 self._profile_data = { 

1417 'total_time': time.time() - start_time, 

1418 'transitions': transitions, 

1419 'states_visited': len(state_times), 

1420 'avg_transition_time': sum(transition_times) / len(transition_times) if transition_times else 0, 

1421 'state_times': { 

1422 state: { 

1423 'count': len(times), 

1424 'total': sum(times), 

1425 'avg': sum(times) / len(times), 

1426 'min': min(times), 

1427 'max': max(times) 

1428 } 

1429 for state, times in state_times.items() 

1430 }, 

1431 'final_state': context.current_state, 

1432 'final_data': context.get_data_snapshot() 

1433 } 

1434 

1435 return self._profile_data 

1436 

1437 

1438class FSMDebugger: 

1439 """Interactive debugger for FSM execution (fully synchronous).""" 

1440 

1441 def __init__(self, fsm: AdvancedFSM): 

1442 """Initialize debugger. 

1443 

1444 Args: 

1445 fsm: Advanced FSM instance to debug 

1446 """ 

1447 self.fsm = fsm 

1448 self.context: ExecutionContext | None = None 

1449 self.watch_vars: dict[str, Any] = {} 

1450 self.command_history: list[str] = [] 

1451 self.step_count: int = 0 

1452 self.execution_history: list[StepResult] = [] 

1453 

1454 @property 

1455 def current_state(self) -> str | None: 

1456 """Get the current state name.""" 

1457 if not self.context: 

1458 return None 

1459 return self.context.get_current_state() 

1460 

1461 @property 

1462 def watches(self) -> dict[str, Any]: 

1463 """Get current watch variable values.""" 

1464 return self.watch_vars.copy() 

1465 

1466 def start( 

1467 self, 

1468 data: dict[str, Any] | Record, 

1469 initial_state: str | None = None 

1470 ) -> None: 

1471 """Start debugging session (synchronous). 

1472 

1473 Args: 

1474 data: Initial data 

1475 initial_state: Optional starting state 

1476 """ 

1477 self.context = self.fsm.create_context(data, initial_state=initial_state) 

1478 self.step_count = 0 

1479 self.execution_history.clear() 

1480 

1481 print(f"Debugger started at state: {self.context.current_state or 'initial'}") 

1482 print(f"Data: {self.context.get_data_snapshot()}") 

1483 

1484 def step(self) -> StepResult: 

1485 """Execute single step and return detailed result. 

1486 

1487 Returns: 

1488 StepResult with transition details 

1489 """ 

1490 if not self.context: 

1491 print("No active debugging session. Call start() first.") 

1492 return StepResult( 

1493 from_state="none", 

1494 to_state="none", 

1495 transition="error", 

1496 success=False, 

1497 error="No active debugging session" 

1498 ) 

1499 

1500 result = self.fsm.execute_step_sync(self.context) 

1501 self.step_count += 1 

1502 self.execution_history.append(result) 

1503 

1504 # Print step information 

1505 if result.success: 

1506 if result.from_state == result.to_state and result.transition == "none": 

1507 print(f"Step {self.step_count}: No transition available from '{result.from_state}'") 

1508 else: 

1509 print(f"Step {self.step_count}: {result.from_state} -> {result.to_state} via '{result.transition}'") 

1510 

1511 if result.at_breakpoint: 

1512 print("*** Hit breakpoint ***") 

1513 

1514 if result.is_complete: 

1515 print("*** Reached end state ***") 

1516 else: 

1517 print(f"Step {self.step_count}: Error - {result.error}") 

1518 

1519 # Check watches 

1520 self._check_watches() 

1521 

1522 return result 

1523 

1524 def continue_to_breakpoint(self) -> StateInstance | None: 

1525 """Continue execution until a breakpoint is hit. 

1526 

1527 Returns: 

1528 State instance where execution stopped 

1529 """ 

1530 if not self.context: 

1531 print("No active debugging session") 

1532 return None 

1533 

1534 print(f"Continuing from state: {self.context.current_state}") 

1535 final_state = self.fsm.run_until_breakpoint_sync(self.context) 

1536 

1537 if final_state: 

1538 print(f"Stopped at: {self.context.current_state}") 

1539 if self.context.current_state in self.fsm._breakpoints: 

1540 print("*** At breakpoint ***") 

1541 if self.context.is_complete(): 

1542 print("*** Execution complete ***") 

1543 

1544 return final_state 

1545 

1546 def inspect(self, path: str = "") -> Any: 

1547 """Inspect data at path. 

1548 

1549 Args: 

1550 path: Dot-separated path to data field (empty for all data) 

1551 

1552 Returns: 

1553 Value at path 

1554 """ 

1555 if not self.context: 

1556 print("No active debugging session") 

1557 return None 

1558 

1559 data = self.context.data 

1560 

1561 if not path: 

1562 return data 

1563 

1564 # Navigate path 

1565 for key in path.split('.'): 

1566 if isinstance(data, dict): 

1567 data = data.get(key) 

1568 elif hasattr(data, key): 

1569 data = getattr(data, key) 

1570 else: 

1571 return None 

1572 return data 

1573 

1574 def watch(self, name: str, path: str) -> None: 

1575 """Add a watch expression. 

1576 

1577 Args: 

1578 name: Watch name 

1579 path: Data path to watch 

1580 """ 

1581 self.watch_vars[name] = path 

1582 value = self.inspect(path) 

1583 print(f"Watch '{name}' added: {path} = {value}") 

1584 

1585 def unwatch(self, name: str) -> None: 

1586 """Remove a watch expression. 

1587 

1588 Args: 

1589 name: Watch name to remove 

1590 """ 

1591 if name in self.watch_vars: 

1592 del self.watch_vars[name] 

1593 print(f"Watch '{name}' removed") 

1594 

1595 def _check_watches(self) -> None: 

1596 """Check and print changed watch values.""" 

1597 if not self.watch_vars: 

1598 return 

1599 

1600 for name, path in self.watch_vars.items(): 

1601 value = self.inspect(path) 

1602 print(f" Watch '{name}': {path} = {value}") 

1603 

1604 def print_watches(self) -> None: 

1605 """Print all watch values.""" 

1606 if not self.watch_vars: 

1607 print("No watches set") 

1608 return 

1609 

1610 for name, path in self.watch_vars.items(): 

1611 value = self.inspect(path) 

1612 print(f"{name}: {path} = {value}") 

1613 

1614 def print_state(self) -> None: 

1615 """Print current state information.""" 

1616 if not self.context: 

1617 print("No active debugging session") 

1618 return 

1619 

1620 print("\n=== State Information ===") 

1621 print(f"Current State: {self.context.current_state}") 

1622 print(f"Previous State: {self.context.previous_state}") 

1623 print(f"Is Complete: {self.context.is_complete()}") 

1624 print("\nData:") 

1625 data = self.context.get_data_snapshot() 

1626 for key, value in data.items(): 

1627 print(f" {key}: {value}") 

1628 

1629 # Print available transitions 

1630 transitions = self.fsm._get_available_transitions(self.context) 

1631 if transitions: 

1632 print("\nAvailable Transitions:") 

1633 for arc in transitions: 

1634 print(f" - {arc.name or 'unnamed'} -> {arc.target_state}") 

1635 else: 

1636 if self.context.is_complete(): 

1637 print("\nNo transitions (end state)") 

1638 else: 

1639 print("\nNo available transitions") 

1640 

1641 def inspect_current_state(self) -> dict[str, Any]: 

1642 """Get detailed information about current state. 

1643 

1644 Returns: 

1645 Dictionary with state details 

1646 """ 

1647 if not self.context: 

1648 return {"error": "No active debugging session"} 

1649 

1650 return { 

1651 'state': self.context.current_state, 

1652 'previous_state': self.context.previous_state, 

1653 'data': self.context.get_data_snapshot(), 

1654 'is_complete': self.context.is_complete(), 

1655 'step_count': self.step_count, 

1656 'at_breakpoint': self.context.current_state in self.fsm._breakpoints, 

1657 'available_transitions': [ 

1658 {'name': arc.name, 'target': arc.target_state} 

1659 for arc in self.fsm._get_available_transitions(self.context) 

1660 ] 

1661 } 

1662 

1663 def get_history(self, limit: int = 10) -> list[StepResult]: 

1664 """Get recent execution history. 

1665 

1666 Args: 

1667 limit: Maximum number of steps to return 

1668 

1669 Returns: 

1670 List of recent step results 

1671 """ 

1672 return self.execution_history[-limit:] 

1673 

1674 def reset(self, data: dict[str, Any] | Record | None = None) -> None: 

1675 """Reset debugger with new data. 

1676 

1677 Args: 

1678 data: New data (uses current data if None) 

1679 """ 

1680 if data is None and self.context: 

1681 data = self.context.data 

1682 

1683 if data is None: 

1684 print("No data available for reset") 

1685 return 

1686 

1687 self.start(data) 

1688 

1689 

1690def create_advanced_fsm( 

1691 config: str | Path | dict[str, Any] | FSM, 

1692 custom_functions: dict[str, Callable] | None = None, 

1693 **kwargs: Any 

1694) -> AdvancedFSM: 

1695 """Factory function to create an AdvancedFSM instance. 

1696  

1697 Args: 

1698 config: Configuration, FSM instance, or path 

1699 custom_functions: Optional custom functions to register 

1700 **kwargs: Additional arguments 

1701  

1702 Returns: 

1703 Configured AdvancedFSM instance 

1704 """ 

1705 if isinstance(config, FSM): 

1706 fsm = config 

1707 else: 

1708 from ..config.builder import FSMBuilder 

1709 from ..config.loader import ConfigLoader 

1710 

1711 loader = ConfigLoader() 

1712 

1713 if isinstance(config, (str, Path)): 

1714 config_obj = loader.load_from_file(str(config)) 

1715 else: 

1716 # Load from dict 

1717 config_obj = loader.load_from_dict(config) 

1718 

1719 builder = FSMBuilder() 

1720 

1721 # Register custom functions if provided 

1722 if custom_functions: 

1723 for name, func in custom_functions.items(): 

1724 builder.register_function(name, func) 

1725 

1726 fsm = builder.build(config_obj) 

1727 

1728 return AdvancedFSM(fsm, **kwargs)