Coverage for src / apcore_cli / ref_resolver.py: 97%
74 statements
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
« prev ^ index » next coverage.py v7.13.0, created at 2026-04-26 10:23 +0800
1"""$ref resolution and schema composition (FE-02)."""
3from __future__ import annotations
5import copy
6from typing import Any
9class RefResolverError(Exception):
10 """Base class for $ref resolution errors."""
13class CircularRefError(RefResolverError):
14 """Raised when a circular $ref is detected."""
17class UnresolvableRefError(RefResolverError):
18 """Raised when a $ref target cannot be found."""
21class MaxDepthExceededError(RefResolverError):
22 """Raised when $ref resolution depth exceeds the configured maximum."""
25def resolve_refs(schema: dict, max_depth: int = 32, module_id: str = "") -> dict:
26 """Resolve all $ref references in a JSON Schema.
28 Returns a fully inlined schema with $defs/definitions removed.
29 """
30 schema = copy.deepcopy(schema)
31 defs = schema.get("$defs", schema.get("definitions", {}))
32 result = _resolve_node(schema, defs, visited=set(), depth=0, max_depth=max_depth, module_id=module_id)
34 # Remove definition keys
35 result.pop("$defs", None)
36 result.pop("definitions", None)
37 return result
40def _resolve_node(
41 node: Any,
42 defs: dict,
43 visited: set,
44 depth: int,
45 max_depth: int,
46 module_id: str = "",
47) -> Any:
48 """Recursively resolve $ref, allOf, anyOf, oneOf in a schema node."""
49 if not isinstance(node, dict):
50 return node
52 # Handle $ref
53 if "$ref" in node:
54 ref_path = node["$ref"]
56 if depth >= max_depth:
57 raise MaxDepthExceededError(
58 f"$ref resolution depth exceeded maximum of {max_depth} for module '{module_id}'."
59 )
61 if ref_path in visited:
62 raise CircularRefError(f"Circular $ref detected in schema for module '{module_id}' at path '{ref_path}'.")
64 # Parse ref target: extract key from "#/$defs/Address" → "Address"
65 parts = ref_path.split("/")
66 key = parts[-1]
68 if key not in defs:
69 raise UnresolvableRefError(f"Unresolvable $ref '{ref_path}' in schema for module '{module_id}'.")
71 visited = visited | {ref_path}
72 return _resolve_node(defs[key], defs, visited, depth + 1, max_depth, module_id)
74 # Handle allOf
75 if "allOf" in node:
76 merged: dict[str, Any] = {"properties": {}, "required": []}
77 # Merge sibling properties/required from the composing node itself first
78 # so the composition branches can extend (not overwrite) them.
79 if isinstance(node.get("properties"), dict):
80 merged["properties"].update(node["properties"])
81 if isinstance(node.get("required"), list):
82 merged["required"].extend(node["required"])
83 for sub_schema in node["allOf"]:
84 resolved = _resolve_node(sub_schema, defs, visited, depth + 1, max_depth, module_id)
85 if "properties" in resolved:
86 merged["properties"].update(resolved["properties"])
87 if "required" in resolved:
88 merged["required"].extend(resolved["required"])
89 # Copy remaining non-composition keys (skip already-handled ones)
90 for k, v in node.items():
91 if k not in ("allOf", "properties", "required") and k not in merged:
92 merged[k] = v
93 return merged
95 # Handle anyOf / oneOf
96 for keyword in ("anyOf", "oneOf"):
97 if keyword in node:
98 merged = {"properties": {}, "required": []}
99 # Merge sibling properties from the composing node first.
100 if isinstance(node.get("properties"), dict):
101 merged["properties"].update(node["properties"])
102 sibling_required: list[str] = list(node["required"]) if isinstance(node.get("required"), list) else []
103 all_required_sets: list[set[str]] = []
104 for sub_schema in node[keyword]:
105 resolved = _resolve_node(sub_schema, defs, visited, depth + 1, max_depth, module_id)
106 if "properties" in resolved:
107 merged["properties"].update(resolved["properties"])
108 if "required" in resolved:
109 all_required_sets.append(set(resolved["required"]))
110 # Required = sibling required ∪ intersection of all branches
111 branch_required = list(set.intersection(*all_required_sets)) if all_required_sets else []
112 seen: set[str] = set()
113 combined_required: list[str] = []
114 for r in sibling_required + branch_required:
115 if r not in seen:
116 seen.add(r)
117 combined_required.append(r)
118 merged["required"] = combined_required
119 # Copy remaining non-composition keys
120 for k, v in node.items():
121 if k not in (keyword, "properties", "required") and k not in merged:
122 merged[k] = v
123 return merged
125 # Recursively process nested properties
126 if "properties" in node:
127 for prop_name, prop_schema in node["properties"].items():
128 node["properties"][prop_name] = _resolve_node(prop_schema, defs, visited, depth + 1, max_depth, module_id)
130 return node