Coverage for formkit_ninja / form_submission / models.py: 47.18%

126 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-20 04:40 +0000

1from __future__ import annotations 

2 

3import logging 

4import uuid 

5import warnings 

6from contextlib import contextmanager 

7 

8import pghistory 

9import pgtrigger 

10from django.apps import apps 

11from django.conf import settings 

12from django.core.serializers.json import DjangoJSONEncoder 

13from django.db import models, transaction 

14from django.utils.translation import gettext_lazy as _ 

15 

16from formkit_ninja.form_submission.utils import ( 

17 ensure_repeater_uuid, 

18 flatten, 

19 pre_validation, 

20) 

21 

22logger = logging.getLogger(__name__) 

23 

24 

25@contextmanager 

26def immediate_constraints(): 

27 """ 

28 Context manager for handling atomic transactions in the database 

29 with constraints set to `immediate`. This prevents an error being 

30 raised later, potentially very far from the source and hard 

31 to track down. 

32 """ 

33 with transaction.atomic(): 

34 conn = transaction.get_connection() 

35 with conn.cursor() as c: 

36 # Temporarily set all constraints to 'IMMEDIATE' in order to not have 'Foreign Key' 

37 # errors which we'll catch in outer scope later 

38 c.execute("SET CONSTRAINTS ALL IMMEDIATE") 

39 yield 

40 

41 

42class SubmissionField(models.JSONField): 

43 def pre_save(self, model_instance: models.Model, add): # type: ignore[override] 

44 value = getattr(model_instance, self.attname) 

45 validated = pre_validation(value) 

46 # Ensure that all repeaters have a UUID set on save 

47 try: 

48 from formkit_ninja.form_submission.utils import get_repeaters 

49 

50 for repeater_key in get_repeaters(validated): 

51 try: 

52 validated[repeater_key] = list(ensure_repeater_uuid(validated, repeater_key)) 

53 except TypeError as E: 

54 warnings.warn(f"{E} (received {validated[repeater_key]})") 

55 except LookupError as E: 

56 warnings.warn(f"{E}") 

57 setattr(model_instance, self.attname, validated) 

58 return validated 

59 

60 

61@pghistory.track() 

62class Submission(models.Model): 

63 class Status(models.IntegerChoices): 

64 NEW = 1, _("New Submission") 

65 REJECTED = 2, _("Rejected") 

66 VERIFIED = 3, _("Verified") 

67 CHANGES_REQUESTED = 4, _("Changes Requested") 

68 

69 key = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) 

70 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True) 

71 created = models.DateTimeField(auto_now_add=True) 

72 updated = models.DateTimeField(auto_now=True) 

73 status = models.IntegerField( 

74 choices=Status.choices, 

75 default=Status.NEW, 

76 ) 

77 fields = SubmissionField(encoder=DjangoJSONEncoder) 

78 form_type = models.CharField( 

79 max_length=128, 

80 help_text="The type of form used in the submission", 

81 ) 

82 is_active = models.BooleanField(default=True) 

83 

84 def save(self, *args, **kwargs): 

85 # Note: was_created logic removed as SeparatedSubmission handles this 

86 

87 # Determine changed UUIDs to clean up SeparatedSubmission 

88 # (Simplified logic from reference) 

89 

90 super().save(*args, **kwargs) 

91 

92 # Create SeparatedSubmission instances 

93 SeparatedSubmission.objects.from_submission(self) 

94 

95 

96class SeparatedSubmissionManager(models.Manager): 

97 @transaction.atomic() 

98 def from_submission(self, sub: Submission) -> list[tuple[SeparatedSubmission, bool]]: 

99 """ 

100 Create SeparatedSubmission(s) from one Submission 

101 """ 

102 fields = list(flatten(sub.fields, [sub.form_type], parent_uuid=sub.pk)) 

103 

104 # Save the top level first (last in flattening list) 

105 root_data = fields[-1] 

106 

107 main, main_created = self.update_or_create( 

108 pk=sub.pk, 

109 defaults=dict( 

110 submission=sub, 

111 user=sub.user, 

112 created=sub.created, 

113 status=sub.status, 

114 fields=root_data[2], # The dict is the 3rd element 

115 form_type=sub.form_type, 

116 ), 

117 ) 

118 

119 results: list[tuple[SeparatedSubmission, bool]] = [] 

120 

121 # Process repeaters. 'fields[:-1]' are the children. 

122 # We reverse to process top-level children before deeper children (Top-Down) 

123 repeater_data = reversed(fields[:-1]) 

124 

125 for item_data in repeater_data: 

126 res = self._save_repeater_chunk(main, item_data) # type: ignore[arg-type] 

127 if res: 

128 results.append(res) 

129 

130 results.append((main, main_created)) # type: ignore[arg-type] 

131 

132 return results 

133 

134 def _save_repeater_chunk(self, main: SeparatedSubmission, data_tuple: tuple[list[str], uuid.UUID | str | None, dict, int]) -> tuple[SeparatedSubmission, bool] | None: 

135 """ 

136 Helper to save a single repeater item. 

137 """ 

138 form_type_path, parent_uuid_val, form_fields, index = data_tuple 

139 

140 # The repeater name is the last element in the form_type list 

141 repeater_name = form_type_path[-1] 

142 form_type_str = "".join(ft.capitalize() for ft in form_type_path) 

143 submission_key = form_fields.pop("uuid", None) 

144 if not submission_key: 

145 warnings.warn(f"No Submission key (UUID) present in {form_fields} of {main}") 

146 return None 

147 

148 # Resolve parent 

149 parent_obj = None 

150 if parent_uuid_val: 

151 if str(parent_uuid_val) == str(main.pk): 

152 parent_obj = main 

153 else: 

154 try: 

155 parent_obj = SeparatedSubmission.objects.get(pk=parent_uuid_val) 

156 except SeparatedSubmission.DoesNotExist: 

157 warnings.warn(f"Parent {parent_uuid_val} not found for {repeater_name}") 

158 parent_obj = main 

159 else: 

160 parent_obj = main 

161 

162 subnode, created = SeparatedSubmission.objects.update_or_create( 

163 pk=submission_key, 

164 defaults=dict( 

165 status=main.status, 

166 submission=main.submission, 

167 form_type=form_type_str, 

168 user=main.user, 

169 fields=form_fields, 

170 repeater_parent=parent_obj, 

171 repeater_key=repeater_name, 

172 repeater_order=index, 

173 ), 

174 ) 

175 return subnode, created 

176 

177 

178@pghistory.track() 

179class SeparatedSubmission(models.Model): 

180 """ 

181 This represents a Submission broken down into the main 

182 submission instance and separate repeaters 

183 """ 

184 

185 id = models.UUIDField(primary_key=True, default=uuid.uuid4) 

186 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True) 

187 created = models.DateTimeField(auto_now_add=True) 

188 status = models.IntegerField(choices=Submission.Status.choices, default=Submission.Status.NEW) 

189 fields = models.JSONField(encoder=DjangoJSONEncoder) 

190 form_type = models.CharField( 

191 max_length=256, 

192 help_text="The type of form used in the submission", 

193 ) 

194 

195 # These fields are relevant for 'Repeaters' to reference the parent object 

196 repeater_key = models.CharField( 

197 max_length=256, 

198 help_text="The field name in the original JSON document", 

199 null=True, 

200 blank=True, 

201 ) 

202 repeater_parent = models.ForeignKey("self", on_delete=models.CASCADE, null=True, blank=True, related_name="repeater_set") 

203 repeater_order = models.IntegerField(null=True, blank=True, help_text="The original order of a repeater in the JSON") 

204 submission = models.ForeignKey( 

205 Submission, 

206 help_text="The original submission.", 

207 on_delete=models.CASCADE, 

208 ) 

209 

210 objects = SeparatedSubmissionManager() 

211 

212 @property 

213 def model_type(self) -> type[models.Model] | None: 

214 """ 

215 Return the corresponding Django model. 

216 """ 

217 # Strategy: Search all installed apps for a model with matching name (case insensitive?) 

218 # Or exact match. 

219 # The 'form_type' in SeparatedSubmission is capitalized in the Manager logic. 

220 

221 target_name = self.form_type 

222 

223 # Prioritize 'formkit_ninja' or apps defined in setting? 

224 # For now, search all. 

225 for app_config in apps.get_app_configs(): 

226 try: 

227 model = app_config.get_model(target_name) 

228 if model: 

229 return model 

230 except LookupError: 

231 continue 

232 

233 # Fallback: maybe the capitalization differs? 

234 # TODO: Implement fuzzy matching? 

235 return None 

236 

237 # to_model removed in favor of generated signals 

238 

239 

240class SubmissionFile(models.Model): 

241 submission = models.UUIDField() 

242 file = models.FileField(upload_to="submission_files") 

243 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT) 

244 comment = models.TextField() 

245 date_uploaded = models.DateTimeField(auto_now_add=True) 

246 deleted = models.BooleanField(default=False) 

247 

248 class Meta: 

249 triggers = [pgtrigger.SoftDelete(name="soft_delete", field="deleted", value=True)] 

250 

251 

252class SeparatedSubmissionImport(models.Model): 

253 """ 

254 Record a success / fail message for a submission import 

255 """ 

256 

257 submission = models.ForeignKey(SeparatedSubmission, on_delete=models.CASCADE) 

258 created = models.DateTimeField(auto_now_add=True) 

259 success = models.BooleanField() 

260 message = models.TextField()