Coverage for src/dataknobs_fsm/config/builder.py: 10%
299 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
« prev ^ index » next coverage.py v7.11.0, created at 2025-11-08 14:11 -0700
1"""FSM builder for constructing FSM instances from configuration.
3This module provides the FSMBuilder class that constructs executable FSM
4instances from configuration objects, including:
5- Resource registration and initialization
6- Function resolution and registration
7- Network and state construction
8- Validation of completeness
9"""
11import importlib
12from typing import Any, Callable, Dict, List, Type
15from dataknobs_fsm.config.schema import (
16 ArcConfig,
17 FSMConfig,
18 FunctionReference,
19 NetworkConfig,
20 PushArcConfig,
21 ResourceConfig,
22 ResourceType,
23 StateConfig,
24)
25from dataknobs_fsm.core.arc import ArcDefinition, PushArc
26from dataknobs_fsm.core.data_modes import DataHandler, DataHandlingMode, get_data_handler
27from dataknobs_fsm.core.network import StateNetwork
28from dataknobs_fsm.core.state import StateDefinition, StateType
29from dataknobs_fsm.core.transactions import (
30 TransactionManager,
31 TransactionStrategy,
32 SingleTransactionManager,
33 BatchTransactionManager,
34 ManualTransactionManager,
35)
36from dataknobs_fsm.core.fsm import FSM as CoreFSMClass # noqa: N811
37from dataknobs_fsm.execution.context import ExecutionContext
38from dataknobs_fsm.functions.base import (
39 IResource,
40 IStateTestFunction,
41 ITransformFunction,
42 IValidationFunction,
43)
44from dataknobs_fsm.resources.manager import ResourceManager
45from dataknobs_fsm.functions.manager import (
46 FunctionManager,
47 FunctionSource
48)
51class FSMBuilder:
52 """Build executable FSM instances from configuration."""
54 def __init__(self):
55 """Initialize the FSMBuilder."""
56 self._resource_manager = ResourceManager()
57 self._function_manager = FunctionManager()
58 self._networks: Dict[str, StateNetwork] = {}
59 self._data_handlers: Dict[DataHandlingMode, DataHandler] = {}
60 self._transaction_manager: TransactionManager | None = None
62 # Register built-in functions on initialization
63 self._register_builtin_functions()
65 def build(self, config: FSMConfig) -> CoreFSMClass:
66 """Build an FSM instance from configuration.
68 Args:
69 config: FSM configuration.
71 Returns:
72 Executable FSM instance.
74 Raises:
75 ValueError: If configuration is invalid or incomplete.
76 """
77 # Clear previous build state
78 self._networks.clear()
80 # 1. Register resources
81 self._register_resources(config.resources)
83 # 2. Initialize data handlers
84 self._init_data_handlers(config.data_mode)
86 # 3. Initialize transaction manager
87 self._init_transaction_manager(config.transaction)
89 # 4. Build networks
90 for network_config in config.networks:
91 network = self._build_network(network_config, config)
92 self._networks[network.name] = network
94 # 5. Validate completeness
95 self._validate_completeness(config)
97 # 6. Create core FSM instance
98 from dataknobs_fsm.core.modes import ProcessingMode as CoreDataMode
99 from dataknobs_fsm.core.modes import TransactionMode as CoreTransactionMode
101 # Map config modes to core modes
102 data_mode = CoreDataMode.SINGLE # Default to SINGLE for now
103 transaction_mode = CoreTransactionMode.NONE # Default to NONE
105 fsm = CoreFSMClass(
106 name=config.name,
107 data_mode=data_mode,
108 transaction_mode=transaction_mode,
109 description=config.description,
110 resource_manager=self._resource_manager,
111 transaction_manager=self._transaction_manager,
112 )
114 # Store config in FSM for reference
115 fsm.config = config
117 # Register all functions from builder into core FSM's function registry
118 for func_name in self._function_manager.list_functions():
119 wrapper = self._function_manager.get_function(func_name)
120 if wrapper:
121 # The FSM's function registry expects callable functions
122 # If it's a FunctionWrapper, get the actual function
123 if hasattr(wrapper, 'func'):
124 fsm.function_registry.register(func_name, wrapper.func)
125 else:
126 fsm.function_registry.register(func_name, wrapper)
128 # Add networks to FSM
129 for network_name, network in self._networks.items():
130 fsm.add_network(network, is_main=(network_name == config.main_network))
132 # Return the core FSM directly
133 return fsm
135 def register_function(self, name: str, func: Callable) -> None:
136 """Register a custom function.
138 Args:
139 name: Function name for reference in configuration.
140 func: Function implementation.
141 """
142 self._function_manager.register_function(name, func, FunctionSource.REGISTERED)
144 def _register_builtin_functions(self) -> None:
145 """Register built-in functions from the library."""
146 # Import built-in function modules
147 try:
148 from dataknobs_fsm.functions.library import validators, transformers
150 # Register validators
151 for name in dir(validators):
152 if not name.startswith("_"):
153 obj = getattr(validators, name)
154 if callable(obj):
155 self._function_manager.register_function(
156 f"validators.{name}", obj, FunctionSource.BUILTIN
157 )
159 # Register transformers
160 for name in dir(transformers):
161 if not name.startswith("_"):
162 obj = getattr(transformers, name)
163 if callable(obj):
164 self._function_manager.register_function(
165 f"transformers.{name}", obj, FunctionSource.BUILTIN
166 )
168 except ImportError:
169 # Built-in functions not yet implemented
170 pass
172 def _register_resources(self, resources: List[ResourceConfig]) -> None:
173 """Register resources with the resource manager.
175 Args:
176 resources: Resource configurations.
177 """
178 for resource_config in resources:
179 resource = self._create_resource(resource_config)
180 self._resource_manager.register_provider(resource_config.name, resource)
182 def _create_resource(self, config: ResourceConfig) -> IResource:
183 """Create a resource instance from configuration.
185 Args:
186 config: Resource configuration.
188 Returns:
189 Resource instance.
191 Raises:
192 ValueError: If resource type is not supported.
193 """
194 # Map resource types to classes
195 resource_classes = {
196 "database": "dataknobs_fsm.resources.database.DatabaseResourceAdapter",
197 "filesystem": "dataknobs_fsm.resources.filesystem.FileSystemResource",
198 "http": "dataknobs_fsm.resources.http.HTTPServiceResource",
199 "llm": "dataknobs_fsm.resources.llm.LLMResource",
200 "vector_store": "dataknobs_fsm.resources.vector_store.VectorStoreResource",
201 }
203 if config.type == "custom":
204 # Custom resource must be in configuration
205 if "class" not in config.config:
206 raise ValueError("Custom resource requires 'class' in configuration")
207 class_path = config.config["class"]
208 else:
209 class_path = resource_classes.get(config.type)
210 if not class_path:
211 raise ValueError(f"Unsupported resource type: {config.type}")
213 # Import and instantiate resource class
214 module_path, class_name = class_path.rsplit(".", 1)
215 module = importlib.import_module(module_path)
216 resource_class = getattr(module, class_name)
218 # Create resource with configuration, adding name if needed
219 kwargs = config.config.copy()
220 if hasattr(resource_class, "__init__") and "name" in resource_class.__init__.__code__.co_varnames:
221 kwargs["name"] = config.name
222 return resource_class(**kwargs)
224 def _init_data_handlers(self, config: Any) -> None:
225 """Initialize data handlers for each mode.
227 Args:
228 config: Data mode configuration.
229 """
230 for mode in DataHandlingMode:
231 self._data_handlers[mode] = get_data_handler(mode)
233 def _init_transaction_manager(self, config: Any) -> None:
234 """Initialize transaction manager.
236 Args:
237 config: Transaction configuration.
238 """
239 if config.strategy == TransactionStrategy.SINGLE:
240 self._transaction_manager = SingleTransactionManager()
241 elif config.strategy == TransactionStrategy.BATCH:
242 self._transaction_manager = BatchTransactionManager(
243 batch_size=config.batch_size
244 )
245 elif config.strategy == TransactionStrategy.MANUAL:
246 self._transaction_manager = ManualTransactionManager()
247 else:
248 # Default to single transaction
249 self._transaction_manager = SingleTransactionManager()
251 def _build_network(self, network_config: NetworkConfig, fsm_config: FSMConfig) -> StateNetwork:
252 """Build a state network from configuration.
254 Args:
255 network_config: Network configuration.
256 fsm_config: Parent FSM configuration.
258 Returns:
259 StateNetwork instance.
260 """
261 network = StateNetwork(
262 name=network_config.name,
263 description=network_config.metadata.get("description", "") if network_config.metadata else None,
264 )
266 # Create states
267 state_defs = {}
268 for state_config in network_config.states:
269 state_def = self._build_state(state_config, fsm_config)
270 state_defs[state_def.name] = state_def
271 # Pass initial and final flags based on state type
272 from dataknobs_fsm.core.state import StateType
273 network.add_state(
274 state_def,
275 initial=(state_def.type in [StateType.START, StateType.START_END]),
276 final=(state_def.type in [StateType.END, StateType.START_END])
277 )
279 # Create arcs with definition order tracking
280 arc_definition_order = 0
281 for state_config in network_config.states:
282 state_def = state_defs[state_config.name]
283 for arc_config in state_config.arcs:
284 arc = self._build_arc(arc_config, state_def, network, fsm_config, arc_definition_order)
285 arc_definition_order += 1
286 # Add arc to both the state definition and the network
287 state_def.outgoing_arcs.append(arc)
288 # Also register the arc with the network for execution
289 # Extract function names for network registration
290 pre_test_name = None
291 transform_name = None
292 if arc.pre_test:
293 pre_test_name = getattr(arc.pre_test, '__name__', str(arc.pre_test))
294 if arc.transform:
295 transform_name = getattr(arc.transform, '__name__', str(arc.transform))
297 network.add_arc(
298 source_state=state_config.name,
299 target_state=arc_config.target,
300 pre_test=pre_test_name,
301 transform=transform_name,
302 metadata=arc_config.metadata
303 )
305 return network
307 def _build_state(self, state_config: StateConfig, fsm_config: FSMConfig) -> StateDefinition:
308 """Build a state definition from configuration.
310 Args:
311 state_config: State configuration.
312 fsm_config: Parent FSM configuration.
314 Returns:
315 StateDefinition instance.
316 """
317 # Build schema if provided
318 schema = None
319 if state_config.data_schema:
320 schema = self._build_schema(state_config.data_schema)
322 # Resolve pre-validators
323 pre_validators = []
324 for func_ref in state_config.pre_validators:
325 pre_validator = self._resolve_function(func_ref, IValidationFunction)
326 pre_validators.append(pre_validator)
328 # Resolve validators
329 validators = []
330 for func_ref in state_config.validators:
331 validator = self._resolve_function(func_ref, IValidationFunction)
332 validators.append(validator)
334 # Resolve transforms
335 transforms = []
336 for func_ref in state_config.transforms:
337 transform = self._resolve_function(func_ref, ITransformFunction)
338 transforms.append(transform)
339 # Don't re-register wrapped functions - they're already in the manager
340 # The transform is already an InterfaceWrapper with proper async handling
342 # Determine data mode
343 data_mode = state_config.data_mode or fsm_config.data_mode.default
345 # Create state definition with correct field names
346 state_def = StateDefinition(name=state_config.name)
347 state_def.schema = schema
348 state_def.pre_validation_functions = pre_validators
349 state_def.validation_functions = validators
350 state_def.transform_functions = transforms
351 # Look up actual resource configs from the FSM config
352 resource_map = {res.name: res for res in fsm_config.resources}
353 state_def.resource_requirements = [
354 resource_map[r] if r in resource_map else ResourceConfig(name=r, type=ResourceType.CUSTOM)
355 for r in state_config.resources
356 ]
357 state_def.data_mode = data_mode
358 # Handle states that are both start and end
359 if state_config.is_start and state_config.is_end:
360 state_def.type = StateType.START_END
361 elif state_config.is_start:
362 state_def.type = StateType.START
363 elif state_config.is_end:
364 state_def.type = StateType.END
365 else:
366 state_def.type = StateType.NORMAL
367 state_def.metadata = state_config.metadata
369 return state_def
371 def _get_function_name(self, func: Any) -> str | None:
372 """Extract the name from a function or wrapped function.
374 Args:
375 func: Function or wrapped function
377 Returns:
378 Function name or None
379 """
380 if not func:
381 return None
383 # Check for various name attributes
384 if hasattr(func, 'name'):
385 return func.name
386 elif hasattr(func, '__name__'):
387 # Skip generic names that would cause collisions
388 name = func.__name__
389 if name not in ['<lambda>', 'inline_func']:
390 return name
391 elif hasattr(func, 'wrapper') and hasattr(func.wrapper, 'name'):
392 # InterfaceWrapper case
393 return func.wrapper.name
394 else:
395 # Search for the function in the manager
396 for fname in self._function_manager.list_functions():
397 wrapper = self._function_manager.get_function(fname)
398 if wrapper and wrapper.func == func:
399 return fname
400 return None
402 def _build_arc(
403 self,
404 arc_config: ArcConfig,
405 source_state: StateDefinition,
406 network: StateNetwork,
407 fsm_config: FSMConfig,
408 definition_order: int = 0,
409 ) -> ArcDefinition:
410 """Build an arc definition from configuration.
412 Args:
413 arc_config: Arc configuration.
414 source_state: Source state definition.
415 network: Parent network.
416 fsm_config: Parent FSM configuration.
417 definition_order: Order in which this arc was defined.
419 Returns:
420 ArcDefinition instance.
421 """
422 # Resolve condition function
423 condition = None
424 condition_name = None
425 if arc_config.condition:
426 condition = self._resolve_function(arc_config.condition, IStateTestFunction)
427 # Get or generate a unique name for the condition
428 condition_name = self._get_function_name(condition)
429 # Lambda functions get the unhelpful name "<lambda>" so we need to generate a unique name
430 if not condition_name or condition_name == "<lambda>":
431 # Generate a unique name based on arc endpoints and code/function id
432 if arc_config.condition.type == "inline" and arc_config.condition.code:
433 # Use hash of code for uniqueness
434 condition_name = f"condition_{source_state.name}_{arc_config.target}_{abs(hash(arc_config.condition.code))}"
435 else:
436 condition_name = f"condition_{source_state.name}_{arc_config.target}_{id(condition)}"
437 # Register the function with the function manager so it gets transferred to FSM later
438 if condition_name and not self._function_manager.has_function(condition_name):
439 # Register the resolved function - if it's an InterfaceWrapper, register it as-is
440 # since InterfaceWrapper is callable and handles the interface correctly
441 if hasattr(condition, 'test'):
442 # It's an IStateTestFunction interface, register the test method
443 self._function_manager.register_function(condition_name, condition.test, FunctionSource.INLINE)
444 else:
445 # Register as-is
446 self._function_manager.register_function(condition_name, condition, FunctionSource.INLINE)
448 # Resolve transform function
449 transform = None
450 transform_name = None
451 if arc_config.transform:
452 transform = self._resolve_function(arc_config.transform, ITransformFunction)
453 # Get or generate a unique name for the transform
454 transform_name = self._get_function_name(transform)
455 # Lambda functions get the unhelpful name "<lambda>" so we need to generate a unique name
456 if not transform_name or transform_name == "<lambda>":
457 # Generate a unique name based on arc endpoints and code/function id
458 if arc_config.transform.type == "inline" and arc_config.transform.code:
459 # Use hash of code for uniqueness
460 transform_name = f"transform_{source_state.name}_{arc_config.target}_{abs(hash(arc_config.transform.code))}"
461 else:
462 transform_name = f"transform_{source_state.name}_{arc_config.target}_{id(transform)}"
463 # Register the function with the function manager so it gets transferred to FSM later
464 if transform_name and not self._function_manager.has_function(transform_name):
465 # Register the resolved function - we need the actual callable
466 self._function_manager.register_function(transform_name, transform, FunctionSource.INLINE)
468 # Create appropriate arc type
469 if isinstance(arc_config, PushArcConfig):
470 # Use the names we determined above
471 pre_test_name = condition_name
472 arc_transform_name = transform_name
474 # Push arc to another network
475 arc = PushArc(
476 target_state=arc_config.target, # Even push arcs have a target state
477 target_network=arc_config.target_network,
478 return_state=arc_config.return_state,
479 pre_test=pre_test_name,
480 transform=arc_transform_name,
481 priority=arc_config.priority,
482 definition_order=definition_order,
483 metadata=arc_config.metadata,
484 )
485 # Store resources separately if needed
486 arc.required_resources = {r: r for r in arc_config.resources}
487 return arc
488 else:
489 # Regular arc within network
490 # Use the names we determined above
491 pre_test_name = condition_name
492 arc_transform_name = transform_name
494 arc = ArcDefinition(
495 target_state=arc_config.target,
496 pre_test=pre_test_name,
497 transform=arc_transform_name,
498 priority=arc_config.priority,
499 definition_order=definition_order,
500 metadata=arc_config.metadata,
501 )
502 # Store resources separately if needed
503 arc.required_resources = {r: r for r in arc_config.resources}
504 return arc
506 def _build_schema(self, schema_config: Dict[str, Any]) -> Any:
507 """Build a schema from configuration.
509 Args:
510 schema_config: Schema configuration (JSON Schema format).
512 Returns:
513 Schema object for validation.
514 """
515 # Handle JSON Schema format - create a simple validation schema
516 # that can be used by the FSM's validation system
518 # For JSON Schema format, create a simple validator wrapper
519 class JSONSchemaValidator:
520 """Simple JSON Schema validator for FSM data validation.
522 Validates data against JSON Schema definitions, checking required fields
523 and type constraints. Supports object schemas with properties and required fields.
524 """
526 def __init__(self, schema_def):
527 self.schema_def = schema_def
529 def validate(self, data):
530 """Validate data against JSON schema."""
531 from dataknobs_data import Record
533 # Convert Record to dict if needed
534 if isinstance(data, Record):
535 data_dict = data.to_dict()
536 elif isinstance(data, dict):
537 data_dict = data
538 else:
539 return type('Result', (), {
540 'valid': False,
541 'errors': [f'Expected object or Record, got {type(data).__name__}']
542 })()
544 # Simple validation for basic JSON schema
545 if self.schema_def.get('type') == 'object':
546 errors = []
547 properties = self.schema_def.get('properties', {})
548 required = self.schema_def.get('required', [])
550 # Check required fields
551 for field in required:
552 if field not in data_dict:
553 errors.append(f"Required field '{field}' is missing")
555 # Check field types
556 for field, value in data_dict.items():
557 if field in properties:
558 field_schema = properties[field]
559 field_type = field_schema.get('type')
560 if field_type and not self._validate_type(value, field_type):
561 errors.append(f"Field '{field}' has wrong type")
563 return type('Result', (), {
564 'valid': len(errors) == 0,
565 'errors': errors
566 })()
567 else:
568 # Simple pass-through for non-object schemas
569 return type('Result', (), {'valid': True, 'errors': []})()
571 def _validate_type(self, value, expected_type):
572 """Validate value type."""
573 type_map = {
574 'string': str,
575 'integer': int,
576 'number': (int, float),
577 'boolean': bool,
578 'array': list,
579 'object': dict
580 }
581 expected_python_type = type_map.get(expected_type)
582 if expected_python_type:
583 return isinstance(value, expected_python_type)
584 return True
586 return JSONSchemaValidator(schema_config)
588 def _resolve_function(
589 self,
590 func_ref: FunctionReference,
591 expected_type: Type | None = None,
592 ) -> Callable:
593 """Resolve a function reference to a callable.
595 Args:
596 func_ref: Function reference.
597 expected_type: Expected function interface type.
599 Returns:
600 Resolved function callable.
602 Raises:
603 ValueError: If function cannot be resolved.
604 """
605 if func_ref.type == "builtin":
606 # Look up built-in function
607 wrapper = self._function_manager.get_function(func_ref.name)
608 if not wrapper:
609 raise ValueError(f"Built-in function not found: {func_ref.name}")
610 func = wrapper
612 elif func_ref.type == "registered":
613 # Look up registered function
614 wrapper = self._function_manager.get_function(func_ref.name)
615 if not wrapper:
616 raise ValueError(f"Registered function not found: {func_ref.name}")
617 func = wrapper
619 elif func_ref.type == "custom":
620 # Check manager first
621 wrapper = self._function_manager.get_function(func_ref.name)
622 if wrapper:
623 func = wrapper
624 else:
625 # Import custom function
626 module = importlib.import_module(func_ref.module)
627 func = getattr(module, func_ref.name)
628 # Register it for future use
629 self._function_manager.register_function(func_ref.name, func, FunctionSource.REGISTERED)
631 elif func_ref.type == "inline":
632 # Use function manager's inline handling
633 wrapper = self._function_manager.resolve_function(func_ref.code, expected_type)
634 if not wrapper:
635 raise ValueError(f"Failed to create inline function from: {func_ref.code}")
636 func = wrapper
637 # Mark as already wrapped to avoid double wrapping
638 func._is_wrapped = True
640 else:
641 raise ValueError(f"Unknown function type: {func_ref.type}")
643 # Apply parameters if provided
644 if func_ref.params:
645 # Create a partial function with parameters
646 import functools
647 func = functools.partial(func, **func_ref.params)
649 # Validate type if specified
650 # Skip wrapping if already wrapped by function manager
651 if expected_type and not isinstance(func, expected_type) and not getattr(func, '_is_wrapped', False):
652 # Wrap function to match expected interface
653 wrapped = self._wrap_function(func, expected_type)
654 # Preserve the original function name if it exists
655 if hasattr(func, 'name'):
656 wrapped.name = func.name
657 elif func_ref.type == "registered" and func_ref.name:
658 wrapped.name = func_ref.name
659 func = wrapped
661 # For inline functions, ensure they have a name
662 if func_ref.type == "inline" and hasattr(func, 'name'):
663 # Name already set by function manager
664 pass
666 return func
668 def _wrap_function(self, func: Callable, interface: Type) -> Any:
669 """Wrap a function to match an expected interface using unified manager.
671 Args:
672 func: Function to wrap.
673 interface: Expected interface type.
675 Returns:
676 Wrapped function implementing the interface.
677 """
678 # Resolve the function through the unified manager
679 wrapper = self._function_manager.resolve_function(func, interface)
680 return wrapper
682 def _validate_completeness(self, config: FSMConfig) -> None:
683 """Validate that the FSM configuration is complete and consistent.
685 Args:
686 config: FSM configuration.
688 Raises:
689 ValueError: If configuration is incomplete or inconsistent.
690 """
691 # Check main network exists
692 if config.main_network not in self._networks:
693 raise ValueError(f"Main network '{config.main_network}' not found")
695 # Check all arc targets exist
696 for network in self._networks.values():
697 state_names = {state.name for state in network.states.values()}
698 for state in network.states.values():
699 for arc in network.get_arcs_from_state(state.name):
700 if isinstance(arc, PushArc):
701 # Check target network exists
702 if arc.target_network not in self._networks:
703 raise ValueError(f"Target network '{arc.target_network}' not found")
704 # Check return state exists if specified
705 if arc.return_state and arc.return_state not in state_names:
706 raise ValueError(f"Return state '{arc.return_state}' not found")
707 else:
708 # Check target state exists
709 if arc.target_state not in state_names:
710 raise ValueError(f"Arc target '{arc.target_state}' not found in network")
712 # Check resource references
713 resource_names = {res.name for res in config.resources}
714 for network in self._networks.values():
715 for state in network.states.values():
716 for resource_req in state.resource_requirements:
717 if resource_req.name not in resource_names:
718 raise ValueError(f"Resource '{resource_req.name}' not found")
720 def _create_execution_context(self, config: FSMConfig) -> ExecutionContext:
721 """Create execution context from configuration.
723 Args:
724 config: FSM configuration.
726 Returns:
727 ExecutionContext instance.
728 """
729 # Map our DataHandlingMode to the execution context's ProcessingMode
730 from dataknobs_fsm.core.modes import ProcessingMode as CoreDataMode
732 # Simple mapping - we'll use SINGLE mode for now
733 # This could be enhanced to support batch and stream modes
734 return ExecutionContext(
735 data_mode=CoreDataMode.SINGLE,
736 resources={} # Resources will be populated during execution
737 )
740# FSM wrapper class removed - functionality moved to core FSM
741# The FSMBuilder now returns the core FSM directly with all
742# execution capabilities integrated