Coverage for smartmdao / executor.py: 100%

58 statements  

« prev     ^ index     » next       coverage.py v7.13.5, created at 2026-05-02 20:01 +0200

1import inspect 

2import logging 

3from dataclasses import is_dataclass, asdict 

4from typing import Dict, Any 

5from .models import Step 

6 

7# Initialize module-level logger 

8logger = logging.getLogger(__name__) 

9 

10class StepExecutor: 

11 """ 

12 Static helper responsible for binding arguments from memory  

13 and updating memory with results. 

14 """ 

15 @staticmethod 

16 def run_step(step: Step, memory: Dict[str, Any]): 

17 logger.debug(f"Preparing to execute step '{step.name}'") 

18 

19 # Use the robust unwrapped signature to find expected parameters 

20 sig = step.get_signature() 

21 

22 # 1. Bind Arguments 

23 params = {} 

24 missing_required = [] 

25 

26 for name, param in sig.parameters.items(): 

27 if name in memory: 

28 params[name] = memory[name] 

29 elif param.default == inspect.Parameter.empty: 

30 missing_required.append(name) 

31 

32 if missing_required: 

33 error_msg = (f"Step '{step.name}' cannot run. Missing inputs: {missing_required}. " 

34 f"Available in memory: {list(memory.keys())}") 

35 logger.error(error_msg) 

36 raise KeyError(error_msg) 

37 

38 # 2. Execute 

39 try: 

40 logger.debug(f"Invoking '{step.name}' with inputs: {list(params.keys())}") 

41 result = step.fn(**params) 

42 except Exception as e: 

43 logger.error(f"Error executing step '{step.name}': {e}", exc_info=True) 

44 raise RuntimeError(f"Error executing step '{step.name}': {e}") from e 

45 

46 # 3. Store Result 

47 StepExecutor._update_memory(step, result, memory) 

48 logger.debug(f"Finished step '{step.name}'.") 

49 

50 @staticmethod 

51 def _update_memory(step: Step, result: Any, memory: Dict[str, Any]): 

52 output_keys = step.resolve_output_names() 

53 

54 if result is None: 

55 logger.debug(f"Step '{step.name}' returned None. No outputs stored.") 

56 return 

57 

58 # Case A: Explicit Manual Outputs (e.g. outputs=['a', 'b']) 

59 if step.manual_outputs: 

60 if len(output_keys) == 1: 

61 memory[output_keys[0]] = result 

62 return 

63 

64 # Handle Dictionary Return with Manual Outputs 

65 if isinstance(result, dict): 

66 for k in output_keys: 

67 if k not in result: 

68 logger.error(f"Step '{step.name}' missing output key '{k}' in returned dict.") 

69 raise KeyError(f"Step '{step.name}' expected output key '{k}' but it was missing in returned dict.") 

70 memory[k] = result[k] 

71 return 

72 

73 # Handle Tuple/List Return with Manual Outputs 

74 if not isinstance(result, (list, tuple)): 

75 raise TypeError(f"Step '{step.name}' expected iterable (or dict) output for keys {output_keys}, got {type(result)}") 

76 

77 if len(result) != len(output_keys): 

78 raise ValueError(f"Step '{step.name}' returned {len(result)} items, expected {len(output_keys)}") 

79 

80 for k, v in zip(output_keys, result): 

81 memory[k] = v 

82 return 

83 

84 # Case B: Dataclass Expansion (Auto-unpacking based on type hint/runtime check) 

85 if is_dataclass(result): 

86 memory.update(asdict(result)) 

87 return 

88 

89 # Case C: Single Default Output 

90 memory[output_keys[0]] = result