Coverage for formkit_ninja / form_submission / utils.py: 11.34%

112 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-27 05:17 +0000

1import uuid 

2import warnings 

3from copy import deepcopy 

4from typing import Any, Iterable, TypeVar 

5 

6from django.db import models 

7 

8 

9def one_to_many(model: models.Model): 

10 """ 

11 Should identify fields which are 'Repeaters' 

12 """ 

13 for field in model._meta.get_fields(include_hidden=True): 

14 if field.one_to_many and field.is_relation: 

15 yield field 

16 

17 

18def many_to_one(model: models.Model): 

19 for field in model._meta.get_fields(include_hidden=True): 

20 if field.many_to_one and field.is_relation: 

21 yield field 

22 

23 

24def one_to_one(model: models.Model): 

25 for field in model._meta.get_fields(include_hidden=True): 

26 if field.one_to_one and field.is_relation: 

27 yield field 

28 

29 

30def update_foreign_keys(model: models.Model, data: dict): 

31 """ 

32 Copy a dictionary and alter the dictionary to point to the right foreign keys 

33 This is intended to rename for instance a JSON field "activity" to a Django field "activity_id" 

34 **This modifies a dict in place** 

35 """ 

36 for foreign_key in many_to_one(model): 

37 if foreign_key.name in data: 

38 data[foreign_key.get_attname()] = data.pop(foreign_key.name) 

39 

40 

41def _skip_value(v) -> bool: 

42 """ 

43 When we're importing a submission, we want to skip 

44 empty dicts and lists, and empty strings 

45 """ 

46 # Return on empty string an 'None' values 

47 if v == "" or v is None: 

48 return True 

49 # Return on empty dict and list 

50 if isinstance(v, (dict, list)) and len(v) == 0: 

51 return True 

52 # Return if a "UUID" is the only key in the dict 

53 if isinstance(v, dict) and set(v.keys()) == {"uuid"}: 

54 return True 

55 # Return if is a list and all elements are also 'skip values' 

56 return isinstance(v, list) and all((_skip_value(_i) for _i in v)) 

57 

58 

59def ensure_object_has_uuid(el: dict): 

60 if "uuid" not in el or el.get("uuid") is None or el.get("uuid") == "": 

61 return deepcopy(el) | {"uuid": uuid.uuid4()} 

62 else: 

63 return el 

64 

65 

66def ensure_repeater_uuid(obj: dict, key: str): 

67 """ 

68 IF the key is a list of dicts: it's likely a repeated element. 

69 To track changes, and allow relationships, 

70 we add a "UUID" field to every element in the list 

71 """ 

72 rep_field = obj.get(key) 

73 if not isinstance(rep_field, list): 

74 raise TypeError(f"{key} is not a list") 

75 for idx, el in enumerate(rep_field): 

76 if not isinstance(el, dict): 

77 raise TypeError(f"{key} element {idx} is not a dict") 

78 el_with_uuid = ensure_object_has_uuid(el) 

79 # Recurse for nested repeaters 

80 for sub_key in get_repeaters(el_with_uuid): 

81 el_with_uuid[sub_key] = list(ensure_repeater_uuid(el_with_uuid, sub_key)) 

82 yield el_with_uuid 

83 

84 

85def ensure_object_has_submission(el: dict): 

86 if "submission" in el and el.get("submission") is not None and el.get("submission") != "": 

87 return el 

88 elif "uuid" in el: # Handle special case where "uuid" already exists 

89 return deepcopy(el) | {"submission": el["uuid"]} 

90 else: 

91 return deepcopy(el) | {"submission": uuid.uuid4()} 

92 

93 

94def ensure_repeater_submission(obj: dict, key: str): 

95 """ 

96 IF the key is a list of dicts: it's likely a repeated element. 

97 To track changes, and allow relationships, 

98 we add a "UUID" field to every element in the list 

99 """ 

100 rep_field = obj.get(key) 

101 if not isinstance(rep_field, list): 

102 raise TypeError(f"{key} is not a list") 

103 for idx, el in enumerate(rep_field): 

104 if not isinstance(el, dict): 

105 raise TypeError(f"{key} element {idx} is not a dict") 

106 yield ensure_object_has_submission(el) 

107 

108 

109def pre_validation(obj: dict): 

110 """ 

111 Prior to saving, recursively drop any keys 

112 which are empty strings, and remove any empty 'dict' values. 

113 """ 

114 if not isinstance(obj, dict): 

115 warnings.warn("pre_validation expected a dict") 

116 return obj 

117 

118 _obj: dict[str, Any] = {} 

119 if _skip_value(obj): 

120 return _obj 

121 for k, v in obj.items(): 

122 if _skip_value(v): 

123 continue 

124 elif isinstance(v, dict): 

125 if pre_validated_value := pre_validation(v): 

126 if not _skip_value(pre_validated_value): 

127 _obj[k] = pre_validation(v) 

128 elif isinstance(v, list): 

129 # Preserve "arrays" like checkbox, multiselect 

130 _obj[k] = [_i if isinstance(_i, (str, int)) else pre_validation(_i) for _i in v] 

131 else: 

132 _obj[k] = v 

133 return _obj 

134 

135 

136def get_foreignkey(from_model: models.Model, to_model: models.Model) -> models.ForeignKey | None: 

137 """ 

138 Find a field relating one model to another 

139 """ 

140 for field in from_model._meta.fields: 

141 if getattr(field, "related_model", None) == to_model: 

142 return field # type: ignore[return-value] 

143 return None 

144 

145 

146T = TypeVar("T", bound=models.Model) 

147 

148 

149def get_foreignkey_value(from_instance: models.Model, to_model: T) -> T | None: 

150 """ 

151 Return the value of a foreign key where the field name might not be known 

152 """ 

153 field: models.ForeignKey | None = get_foreignkey(from_instance, to_model) # type: ignore[assignment] 

154 return getattr(from_instance, field.name) if field else None 

155 

156 

157def get_repeaters(obj: dict[str, dict[str, Any] | Any]) -> Iterable[str]: 

158 """ 

159 Return keys which appear to be "list of object" type, aka "repeaters" 

160 """ 

161 for k, v in obj.items(): 

162 if not isinstance(v, (list, tuple)): 

163 continue 

164 if all(isinstance(_i, dict) for _i in v): 

165 yield k 

166 

167 

168def get_repeaters_uuids(obj: dict[str, dict[str, list]]) -> Iterable[uuid.UUID]: 

169 """ 

170 Yields the content of the `uuid` field in repeater 

171 """ 

172 for field in get_repeaters(obj): 

173 for object in obj[field]: 

174 if isinstance(object, dict) and "uuid" in object and object["uuid"] is not None: 

175 if isinstance(object["uuid"], uuid.UUID): 

176 yield object["uuid"] 

177 else: 

178 yield uuid.UUID(object["uuid"]) 

179 

180 

181def flatten( 

182 obj: dict[str, Any], 

183 parent_key: list[str] | None = None, 

184 parent_uuid: uuid.UUID | str | None = None, 

185 index: int = 0, 

186) -> Iterable[tuple[list[str], uuid.UUID | str | None, dict, int]]: 

187 """ 

188 This is a generator which returns nested ('repeater') values first 

189 and then the top level field without the repeaters 

190 """ 

191 if parent_key is None: 

192 parent_key = [] 

193 

194 current_uuid = obj.get("uuid") 

195 

196 klone = deepcopy(obj) # Ensure that the original data is not midified 

197 for rep_k in get_repeaters(obj): 

198 for idx, rep_item in enumerate(klone.pop(rep_k)): 

199 yield from flatten(rep_item, parent_key=[*parent_key, rep_k], parent_uuid=current_uuid, index=idx) 

200 yield parent_key, parent_uuid, klone, index 

201 

202 

203def igetattr(thing: Any, prop: str): 

204 """ 

205 A case insensitive 'getattr' 

206 """ 

207 try: 

208 return getattr(thing, next((name for name in dir(thing) if name.lower() == prop.lower()))) 

209 except StopIteration as e: 

210 raise AttributeError from e