Coverage for src / apcore_cli / ref_resolver.py: 97%

74 statements  

« prev     ^ index     » next       coverage.py v7.13.0, created at 2026-04-26 10:23 +0800

1"""$ref resolution and schema composition (FE-02).""" 

2 

3from __future__ import annotations 

4 

5import copy 

6from typing import Any 

7 

8 

9class RefResolverError(Exception): 

10 """Base class for $ref resolution errors.""" 

11 

12 

13class CircularRefError(RefResolverError): 

14 """Raised when a circular $ref is detected.""" 

15 

16 

17class UnresolvableRefError(RefResolverError): 

18 """Raised when a $ref target cannot be found.""" 

19 

20 

21class MaxDepthExceededError(RefResolverError): 

22 """Raised when $ref resolution depth exceeds the configured maximum.""" 

23 

24 

25def resolve_refs(schema: dict, max_depth: int = 32, module_id: str = "") -> dict: 

26 """Resolve all $ref references in a JSON Schema. 

27 

28 Returns a fully inlined schema with $defs/definitions removed. 

29 """ 

30 schema = copy.deepcopy(schema) 

31 defs = schema.get("$defs", schema.get("definitions", {})) 

32 result = _resolve_node(schema, defs, visited=set(), depth=0, max_depth=max_depth, module_id=module_id) 

33 

34 # Remove definition keys 

35 result.pop("$defs", None) 

36 result.pop("definitions", None) 

37 return result 

38 

39 

40def _resolve_node( 

41 node: Any, 

42 defs: dict, 

43 visited: set, 

44 depth: int, 

45 max_depth: int, 

46 module_id: str = "", 

47) -> Any: 

48 """Recursively resolve $ref, allOf, anyOf, oneOf in a schema node.""" 

49 if not isinstance(node, dict): 

50 return node 

51 

52 # Handle $ref 

53 if "$ref" in node: 

54 ref_path = node["$ref"] 

55 

56 if depth >= max_depth: 

57 raise MaxDepthExceededError( 

58 f"$ref resolution depth exceeded maximum of {max_depth} for module '{module_id}'." 

59 ) 

60 

61 if ref_path in visited: 

62 raise CircularRefError(f"Circular $ref detected in schema for module '{module_id}' at path '{ref_path}'.") 

63 

64 # Parse ref target: extract key from "#/$defs/Address" → "Address" 

65 parts = ref_path.split("/") 

66 key = parts[-1] 

67 

68 if key not in defs: 

69 raise UnresolvableRefError(f"Unresolvable $ref '{ref_path}' in schema for module '{module_id}'.") 

70 

71 visited = visited | {ref_path} 

72 return _resolve_node(defs[key], defs, visited, depth + 1, max_depth, module_id) 

73 

74 # Handle allOf 

75 if "allOf" in node: 

76 merged: dict[str, Any] = {"properties": {}, "required": []} 

77 # Merge sibling properties/required from the composing node itself first 

78 # so the composition branches can extend (not overwrite) them. 

79 if isinstance(node.get("properties"), dict): 

80 merged["properties"].update(node["properties"]) 

81 if isinstance(node.get("required"), list): 

82 merged["required"].extend(node["required"]) 

83 for sub_schema in node["allOf"]: 

84 resolved = _resolve_node(sub_schema, defs, visited, depth + 1, max_depth, module_id) 

85 if "properties" in resolved: 

86 merged["properties"].update(resolved["properties"]) 

87 if "required" in resolved: 

88 merged["required"].extend(resolved["required"]) 

89 # Copy remaining non-composition keys (skip already-handled ones) 

90 for k, v in node.items(): 

91 if k not in ("allOf", "properties", "required") and k not in merged: 

92 merged[k] = v 

93 return merged 

94 

95 # Handle anyOf / oneOf 

96 for keyword in ("anyOf", "oneOf"): 

97 if keyword in node: 

98 merged = {"properties": {}, "required": []} 

99 # Merge sibling properties from the composing node first. 

100 if isinstance(node.get("properties"), dict): 

101 merged["properties"].update(node["properties"]) 

102 sibling_required: list[str] = list(node["required"]) if isinstance(node.get("required"), list) else [] 

103 all_required_sets: list[set[str]] = [] 

104 for sub_schema in node[keyword]: 

105 resolved = _resolve_node(sub_schema, defs, visited, depth + 1, max_depth, module_id) 

106 if "properties" in resolved: 

107 merged["properties"].update(resolved["properties"]) 

108 if "required" in resolved: 

109 all_required_sets.append(set(resolved["required"])) 

110 # Required = sibling required ∪ intersection of all branches 

111 branch_required = list(set.intersection(*all_required_sets)) if all_required_sets else [] 

112 seen: set[str] = set() 

113 combined_required: list[str] = [] 

114 for r in sibling_required + branch_required: 

115 if r not in seen: 

116 seen.add(r) 

117 combined_required.append(r) 

118 merged["required"] = combined_required 

119 # Copy remaining non-composition keys 

120 for k, v in node.items(): 

121 if k not in (keyword, "properties", "required") and k not in merged: 

122 merged[k] = v 

123 return merged 

124 

125 # Recursively process nested properties 

126 if "properties" in node: 

127 for prop_name, prop_schema in node["properties"].items(): 

128 node["properties"][prop_name] = _resolve_node(prop_schema, defs, visited, depth + 1, max_depth, module_id) 

129 

130 return node