Coverage for agentos/tools/config_manager.py: 0%
151 statements
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:30 +0800
« prev ^ index » next coverage.py v7.14.3, created at 2026-07-03 07:30 +0800
1"""
2ConfigManager — layered configuration with schema validation, env overlay, and hot reload.
4Layers (priority low → high):
5 1. defaults — hardcoded defaults
6 2. file — YAML/JSON config file(s)
7 3. env — environment variable overrides (PREFIX_KEY=value)
8 4. runtime — programmatic overrides via set()
10Supports: dot-path access, schema validation, file watching for hot reload.
11"""
13import json
14import os
15import threading
16import time
17from copy import deepcopy
18from pathlib import Path
19from typing import Any, Callable, Dict, List, Optional, Set, Union
22# ============================================================================
23# Schema Validation
24# ============================================================================
26class ConfigSchemaError(Exception):
27 """Validation error with path and message."""
28 def __init__(self, path: str, message: str):
29 self.path = path
30 self.message = message
31 super().__init__(f"[{path}] {message}")
34class ConfigSchema:
35 """Declarative schema for config validation."""
37 def __init__(self):
38 self._fields: Dict[str, Dict[str, Any]] = {}
40 def field(
41 self,
42 name: str,
43 type_: type = str,
44 required: bool = False,
45 default: Any = None,
46 choices: Optional[List[Any]] = None,
47 min_val: Optional[float] = None,
48 max_val: Optional[float] = None,
49 description: str = "",
50 ) -> "ConfigSchema":
51 self._fields[name] = {
52 "type": type_,
53 "required": required,
54 "default": default,
55 "choices": choices,
56 "min": min_val,
57 "max": max_val,
58 "description": description,
59 }
60 return self
62 def validate(self, config: Dict[str, Any], prefix: str = "") -> List[ConfigSchemaError]:
63 errors = []
64 for name, spec in self._fields.items():
65 path = f"{prefix}.{name}" if prefix else name
66 value = config.get(name)
67 if value is None:
68 if spec["required"]:
69 errors.append(ConfigSchemaError(path, "required field missing"))
70 continue
71 if not isinstance(value, spec["type"]):
72 errors.append(ConfigSchemaError(path,
73 f"expected {spec['type'].__name__}, got {type(value).__name__}"))
74 continue
75 if spec["choices"] and value not in spec["choices"]:
76 errors.append(ConfigSchemaError(path,
77 f"invalid choice '{value}', allowed: {spec['choices']}"))
78 if spec["min"] is not None and value < spec["min"]:
79 errors.append(ConfigSchemaError(path, f"value {value} below min {spec['min']}"))
80 if spec["max"] is not None and value > spec["max"]:
81 errors.append(ConfigSchemaError(path, f"value {value} above max {spec['max']}"))
82 return errors
85# ============================================================================
86# ConfigManager
87# ============================================================================
89class ConfigManager:
90 """Layered configuration manager.
92 Usage:
93 cm = ConfigManager(defaults={"host": "localhost", "port": 8080})
94 cm.load_file("config.yaml")
95 cm.load_env("APP_") # APP_HOST=0.0.0.0 overrides host
96 cm.get("host") # returns "0.0.0.0"
97 """
99 def __init__(
100 self,
101 defaults: Optional[Dict[str, Any]] = None,
102 schema: Optional[ConfigSchema] = None,
103 ):
104 self._defaults = deepcopy(defaults) if defaults else {}
105 self._file_layer: Dict[str, Any] = {}
106 self._env_layer: Dict[str, Any] = {}
107 self._runtime_layer: Dict[str, Any] = {}
108 self._schema = schema
109 self._lock = threading.RLock()
110 self._watchers: Dict[str, float] = {} # path → mtime
111 self._on_change: List[Callable[[str, Any, Any], None]] = []
113 # ---------- file loading ----------
115 def load_file(self, path: Union[str, Path]) -> None:
116 """Load config from YAML or JSON file."""
117 path = Path(path)
118 if not path.exists():
119 raise FileNotFoundError(f"Config file not found: {path}")
120 content = path.read_text(encoding="utf-8")
121 if path.suffix in (".yaml", ".yml"):
122 data = self._parse_yaml(content)
123 else:
124 data = json.loads(content)
125 with self._lock:
126 self._file_layer = self._deep_merge(self._file_layer, data)
127 if str(path) not in self._watchers:
128 self._watchers[str(path)] = path.stat().st_mtime
130 def _parse_yaml(self, content: str) -> Dict[str, Any]:
131 try:
132 import yaml
133 return yaml.safe_load(content) or {}
134 except ImportError:
135 raise ImportError("PyYAML required for YAML config files. pip install pyyaml")
137 # ---------- env loading ----------
139 def load_env(self, prefix: str = "") -> None:
140 """Overlay environment variables. PREFIX_KEY → config key (lowercase)."""
141 with self._lock:
142 for key, value in os.environ.items():
143 if not prefix or key.startswith(prefix):
144 config_key = key[len(prefix):].lower() if prefix else key.lower()
145 # Try to parse numbers/booleans
146 parsed = self._parse_value(value)
147 self._env_layer[config_key] = parsed
149 @staticmethod
150 def _parse_value(value: str) -> Any:
151 if value.lower() in ("true", "false"):
152 return value.lower() == "true"
153 if value.lower() in ("null", "none", ""):
154 return None
155 try:
156 if "." in value:
157 return float(value)
158 return int(value)
159 except ValueError:
160 return value
162 # ---------- runtime access ----------
164 def set(self, key: str, value: Any) -> None:
165 old = self.get(key)
166 with self._lock:
167 self._runtime_layer[key] = value
168 new_val = value
169 if old != new_val:
170 self._notify(key, old, new_val)
172 def get(self, key: str, default: Any = None) -> Any:
173 with self._lock:
174 # Priority: runtime > env > file > defaults
175 if key in self._runtime_layer:
176 return self._runtime_layer[key]
177 if key in self._env_layer:
178 return self._env_layer[key]
179 if key in self._file_layer:
180 return self._file_layer[key]
181 if key in self._defaults:
182 return self._defaults[key]
183 return default
185 def get_dot(self, path: str, default: Any = None) -> Any:
186 """Dot-path access: get_dot('server.host') → get('server')['host']"""
187 keys = path.split(".")
188 current = None
189 for i, key in enumerate(keys):
190 if i == 0:
191 current = self.get(key)
192 elif isinstance(current, dict):
193 current = current.get(key)
194 else:
195 return default
196 if current is None:
197 return default
198 return current
200 def all(self) -> Dict[str, Any]:
201 """Return merged config dict."""
202 with self._lock:
203 result = deepcopy(self._defaults)
204 result = self._deep_merge(result, self._file_layer)
205 result = self._deep_merge(result, self._env_layer)
206 result = self._deep_merge(result, self._runtime_layer)
207 return result
209 def reload(self) -> None:
210 """Reload file layer from disk (check mtime)."""
211 with self._lock:
212 for path_str, mtime in list(self._watchers.items()):
213 p = Path(path_str)
214 if p.exists():
215 new_mtime = p.stat().st_mtime
216 if new_mtime > mtime:
217 self._file_layer = {}
218 self.load_file(path_str)
219 self._watchers[path_str] = new_mtime
221 # ---------- validation ----------
223 def validate(self) -> List[ConfigSchemaError]:
224 if not self._schema:
225 return []
226 return self._schema.validate(self.all())
228 # ---------- events ----------
230 def on_change(self, callback: Callable[[str, Any, Any], None]) -> None:
231 self._on_change.append(callback)
233 def _notify(self, key: str, old: Any, new: Any) -> None:
234 for cb in self._on_change:
235 try:
236 cb(key, old, new)
237 except Exception:
238 pass
240 # ---------- internal ----------
242 @staticmethod
243 def _deep_merge(base: Dict, overlay: Dict) -> Dict:
244 result = deepcopy(base)
245 for k, v in overlay.items():
246 if k in result and isinstance(result[k], dict) and isinstance(v, dict):
247 result[k] = ConfigManager._deep_merge(result[k], v)
248 else:
249 result[k] = v
250 return result