Coverage for agentos/tools/config_manager.py: 0%

151 statements  

« prev     ^ index     » next       coverage.py v7.14.3, created at 2026-07-03 07:30 +0800

1""" 

2ConfigManager — layered configuration with schema validation, env overlay, and hot reload. 

3 

4Layers (priority low → high): 

5 1. defaults — hardcoded defaults 

6 2. file — YAML/JSON config file(s) 

7 3. env — environment variable overrides (PREFIX_KEY=value) 

8 4. runtime — programmatic overrides via set() 

9 

10Supports: dot-path access, schema validation, file watching for hot reload. 

11""" 

12 

13import json 

14import os 

15import threading 

16import time 

17from copy import deepcopy 

18from pathlib import Path 

19from typing import Any, Callable, Dict, List, Optional, Set, Union 

20 

21 

22# ============================================================================ 

23# Schema Validation 

24# ============================================================================ 

25 

26class ConfigSchemaError(Exception): 

27 """Validation error with path and message.""" 

28 def __init__(self, path: str, message: str): 

29 self.path = path 

30 self.message = message 

31 super().__init__(f"[{path}] {message}") 

32 

33 

34class ConfigSchema: 

35 """Declarative schema for config validation.""" 

36 

37 def __init__(self): 

38 self._fields: Dict[str, Dict[str, Any]] = {} 

39 

40 def field( 

41 self, 

42 name: str, 

43 type_: type = str, 

44 required: bool = False, 

45 default: Any = None, 

46 choices: Optional[List[Any]] = None, 

47 min_val: Optional[float] = None, 

48 max_val: Optional[float] = None, 

49 description: str = "", 

50 ) -> "ConfigSchema": 

51 self._fields[name] = { 

52 "type": type_, 

53 "required": required, 

54 "default": default, 

55 "choices": choices, 

56 "min": min_val, 

57 "max": max_val, 

58 "description": description, 

59 } 

60 return self 

61 

62 def validate(self, config: Dict[str, Any], prefix: str = "") -> List[ConfigSchemaError]: 

63 errors = [] 

64 for name, spec in self._fields.items(): 

65 path = f"{prefix}.{name}" if prefix else name 

66 value = config.get(name) 

67 if value is None: 

68 if spec["required"]: 

69 errors.append(ConfigSchemaError(path, "required field missing")) 

70 continue 

71 if not isinstance(value, spec["type"]): 

72 errors.append(ConfigSchemaError(path, 

73 f"expected {spec['type'].__name__}, got {type(value).__name__}")) 

74 continue 

75 if spec["choices"] and value not in spec["choices"]: 

76 errors.append(ConfigSchemaError(path, 

77 f"invalid choice '{value}', allowed: {spec['choices']}")) 

78 if spec["min"] is not None and value < spec["min"]: 

79 errors.append(ConfigSchemaError(path, f"value {value} below min {spec['min']}")) 

80 if spec["max"] is not None and value > spec["max"]: 

81 errors.append(ConfigSchemaError(path, f"value {value} above max {spec['max']}")) 

82 return errors 

83 

84 

85# ============================================================================ 

86# ConfigManager 

87# ============================================================================ 

88 

89class ConfigManager: 

90 """Layered configuration manager. 

91 

92 Usage: 

93 cm = ConfigManager(defaults={"host": "localhost", "port": 8080}) 

94 cm.load_file("config.yaml") 

95 cm.load_env("APP_") # APP_HOST=0.0.0.0 overrides host 

96 cm.get("host") # returns "0.0.0.0" 

97 """ 

98 

99 def __init__( 

100 self, 

101 defaults: Optional[Dict[str, Any]] = None, 

102 schema: Optional[ConfigSchema] = None, 

103 ): 

104 self._defaults = deepcopy(defaults) if defaults else {} 

105 self._file_layer: Dict[str, Any] = {} 

106 self._env_layer: Dict[str, Any] = {} 

107 self._runtime_layer: Dict[str, Any] = {} 

108 self._schema = schema 

109 self._lock = threading.RLock() 

110 self._watchers: Dict[str, float] = {} # path → mtime 

111 self._on_change: List[Callable[[str, Any, Any], None]] = [] 

112 

113 # ---------- file loading ---------- 

114 

115 def load_file(self, path: Union[str, Path]) -> None: 

116 """Load config from YAML or JSON file.""" 

117 path = Path(path) 

118 if not path.exists(): 

119 raise FileNotFoundError(f"Config file not found: {path}") 

120 content = path.read_text(encoding="utf-8") 

121 if path.suffix in (".yaml", ".yml"): 

122 data = self._parse_yaml(content) 

123 else: 

124 data = json.loads(content) 

125 with self._lock: 

126 self._file_layer = self._deep_merge(self._file_layer, data) 

127 if str(path) not in self._watchers: 

128 self._watchers[str(path)] = path.stat().st_mtime 

129 

130 def _parse_yaml(self, content: str) -> Dict[str, Any]: 

131 try: 

132 import yaml 

133 return yaml.safe_load(content) or {} 

134 except ImportError: 

135 raise ImportError("PyYAML required for YAML config files. pip install pyyaml") 

136 

137 # ---------- env loading ---------- 

138 

139 def load_env(self, prefix: str = "") -> None: 

140 """Overlay environment variables. PREFIX_KEY → config key (lowercase).""" 

141 with self._lock: 

142 for key, value in os.environ.items(): 

143 if not prefix or key.startswith(prefix): 

144 config_key = key[len(prefix):].lower() if prefix else key.lower() 

145 # Try to parse numbers/booleans 

146 parsed = self._parse_value(value) 

147 self._env_layer[config_key] = parsed 

148 

149 @staticmethod 

150 def _parse_value(value: str) -> Any: 

151 if value.lower() in ("true", "false"): 

152 return value.lower() == "true" 

153 if value.lower() in ("null", "none", ""): 

154 return None 

155 try: 

156 if "." in value: 

157 return float(value) 

158 return int(value) 

159 except ValueError: 

160 return value 

161 

162 # ---------- runtime access ---------- 

163 

164 def set(self, key: str, value: Any) -> None: 

165 old = self.get(key) 

166 with self._lock: 

167 self._runtime_layer[key] = value 

168 new_val = value 

169 if old != new_val: 

170 self._notify(key, old, new_val) 

171 

172 def get(self, key: str, default: Any = None) -> Any: 

173 with self._lock: 

174 # Priority: runtime > env > file > defaults 

175 if key in self._runtime_layer: 

176 return self._runtime_layer[key] 

177 if key in self._env_layer: 

178 return self._env_layer[key] 

179 if key in self._file_layer: 

180 return self._file_layer[key] 

181 if key in self._defaults: 

182 return self._defaults[key] 

183 return default 

184 

185 def get_dot(self, path: str, default: Any = None) -> Any: 

186 """Dot-path access: get_dot('server.host') → get('server')['host']""" 

187 keys = path.split(".") 

188 current = None 

189 for i, key in enumerate(keys): 

190 if i == 0: 

191 current = self.get(key) 

192 elif isinstance(current, dict): 

193 current = current.get(key) 

194 else: 

195 return default 

196 if current is None: 

197 return default 

198 return current 

199 

200 def all(self) -> Dict[str, Any]: 

201 """Return merged config dict.""" 

202 with self._lock: 

203 result = deepcopy(self._defaults) 

204 result = self._deep_merge(result, self._file_layer) 

205 result = self._deep_merge(result, self._env_layer) 

206 result = self._deep_merge(result, self._runtime_layer) 

207 return result 

208 

209 def reload(self) -> None: 

210 """Reload file layer from disk (check mtime).""" 

211 with self._lock: 

212 for path_str, mtime in list(self._watchers.items()): 

213 p = Path(path_str) 

214 if p.exists(): 

215 new_mtime = p.stat().st_mtime 

216 if new_mtime > mtime: 

217 self._file_layer = {} 

218 self.load_file(path_str) 

219 self._watchers[path_str] = new_mtime 

220 

221 # ---------- validation ---------- 

222 

223 def validate(self) -> List[ConfigSchemaError]: 

224 if not self._schema: 

225 return [] 

226 return self._schema.validate(self.all()) 

227 

228 # ---------- events ---------- 

229 

230 def on_change(self, callback: Callable[[str, Any, Any], None]) -> None: 

231 self._on_change.append(callback) 

232 

233 def _notify(self, key: str, old: Any, new: Any) -> None: 

234 for cb in self._on_change: 

235 try: 

236 cb(key, old, new) 

237 except Exception: 

238 pass 

239 

240 # ---------- internal ---------- 

241 

242 @staticmethod 

243 def _deep_merge(base: Dict, overlay: Dict) -> Dict: 

244 result = deepcopy(base) 

245 for k, v in overlay.items(): 

246 if k in result and isinstance(result[k], dict) and isinstance(v, dict): 

247 result[k] = ConfigManager._deep_merge(result[k], v) 

248 else: 

249 result[k] = v 

250 return result