Coverage for C: \ Users \ peaco \ OneDrive \ Documents \ GitHub \ mt_metadata \ mt_metadata \ base \ pydantic_helpers.py: 66%
253 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-10 00:11 -0800
1"""
2Field introspection utilities for Pydantic BaseModel classes with
3lazy in-memory caching and optional on-disk caching.
5This module builds a JSON-serializable nested "field tree" for any
6Pydantic BaseModel, avoiding instantiation and guarding against
7infinite recursion.
9Leaf nodes are serializable summaries and include:
10 - type
11 - default
12 - deprecated
13 - description
14 - title
15 - default_factory (if present)
16 - enum (for Enum/Literal types)
17 - enum_names (for Enum subclasses)
18 - examples (from Field(..., json_schema_extra={'examples': [...]}))
19 - required (from Field(..., json_schema_extra={'required': True/False}))
20 - units (from Field(..., json_schema_extra={'units': '...'}))
21 - has_validators (True if any field validators are present)
22 - constraints:
23 * ge, le, gt, lt
24 * multiple_of
25 * min_length, max_length, pattern
26 * min_items, max_items, unique_items
27 * const, format
28 * nullable
30Nested nodes represent BaseModel-typed fields and contain further trees.
32Notes
33-----
34- List, Dict, and Union types are treated as simple fields (non-expanded),
35 unless the Union directly contains a BaseModel, in which case the first
36 BaseModel type is expanded.
37- A special-case hook (`SPECIAL_CASE_MODEL_NAMES`) lets you treat certain
38 BaseModel types (e.g., "MTime") as simple fields.
39- Constraints are derived from Pydantic's JSON Schema via `TypeAdapter(annotation).json_schema()`.
40"""
42from __future__ import annotations
44import enum
45import hashlib
46import json
47import os
48import sys
49from pathlib import Path
50from threading import RLock
51from typing import Annotated, Any, Dict, get_args, get_origin, Literal, Optional, Union
53from pydantic import __version__ as _PYDANTIC_VERSION
54from pydantic import TypeAdapter # Pydantic v2
55from pydantic import BaseModel
58# try:
59# # Optional dependency for platform-aware cache directory
60# from platformdirs import user_cache_dir
61# except Exception: # pragma: no cover - optional
62# user_cache_dir = None # Fallback handled below
65# -------------------------------
66# Configuration & Globals
67# -------------------------------
69APP_NAME = "mt_metadata"
71# Treat these BaseModel names as simple fields (no expansion)
72SPECIAL_CASE_MODEL_NAMES = {"MTime"}
74# Thread-safe in-memory cache of computed field trees (per class)
75_FIELDS_TREE_CACHE: Dict[type[BaseModel], Dict[str, Any]] = {}
76_CACHE_LOCK = RLock()
78# Environment flag to disable disk caching (e.g., for tests)
79_DISABLE_DISK_CACHE = os.environ.get("MT_METADATA_DISABLE_DISK_CACHE", "0") in {
80 "1",
81 "true",
82 "True",
83}
86# -------------------------------
87# Public API
88# -------------------------------
91def get_all_fields_serializable(
92 model_or_cls: Union[type[BaseModel], BaseModel],
93) -> Dict[str, Any]:
94 """
95 Build a JSON-serializable nested dictionary of fields for a Pydantic BaseModel.
97 This function avoids instantiating models, caches results in memory,
98 and (optionally) persists/retrieves the serialized tree to/from disk.
100 Parameters
101 ----------
102 model_or_cls : type[BaseModel] or BaseModel
103 The BaseModel class (preferred) or an instance. If an instance is provided,
104 its class will be used.
106 Returns
107 -------
108 Dict[str, Any]
109 A nested, JSON-serializable dictionary describing the model's fields.
110 Leaf nodes are field summaries; nested nodes correspond to BaseModel-typed fields.
112 Notes
113 -----
114 - Uses a sentinel write to the cache prior to recursion to break cycles.
115 - The on-disk cache file name is derived from the class's fully-qualified name,
116 Pydantic version, and a fingerprint of the field schema.
117 """
118 model_cls: type[BaseModel] = (
119 model_or_cls if isinstance(model_or_cls, type) else type(model_or_cls)
120 )
122 with _CACHE_LOCK:
123 # In-memory hit
124 if model_cls in _FIELDS_TREE_CACHE:
125 return _FIELDS_TREE_CACHE[model_cls]
127 # Try disk cache
128 if not _DISABLE_DISK_CACHE:
129 disk = _load_fields_from_disk(model_cls)
130 if disk is not None:
131 _FIELDS_TREE_CACHE[model_cls] = disk
132 return disk
134 # Sentinel to break cycles
135 _FIELDS_TREE_CACHE[model_cls] = {}
137 # Compute and persist
138 tree = _compute_fields_tree(model_cls)
139 _FIELDS_TREE_CACHE[model_cls] = tree
141 if not _DISABLE_DISK_CACHE:
142 _save_fields_to_disk(model_cls, tree)
144 return tree
147def flatten_field_tree_map(
148 tree: Dict[str, Any], prefix: str = ""
149) -> Dict[str, Dict[str, Any]]:
150 """
151 Flatten a nested field tree (as returned by `get_all_fields_serializable`) into
152 a dictionary keyed by dotted field paths, where each value is the leaf field's
153 serializable summary.
155 Parameters
156 ----------
157 tree : Dict[str, Any]
158 The nested field tree. Leaf nodes are dicts that contain `"__field__": True`;
159 nested nodes are dictionaries whose values are more field trees.
160 prefix : str, optional
161 A prefix to prepend to each key (useful when flattening under a known root),
162 by default "".
164 Returns
165 -------
166 Dict[str, Dict[str, Any]]
167 A mapping from dotted paths (e.g., "inner.a") to the corresponding leaf summary
168 dictionaries (e.g., {"__field__": True, "type": "<class 'int'>", ...}).
170 Notes
171 -----
172 - Only leaf nodes marked with `"__field__": True` are included in the output.
173 - Nested BaseModel nodes (i.e., dictionaries without `"__field__": True") are traversed.
174 - Keys are constructed using dot notation to reflect the hierarchy.
175 """
176 out: Dict[str, Dict[str, Any]] = {}
178 for name, node in tree.items():
179 path = f"{prefix}.{name}" if prefix else name
181 # Leaf: field summary dicts have "__field__": True
182 if isinstance(node, dict) and node.get("__field__") is True:
183 out[path] = node
184 continue
186 # Nested: recurse into sub-dicts that are not leaf summaries
187 if isinstance(node, dict):
188 out.update(flatten_field_tree_map(node, path))
190 return out
193def clear_field_caches() -> None:
194 """
195 Clear the in-memory field tree cache.
197 This does not remove any on-disk cache files.
198 """
199 with _CACHE_LOCK:
200 _FIELDS_TREE_CACHE.clear()
203# -------------------------------
204# Internal helpers
205# -------------------------------
208def _compute_fields_tree(model_cls: type[BaseModel]) -> Dict[str, Any]:
209 """
210 Compute the nested, serializable field tree for a BaseModel class.
212 Parameters
213 ----------
214 model_cls : type[BaseModel]
215 The Pydantic BaseModel subclass to introspect.
217 Returns
218 -------
219 Dict[str, Any]
220 Nested dict of fields; leaf nodes are serializable summaries.
222 Notes
223 -----
224 - Uses the public `model_fields` API where available; falls back to `__pydantic_fields__`.
225 - Skips fields marked as deprecated (if `FieldInfo.deprecated` is present).
226 - Computes `has_validators` flags by inspecting model-level decorators.
227 """
228 validators_map = _collect_field_validator_map(model_cls)
229 field_map = getattr(model_cls, "model_fields", None) or getattr(
230 model_cls, "__pydantic_fields__", {}
231 )
232 out: Dict[str, Any] = {}
234 for field_name, field_info in field_map.items():
235 deprecated = getattr(field_info, "deprecated", None)
236 if deprecated is not None:
237 continue
239 annotation = getattr(field_info, "annotation", None)
240 base_type = _extract_base_type(annotation)
242 if (
243 base_type
244 and _is_basemodel_subclass(base_type)
245 and base_type.__name__ not in SPECIAL_CASE_MODEL_NAMES
246 ):
247 out[field_name] = get_all_fields_serializable(base_type)
248 else:
249 out[field_name] = _to_serializable_field(
250 field_info, model_cls, field_name, validators_map
251 )
253 return out
256def _extract_base_type(annotation: Any) -> Any:
257 """
258 Extract a primary base type from complex type annotations (Optional/Union, Annotated, List, Dict).
260 Parameters
261 ----------
262 annotation : Any
263 The annotation to inspect.
265 Returns
266 -------
267 Any or None
268 The extracted base type if a direct class can be resolved, otherwise None.
270 Notes
271 -----
272 - Annotated[T, ...] unwraps to T.
273 - List[T] and Dict[K, V] return None (treated as simple fields).
274 - Union[...] returns the first BaseModel subtype if present; otherwise the first non-None type.
275 """
276 if annotation is None:
277 return None
279 origin = get_origin(annotation)
280 args = get_args(annotation)
282 if origin is Annotated:
283 return _extract_base_type(args[0]) if args else None
284 if origin in (list,) or (hasattr(origin, "__name__") and origin.__name__ == "list"):
285 return None
286 if origin in (dict,) or (
287 hasattr(origin, "__name__") and origin.__name__ in {"dict", "Dict"}
288 ):
289 return None
290 if origin and (
291 origin is Union or getattr(origin, "__name__", "") in {"Union", "UnionType"}
292 ):
293 for arg in args:
294 if _is_basemodel_subclass(arg):
295 return arg
296 for arg in args:
297 if arg is not type(None):
298 return _extract_base_type(arg)
299 return None
301 if isinstance(annotation, type) and annotation is not type(None):
302 return annotation
304 return None
307def _is_basemodel_subclass(cls: Any) -> bool:
308 """
309 Check whether a class is a subclass of Pydantic BaseModel with field metadata.
311 Parameters
312 ----------
313 cls : Any
314 The candidate class.
316 Returns
317 -------
318 bool
319 True if `cls` is a BaseModel subclass with field metadata, else False.
320 """
321 try:
322 return (
323 isinstance(cls, type)
324 and issubclass(cls, BaseModel)
325 and (hasattr(cls, "model_fields") or hasattr(cls, "__pydantic_fields__"))
326 )
327 except Exception:
328 return False
331def _to_serializable_field(
332 field_info: Any,
333 model_cls: type[BaseModel],
334 field_name: str,
335 validators_map: Dict[str, bool],
336) -> Dict[str, Any]:
337 """
338 Convert a Pydantic FieldInfo into a JSON-serializable summary dict, enriched with
339 enum values/names, examples, required, units, validators presence, and constraints.
341 Parameters
342 ----------
343 field_info : Any
344 The Pydantic FieldInfo-like object.
345 model_cls : type[BaseModel]
346 The BaseModel class owning the field, used to resolve validators presence.
347 field_name : str
348 The name of the field on the model.
349 validators_map : Dict[str, bool]
350 A mapping from field name to a boolean indicating if any validators target that field.
352 Returns
353 -------
354 Dict[str, Any]
355 A serializable summary including type, default, doc metadata, enum info,
356 examples, `required`, `units`, `has_validators`, and `constraints`.
357 """
358 ann = getattr(field_info, "annotation", None)
360 enum_values, enum_names = _extract_enum_info(ann)
361 extras = _extract_json_schema_extras(field_info) # examples, required, units
362 constraints = _extract_constraints(ann)
364 summary = {
365 "__field__": True,
366 "type": repr(ann),
367 "default": _safe_repr(getattr(field_info, "default", None)),
368 "deprecated": _safe_repr(getattr(field_info, "deprecated", None)),
369 "description": getattr(field_info, "description", None),
370 "enum": enum_values,
371 "enum_names": enum_names,
372 "examples": extras.get("examples"),
373 "required": extras.get("required"),
374 "units": extras.get("units"),
375 "has_validators": bool(validators_map.get(field_name, False)),
376 "constraints": constraints or {},
377 }
379 default_factory = getattr(field_info, "default_factory", None)
380 if default_factory is not None:
381 summary["default_factory"] = repr(default_factory)
383 return summary
386def _extract_json_schema_extras(field_info: Any) -> Dict[str, Any]:
387 """
388 Extract selected keys from a FieldInfo's `json_schema_extra`.
390 Parameters
391 ----------
392 field_info : Any
393 The Pydantic FieldInfo-like object.
395 Returns
396 -------
397 Dict[str, Any]
398 A dictionary possibly containing:
399 - "examples": list or None
400 - "required": bool or None
401 - "units": str or None
403 Notes
404 -----
405 - Ensures `examples` are JSON-serializable; falls back to `repr(...)` for complex items.
406 - Passes through `required` and `units` if present (no type coercion beyond JSON compatibility).
407 """
408 out: Dict[str, Any] = {"examples": None, "required": None, "units": None}
409 extra = getattr(field_info, "json_schema_extra", None)
410 if not isinstance(extra, dict):
411 return out
413 # examples
414 ex = extra.get("examples")
415 if ex is not None:
416 try:
417 json.dumps(ex)
418 out["examples"] = ex
419 except Exception:
420 out["examples"] = (
421 [repr(item) for item in ex]
422 if isinstance(ex, (list, tuple, set))
423 else repr(ex)
424 )
426 # required
427 # Note: in Pydantic, "required" is typically controlled at the model level,
428 # but if your project uses json_schema_extra to signal requiredness, we surface it.
429 req = extra.get("required")
430 if isinstance(req, bool):
431 out["required"] = req
432 elif req is not None:
433 # Allow strings like "true"/"false" to be normalized
434 if str(req).lower() in {"true", "1"}:
435 out["required"] = True
436 elif str(req).lower() in {"false", "0"}:
437 out["required"] = False
438 else:
439 out["required"] = repr(req) # preserve value, but keep serializable
441 # units
442 units = extra.get("units")
443 if units is not None:
444 try:
445 json.dumps(units)
446 out["units"] = units
447 except Exception:
448 out["units"] = repr(units)
450 return out
453def _extract_enum_info(annotation: Any) -> tuple[list[Any] | None, list[str] | None]:
454 """
455 Extract enum values and names from annotations that are Enum subclasses or Literal[...] types.
457 Parameters
458 ----------
459 annotation : Any
460 The type annotation to inspect.
462 Returns
463 -------
464 tuple
465 (enum_values, enum_names)
466 - enum_values : list or None
467 The list of allowed values for the field (primitive values preferred).
468 - enum_names : list of str or None
469 Enum member names if the annotation is an Enum subclass; otherwise None.
470 """
471 if annotation is None:
472 return None, None
474 try:
475 if isinstance(annotation, type) and issubclass(annotation, enum.Enum):
476 values = [m.value for m in annotation]
477 names = [m.name for m in annotation]
478 return values, names
479 except Exception:
480 pass
482 origin = get_origin(annotation)
483 if origin is Literal:
484 args = list(get_args(annotation))
485 values: list[Any] = []
486 for v in args:
487 try:
488 json.dumps(v)
489 values.append(v)
490 except Exception:
491 values.append(repr(v))
492 return values, None
494 return None, None
497def _extract_constraints(annotation: Any) -> Dict[str, Any] | None:
498 """
499 Extract constraints from the type annotation using Pydantic's JSON Schema.
501 Parameters
502 ----------
503 annotation : Any
504 The type annotation to inspect.
506 Returns
507 -------
508 Dict[str, Any] or None
509 A dictionary of constraints (ge, le, gt, lt, multiple_of, min_length, max_length,
510 pattern, min_items, max_items, unique_items, const, format, nullable).
511 Returns None if no constraints can be extracted.
513 Notes
514 -----
515 - Uses `TypeAdapter(annotation).json_schema()` to derive constraints.
516 """
517 if annotation is None:
518 return None
520 try:
521 schema = TypeAdapter(annotation).json_schema()
522 except Exception:
523 return None
525 def _is_nullable(s: Dict[str, Any]) -> bool:
526 if s.get("nullable") is True:
527 return True
528 t = s.get("type")
529 if isinstance(t, list) and "null" in t:
530 return True
531 for key in ("anyOf", "oneOf", "allOf"):
532 for sub in s.get(key, []) or []:
533 if isinstance(sub, dict) and sub.get("type") == "null":
534 return True
535 return False
537 constraints: Dict[str, Any] = {
538 "ge": schema.get("minimum"),
539 "le": schema.get("maximum"),
540 "gt": schema.get("exclusiveMinimum"),
541 "lt": schema.get("exclusiveMaximum"),
542 "multiple_of": schema.get("multipleOf"),
543 "min_length": schema.get("minLength"),
544 "max_length": schema.get("maxLength"),
545 "pattern": schema.get("pattern"),
546 "min_items": schema.get("minItems"),
547 "max_items": schema.get("maxItems"),
548 "unique_items": schema.get("uniqueItems"),
549 "const": schema.get("const"),
550 "format": schema.get("format"),
551 "nullable": _is_nullable(schema),
552 }
554 return {k: v for k, v in constraints.items() if v is not None}
557def _safe_repr(obj: Any) -> Any:
558 """
559 Safely repr() an object for serialization, returning None if repr fails.
561 Parameters
562 ----------
563 obj : Any
564 The object to represent.
566 Returns
567 -------
568 Any
569 The repr string of the object, or None if not representable.
570 """
571 try:
572 return repr(obj) if obj is not None else None
573 except Exception:
574 return None
577def _collect_field_validator_map(model_cls: type[BaseModel]) -> Dict[str, bool]:
578 """
579 Collect a mapping of field names to a boolean indicating presence of field validators.
581 Parameters
582 ----------
583 model_cls : type[BaseModel]
584 The BaseModel class to inspect.
586 Returns
587 -------
588 Dict[str, bool]
589 Mapping from field name to True/False, where True means at least one
590 field validator is declared for that field on the model.
592 Notes
593 -----
594 - Best-effort for Pydantic v2 by introspecting `__pydantic_decorators__`.
595 """
596 result: Dict[str, bool] = {}
597 decs = getattr(model_cls, "__pydantic_decorators__", None)
598 if decs is None:
599 return result
601 fv = getattr(decs, "field_validators", None)
602 if isinstance(fv, dict):
603 for fname, validators in fv.items():
604 result[fname] = bool(validators)
606 vals = getattr(decs, "validators", None)
607 if vals:
608 for v in vals:
609 fields = getattr(v, "fields", None) or getattr(v, "field", None)
610 if fields is None:
611 continue
612 if isinstance(fields, (list, tuple, set)):
613 for fname in fields:
614 result[fname] = True
615 elif isinstance(fields, str):
616 result[fields] = True
618 return result
621# -------------------------------
622# Disk cache utilities
623# -------------------------------
626def _cache_dir() -> str:
627 """
628 Resolve a user-specific cache directory for the application using only the stdlib.
630 Priority
631 --------
632 1. Environment variable override: MT_METADATA_CACHE_DIR
633 2. OS-specific conventional cache directories:
634 - Linux: $XDG_CACHE_HOME or ~/.cache/<APP_NAME>
635 - macOS: ~/Library/Caches/<APP_NAME>
636 - Windows: %LOCALAPPDATA%\\<APP_NAME> or ~/AppData/Local/<APP_NAME>
638 Returns
639 -------
640 str
641 Absolute path to the cache directory. The directory is created if it does not exist.
643 Notes
644 -----
645 - Uses only Python's standard library (no external dependencies).
646 - Provides a portable behavior that aligns with common platform conventions.
647 """
648 # 1) Explicit override
649 override = os.environ.get("MT_METADATA_CACHE_DIR")
650 if override:
651 path = Path(override).expanduser().resolve()
652 path.mkdir(parents=True, exist_ok=True)
653 return str(path)
655 # 2) Platform-specific default
656 plat = sys.platform
657 home = Path.home()
659 if plat.startswith("linux"):
660 base = Path(os.environ.get("XDG_CACHE_HOME", home / ".cache"))
661 path = base / APP_NAME
663 elif plat == "darwin":
664 # macOS: ~/Library/Caches/<APP_NAME>
665 path = home / "Library" / "Caches" / APP_NAME
667 elif plat.startswith("win"):
668 # Windows: %LOCALAPPDATA% preferred, else fallback
669 local_appdata = os.environ.get("LOCALAPPDATA")
670 if local_appdata:
671 path = Path(local_appdata) / APP_NAME
672 else:
673 path = home / "AppData" / "Local" / APP_NAME
675 else:
676 # Fallback for unknown platforms
677 path = home / ".cache" / APP_NAME
679 path.mkdir(parents=True, exist_ok=True)
680 return str(path)
683# def _cache_dir() -> str:
684# """
685# Resolve a user-specific cache directory for the application.
687# Returns
688# -------
689# str
690# The path to the cache directory.
692# Notes
693# -----
694# - Uses `platformdirs.user_cache_dir(APP_NAME)` if available; otherwise
695# falls back to `~/.cache/<APP_NAME>`.
696# """
697# if user_cache_dir is not None:
698# path = user_cache_dir(APP_NAME)
699# else:
700# path = os.path.join(os.path.expanduser("~"), ".cache", APP_NAME)
701# os.makedirs(path, exist_ok=True)
702# return path
705def _model_fingerprint(model_cls: type[BaseModel]) -> str:
706 """
707 Compute a stable fingerprint for a model class's fields.
709 Parameters
710 ----------
711 model_cls : type[BaseModel]
712 The model class to fingerprint.
714 Returns
715 -------
716 str
717 A SHA-256 hex digest representing the schema shape.
719 Notes
720 -----
721 - Uses a sorted JSON of tuples:
722 (name, deprecated, annotation repr, default repr, json_schema_extra snapshot)
723 - Change in any of these will produce a different fingerprint, refreshing the disk cache.
724 """
725 field_map = getattr(model_cls, "model_fields", None) or getattr(
726 model_cls, "__pydantic_fields__", {}
727 )
728 parts = []
729 for name, info in field_map.items():
730 extra = getattr(info, "json_schema_extra", None)
731 if isinstance(extra, dict):
732 try:
733 extra_snapshot = json.dumps(
734 extra, sort_keys=True, separators=(",", ":"), ensure_ascii=False
735 )
736 except Exception:
737 extra_snapshot = []
738 print("Non-serializable json_schema_extra for field:", name)
739 for key, value in sorted(extra.items()):
740 extra_snapshot.append((key, _safe_repr(value)))
741 else:
742 extra_snapshot = _safe_repr(extra)
743 parts.append(
744 (
745 name,
746 _safe_repr(getattr(info, "deprecated", None)),
747 repr(getattr(info, "annotation", None)),
748 _safe_repr(getattr(info, "default", None)),
749 extra_snapshot,
750 )
751 )
752 raw = json.dumps(sorted(parts), separators=(",", ":"), ensure_ascii=False)
753 return hashlib.sha256(raw.encode("utf-8")).hexdigest()
756def _disk_cache_path(model_cls: type[BaseModel]) -> str:
757 """
758 Construct the on-disk cache path for a given model class.
760 Parameters
761 ----------
762 model_cls : type[BaseModel]
763 The model class.
765 Returns
766 -------
767 str
768 Absolute path to the cache JSON file.
769 """
770 fqname = f"{model_cls.__module__}.{model_cls.__qualname__}"
771 fp = _model_fingerprint(model_cls)
772 fname = f"{fqname}__pyd{_PYDANTIC_VERSION}__{fp}.json"
773 return os.path.join(_cache_dir(), fname)
776def _load_fields_from_disk(model_cls: type[BaseModel]) -> Dict[str, Any] | None:
777 """
778 Load a serialized field tree from disk cache if present.
780 Parameters
781 ----------
782 model_cls : type[BaseModel]
783 The model class.
785 Returns
786 -------
787 Dict[str, Any] or None
788 The field tree if found, otherwise None.
790 Notes
791 -----
792 - Returns None on any read/parse error.
793 """
794 path = _disk_cache_path(model_cls)
795 if not os.path.exists(path):
796 return None
797 try:
798 with open(path, "r", encoding="utf-8") as f:
799 return json.load(f)
800 except Exception:
801 return None
804def _save_fields_to_disk(model_cls: type[BaseModel], tree: Dict[str, Any]) -> None:
805 """
806 Persist a serialized field tree to disk cache.
808 Parameters
809 ----------
810 model_cls : type[BaseModel]
811 The model class.
812 tree : Dict[str, Any]
813 The serialized field tree.
815 Returns
816 -------
817 None
819 Notes
820 -----
821 - Overwrites any existing file for the same model fingerprint.
822 """
823 path = _disk_cache_path(model_cls)
824 try:
825 with open(path, "w", encoding="utf-8") as f:
826 json.dump(tree, f, indent=2, ensure_ascii=False)
827 except Exception:
828 # Best-effort caching; ignore write errors
829 pass