Coverage for smartmdao / executor.py: 100%
58 statements
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 20:01 +0200
« prev ^ index » next coverage.py v7.13.5, created at 2026-05-02 20:01 +0200
1import inspect
2import logging
3from dataclasses import is_dataclass, asdict
4from typing import Dict, Any
5from .models import Step
7# Initialize module-level logger
8logger = logging.getLogger(__name__)
10class StepExecutor:
11 """
12 Static helper responsible for binding arguments from memory
13 and updating memory with results.
14 """
15 @staticmethod
16 def run_step(step: Step, memory: Dict[str, Any]):
17 logger.debug(f"Preparing to execute step '{step.name}'")
19 # Use the robust unwrapped signature to find expected parameters
20 sig = step.get_signature()
22 # 1. Bind Arguments
23 params = {}
24 missing_required = []
26 for name, param in sig.parameters.items():
27 if name in memory:
28 params[name] = memory[name]
29 elif param.default == inspect.Parameter.empty:
30 missing_required.append(name)
32 if missing_required:
33 error_msg = (f"Step '{step.name}' cannot run. Missing inputs: {missing_required}. "
34 f"Available in memory: {list(memory.keys())}")
35 logger.error(error_msg)
36 raise KeyError(error_msg)
38 # 2. Execute
39 try:
40 logger.debug(f"Invoking '{step.name}' with inputs: {list(params.keys())}")
41 result = step.fn(**params)
42 except Exception as e:
43 logger.error(f"Error executing step '{step.name}': {e}", exc_info=True)
44 raise RuntimeError(f"Error executing step '{step.name}': {e}") from e
46 # 3. Store Result
47 StepExecutor._update_memory(step, result, memory)
48 logger.debug(f"Finished step '{step.name}'.")
50 @staticmethod
51 def _update_memory(step: Step, result: Any, memory: Dict[str, Any]):
52 output_keys = step.resolve_output_names()
54 if result is None:
55 logger.debug(f"Step '{step.name}' returned None. No outputs stored.")
56 return
58 # Case A: Explicit Manual Outputs (e.g. outputs=['a', 'b'])
59 if step.manual_outputs:
60 if len(output_keys) == 1:
61 memory[output_keys[0]] = result
62 return
64 # Handle Dictionary Return with Manual Outputs
65 if isinstance(result, dict):
66 for k in output_keys:
67 if k not in result:
68 logger.error(f"Step '{step.name}' missing output key '{k}' in returned dict.")
69 raise KeyError(f"Step '{step.name}' expected output key '{k}' but it was missing in returned dict.")
70 memory[k] = result[k]
71 return
73 # Handle Tuple/List Return with Manual Outputs
74 if not isinstance(result, (list, tuple)):
75 raise TypeError(f"Step '{step.name}' expected iterable (or dict) output for keys {output_keys}, got {type(result)}")
77 if len(result) != len(output_keys):
78 raise ValueError(f"Step '{step.name}' returned {len(result)} items, expected {len(output_keys)}")
80 for k, v in zip(output_keys, result):
81 memory[k] = v
82 return
84 # Case B: Dataclass Expansion (Auto-unpacking based on type hint/runtime check)
85 if is_dataclass(result):
86 memory.update(asdict(result))
87 return
89 # Case C: Single Default Output
90 memory[output_keys[0]] = result