Coverage for src/configuraptor/helpers.py: 100%

111 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2026-04-22 11:27 +0200

1""" 

2Contains stand-alone helper functions. 

3""" 

4 

5import contextlib 

6import dataclasses as dc 

7import io 

8import math 

9import re 

10import types 

11import typing 

12import warnings 

13from collections import ChainMap 

14from pathlib import Path 

15 

16from expandvars import expand 

17from typeguard import TypeCheckError 

18from typeguard import check_type as _check_type 

19 

20try: 

21 import annotationlib 

22except ImportError: # pragma: no cover 

23 annotationlib = None 

24 

25 

26def camel_to_snake(s: str) -> str: 

27 """ 

28 Convert CamelCase to snake_case. 

29 

30 Source: 

31 https://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-snake-case 

32 """ 

33 return "".join([f"_{c.lower()}" if c.isupper() else c for c in s]).lstrip("_") 

34 

35 

36# def find_pyproject_toml() -> typing.Optional[str]: 

37# """ 

38# Find the project's config toml, looks up until it finds the project root (black's logic). 

39# """ 

40# return black.files.find_pyproject_toml((os.getcwd(),)) 

41 

42 

43def find_pyproject_toml(start_dir: typing.Optional[Path | str] = None) -> Path | None: 

44 """ 

45 Search for pyproject.toml starting from the current working directory \ 

46 and moving upwards in the directory tree. 

47 

48 Args: 

49 start_dir: Starting directory to begin the search. 

50 If not provided, uses the current working directory. 

51 

52 Returns: 

53 Path or None: Path object to the found pyproject.toml file, or None if not found. 

54 """ 

55 start_dir = Path.cwd() if start_dir is None else Path(start_dir).resolve() 

56 

57 current_dir = start_dir 

58 

59 while str(current_dir) != str(current_dir.root): 

60 pyproject_toml = current_dir / "pyproject.toml" 

61 if pyproject_toml.is_file(): 

62 return pyproject_toml 

63 current_dir = current_dir.parent 

64 

65 # If not found anywhere 

66 return None 

67 

68 

69Type = typing.Type[typing.Any] 

70 

71 

72def _cls_annotations(c: type) -> dict[str, type]: # pragma: no cover 

73 """ 

74 Functions to get the annotations of a class (excl inherited, use _all_annotations for that). 

75 

76 Uses `annotationlib` if available (since 3.14) and if so, resolves forward references immediately. 

77 """ 

78 if annotationlib: 

79 return typing.cast( 

80 dict[str, type], 

81 annotationlib.get_annotations(c, format=annotationlib.Format.VALUE, eval_str=True), 

82 ) 

83 else: 

84 # note: idk why but this is not equivalent (the first doesn't work well): 

85 # return getattr(c, "__annotations__", {}) 

86 return c.__dict__.get("__annotations__") or {} 

87 

88 

89def _all_annotations(cls: type) -> ChainMap[str, type]: 

90 """ 

91 Returns a dictionary-like ChainMap that includes annotations for all \ 

92 attributes defined in cls or inherited from superclasses. 

93 """ 

94 # chainmap reverses the iterable, so reverse again beforehand to keep order normally: 

95 

96 return ChainMap(*(_cls_annotations(c) for c in getattr(cls, "__mro__", []))) 

97 

98 

99def all_annotations(cls: Type, _except: typing.Iterable[str] = None) -> dict[str, type[object]]: 

100 """ 

101 Wrapper around `_all_annotations` that filters away any keys in _except. 

102 

103 It also flattens the ChainMap to a regular dict. 

104 """ 

105 if _except is None: 

106 _except = set() 

107 

108 _all = _all_annotations(cls) 

109 return {k: v for k, v in _all.items() if k not in _except} 

110 

111 

112T = typing.TypeVar("T") 

113 

114 

115def check_type(value: typing.Any, expected_type: typing.Type[T]) -> typing.TypeGuard[T]: 

116 """ 

117 Given a variable, check if it matches 'expected_type' (which can be a Union, parameterized generic etc.). 

118 

119 Based on typeguard but this returns a boolean instead of returning the value or throwing a TypeCheckError 

120 """ 

121 try: 

122 _check_type(value, expected_type) 

123 return True 

124 except TypeCheckError: 

125 return False 

126 

127 

128def is_builtin_type(_type: Type) -> bool: 

129 """ 

130 Returns whether _type is one of the builtin types. 

131 """ 

132 return _type.__module__ in ("__builtin__", "builtins") 

133 

134 

135# def is_builtin_class_instance(obj: typing.Any) -> bool: 

136# return is_builtin_type(obj.__class__) 

137 

138 

139def is_from_types_or_typing(_type: Type) -> bool: 

140 """ 

141 Returns whether _type is one of the stlib typing/types types. 

142 

143 e.g. types.UnionType or typing.Union 

144 """ 

145 return _type.__module__ in ("types", "typing") 

146 

147 

148def is_from_other_toml_supported_module(_type: Type) -> bool: 

149 """ 

150 Besides builtins, toml also supports 'datetime' and 'math' types, \ 

151 so this returns whether _type is a type from these stdlib modules. 

152 """ 

153 return _type.__module__ in ("datetime", "math") 

154 

155 

156def is_parameterized(_type: Type) -> bool: 

157 """ 

158 Returns whether _type is a parameterized type. 

159 

160 Examples: 

161 list[str] -> True 

162 str -> False 

163 """ 

164 return typing.get_origin(_type) is not None 

165 

166 

167def is_custom_class(_type: Type) -> bool: 

168 """ 

169 Tries to guess if _type is a builtin or a custom (user-defined) class. 

170 

171 Other logic in this module depends on knowing that. 

172 """ 

173 return ( 

174 type(_type) is type 

175 and not is_builtin_type(_type) 

176 and not is_from_other_toml_supported_module(_type) 

177 and not is_from_types_or_typing(_type) 

178 ) 

179 

180 

181def instance_of_custom_class(var: typing.Any) -> bool: 

182 """ 

183 Calls `is_custom_class` on an instance of a (possibly custom) class. 

184 """ 

185 return is_custom_class(var.__class__) 

186 

187 

188def is_union(sometype: typing.Type[typing.Any] | typing.Any) -> bool: 

189 """ 

190 Determines if a given type is a Union type. 

191 

192 A Union type in Python is used to represent a type that can be one of multiple 

193 types. This function checks whether the provided type object corresponds to a 

194 Union type as defined in Python's type hints or annotations. 

195 

196 Returns: 

197 bool 

198 True if the provided type is a Union type, False otherwise. 

199 """ 

200 origin = typing.get_origin(sometype) 

201 return origin in (typing.Union, types.UnionType) 

202 

203 

204def is_optional(_type: Type | typing.Any) -> bool: 

205 """ 

206 Tries to guess if _type could be optional. 

207 

208 Examples: 

209 None -> True 

210 NoneType -> True 

211 typing.Union[str, None] -> True 

212 str | None -> True 

213 list[str | None] -> False 

214 list[str] -> False 

215 """ 

216 if _type and (is_parameterized(_type) and typing.get_origin(_type) in (dict, list)) or (_type is math.nan): 

217 # e.g. list[str] 

218 # will crash issubclass to test it first here 

219 return False 

220 

221 try: 

222 return ( 

223 _type is None 

224 or types.NoneType in typing.get_args(_type) # union with Nonetype 

225 or issubclass(types.NoneType, _type) 

226 or issubclass(types.NoneType, type(_type)) # no type # Nonetype 

227 ) 

228 except TypeError: 

229 # probably some weird input that's not a type 

230 return False 

231 

232 

233def dataclass_field(cls: Type, key: str) -> typing.Optional[dc.Field[typing.Any]]: 

234 """ 

235 Get Field info for a dataclass cls. 

236 """ 

237 fields = getattr(cls, "__dataclass_fields__", {}) 

238 return fields.get(key) 

239 

240 

241@contextlib.contextmanager 

242def uncloseable(fd: typing.BinaryIO) -> typing.Generator[typing.BinaryIO, typing.Any, None]: 

243 """ 

244 Context manager which turns the fd's close operation to no-op for the duration of the context. 

245 """ 

246 close = fd.close 

247 fd.close = lambda: None # type: ignore 

248 yield fd 

249 fd.close = close # type: ignore 

250 

251 

252def as_binaryio(file: str | Path | typing.BinaryIO | None, mode: typing.Literal["rb", "wb"] = "rb") -> typing.BinaryIO: 

253 """ 

254 Convert a number of possible 'file' descriptions into a single BinaryIO interface. 

255 """ 

256 if isinstance(file, str): 

257 file = Path(file) 

258 if isinstance(file, Path): 

259 file = file.open(mode) 

260 if file is None: 

261 file = io.BytesIO() 

262 if isinstance(file, io.BytesIO): 

263 # so .read() works after .write(): 

264 file.seek(0) 

265 # so the with-statement doesn't close the in-memory file: 

266 file = uncloseable(file) # type: ignore 

267 

268 return file 

269 

270 

271_LEGACY_ENV_DEFAULTS_RE = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*):([^\-?=+][^}]*)}") 

272 

273 

274def _normalize_legacy_env_defaults(value: str) -> str: 

275 """ 

276 Rewrite legacy ${VAR:default} to ${VAR:-default} and warn once. 

277 """ 

278 if not _LEGACY_ENV_DEFAULTS_RE.search(value): 

279 return value 

280 

281 warnings.warn( 

282 "Legacy ${VAR:default} syntax is deprecated; use ${VAR:-default}. " 

283 "Support for the legacy form may be removed in a future release.", 

284 DeprecationWarning, 

285 stacklevel=3, 

286 ) 

287 

288 return _LEGACY_ENV_DEFAULTS_RE.sub(r"${\1:-\2}", value) 

289 

290 

291def expand_posix_vars(posix_expr: str, context: dict[str, str]) -> str: 

292 """ 

293 Replace case-insensitive POSIX and Docker Compose-like environment variables in a string with their values. 

294 

295 Args: 

296 posix_expr (str): The input string containing case-insensitive POSIX or Docker Compose-like variables. 

297 context (dict): A dictionary containing variable names and their respective values. 

298 

299 Returns: 

300 str: The string with replaced variable values. 

301 """ 

302 posix_expr = _normalize_legacy_env_defaults(posix_expr) 

303 return typing.cast(str, expand(posix_expr, environ=context)) 

304 

305 

306def expand_env_vars_into_toml_values( 

307 toml: dict[str, typing.Any], 

308 env: dict[str, typing.Any], 

309 *, 

310 case_insensitive: bool = True, 

311) -> None: 

312 """ 

313 Recursively expands POSIX/Docker Compose-like environment variables in a TOML dictionary. 

314 

315 This function traverses a TOML dictionary and expands POSIX/Docker Compose-like 

316 environment variables (${VAR:default}) using values provided in the 'env' dictionary. 

317 It performs in-place modification of the 'toml' dictionary. 

318 

319 Args: 

320 toml (dict): A TOML dictionary with string values possibly containing environment variables. 

321 env (dict): A dictionary containing environment variable names and their respective values. 

322 case_insensitive (bool): If True, treat environment keys as case-insensitive by adding 

323 upper/lower variants for lookup. Defaults to True. 

324 

325 Returns: 

326 None: The function modifies the 'toml' dictionary in place. 

327 

328 Notes: 

329 The function recursively traverses the 'toml' dictionary. If a value is a string or a list of strings, 

330 it attempts to substitute any environment variables found within those strings using the 'env' dictionary. 

331 

332 Example: 

333 toml_data = { 

334 'key1': 'This has ${ENV_VAR:default}', 

335 'key2': ['String with ${ANOTHER_VAR}', 'Another ${YET_ANOTHER_VAR}'] 

336 } 

337 environment = { 

338 'ENV_VAR': 'replaced_value', 

339 'ANOTHER_VAR': 'value_1', 

340 'YET_ANOTHER_VAR': 'value_2' 

341 } 

342 

343 expand_env_vars_into_toml_values(toml_data, environment) 

344 # 'toml_data' will be modified in place: 

345 # { 

346 # 'key1': 'This has replaced_value', 

347 # 'key2': ['String with value_1', 'Another value_2'] 

348 # } 

349 """ 

350 if not toml or not env: # pragma: no cover 

351 return 

352 

353 if case_insensitive: 

354 env_case: dict[str, typing.Any] = dict(env) 

355 for key, value in env.items(): 

356 upper = key.upper() 

357 lower = key.lower() 

358 if upper not in env_case: 

359 env_case[upper] = value 

360 if lower not in env_case: 

361 env_case[lower] = value 

362 env = env_case 

363 

364 for key, var in toml.items(): 

365 if isinstance(var, dict): 

366 expand_env_vars_into_toml_values(var, env, case_insensitive=case_insensitive) 

367 elif isinstance(var, list): 

368 toml[key] = [expand_posix_vars(value, env) if isinstance(value, str) else value for value in var] 

369 elif isinstance(var, str): 

370 toml[key] = expand_posix_vars(var, env) 

371 else: 

372 # nothing to substitute 

373 continue