Coverage for src/dataknobs_fsm/core/fsm.py: 15%
248 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
1"""Core FSM class for managing state machines.
3This module provides the core FSM class that serves as the foundation for all
4dataknobs-fsm functionality. The FSM class manages state networks, function
5registries, data modes, transactions, and resources.
7Architecture:
8 The FSM class is the central orchestrator in the dataknobs-fsm architecture:
10 **Responsibilities:**
11 - Network Management: Manages one or more state networks with transitions
12 - Function Registry: Maintains registered functions for state operations
13 - Data Mode Control: Configures how data flows through states (COPY/REFERENCE/DIRECT)
14 - Transaction Management: Coordinates transactional state changes
15 - Resource Management: Tracks and manages resource requirements
16 - Execution Engines: Creates and manages sync/async execution engines
18 **Design Philosophy:**
19 - Separation of Concerns: FSM defines structure, engines handle execution
20 - Multiple Networks: Support complex workflows with multiple state graphs
21 - Configurable Processing: Data modes adapt to different use cases
22 - Resource Awareness: Track dependencies before execution
24 **Layer Architecture:**
25 ```
26 API Layer (simple/async_simple/advanced)
27 ↓
28 Core FSM (this module) ← defines structure
29 ↓
30 Execution Engines ← handle runtime
31 ↓
32 State Network ← graph of states/transitions
33 ```
35Processing Modes:
36 The FSM supports three data processing modes:
38 **SINGLE Mode (ProcessingMode.SINGLE):**
39 - Process one data item at a time
40 - Simplest execution model
41 - Data flows through state network sequentially
42 - Best for: Scripts, simple pipelines, prototypes
44 **BATCH Mode (ProcessingMode.BATCH):**
45 - Process multiple items as a batch
46 - All items go through same state sequence
47 - Can parallelize within batch
48 - Best for: High-throughput processing, bulk operations
50 **STREAM Mode (ProcessingMode.STREAM):**
51 - Process data as streaming chunks
52 - Memory-efficient for large datasets
53 - Supports backpressure and flow control
54 - Best for: Large files, real-time data, memory-constrained environments
56Transaction Modes:
57 The FSM supports configurable transaction handling:
59 **NONE (TransactionMode.NONE):**
60 - No transactional guarantees
61 - State changes are immediate and permanent
62 - Fastest performance
63 - Best for: Read-only operations, idempotent workflows
65 **OPTIMISTIC (TransactionMode.OPTIMISTIC):**
66 - Changes committed at workflow end
67 - Rollback on failure
68 - Moderate performance overhead
69 - Best for: Most production workflows
71 **PESSIMISTIC (TransactionMode.PESSIMISTIC):**
72 - Changes committed at each state
73 - Highest consistency guarantees
74 - Higher performance overhead
75 - Best for: Critical workflows, strict consistency requirements
77State Networks:
78 An FSM can contain multiple state networks:
80 **Main Network:**
81 - Primary execution path
82 - Default for most operations
83 - Set via is_main parameter or first network added
85 **Sub-Networks:**
86 - Additional state graphs for modular workflows
87 - Can represent sub-processes or alternate paths
88 - Accessed by name
90 **Network Operations:**
91 - add_network(): Add a network to the FSM
92 - remove_network(): Remove a network by name
93 - get_network(): Get network by name (None = main network)
95Function Registry:
96 The FSM maintains a registry of functions used in state operations:
98 **Function Types:**
99 - Validation Functions: Check data validity before/after state entry
100 - Transform Functions: Modify data during state processing
101 - Test Functions: Determine state transitions
102 - Pre/Post Hooks: Execute before/after state operations
104 **Registration:**
105 - Functions registered via function_registry.register()
106 - Referenced by name in state/arc definitions
107 - Validated during FSM.validate()
109Resource Management:
110 The FSM tracks resource requirements across all networks:
112 **Resource Types:**
113 - Database connections
114 - API clients
115 - File handles
116 - External service connections
117 - Memory allocations
119 **Resource Tracking:**
120 - Aggregated from all networks
121 - Available via resource_requirements dict
122 - Summary via get_resource_summary()
123 - Managers set via resource_manager/transaction_manager
125Execution Engines:
126 The FSM creates execution engines on demand:
128 **Synchronous Engine (ExecutionEngine):**
129 - Created via get_engine()
130 - Used by SimpleFSM API
131 - Supports multiple traversal strategies:
132 - DEPTH_FIRST: Deep exploration before backtracking
133 - BREADTH_FIRST: Explore all neighbors before going deeper
134 - RESOURCE_OPTIMIZED: Minimize resource acquisition/release
135 - STREAM_OPTIMIZED: Optimize for streaming workflows
137 **Asynchronous Engine (AsyncExecutionEngine):**
138 - Created via get_async_engine()
139 - Used by AsyncSimpleFSM API
140 - Supports concurrent state execution
141 - Efficient I/O handling
143Validation:
144 The FSM can validate its structure before execution:
146 **Checks Performed:**
147 - At least one network exists
148 - Main network exists and is valid
149 - All networks are internally valid
150 - All referenced functions are registered
151 - No dangling references
153 **Usage:**
154 ```python
155 valid, errors = fsm.validate()
156 if not valid:
157 for error in errors:
158 print(f"Validation error: {error}")
159 ```
161Note:
162 This is an internal API primarily used by the builder and API layers.
163 Users typically interact with SimpleFSM, AsyncSimpleFSM, or AdvancedFSM
164 rather than instantiating FSM directly.
166 The FSM class is designed to be serializable (to_dict/from_dict) for
167 persistence and transmission.
169See Also:
170 - :class:`~dataknobs_fsm.api.simple.SimpleFSM`: Synchronous API wrapper
171 - :class:`~dataknobs_fsm.api.async_simple.AsyncSimpleFSM`: Async API wrapper
172 - :class:`~dataknobs_fsm.api.advanced.AdvancedFSM`: Advanced debugging API
173 - :class:`~dataknobs_fsm.core.network.StateNetwork`: State graph container
174 - :class:`~dataknobs_fsm.execution.engine.ExecutionEngine`: Sync execution engine
175 - :class:`~dataknobs_fsm.execution.async_engine.AsyncExecutionEngine`: Async execution engine
176"""
178from typing import Any, Dict, List, Set, Tuple, Optional, TYPE_CHECKING
180from dataknobs_fsm.core.modes import ProcessingMode, TransactionMode
182if TYPE_CHECKING:
183 from dataknobs_fsm.execution.engine import ExecutionEngine
184 from dataknobs_fsm.execution.async_engine import AsyncExecutionEngine
185from dataknobs_fsm.core.network import StateNetwork
186from dataknobs_fsm.core.state import StateDefinition, StateInstance, StateType
187from dataknobs_fsm.functions.base import FunctionRegistry
190class FSM:
191 """Finite State Machine core class.
193 This is the foundational class that defines the structure and configuration
194 of a finite state machine. It serves as the container for state networks,
195 functions, and execution configuration.
197 Architecture:
198 The FSM class uses a builder pattern approach where:
199 1. FSM is constructed with configuration
200 2. Networks are added with states and transitions
201 3. Functions are registered for state operations
202 4. FSM is validated before execution
203 5. Execution engines are created on-demand
205 Key Components:
206 **State Networks:**
207 - One or more state graphs (nodes=states, edges=transitions)
208 - Main network for primary execution path
209 - Sub-networks for modular workflows
210 - Accessed via networks dict or get_network()
212 **Function Registry:**
213 - Central registry of all functions used in workflows
214 - Validation, transform, and test functions
215 - Referenced by name in state/arc definitions
216 - Ensures all function references are valid
218 **Processing Configuration:**
219 - data_mode: How data flows (SINGLE/BATCH/STREAM)
220 - transaction_mode: Transaction guarantees (NONE/OPTIMISTIC/PESSIMISTIC)
221 - resource_manager: Manages external resources
222 - transaction_manager: Coordinates transactions
224 **Execution Engines:**
225 - Created lazily via get_engine() / get_async_engine()
226 - Sync engine for SimpleFSM
227 - Async engine for AsyncSimpleFSM
228 - Cached for reuse
230 Attributes:
231 name (str): Unique identifier for this FSM
232 data_mode (ProcessingMode): Data processing mode (SINGLE/BATCH/STREAM)
233 transaction_mode (TransactionMode): Transaction handling (NONE/OPTIMISTIC/PESSIMISTIC)
234 description (str | None): Optional FSM description
235 networks (Dict[str, StateNetwork]): State networks by name
236 main_network_name (str | None): Name of the main network
237 function_registry (FunctionRegistry): Registry of all functions
238 resource_requirements (Dict[str, Any]): Aggregated resource requirements
239 config (Dict[str, Any]): Additional configuration
240 metadata (Dict[str, Any]): FSM metadata
241 version (str): FSM version for compatibility
242 created_at (float | None): Creation timestamp
243 updated_at (float | None): Last update timestamp
244 resource_manager (Any | None): External resource manager
245 transaction_manager (Any | None): Transaction coordinator
247 Design Patterns:
248 **Separation of Concerns:**
249 - FSM defines structure (what to execute)
250 - Engines handle execution (how to execute)
251 - Clear boundary between definition and runtime
253 **Multiple Networks:**
254 - FSMs can contain multiple state graphs
255 - Enables modular workflow composition
256 - Sub-networks can be reused across FSMs
258 **Lazy Initialization:**
259 - Engines created on first use
260 - Reduces overhead for validation-only use cases
261 - Cached for subsequent use
263 **Validation Before Execution:**
264 - validate() checks structure before execution
265 - Catches configuration errors early
266 - Provides detailed error messages
268 Resource Management:
269 The FSM aggregates resource requirements from all networks:
270 - Database connections needed
271 - API clients required
272 - File handles expected
273 - Memory requirements
275 This enables:
276 - Pre-execution resource checks
277 - Resource pooling optimization
278 - Clear dependency documentation
280 Serialization:
281 FSM supports serialization for persistence and transmission:
282 - to_dict(): Convert to dictionary
283 - from_dict(): Reconstruct from dictionary
284 - Note: Function registry not serialized (functions must be re-registered)
286 Thread Safety:
287 The FSM structure itself is read-only after construction and validation.
288 However, execution engines may or may not be thread-safe depending on
289 data_mode:
290 - SINGLE mode: Not thread-safe (by design)
291 - BATCH mode: Thread-safe if using DataHandlingMode.COPY
292 - STREAM mode: Thread-safe with proper streaming context
294 Note:
295 This is an internal API. Users should use the higher-level APIs:
296 - SimpleFSM for synchronous workflows
297 - AsyncSimpleFSM for async/production workflows
298 - AdvancedFSM for debugging and profiling
300 Direct FSM instantiation is primarily for builders and internal use.
302 See Also:
303 - :class:`~dataknobs_fsm.api.simple.SimpleFSM`: Synchronous API
304 - :class:`~dataknobs_fsm.api.async_simple.AsyncSimpleFSM`: Async API
305 - :class:`~dataknobs_fsm.core.network.StateNetwork`: State graph
306 - :class:`~dataknobs_fsm.functions.base.FunctionRegistry`: Function registry
307 """
309 def __init__(
310 self,
311 name: str,
312 data_mode: ProcessingMode = ProcessingMode.SINGLE,
313 transaction_mode: TransactionMode = TransactionMode.NONE,
314 description: str | None = None,
315 resource_manager: Any | None = None,
316 transaction_manager: Any | None = None
317 ):
318 """Initialize FSM.
320 Args:
321 name: Name of the FSM.
322 data_mode: Data processing mode.
323 transaction_mode: Transaction handling mode.
324 description: Optional FSM description.
325 """
326 self.name = name
327 self.data_mode = data_mode
328 self.transaction_mode = transaction_mode
329 self.description = description
331 # Networks
332 self.networks: Dict[str, StateNetwork] = {}
333 self.main_network_name: str | None = None
335 # Function registry
336 self.function_registry = FunctionRegistry()
338 # Resource requirements
339 self.resource_requirements: Dict[str, Any] = {}
341 # Configuration
342 self.config: Dict[str, Any] = {}
344 # Metadata
345 self.metadata: Dict[str, Any] = {}
346 self.version: str = "1.0.0"
347 self.created_at: float | None = None
348 self.updated_at: float | None = None
350 # Execution support (from builder FSM wrapper)
351 self.resource_manager = resource_manager
352 self.transaction_manager = transaction_manager
353 self._engine: Any | None = None # ExecutionEngine
354 self._async_engine: Any | None = None # AsyncExecutionEngine
356 def add_network(
357 self,
358 network: StateNetwork,
359 is_main: bool = False
360 ) -> None:
361 """Add a network to the FSM.
363 Args:
364 network: Network to add.
365 is_main: Whether this is the main network.
366 """
367 self.networks[network.name] = network
369 if is_main or self.main_network_name is None:
370 self.main_network_name = network.name
372 # Aggregate resource requirements
373 for resource_type, requirements in network.resource_requirements.items():
374 if resource_type not in self.resource_requirements:
375 self.resource_requirements[resource_type] = set()
376 self.resource_requirements[resource_type].update(requirements)
378 def remove_network(self, network_name: str) -> bool:
379 """Remove a network from the FSM.
381 Args:
382 network_name: Name of network to remove.
384 Returns:
385 True if removed successfully.
386 """
387 if network_name in self.networks:
388 del self.networks[network_name]
390 # Update main network if needed
391 if self.main_network_name == network_name:
392 if self.networks:
393 self.main_network_name = next(iter(self.networks.keys()))
394 else:
395 self.main_network_name = None
397 return True
398 return False
400 def get_network(self, network_name: str | None = None) -> StateNetwork | None:
401 """Get a network by name.
403 Args:
404 network_name: Name of network (None for main network).
406 Returns:
407 Network or None if not found.
408 """
409 if network_name is None:
410 network_name = self.main_network_name
412 if network_name:
413 return self.networks.get(network_name)
414 return None
416 def validate(self) -> Tuple[bool, List[str]]:
417 """Validate the FSM.
419 Returns:
420 Tuple of (valid, list of errors).
421 """
422 errors = []
424 # Check for at least one network
425 if not self.networks:
426 errors.append("FSM has no networks")
428 # Check main network exists
429 if self.main_network_name and self.main_network_name not in self.networks:
430 errors.append(f"Main network '{self.main_network_name}' not found")
432 # Validate each network
433 for network_name, network in self.networks.items():
434 valid, network_errors = network.validate()
435 if not valid:
436 for error in network_errors:
437 errors.append(f"Network '{network_name}': {error}")
439 # Check function references
440 all_functions = self._get_all_function_references()
441 for func_name in all_functions:
442 if not self.function_registry.get_function(func_name):
443 errors.append(f"Function '{func_name}' not registered")
445 return len(errors) == 0, errors
447 def _get_all_function_references(self) -> Set[str]:
448 """Get all function references from all networks.
450 Returns:
451 Set of function names referenced.
452 """
453 functions = set()
455 for network in self.networks.values():
456 for arc in network.arcs.values():
457 if hasattr(arc, 'pre_test') and arc.pre_test:
458 functions.add(arc.pre_test)
459 if hasattr(arc, 'transform') and arc.transform:
460 functions.add(arc.transform)
462 return functions
464 def get_all_states(self) -> Dict[str, List[str]]:
465 """Get all states from all networks.
467 Returns:
468 Dictionary of network_name -> list of state names.
469 """
470 all_states = {}
472 for network_name, network in self.networks.items():
473 all_states[network_name] = list(network.states.keys())
475 return all_states
477 def get_all_arcs(self) -> Dict[str, List[str]]:
478 """Get all arcs from all networks.
480 Returns:
481 Dictionary of network_name -> list of arc IDs.
482 """
483 all_arcs = {}
485 for network_name, network in self.networks.items():
486 all_arcs[network_name] = list(network.arcs.keys())
488 return all_arcs
490 def supports_streaming(self) -> bool:
491 """Check if FSM supports streaming.
493 Returns:
494 True if any network supports streaming.
495 """
496 return any(network.supports_streaming for network in self.networks.values())
498 def get_resource_summary(self) -> Dict[str, Any]:
499 """Get resource requirements summary.
501 Returns:
502 Resource requirements summary.
503 """
504 summary = {
505 'total_networks': len(self.networks),
506 'total_states': sum(len(n.states) for n in self.networks.values()),
507 'total_arcs': sum(len(n.arcs) for n in self.networks.values()),
508 'resource_types': list(self.resource_requirements.keys()),
509 'supports_streaming': self.supports_streaming(),
510 'data_mode': self.data_mode.value,
511 'transaction_mode': self.transaction_mode.value
512 }
514 # Add resource counts
515 for resource_type, requirements in self.resource_requirements.items():
516 summary[f'{resource_type}_count'] = len(requirements)
518 return summary
520 def clone(self) -> 'FSM':
521 """Create a clone of this FSM.
523 Returns:
524 Cloned FSM.
525 """
526 clone = FSM(
527 name=f"{self.name}_clone",
528 data_mode=self.data_mode,
529 transaction_mode=self.transaction_mode,
530 description=self.description
531 )
533 # Clone networks
534 for network_name, network in self.networks.items():
535 # Note: This is a shallow copy - for deep clone would need to implement network.clone()
536 clone.networks[network_name] = network
538 clone.main_network_name = self.main_network_name
539 clone.function_registry = self.function_registry
540 clone.resource_requirements = self.resource_requirements.copy()
541 clone.config = self.config.copy()
542 clone.metadata = self.metadata.copy()
543 clone.version = self.version
545 return clone
547 def to_dict(self) -> Dict[str, Any]:
548 """Convert FSM to dictionary representation.
550 Returns:
551 Dictionary representation.
552 """
553 return {
554 'name': self.name,
555 'description': self.description,
556 'data_mode': self.data_mode.value,
557 'transaction_mode': self.transaction_mode.value,
558 'main_network': self.main_network_name,
559 'networks': list(self.networks.keys()),
560 'resource_requirements': {
561 k: list(v) if isinstance(v, set) else v
562 for k, v in self.resource_requirements.items()
563 },
564 'config': self.config,
565 'metadata': self.metadata,
566 'version': self.version
567 }
569 @classmethod
570 def from_dict(cls, data: Dict[str, Any]) -> 'FSM':
571 """Create FSM from dictionary representation.
573 Args:
574 data: Dictionary with FSM data.
576 Returns:
577 New FSM instance.
578 """
579 fsm = cls(
580 name=data['name'],
581 data_mode=ProcessingMode(data.get('data_mode', 'single')),
582 transaction_mode=TransactionMode(data.get('transaction_mode', 'none')),
583 description=data.get('description')
584 )
586 fsm.main_network_name = data.get('main_network')
587 fsm.config = data.get('config', {})
588 fsm.metadata = data.get('metadata', {})
589 fsm.version = data.get('version', '1.0.0')
591 # Resource requirements
592 for resource_type, requirements in data.get('resource_requirements', {}).items():
593 fsm.resource_requirements[resource_type] = set(requirements)
595 return fsm
597 def find_state_definition(self, state_name: str, network_name: str | None = None) -> StateDefinition | None:
598 """Find a state definition by name.
600 Args:
601 state_name: Name of the state to find
602 network_name: Optional specific network to search in
604 Returns:
605 StateDefinition if found, None otherwise
606 """
607 if network_name:
608 # Search specific network
609 network = self.networks.get(network_name)
610 if network and hasattr(network, 'states'):
611 return network.states.get(state_name)
612 else:
613 # Search all networks
614 for network in self.networks.values():
615 if hasattr(network, 'states') and state_name in network.states:
616 return network.states[state_name]
618 return None
620 def create_state_instance(self, state_name: str, data: Dict[str, Any] | None = None, network_name: str | None = None) -> StateInstance:
621 """Create a state instance from a state name.
623 Args:
624 state_name: Name of the state
625 data: Optional initial data for the state
626 network_name: Optional specific network to search in
628 Returns:
629 StateInstance object
630 """
631 # Try to find existing state definition
632 state_def = self.find_state_definition(state_name, network_name)
634 if not state_def:
635 # Create minimal state definition
636 state_def = StateDefinition(
637 name=state_name,
638 type=StateType.START if state_name in ['start', 'Start', 'START'] else StateType.NORMAL
639 )
641 # Create and return state instance
642 return StateInstance(
643 definition=state_def,
644 data=data or {}
645 )
647 def get_state(self, state_name: str, network_name: str | None = None) -> StateDefinition | None:
648 """Get a state definition by name.
650 This is an alias for find_state_definition for compatibility.
652 Args:
653 state_name: Name of the state
654 network_name: Optional specific network to search in
656 Returns:
657 StateDefinition if found, None otherwise
658 """
659 return self.find_state_definition(state_name, network_name)
661 def is_start_state(self, state_name: str, network_name: str | None = None) -> bool:
662 """Check if a state is a start state.
664 Args:
665 state_name: Name of the state
666 network_name: Optional specific network to check in (defaults to main network)
668 Returns:
669 True if the state is a start state
670 """
671 network_name = network_name or self.main_network_name
672 if network_name:
673 network = self.networks.get(network_name)
674 if network:
675 return network.is_initial_state(state_name)
676 return False
678 def is_end_state(self, state_name: str, network_name: str | None = None) -> bool:
679 """Check if a state is an end state.
681 Args:
682 state_name: Name of the state
683 network_name: Optional specific network to check in (defaults to main network)
685 Returns:
686 True if the state is an end state
687 """
688 network_name = network_name or self.main_network_name
689 if network_name:
690 network = self.networks.get(network_name)
691 if network:
692 return network.is_final_state(state_name)
693 return False
695 def get_start_state(self, network_name: str | None = None) -> StateDefinition | None:
696 """Get the start state definition.
698 Args:
699 network_name: Optional specific network to search in
701 Returns:
702 Start state definition if found, None otherwise
703 """
704 # If network specified, search that network
705 if network_name:
706 network = self.networks.get(network_name)
707 if network and hasattr(network, 'states'):
708 for state in network.states.values():
709 if (hasattr(state, 'is_start_state') and state.is_start_state()) or (hasattr(state, 'type') and state.type == StateType.START):
710 return state
711 else:
712 # Search main network first
713 if self.main_network_name:
714 start_state = self.get_start_state(self.main_network_name)
715 if start_state:
716 return start_state
718 # Search all networks
719 for network in self.networks.values():
720 if hasattr(network, 'states'):
721 for state in network.states.values():
722 if (hasattr(state, 'is_start_state') and state.is_start_state()) or (hasattr(state, 'type') and state.type == StateType.START):
723 return state
725 # Fallback: look for state named 'start'
726 return self.find_state_definition('start', network_name)
728 @property
729 def main_network(self) -> Optional['StateNetwork']:
730 """Get the main network object.
732 Returns:
733 The main StateNetwork object or None if not set
734 """
735 if self.main_network_name:
736 return self.networks.get(self.main_network_name)
737 return None
739 @property
740 def states(self) -> Dict[str, StateDefinition]:
741 """Get all states from the main network.
743 Returns:
744 Dictionary of state_name -> state_definition for the main network
745 """
746 if not self.main_network_name:
747 return {}
749 network = self.get_network(self.main_network_name)
750 if network and hasattr(network, 'states'):
751 return network.states
752 return {}
754 def get_all_states_dict(self) -> Dict[str, Dict[str, StateDefinition]]:
755 """Get all states from all networks.
757 Returns:
758 Dictionary of network_name -> {state_name -> state_definition}
759 """
760 all_states = {}
761 for network_name, network in self.networks.items():
762 if hasattr(network, 'states'):
763 all_states[network_name] = network.states
764 return all_states
766 def get_outgoing_arcs(self, state_name: str, network_name: str | None = None) -> List[Any]:
767 """Get outgoing arcs from a state.
769 Args:
770 state_name: Name of the state
771 network_name: Optional network name (uses main network if None)
773 Returns:
774 List of outgoing arcs from the state
775 """
776 network_name = network_name or self.main_network_name
777 if not network_name:
778 return []
780 network = self.get_network(network_name)
781 if network:
782 return network.get_arcs_from_state(state_name)
783 return []
785 def get_engine(self, strategy: str | None = None) -> "ExecutionEngine":
786 """Get or create the execution engine.
788 Args:
789 strategy: Optional execution strategy override
791 Returns:
792 ExecutionEngine instance.
793 """
794 if self._engine is None:
795 from dataknobs_fsm.execution.engine import ExecutionEngine, TraversalStrategy
797 # Map strategy strings to enum
798 strategy_map = {
799 "depth_first": TraversalStrategy.DEPTH_FIRST,
800 "breadth_first": TraversalStrategy.BREADTH_FIRST,
801 "resource_optimized": TraversalStrategy.RESOURCE_OPTIMIZED,
802 "stream_optimized": TraversalStrategy.STREAM_OPTIMIZED,
803 }
805 strat = TraversalStrategy.DEPTH_FIRST # Default
806 if strategy and strategy in strategy_map:
807 strat = strategy_map[strategy]
809 self._engine = ExecutionEngine(
810 fsm=self,
811 strategy=strat,
812 )
814 return self._engine
816 def get_async_engine(self, strategy: str | None = None) -> "AsyncExecutionEngine":
817 """Get or create the async execution engine.
819 Args:
820 strategy: Optional execution strategy override
822 Returns:
823 AsyncExecutionEngine instance.
824 """
825 if self._async_engine is None:
826 from dataknobs_fsm.execution.async_engine import AsyncExecutionEngine
828 self._async_engine = AsyncExecutionEngine(fsm=self)
830 return self._async_engine
832 def _prepare_execution_context(self, initial_data: Dict[str, Any] | None = None):
833 """Prepare execution context for FSM execution.
835 Args:
836 initial_data: Initial data for execution.
838 Returns:
839 Configured ExecutionContext instance.
840 """
841 from dataknobs_fsm.execution.context import ExecutionContext
842 from dataknobs_fsm.streaming.core import StreamContext, StreamConfig
844 # Create execution context
845 context = ExecutionContext(
846 data_mode=self.data_mode,
847 transaction_mode=self.transaction_mode
848 )
850 # Set resource and transaction managers if available
851 if self.resource_manager:
852 context.resource_manager = self.resource_manager
853 if self.transaction_manager:
854 context.transaction_manager = self.transaction_manager
856 # Set up context based on data mode
857 if self.data_mode == ProcessingMode.BATCH:
858 # For batch mode, treat input as batch data
859 if initial_data is not None:
860 # If it's not already a list, make it one
861 if not isinstance(initial_data, list): # type: ignore[unreachable]
862 context.batch_data = [initial_data]
863 else:
864 context.batch_data = initial_data # type: ignore[unreachable]
865 else:
866 context.batch_data = []
867 elif self.data_mode == ProcessingMode.STREAM:
868 # For stream mode, create a stream context
869 stream_config = StreamConfig()
870 context.stream_context = StreamContext(config=stream_config)
872 # Add initial data as a chunk if provided
873 if initial_data is not None:
874 # Add the data as a single chunk to the stream
875 context.stream_context.add_data(initial_data, is_last=True)
876 # Also set context.data for compatibility
877 context.data = initial_data
878 else:
879 # Single mode - data passed normally
880 pass
882 return context
884 def _format_execution_result(self, success: bool, result: Any, context: Any,
885 duration: float, initial_data: Any = None,
886 error: str | None = None) -> Dict[str, Any]:
887 """Format the execution result in a standard format.
889 Args:
890 success: Whether execution succeeded.
891 result: The execution result data.
892 context: The execution context.
893 duration: Time taken for execution.
894 initial_data: Original input data.
895 error: Error message if execution failed.
897 Returns:
898 Formatted result dictionary.
899 """
900 if error:
901 return {
902 "status": "error",
903 "error": error,
904 "data": initial_data,
905 "execution_id": None,
906 "transitions": 0,
907 "duration": None
908 }
910 return {
911 "status": "completed" if success else "failed",
912 "data": result,
913 "execution_id": getattr(context, 'execution_id', None),
914 "transitions": getattr(context, 'transition_count', 0),
915 "duration": duration
916 }
918 async def execute_async(self, initial_data: Dict[str, Any] | None = None) -> Any:
919 """Execute the FSM asynchronously with initial data.
921 Args:
922 initial_data: Initial data for execution.
924 Returns:
925 Execution result.
926 """
927 import time
929 try:
930 # Get the async execution engine
931 engine = self.get_async_engine()
933 # Prepare execution context
934 context = self._prepare_execution_context(initial_data)
936 # Track execution time
937 start_time = time.time()
939 # Execute the FSM
940 success, result = await engine.execute(
941 context,
942 initial_data if self.data_mode == ProcessingMode.SINGLE else None
943 )
945 # Calculate duration
946 duration = time.time() - start_time
948 return self._format_execution_result(success, result, context, duration)
950 except Exception as e:
951 # Handle any exception that occurs during execution
952 return self._format_execution_result(
953 False, None, None, 0.0, initial_data, str(e)
954 )
956 def execute(self, initial_data: Dict[str, Any] | None = None) -> Any:
957 """Execute the FSM synchronously with initial data.
959 This is a simplified API for running the FSM.
961 Args:
962 initial_data: Initial data for execution.
964 Returns:
965 Execution result.
966 """
967 import time
969 try:
970 # Get the execution engine
971 engine = self.get_engine()
973 # Prepare execution context
974 context = self._prepare_execution_context(initial_data)
976 # Track execution time
977 start_time = time.time()
979 # Execute the FSM
980 success, result = engine.execute(
981 context,
982 initial_data if self.data_mode == ProcessingMode.SINGLE else None
983 )
985 # Calculate duration
986 duration = time.time() - start_time
988 return self._format_execution_result(success, result, context, duration)
990 except Exception as e:
991 # Handle any exception that occurs during execution
992 return self._format_execution_result(
993 False, None, None, 0.0, initial_data, str(e)
994 )