Coverage for src/configuraptor/helpers.py: 100%
111 statements
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-22 11:27 +0200
« prev ^ index » next coverage.py v7.2.7, created at 2026-04-22 11:27 +0200
1"""
2Contains stand-alone helper functions.
3"""
5import contextlib
6import dataclasses as dc
7import io
8import math
9import re
10import types
11import typing
12import warnings
13from collections import ChainMap
14from pathlib import Path
16from expandvars import expand
17from typeguard import TypeCheckError
18from typeguard import check_type as _check_type
20try:
21 import annotationlib
22except ImportError: # pragma: no cover
23 annotationlib = None
26def camel_to_snake(s: str) -> str:
27 """
28 Convert CamelCase to snake_case.
30 Source:
31 https://stackoverflow.com/questions/1175208/elegant-python-function-to-convert-camelcase-to-snake-case
32 """
33 return "".join([f"_{c.lower()}" if c.isupper() else c for c in s]).lstrip("_")
36# def find_pyproject_toml() -> typing.Optional[str]:
37# """
38# Find the project's config toml, looks up until it finds the project root (black's logic).
39# """
40# return black.files.find_pyproject_toml((os.getcwd(),))
43def find_pyproject_toml(start_dir: typing.Optional[Path | str] = None) -> Path | None:
44 """
45 Search for pyproject.toml starting from the current working directory \
46 and moving upwards in the directory tree.
48 Args:
49 start_dir: Starting directory to begin the search.
50 If not provided, uses the current working directory.
52 Returns:
53 Path or None: Path object to the found pyproject.toml file, or None if not found.
54 """
55 start_dir = Path.cwd() if start_dir is None else Path(start_dir).resolve()
57 current_dir = start_dir
59 while str(current_dir) != str(current_dir.root):
60 pyproject_toml = current_dir / "pyproject.toml"
61 if pyproject_toml.is_file():
62 return pyproject_toml
63 current_dir = current_dir.parent
65 # If not found anywhere
66 return None
69Type = typing.Type[typing.Any]
72def _cls_annotations(c: type) -> dict[str, type]: # pragma: no cover
73 """
74 Functions to get the annotations of a class (excl inherited, use _all_annotations for that).
76 Uses `annotationlib` if available (since 3.14) and if so, resolves forward references immediately.
77 """
78 if annotationlib:
79 return typing.cast(
80 dict[str, type],
81 annotationlib.get_annotations(c, format=annotationlib.Format.VALUE, eval_str=True),
82 )
83 else:
84 # note: idk why but this is not equivalent (the first doesn't work well):
85 # return getattr(c, "__annotations__", {})
86 return c.__dict__.get("__annotations__") or {}
89def _all_annotations(cls: type) -> ChainMap[str, type]:
90 """
91 Returns a dictionary-like ChainMap that includes annotations for all \
92 attributes defined in cls or inherited from superclasses.
93 """
94 # chainmap reverses the iterable, so reverse again beforehand to keep order normally:
96 return ChainMap(*(_cls_annotations(c) for c in getattr(cls, "__mro__", [])))
99def all_annotations(cls: Type, _except: typing.Iterable[str] = None) -> dict[str, type[object]]:
100 """
101 Wrapper around `_all_annotations` that filters away any keys in _except.
103 It also flattens the ChainMap to a regular dict.
104 """
105 if _except is None:
106 _except = set()
108 _all = _all_annotations(cls)
109 return {k: v for k, v in _all.items() if k not in _except}
112T = typing.TypeVar("T")
115def check_type(value: typing.Any, expected_type: typing.Type[T]) -> typing.TypeGuard[T]:
116 """
117 Given a variable, check if it matches 'expected_type' (which can be a Union, parameterized generic etc.).
119 Based on typeguard but this returns a boolean instead of returning the value or throwing a TypeCheckError
120 """
121 try:
122 _check_type(value, expected_type)
123 return True
124 except TypeCheckError:
125 return False
128def is_builtin_type(_type: Type) -> bool:
129 """
130 Returns whether _type is one of the builtin types.
131 """
132 return _type.__module__ in ("__builtin__", "builtins")
135# def is_builtin_class_instance(obj: typing.Any) -> bool:
136# return is_builtin_type(obj.__class__)
139def is_from_types_or_typing(_type: Type) -> bool:
140 """
141 Returns whether _type is one of the stlib typing/types types.
143 e.g. types.UnionType or typing.Union
144 """
145 return _type.__module__ in ("types", "typing")
148def is_from_other_toml_supported_module(_type: Type) -> bool:
149 """
150 Besides builtins, toml also supports 'datetime' and 'math' types, \
151 so this returns whether _type is a type from these stdlib modules.
152 """
153 return _type.__module__ in ("datetime", "math")
156def is_parameterized(_type: Type) -> bool:
157 """
158 Returns whether _type is a parameterized type.
160 Examples:
161 list[str] -> True
162 str -> False
163 """
164 return typing.get_origin(_type) is not None
167def is_custom_class(_type: Type) -> bool:
168 """
169 Tries to guess if _type is a builtin or a custom (user-defined) class.
171 Other logic in this module depends on knowing that.
172 """
173 return (
174 type(_type) is type
175 and not is_builtin_type(_type)
176 and not is_from_other_toml_supported_module(_type)
177 and not is_from_types_or_typing(_type)
178 )
181def instance_of_custom_class(var: typing.Any) -> bool:
182 """
183 Calls `is_custom_class` on an instance of a (possibly custom) class.
184 """
185 return is_custom_class(var.__class__)
188def is_union(sometype: typing.Type[typing.Any] | typing.Any) -> bool:
189 """
190 Determines if a given type is a Union type.
192 A Union type in Python is used to represent a type that can be one of multiple
193 types. This function checks whether the provided type object corresponds to a
194 Union type as defined in Python's type hints or annotations.
196 Returns:
197 bool
198 True if the provided type is a Union type, False otherwise.
199 """
200 origin = typing.get_origin(sometype)
201 return origin in (typing.Union, types.UnionType)
204def is_optional(_type: Type | typing.Any) -> bool:
205 """
206 Tries to guess if _type could be optional.
208 Examples:
209 None -> True
210 NoneType -> True
211 typing.Union[str, None] -> True
212 str | None -> True
213 list[str | None] -> False
214 list[str] -> False
215 """
216 if _type and (is_parameterized(_type) and typing.get_origin(_type) in (dict, list)) or (_type is math.nan):
217 # e.g. list[str]
218 # will crash issubclass to test it first here
219 return False
221 try:
222 return (
223 _type is None
224 or types.NoneType in typing.get_args(_type) # union with Nonetype
225 or issubclass(types.NoneType, _type)
226 or issubclass(types.NoneType, type(_type)) # no type # Nonetype
227 )
228 except TypeError:
229 # probably some weird input that's not a type
230 return False
233def dataclass_field(cls: Type, key: str) -> typing.Optional[dc.Field[typing.Any]]:
234 """
235 Get Field info for a dataclass cls.
236 """
237 fields = getattr(cls, "__dataclass_fields__", {})
238 return fields.get(key)
241@contextlib.contextmanager
242def uncloseable(fd: typing.BinaryIO) -> typing.Generator[typing.BinaryIO, typing.Any, None]:
243 """
244 Context manager which turns the fd's close operation to no-op for the duration of the context.
245 """
246 close = fd.close
247 fd.close = lambda: None # type: ignore
248 yield fd
249 fd.close = close # type: ignore
252def as_binaryio(file: str | Path | typing.BinaryIO | None, mode: typing.Literal["rb", "wb"] = "rb") -> typing.BinaryIO:
253 """
254 Convert a number of possible 'file' descriptions into a single BinaryIO interface.
255 """
256 if isinstance(file, str):
257 file = Path(file)
258 if isinstance(file, Path):
259 file = file.open(mode)
260 if file is None:
261 file = io.BytesIO()
262 if isinstance(file, io.BytesIO):
263 # so .read() works after .write():
264 file.seek(0)
265 # so the with-statement doesn't close the in-memory file:
266 file = uncloseable(file) # type: ignore
268 return file
271_LEGACY_ENV_DEFAULTS_RE = re.compile(r"\$\{([A-Za-z_][A-Za-z0-9_]*):([^\-?=+][^}]*)}")
274def _normalize_legacy_env_defaults(value: str) -> str:
275 """
276 Rewrite legacy ${VAR:default} to ${VAR:-default} and warn once.
277 """
278 if not _LEGACY_ENV_DEFAULTS_RE.search(value):
279 return value
281 warnings.warn(
282 "Legacy ${VAR:default} syntax is deprecated; use ${VAR:-default}. "
283 "Support for the legacy form may be removed in a future release.",
284 DeprecationWarning,
285 stacklevel=3,
286 )
288 return _LEGACY_ENV_DEFAULTS_RE.sub(r"${\1:-\2}", value)
291def expand_posix_vars(posix_expr: str, context: dict[str, str]) -> str:
292 """
293 Replace case-insensitive POSIX and Docker Compose-like environment variables in a string with their values.
295 Args:
296 posix_expr (str): The input string containing case-insensitive POSIX or Docker Compose-like variables.
297 context (dict): A dictionary containing variable names and their respective values.
299 Returns:
300 str: The string with replaced variable values.
301 """
302 posix_expr = _normalize_legacy_env_defaults(posix_expr)
303 return typing.cast(str, expand(posix_expr, environ=context))
306def expand_env_vars_into_toml_values(
307 toml: dict[str, typing.Any],
308 env: dict[str, typing.Any],
309 *,
310 case_insensitive: bool = True,
311) -> None:
312 """
313 Recursively expands POSIX/Docker Compose-like environment variables in a TOML dictionary.
315 This function traverses a TOML dictionary and expands POSIX/Docker Compose-like
316 environment variables (${VAR:default}) using values provided in the 'env' dictionary.
317 It performs in-place modification of the 'toml' dictionary.
319 Args:
320 toml (dict): A TOML dictionary with string values possibly containing environment variables.
321 env (dict): A dictionary containing environment variable names and their respective values.
322 case_insensitive (bool): If True, treat environment keys as case-insensitive by adding
323 upper/lower variants for lookup. Defaults to True.
325 Returns:
326 None: The function modifies the 'toml' dictionary in place.
328 Notes:
329 The function recursively traverses the 'toml' dictionary. If a value is a string or a list of strings,
330 it attempts to substitute any environment variables found within those strings using the 'env' dictionary.
332 Example:
333 toml_data = {
334 'key1': 'This has ${ENV_VAR:default}',
335 'key2': ['String with ${ANOTHER_VAR}', 'Another ${YET_ANOTHER_VAR}']
336 }
337 environment = {
338 'ENV_VAR': 'replaced_value',
339 'ANOTHER_VAR': 'value_1',
340 'YET_ANOTHER_VAR': 'value_2'
341 }
343 expand_env_vars_into_toml_values(toml_data, environment)
344 # 'toml_data' will be modified in place:
345 # {
346 # 'key1': 'This has replaced_value',
347 # 'key2': ['String with value_1', 'Another value_2']
348 # }
349 """
350 if not toml or not env: # pragma: no cover
351 return
353 if case_insensitive:
354 env_case: dict[str, typing.Any] = dict(env)
355 for key, value in env.items():
356 upper = key.upper()
357 lower = key.lower()
358 if upper not in env_case:
359 env_case[upper] = value
360 if lower not in env_case:
361 env_case[lower] = value
362 env = env_case
364 for key, var in toml.items():
365 if isinstance(var, dict):
366 expand_env_vars_into_toml_values(var, env, case_insensitive=case_insensitive)
367 elif isinstance(var, list):
368 toml[key] = [expand_posix_vars(value, env) if isinstance(value, str) else value for value in var]
369 elif isinstance(var, str):
370 toml[key] = expand_posix_vars(var, env)
371 else:
372 # nothing to substitute
373 continue