Coverage for src/dataknobs_fsm/core/fsm.py: 15%

248 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-11-08 14:11 -0700

1"""Core FSM class for managing state machines. 

2 

3This module provides the core FSM class that serves as the foundation for all 

4dataknobs-fsm functionality. The FSM class manages state networks, function 

5registries, data modes, transactions, and resources. 

6 

7Architecture: 

8 The FSM class is the central orchestrator in the dataknobs-fsm architecture: 

9 

10 **Responsibilities:** 

11 - Network Management: Manages one or more state networks with transitions 

12 - Function Registry: Maintains registered functions for state operations 

13 - Data Mode Control: Configures how data flows through states (COPY/REFERENCE/DIRECT) 

14 - Transaction Management: Coordinates transactional state changes 

15 - Resource Management: Tracks and manages resource requirements 

16 - Execution Engines: Creates and manages sync/async execution engines 

17 

18 **Design Philosophy:** 

19 - Separation of Concerns: FSM defines structure, engines handle execution 

20 - Multiple Networks: Support complex workflows with multiple state graphs 

21 - Configurable Processing: Data modes adapt to different use cases 

22 - Resource Awareness: Track dependencies before execution 

23 

24 **Layer Architecture:** 

25 ``` 

26 API Layer (simple/async_simple/advanced) 

27 

28 Core FSM (this module) ← defines structure 

29 

30 Execution Engines ← handle runtime 

31 

32 State Network ← graph of states/transitions 

33 ``` 

34 

35Processing Modes: 

36 The FSM supports three data processing modes: 

37 

38 **SINGLE Mode (ProcessingMode.SINGLE):** 

39 - Process one data item at a time 

40 - Simplest execution model 

41 - Data flows through state network sequentially 

42 - Best for: Scripts, simple pipelines, prototypes 

43 

44 **BATCH Mode (ProcessingMode.BATCH):** 

45 - Process multiple items as a batch 

46 - All items go through same state sequence 

47 - Can parallelize within batch 

48 - Best for: High-throughput processing, bulk operations 

49 

50 **STREAM Mode (ProcessingMode.STREAM):** 

51 - Process data as streaming chunks 

52 - Memory-efficient for large datasets 

53 - Supports backpressure and flow control 

54 - Best for: Large files, real-time data, memory-constrained environments 

55 

56Transaction Modes: 

57 The FSM supports configurable transaction handling: 

58 

59 **NONE (TransactionMode.NONE):** 

60 - No transactional guarantees 

61 - State changes are immediate and permanent 

62 - Fastest performance 

63 - Best for: Read-only operations, idempotent workflows 

64 

65 **OPTIMISTIC (TransactionMode.OPTIMISTIC):** 

66 - Changes committed at workflow end 

67 - Rollback on failure 

68 - Moderate performance overhead 

69 - Best for: Most production workflows 

70 

71 **PESSIMISTIC (TransactionMode.PESSIMISTIC):** 

72 - Changes committed at each state 

73 - Highest consistency guarantees 

74 - Higher performance overhead 

75 - Best for: Critical workflows, strict consistency requirements 

76 

77State Networks: 

78 An FSM can contain multiple state networks: 

79 

80 **Main Network:** 

81 - Primary execution path 

82 - Default for most operations 

83 - Set via is_main parameter or first network added 

84 

85 **Sub-Networks:** 

86 - Additional state graphs for modular workflows 

87 - Can represent sub-processes or alternate paths 

88 - Accessed by name 

89 

90 **Network Operations:** 

91 - add_network(): Add a network to the FSM 

92 - remove_network(): Remove a network by name 

93 - get_network(): Get network by name (None = main network) 

94 

95Function Registry: 

96 The FSM maintains a registry of functions used in state operations: 

97 

98 **Function Types:** 

99 - Validation Functions: Check data validity before/after state entry 

100 - Transform Functions: Modify data during state processing 

101 - Test Functions: Determine state transitions 

102 - Pre/Post Hooks: Execute before/after state operations 

103 

104 **Registration:** 

105 - Functions registered via function_registry.register() 

106 - Referenced by name in state/arc definitions 

107 - Validated during FSM.validate() 

108 

109Resource Management: 

110 The FSM tracks resource requirements across all networks: 

111 

112 **Resource Types:** 

113 - Database connections 

114 - API clients 

115 - File handles 

116 - External service connections 

117 - Memory allocations 

118 

119 **Resource Tracking:** 

120 - Aggregated from all networks 

121 - Available via resource_requirements dict 

122 - Summary via get_resource_summary() 

123 - Managers set via resource_manager/transaction_manager 

124 

125Execution Engines: 

126 The FSM creates execution engines on demand: 

127 

128 **Synchronous Engine (ExecutionEngine):** 

129 - Created via get_engine() 

130 - Used by SimpleFSM API 

131 - Supports multiple traversal strategies: 

132 - DEPTH_FIRST: Deep exploration before backtracking 

133 - BREADTH_FIRST: Explore all neighbors before going deeper 

134 - RESOURCE_OPTIMIZED: Minimize resource acquisition/release 

135 - STREAM_OPTIMIZED: Optimize for streaming workflows 

136 

137 **Asynchronous Engine (AsyncExecutionEngine):** 

138 - Created via get_async_engine() 

139 - Used by AsyncSimpleFSM API 

140 - Supports concurrent state execution 

141 - Efficient I/O handling 

142 

143Validation: 

144 The FSM can validate its structure before execution: 

145 

146 **Checks Performed:** 

147 - At least one network exists 

148 - Main network exists and is valid 

149 - All networks are internally valid 

150 - All referenced functions are registered 

151 - No dangling references 

152 

153 **Usage:** 

154 ```python 

155 valid, errors = fsm.validate() 

156 if not valid: 

157 for error in errors: 

158 print(f"Validation error: {error}") 

159 ``` 

160 

161Note: 

162 This is an internal API primarily used by the builder and API layers. 

163 Users typically interact with SimpleFSM, AsyncSimpleFSM, or AdvancedFSM 

164 rather than instantiating FSM directly. 

165 

166 The FSM class is designed to be serializable (to_dict/from_dict) for 

167 persistence and transmission. 

168 

169See Also: 

170 - :class:`~dataknobs_fsm.api.simple.SimpleFSM`: Synchronous API wrapper 

171 - :class:`~dataknobs_fsm.api.async_simple.AsyncSimpleFSM`: Async API wrapper 

172 - :class:`~dataknobs_fsm.api.advanced.AdvancedFSM`: Advanced debugging API 

173 - :class:`~dataknobs_fsm.core.network.StateNetwork`: State graph container 

174 - :class:`~dataknobs_fsm.execution.engine.ExecutionEngine`: Sync execution engine 

175 - :class:`~dataknobs_fsm.execution.async_engine.AsyncExecutionEngine`: Async execution engine 

176""" 

177 

178from typing import Any, Dict, List, Set, Tuple, Optional, TYPE_CHECKING 

179 

180from dataknobs_fsm.core.modes import ProcessingMode, TransactionMode 

181 

182if TYPE_CHECKING: 

183 from dataknobs_fsm.execution.engine import ExecutionEngine 

184 from dataknobs_fsm.execution.async_engine import AsyncExecutionEngine 

185from dataknobs_fsm.core.network import StateNetwork 

186from dataknobs_fsm.core.state import StateDefinition, StateInstance, StateType 

187from dataknobs_fsm.functions.base import FunctionRegistry 

188 

189 

190class FSM: 

191 """Finite State Machine core class. 

192 

193 This is the foundational class that defines the structure and configuration 

194 of a finite state machine. It serves as the container for state networks, 

195 functions, and execution configuration. 

196 

197 Architecture: 

198 The FSM class uses a builder pattern approach where: 

199 1. FSM is constructed with configuration 

200 2. Networks are added with states and transitions 

201 3. Functions are registered for state operations 

202 4. FSM is validated before execution 

203 5. Execution engines are created on-demand 

204 

205 Key Components: 

206 **State Networks:** 

207 - One or more state graphs (nodes=states, edges=transitions) 

208 - Main network for primary execution path 

209 - Sub-networks for modular workflows 

210 - Accessed via networks dict or get_network() 

211 

212 **Function Registry:** 

213 - Central registry of all functions used in workflows 

214 - Validation, transform, and test functions 

215 - Referenced by name in state/arc definitions 

216 - Ensures all function references are valid 

217 

218 **Processing Configuration:** 

219 - data_mode: How data flows (SINGLE/BATCH/STREAM) 

220 - transaction_mode: Transaction guarantees (NONE/OPTIMISTIC/PESSIMISTIC) 

221 - resource_manager: Manages external resources 

222 - transaction_manager: Coordinates transactions 

223 

224 **Execution Engines:** 

225 - Created lazily via get_engine() / get_async_engine() 

226 - Sync engine for SimpleFSM 

227 - Async engine for AsyncSimpleFSM 

228 - Cached for reuse 

229 

230 Attributes: 

231 name (str): Unique identifier for this FSM 

232 data_mode (ProcessingMode): Data processing mode (SINGLE/BATCH/STREAM) 

233 transaction_mode (TransactionMode): Transaction handling (NONE/OPTIMISTIC/PESSIMISTIC) 

234 description (str | None): Optional FSM description 

235 networks (Dict[str, StateNetwork]): State networks by name 

236 main_network_name (str | None): Name of the main network 

237 function_registry (FunctionRegistry): Registry of all functions 

238 resource_requirements (Dict[str, Any]): Aggregated resource requirements 

239 config (Dict[str, Any]): Additional configuration 

240 metadata (Dict[str, Any]): FSM metadata 

241 version (str): FSM version for compatibility 

242 created_at (float | None): Creation timestamp 

243 updated_at (float | None): Last update timestamp 

244 resource_manager (Any | None): External resource manager 

245 transaction_manager (Any | None): Transaction coordinator 

246 

247 Design Patterns: 

248 **Separation of Concerns:** 

249 - FSM defines structure (what to execute) 

250 - Engines handle execution (how to execute) 

251 - Clear boundary between definition and runtime 

252 

253 **Multiple Networks:** 

254 - FSMs can contain multiple state graphs 

255 - Enables modular workflow composition 

256 - Sub-networks can be reused across FSMs 

257 

258 **Lazy Initialization:** 

259 - Engines created on first use 

260 - Reduces overhead for validation-only use cases 

261 - Cached for subsequent use 

262 

263 **Validation Before Execution:** 

264 - validate() checks structure before execution 

265 - Catches configuration errors early 

266 - Provides detailed error messages 

267 

268 Resource Management: 

269 The FSM aggregates resource requirements from all networks: 

270 - Database connections needed 

271 - API clients required 

272 - File handles expected 

273 - Memory requirements 

274 

275 This enables: 

276 - Pre-execution resource checks 

277 - Resource pooling optimization 

278 - Clear dependency documentation 

279 

280 Serialization: 

281 FSM supports serialization for persistence and transmission: 

282 - to_dict(): Convert to dictionary 

283 - from_dict(): Reconstruct from dictionary 

284 - Note: Function registry not serialized (functions must be re-registered) 

285 

286 Thread Safety: 

287 The FSM structure itself is read-only after construction and validation. 

288 However, execution engines may or may not be thread-safe depending on 

289 data_mode: 

290 - SINGLE mode: Not thread-safe (by design) 

291 - BATCH mode: Thread-safe if using DataHandlingMode.COPY 

292 - STREAM mode: Thread-safe with proper streaming context 

293 

294 Note: 

295 This is an internal API. Users should use the higher-level APIs: 

296 - SimpleFSM for synchronous workflows 

297 - AsyncSimpleFSM for async/production workflows 

298 - AdvancedFSM for debugging and profiling 

299 

300 Direct FSM instantiation is primarily for builders and internal use. 

301 

302 See Also: 

303 - :class:`~dataknobs_fsm.api.simple.SimpleFSM`: Synchronous API 

304 - :class:`~dataknobs_fsm.api.async_simple.AsyncSimpleFSM`: Async API 

305 - :class:`~dataknobs_fsm.core.network.StateNetwork`: State graph 

306 - :class:`~dataknobs_fsm.functions.base.FunctionRegistry`: Function registry 

307 """ 

308 

309 def __init__( 

310 self, 

311 name: str, 

312 data_mode: ProcessingMode = ProcessingMode.SINGLE, 

313 transaction_mode: TransactionMode = TransactionMode.NONE, 

314 description: str | None = None, 

315 resource_manager: Any | None = None, 

316 transaction_manager: Any | None = None 

317 ): 

318 """Initialize FSM. 

319  

320 Args: 

321 name: Name of the FSM. 

322 data_mode: Data processing mode. 

323 transaction_mode: Transaction handling mode. 

324 description: Optional FSM description. 

325 """ 

326 self.name = name 

327 self.data_mode = data_mode 

328 self.transaction_mode = transaction_mode 

329 self.description = description 

330 

331 # Networks 

332 self.networks: Dict[str, StateNetwork] = {} 

333 self.main_network_name: str | None = None 

334 

335 # Function registry 

336 self.function_registry = FunctionRegistry() 

337 

338 # Resource requirements 

339 self.resource_requirements: Dict[str, Any] = {} 

340 

341 # Configuration 

342 self.config: Dict[str, Any] = {} 

343 

344 # Metadata 

345 self.metadata: Dict[str, Any] = {} 

346 self.version: str = "1.0.0" 

347 self.created_at: float | None = None 

348 self.updated_at: float | None = None 

349 

350 # Execution support (from builder FSM wrapper) 

351 self.resource_manager = resource_manager 

352 self.transaction_manager = transaction_manager 

353 self._engine: Any | None = None # ExecutionEngine 

354 self._async_engine: Any | None = None # AsyncExecutionEngine 

355 

356 def add_network( 

357 self, 

358 network: StateNetwork, 

359 is_main: bool = False 

360 ) -> None: 

361 """Add a network to the FSM. 

362  

363 Args: 

364 network: Network to add. 

365 is_main: Whether this is the main network. 

366 """ 

367 self.networks[network.name] = network 

368 

369 if is_main or self.main_network_name is None: 

370 self.main_network_name = network.name 

371 

372 # Aggregate resource requirements 

373 for resource_type, requirements in network.resource_requirements.items(): 

374 if resource_type not in self.resource_requirements: 

375 self.resource_requirements[resource_type] = set() 

376 self.resource_requirements[resource_type].update(requirements) 

377 

378 def remove_network(self, network_name: str) -> bool: 

379 """Remove a network from the FSM. 

380  

381 Args: 

382 network_name: Name of network to remove. 

383  

384 Returns: 

385 True if removed successfully. 

386 """ 

387 if network_name in self.networks: 

388 del self.networks[network_name] 

389 

390 # Update main network if needed 

391 if self.main_network_name == network_name: 

392 if self.networks: 

393 self.main_network_name = next(iter(self.networks.keys())) 

394 else: 

395 self.main_network_name = None 

396 

397 return True 

398 return False 

399 

400 def get_network(self, network_name: str | None = None) -> StateNetwork | None: 

401 """Get a network by name. 

402  

403 Args: 

404 network_name: Name of network (None for main network). 

405  

406 Returns: 

407 Network or None if not found. 

408 """ 

409 if network_name is None: 

410 network_name = self.main_network_name 

411 

412 if network_name: 

413 return self.networks.get(network_name) 

414 return None 

415 

416 def validate(self) -> Tuple[bool, List[str]]: 

417 """Validate the FSM. 

418  

419 Returns: 

420 Tuple of (valid, list of errors). 

421 """ 

422 errors = [] 

423 

424 # Check for at least one network 

425 if not self.networks: 

426 errors.append("FSM has no networks") 

427 

428 # Check main network exists 

429 if self.main_network_name and self.main_network_name not in self.networks: 

430 errors.append(f"Main network '{self.main_network_name}' not found") 

431 

432 # Validate each network 

433 for network_name, network in self.networks.items(): 

434 valid, network_errors = network.validate() 

435 if not valid: 

436 for error in network_errors: 

437 errors.append(f"Network '{network_name}': {error}") 

438 

439 # Check function references 

440 all_functions = self._get_all_function_references() 

441 for func_name in all_functions: 

442 if not self.function_registry.get_function(func_name): 

443 errors.append(f"Function '{func_name}' not registered") 

444 

445 return len(errors) == 0, errors 

446 

447 def _get_all_function_references(self) -> Set[str]: 

448 """Get all function references from all networks. 

449  

450 Returns: 

451 Set of function names referenced. 

452 """ 

453 functions = set() 

454 

455 for network in self.networks.values(): 

456 for arc in network.arcs.values(): 

457 if hasattr(arc, 'pre_test') and arc.pre_test: 

458 functions.add(arc.pre_test) 

459 if hasattr(arc, 'transform') and arc.transform: 

460 functions.add(arc.transform) 

461 

462 return functions 

463 

464 def get_all_states(self) -> Dict[str, List[str]]: 

465 """Get all states from all networks. 

466  

467 Returns: 

468 Dictionary of network_name -> list of state names. 

469 """ 

470 all_states = {} 

471 

472 for network_name, network in self.networks.items(): 

473 all_states[network_name] = list(network.states.keys()) 

474 

475 return all_states 

476 

477 def get_all_arcs(self) -> Dict[str, List[str]]: 

478 """Get all arcs from all networks. 

479  

480 Returns: 

481 Dictionary of network_name -> list of arc IDs. 

482 """ 

483 all_arcs = {} 

484 

485 for network_name, network in self.networks.items(): 

486 all_arcs[network_name] = list(network.arcs.keys()) 

487 

488 return all_arcs 

489 

490 def supports_streaming(self) -> bool: 

491 """Check if FSM supports streaming. 

492  

493 Returns: 

494 True if any network supports streaming. 

495 """ 

496 return any(network.supports_streaming for network in self.networks.values()) 

497 

498 def get_resource_summary(self) -> Dict[str, Any]: 

499 """Get resource requirements summary. 

500  

501 Returns: 

502 Resource requirements summary. 

503 """ 

504 summary = { 

505 'total_networks': len(self.networks), 

506 'total_states': sum(len(n.states) for n in self.networks.values()), 

507 'total_arcs': sum(len(n.arcs) for n in self.networks.values()), 

508 'resource_types': list(self.resource_requirements.keys()), 

509 'supports_streaming': self.supports_streaming(), 

510 'data_mode': self.data_mode.value, 

511 'transaction_mode': self.transaction_mode.value 

512 } 

513 

514 # Add resource counts 

515 for resource_type, requirements in self.resource_requirements.items(): 

516 summary[f'{resource_type}_count'] = len(requirements) 

517 

518 return summary 

519 

520 def clone(self) -> 'FSM': 

521 """Create a clone of this FSM. 

522  

523 Returns: 

524 Cloned FSM. 

525 """ 

526 clone = FSM( 

527 name=f"{self.name}_clone", 

528 data_mode=self.data_mode, 

529 transaction_mode=self.transaction_mode, 

530 description=self.description 

531 ) 

532 

533 # Clone networks 

534 for network_name, network in self.networks.items(): 

535 # Note: This is a shallow copy - for deep clone would need to implement network.clone() 

536 clone.networks[network_name] = network 

537 

538 clone.main_network_name = self.main_network_name 

539 clone.function_registry = self.function_registry 

540 clone.resource_requirements = self.resource_requirements.copy() 

541 clone.config = self.config.copy() 

542 clone.metadata = self.metadata.copy() 

543 clone.version = self.version 

544 

545 return clone 

546 

547 def to_dict(self) -> Dict[str, Any]: 

548 """Convert FSM to dictionary representation. 

549  

550 Returns: 

551 Dictionary representation. 

552 """ 

553 return { 

554 'name': self.name, 

555 'description': self.description, 

556 'data_mode': self.data_mode.value, 

557 'transaction_mode': self.transaction_mode.value, 

558 'main_network': self.main_network_name, 

559 'networks': list(self.networks.keys()), 

560 'resource_requirements': { 

561 k: list(v) if isinstance(v, set) else v 

562 for k, v in self.resource_requirements.items() 

563 }, 

564 'config': self.config, 

565 'metadata': self.metadata, 

566 'version': self.version 

567 } 

568 

569 @classmethod 

570 def from_dict(cls, data: Dict[str, Any]) -> 'FSM': 

571 """Create FSM from dictionary representation. 

572  

573 Args: 

574 data: Dictionary with FSM data. 

575  

576 Returns: 

577 New FSM instance. 

578 """ 

579 fsm = cls( 

580 name=data['name'], 

581 data_mode=ProcessingMode(data.get('data_mode', 'single')), 

582 transaction_mode=TransactionMode(data.get('transaction_mode', 'none')), 

583 description=data.get('description') 

584 ) 

585 

586 fsm.main_network_name = data.get('main_network') 

587 fsm.config = data.get('config', {}) 

588 fsm.metadata = data.get('metadata', {}) 

589 fsm.version = data.get('version', '1.0.0') 

590 

591 # Resource requirements 

592 for resource_type, requirements in data.get('resource_requirements', {}).items(): 

593 fsm.resource_requirements[resource_type] = set(requirements) 

594 

595 return fsm 

596 

597 def find_state_definition(self, state_name: str, network_name: str | None = None) -> StateDefinition | None: 

598 """Find a state definition by name. 

599  

600 Args: 

601 state_name: Name of the state to find 

602 network_name: Optional specific network to search in 

603  

604 Returns: 

605 StateDefinition if found, None otherwise 

606 """ 

607 if network_name: 

608 # Search specific network 

609 network = self.networks.get(network_name) 

610 if network and hasattr(network, 'states'): 

611 return network.states.get(state_name) 

612 else: 

613 # Search all networks 

614 for network in self.networks.values(): 

615 if hasattr(network, 'states') and state_name in network.states: 

616 return network.states[state_name] 

617 

618 return None 

619 

620 def create_state_instance(self, state_name: str, data: Dict[str, Any] | None = None, network_name: str | None = None) -> StateInstance: 

621 """Create a state instance from a state name. 

622  

623 Args: 

624 state_name: Name of the state 

625 data: Optional initial data for the state 

626 network_name: Optional specific network to search in 

627  

628 Returns: 

629 StateInstance object 

630 """ 

631 # Try to find existing state definition 

632 state_def = self.find_state_definition(state_name, network_name) 

633 

634 if not state_def: 

635 # Create minimal state definition 

636 state_def = StateDefinition( 

637 name=state_name, 

638 type=StateType.START if state_name in ['start', 'Start', 'START'] else StateType.NORMAL 

639 ) 

640 

641 # Create and return state instance 

642 return StateInstance( 

643 definition=state_def, 

644 data=data or {} 

645 ) 

646 

647 def get_state(self, state_name: str, network_name: str | None = None) -> StateDefinition | None: 

648 """Get a state definition by name. 

649  

650 This is an alias for find_state_definition for compatibility. 

651  

652 Args: 

653 state_name: Name of the state 

654 network_name: Optional specific network to search in 

655  

656 Returns: 

657 StateDefinition if found, None otherwise 

658 """ 

659 return self.find_state_definition(state_name, network_name) 

660 

661 def is_start_state(self, state_name: str, network_name: str | None = None) -> bool: 

662 """Check if a state is a start state. 

663  

664 Args: 

665 state_name: Name of the state 

666 network_name: Optional specific network to check in (defaults to main network) 

667  

668 Returns: 

669 True if the state is a start state 

670 """ 

671 network_name = network_name or self.main_network_name 

672 if network_name: 

673 network = self.networks.get(network_name) 

674 if network: 

675 return network.is_initial_state(state_name) 

676 return False 

677 

678 def is_end_state(self, state_name: str, network_name: str | None = None) -> bool: 

679 """Check if a state is an end state. 

680  

681 Args: 

682 state_name: Name of the state 

683 network_name: Optional specific network to check in (defaults to main network) 

684  

685 Returns: 

686 True if the state is an end state 

687 """ 

688 network_name = network_name or self.main_network_name 

689 if network_name: 

690 network = self.networks.get(network_name) 

691 if network: 

692 return network.is_final_state(state_name) 

693 return False 

694 

695 def get_start_state(self, network_name: str | None = None) -> StateDefinition | None: 

696 """Get the start state definition. 

697  

698 Args: 

699 network_name: Optional specific network to search in 

700  

701 Returns: 

702 Start state definition if found, None otherwise 

703 """ 

704 # If network specified, search that network 

705 if network_name: 

706 network = self.networks.get(network_name) 

707 if network and hasattr(network, 'states'): 

708 for state in network.states.values(): 

709 if (hasattr(state, 'is_start_state') and state.is_start_state()) or (hasattr(state, 'type') and state.type == StateType.START): 

710 return state 

711 else: 

712 # Search main network first 

713 if self.main_network_name: 

714 start_state = self.get_start_state(self.main_network_name) 

715 if start_state: 

716 return start_state 

717 

718 # Search all networks 

719 for network in self.networks.values(): 

720 if hasattr(network, 'states'): 

721 for state in network.states.values(): 

722 if (hasattr(state, 'is_start_state') and state.is_start_state()) or (hasattr(state, 'type') and state.type == StateType.START): 

723 return state 

724 

725 # Fallback: look for state named 'start' 

726 return self.find_state_definition('start', network_name) 

727 

728 @property 

729 def main_network(self) -> Optional['StateNetwork']: 

730 """Get the main network object. 

731  

732 Returns: 

733 The main StateNetwork object or None if not set 

734 """ 

735 if self.main_network_name: 

736 return self.networks.get(self.main_network_name) 

737 return None 

738 

739 @property 

740 def states(self) -> Dict[str, StateDefinition]: 

741 """Get all states from the main network. 

742  

743 Returns: 

744 Dictionary of state_name -> state_definition for the main network 

745 """ 

746 if not self.main_network_name: 

747 return {} 

748 

749 network = self.get_network(self.main_network_name) 

750 if network and hasattr(network, 'states'): 

751 return network.states 

752 return {} 

753 

754 def get_all_states_dict(self) -> Dict[str, Dict[str, StateDefinition]]: 

755 """Get all states from all networks. 

756  

757 Returns: 

758 Dictionary of network_name -> {state_name -> state_definition} 

759 """ 

760 all_states = {} 

761 for network_name, network in self.networks.items(): 

762 if hasattr(network, 'states'): 

763 all_states[network_name] = network.states 

764 return all_states 

765 

766 def get_outgoing_arcs(self, state_name: str, network_name: str | None = None) -> List[Any]: 

767 """Get outgoing arcs from a state. 

768  

769 Args: 

770 state_name: Name of the state 

771 network_name: Optional network name (uses main network if None) 

772  

773 Returns: 

774 List of outgoing arcs from the state 

775 """ 

776 network_name = network_name or self.main_network_name 

777 if not network_name: 

778 return [] 

779 

780 network = self.get_network(network_name) 

781 if network: 

782 return network.get_arcs_from_state(state_name) 

783 return [] 

784 

785 def get_engine(self, strategy: str | None = None) -> "ExecutionEngine": 

786 """Get or create the execution engine. 

787 

788 Args: 

789 strategy: Optional execution strategy override 

790 

791 Returns: 

792 ExecutionEngine instance. 

793 """ 

794 if self._engine is None: 

795 from dataknobs_fsm.execution.engine import ExecutionEngine, TraversalStrategy 

796 

797 # Map strategy strings to enum 

798 strategy_map = { 

799 "depth_first": TraversalStrategy.DEPTH_FIRST, 

800 "breadth_first": TraversalStrategy.BREADTH_FIRST, 

801 "resource_optimized": TraversalStrategy.RESOURCE_OPTIMIZED, 

802 "stream_optimized": TraversalStrategy.STREAM_OPTIMIZED, 

803 } 

804 

805 strat = TraversalStrategy.DEPTH_FIRST # Default 

806 if strategy and strategy in strategy_map: 

807 strat = strategy_map[strategy] 

808 

809 self._engine = ExecutionEngine( 

810 fsm=self, 

811 strategy=strat, 

812 ) 

813 

814 return self._engine 

815 

816 def get_async_engine(self, strategy: str | None = None) -> "AsyncExecutionEngine": 

817 """Get or create the async execution engine. 

818 

819 Args: 

820 strategy: Optional execution strategy override 

821 

822 Returns: 

823 AsyncExecutionEngine instance. 

824 """ 

825 if self._async_engine is None: 

826 from dataknobs_fsm.execution.async_engine import AsyncExecutionEngine 

827 

828 self._async_engine = AsyncExecutionEngine(fsm=self) 

829 

830 return self._async_engine 

831 

832 def _prepare_execution_context(self, initial_data: Dict[str, Any] | None = None): 

833 """Prepare execution context for FSM execution. 

834  

835 Args: 

836 initial_data: Initial data for execution. 

837  

838 Returns: 

839 Configured ExecutionContext instance. 

840 """ 

841 from dataknobs_fsm.execution.context import ExecutionContext 

842 from dataknobs_fsm.streaming.core import StreamContext, StreamConfig 

843 

844 # Create execution context 

845 context = ExecutionContext( 

846 data_mode=self.data_mode, 

847 transaction_mode=self.transaction_mode 

848 ) 

849 

850 # Set resource and transaction managers if available 

851 if self.resource_manager: 

852 context.resource_manager = self.resource_manager 

853 if self.transaction_manager: 

854 context.transaction_manager = self.transaction_manager 

855 

856 # Set up context based on data mode 

857 if self.data_mode == ProcessingMode.BATCH: 

858 # For batch mode, treat input as batch data 

859 if initial_data is not None: 

860 # If it's not already a list, make it one 

861 if not isinstance(initial_data, list): # type: ignore[unreachable] 

862 context.batch_data = [initial_data] 

863 else: 

864 context.batch_data = initial_data # type: ignore[unreachable] 

865 else: 

866 context.batch_data = [] 

867 elif self.data_mode == ProcessingMode.STREAM: 

868 # For stream mode, create a stream context 

869 stream_config = StreamConfig() 

870 context.stream_context = StreamContext(config=stream_config) 

871 

872 # Add initial data as a chunk if provided 

873 if initial_data is not None: 

874 # Add the data as a single chunk to the stream 

875 context.stream_context.add_data(initial_data, is_last=True) 

876 # Also set context.data for compatibility 

877 context.data = initial_data 

878 else: 

879 # Single mode - data passed normally 

880 pass 

881 

882 return context 

883 

884 def _format_execution_result(self, success: bool, result: Any, context: Any, 

885 duration: float, initial_data: Any = None, 

886 error: str | None = None) -> Dict[str, Any]: 

887 """Format the execution result in a standard format. 

888  

889 Args: 

890 success: Whether execution succeeded. 

891 result: The execution result data. 

892 context: The execution context. 

893 duration: Time taken for execution. 

894 initial_data: Original input data. 

895 error: Error message if execution failed. 

896  

897 Returns: 

898 Formatted result dictionary. 

899 """ 

900 if error: 

901 return { 

902 "status": "error", 

903 "error": error, 

904 "data": initial_data, 

905 "execution_id": None, 

906 "transitions": 0, 

907 "duration": None 

908 } 

909 

910 return { 

911 "status": "completed" if success else "failed", 

912 "data": result, 

913 "execution_id": getattr(context, 'execution_id', None), 

914 "transitions": getattr(context, 'transition_count', 0), 

915 "duration": duration 

916 } 

917 

918 async def execute_async(self, initial_data: Dict[str, Any] | None = None) -> Any: 

919 """Execute the FSM asynchronously with initial data. 

920  

921 Args: 

922 initial_data: Initial data for execution. 

923  

924 Returns: 

925 Execution result. 

926 """ 

927 import time 

928 

929 try: 

930 # Get the async execution engine 

931 engine = self.get_async_engine() 

932 

933 # Prepare execution context 

934 context = self._prepare_execution_context(initial_data) 

935 

936 # Track execution time 

937 start_time = time.time() 

938 

939 # Execute the FSM 

940 success, result = await engine.execute( 

941 context, 

942 initial_data if self.data_mode == ProcessingMode.SINGLE else None 

943 ) 

944 

945 # Calculate duration 

946 duration = time.time() - start_time 

947 

948 return self._format_execution_result(success, result, context, duration) 

949 

950 except Exception as e: 

951 # Handle any exception that occurs during execution 

952 return self._format_execution_result( 

953 False, None, None, 0.0, initial_data, str(e) 

954 ) 

955 

956 def execute(self, initial_data: Dict[str, Any] | None = None) -> Any: 

957 """Execute the FSM synchronously with initial data. 

958  

959 This is a simplified API for running the FSM. 

960  

961 Args: 

962 initial_data: Initial data for execution. 

963  

964 Returns: 

965 Execution result. 

966 """ 

967 import time 

968 

969 try: 

970 # Get the execution engine 

971 engine = self.get_engine() 

972 

973 # Prepare execution context 

974 context = self._prepare_execution_context(initial_data) 

975 

976 # Track execution time 

977 start_time = time.time() 

978 

979 # Execute the FSM 

980 success, result = engine.execute( 

981 context, 

982 initial_data if self.data_mode == ProcessingMode.SINGLE else None 

983 ) 

984 

985 # Calculate duration 

986 duration = time.time() - start_time 

987 

988 return self._format_execution_result(success, result, context, duration) 

989 

990 except Exception as e: 

991 # Handle any exception that occurs during execution 

992 return self._format_execution_result( 

993 False, None, None, 0.0, initial_data, str(e) 

994 )