Coverage for formkit_ninja / form_submission / models.py: 51.76%

154 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-06 04:12 +0000

1from __future__ import annotations 

2 

3import logging 

4import uuid 

5import warnings 

6from contextlib import contextmanager 

7 

8import pghistory 

9import pgtrigger 

10from django.apps import apps 

11from django.conf import settings 

12from django.core.serializers.json import DjangoJSONEncoder 

13from django.db import models, transaction 

14from django.utils import timezone 

15from django.utils.translation import gettext_lazy as _ 

16 

17from formkit_ninja.form_submission.querysets import SeparatedSubmissionQuerySet, SubmissionQuerySet 

18from formkit_ninja.form_submission.utils import ( 

19 ensure_repeater_uuid, 

20 flatten, 

21 pre_validation, 

22) 

23 

24logger = logging.getLogger(__name__) 

25 

26 

27@contextmanager 

28def immediate_constraints(): 

29 """ 

30 Context manager for handling atomic transactions in the database 

31 with constraints set to `immediate`. This prevents an error being 

32 raised later, potentially very far from the source and hard 

33 to track down. 

34 """ 

35 with transaction.atomic(): 

36 conn = transaction.get_connection() 

37 with conn.cursor() as c: 

38 # Temporarily set all constraints to 'IMMEDIATE' in order to not have 'Foreign Key' 

39 # errors which we'll catch in outer scope later 

40 c.execute("SET CONSTRAINTS ALL IMMEDIATE") 

41 yield 

42 

43 

44class SubmissionField(models.JSONField): 

45 def pre_save(self, model_instance: models.Model, add): # type: ignore[override] 

46 value = getattr(model_instance, self.attname) 

47 validated = pre_validation(value) 

48 # Ensure that all repeaters have a UUID set on save 

49 try: 

50 from formkit_ninja.form_submission.utils import get_repeaters 

51 

52 for repeater_key in get_repeaters(validated): 

53 try: 

54 validated[repeater_key] = list(ensure_repeater_uuid(validated, repeater_key)) 

55 except TypeError as E: 

56 warnings.warn(f"{E} (received {validated[repeater_key]})") 

57 except LookupError as E: 

58 warnings.warn(f"{E}") 

59 setattr(model_instance, self.attname, validated) 

60 return validated 

61 

62 

63@pghistory.track() 

64class Submission(models.Model): 

65 class Status(models.IntegerChoices): 

66 NEW = 1, _("New Submission") 

67 REJECTED = 2, _("Rejected") 

68 VERIFIED = 3, _("Verified") 

69 CHANGES_REQUESTED = 4, _("Changes Requested") 

70 

71 key = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False) 

72 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True) 

73 created = models.DateTimeField(default=timezone.now) 

74 updated = models.DateTimeField(default=timezone.now) 

75 status = models.IntegerField( 

76 choices=Status.choices, 

77 default=Status.NEW, 

78 ) 

79 fields = SubmissionField(encoder=DjangoJSONEncoder) 

80 form_type = models.CharField( 

81 max_length=128, 

82 help_text="The type of form used in the submission", 

83 ) 

84 is_active = models.BooleanField(default=True) 

85 

86 objects = SubmissionQuerySet.as_manager() 

87 

88 def save(self, *args, **kwargs): 

89 # Note: was_created logic removed as SeparatedSubmission handles this 

90 

91 # Determine changed UUIDs to clean up SeparatedSubmission 

92 # (Simplified logic from reference) 

93 

94 super().save(*args, **kwargs) 

95 

96 # Create SeparatedSubmission instances 

97 SeparatedSubmission.objects.from_submission(self) 

98 

99 def __str__(self) -> str: 

100 # Use only local fields to avoid N+1 (e.g. admin list). 

101 return f"{self.form_type} {self.key} ({self.get_status_display()})" 

102 

103 

104class _SeparatedSubmissionManagerBase(models.Manager): 

105 """Base manager with custom creation methods for SeparatedSubmission.""" 

106 

107 @transaction.atomic() 

108 def from_submission(self, sub: Submission) -> list[tuple[SeparatedSubmission, bool]]: 

109 """ 

110 Create SeparatedSubmission(s) from one Submission 

111 """ 

112 fields = list(flatten(sub.fields, [sub.form_type], parent_uuid=sub.pk)) 

113 

114 # Save the top level first (last in flattening list) 

115 root_data = fields[-1] 

116 

117 main, main_created = self.update_or_create( 

118 pk=sub.pk, 

119 defaults=dict( 

120 submission=sub, 

121 user=sub.user, 

122 created=sub.created, 

123 status=sub.status, 

124 fields=root_data[2], # The dict is the 3rd element 

125 form_type=sub.form_type, 

126 ), 

127 ) 

128 

129 results: list[tuple[SeparatedSubmission, bool]] = [] 

130 

131 # Process repeaters. 'fields[:-1]' are the children. 

132 # We reverse to process top-level children before deeper children (Top-Down) 

133 repeater_data = reversed(fields[:-1]) 

134 

135 for item_data in repeater_data: 

136 res = self._save_repeater_chunk(main, item_data) # type: ignore[arg-type] 

137 if res: 

138 results.append(res) 

139 

140 results.append((main, main_created)) # type: ignore[arg-type] 

141 

142 return results 

143 

144 def _save_repeater_chunk(self, main: SeparatedSubmission, data_tuple: tuple[list[str], uuid.UUID | str | None, dict, int]) -> tuple[SeparatedSubmission, bool] | None: 

145 """ 

146 Helper to save a single repeater item. 

147 """ 

148 form_type_path, parent_uuid_val, form_fields, index = data_tuple 

149 

150 # The repeater name is the last element in the form_type list 

151 repeater_name = form_type_path[-1] 

152 form_type_str = "".join(ft.capitalize() for ft in form_type_path) 

153 submission_key = form_fields.pop("uuid", None) 

154 if not submission_key: 

155 warnings.warn(f"No Submission key (UUID) present in {form_fields} of {main}") 

156 return None 

157 

158 # Resolve parent 

159 parent_obj = None 

160 if parent_uuid_val: 

161 if str(parent_uuid_val) == str(main.pk): 

162 parent_obj = main 

163 else: 

164 try: 

165 parent_obj = SeparatedSubmission.objects.get(pk=parent_uuid_val) 

166 except SeparatedSubmission.DoesNotExist: 

167 warnings.warn(f"Parent {parent_uuid_val} not found for {repeater_name}") 

168 parent_obj = main 

169 else: 

170 parent_obj = main 

171 

172 subnode, created = SeparatedSubmission.objects.update_or_create( 

173 pk=submission_key, 

174 defaults=dict( 

175 status=main.status, 

176 submission=main.submission, 

177 form_type=form_type_str, 

178 user=main.user, 

179 fields=form_fields, 

180 repeater_parent=parent_obj, 

181 repeater_key=repeater_name, 

182 repeater_order=index, 

183 ), 

184 ) 

185 return subnode, created 

186 

187 

188# Combine custom manager methods with queryset annotation methods 

189SeparatedSubmissionManager = _SeparatedSubmissionManagerBase.from_queryset(SeparatedSubmissionQuerySet) 

190 

191 

192@pghistory.track() 

193class SeparatedSubmission(models.Model): 

194 """ 

195 This represents a Submission broken down into the main 

196 submission instance and separate repeaters 

197 """ 

198 

199 id = models.UUIDField(primary_key=True, default=uuid.uuid4) 

200 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.SET_NULL, null=True, blank=True) 

201 created = models.DateTimeField(default=timezone.now) 

202 status = models.IntegerField(choices=Submission.Status.choices, default=Submission.Status.NEW) 

203 fields = models.JSONField(encoder=DjangoJSONEncoder) 

204 form_type = models.CharField( 

205 max_length=256, 

206 help_text="The type of form used in the submission", 

207 ) 

208 

209 # These fields are relevant for 'Repeaters' to reference the parent object 

210 repeater_key = models.CharField( 

211 max_length=256, 

212 help_text="The field name in the original JSON document", 

213 null=True, 

214 blank=True, 

215 ) 

216 repeater_parent = models.ForeignKey("self", on_delete=models.CASCADE, null=True, blank=True, related_name="repeater_set") 

217 repeater_order = models.IntegerField(null=True, blank=True, help_text="The original order of a repeater in the JSON") 

218 submission = models.ForeignKey( 

219 Submission, 

220 help_text="The original submission.", 

221 on_delete=models.CASCADE, 

222 ) 

223 

224 objects = SeparatedSubmissionManager() 

225 

226 @property 

227 def model_type(self) -> type[models.Model] | None: 

228 """ 

229 Return the corresponding Django model. 

230 """ 

231 # Strategy: Search all installed apps for a model with matching name (case insensitive?) 

232 # Or exact match. 

233 # The 'form_type' in SeparatedSubmission is capitalized in the Manager logic. 

234 

235 target_name = self.form_type 

236 

237 # Prioritize 'formkit_ninja' or apps defined in setting? 

238 # For now, search all. 

239 for app_config in apps.get_app_configs(): 

240 try: 

241 model = app_config.get_model(target_name) 

242 if model: 

243 return model 

244 except LookupError: 

245 continue 

246 

247 # Fallback: maybe the capitalization differs? 

248 # TODO: Implement fuzzy matching? 

249 return None 

250 

251 # to_model removed in favor of generated signals 

252 

253 def __str__(self) -> str: 

254 # Use only local fields to avoid N+1 (submission_id would also be local). 

255 return f"{self.form_type} {self.id} ({self.get_status_display()})" 

256 

257 

258class SubmissionFile(models.Model): 

259 submission = models.UUIDField() 

260 file = models.FileField(upload_to="submission_files") 

261 user = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT) 

262 comment = models.TextField() 

263 date_uploaded = models.DateTimeField(default=timezone.now) 

264 deleted = models.BooleanField(default=False) 

265 

266 class Meta: 

267 triggers = [pgtrigger.SoftDelete(name="soft_delete", field="deleted", value=True)] 

268 

269 def __str__(self) -> str: 

270 return f"File for submission {self.submission}" if self.pk else "SubmissionFile (unsaved)" 

271 

272 

273class SeparatedSubmissionImport(models.Model): 

274 """ 

275 Record a success / fail message for a submission import 

276 """ 

277 

278 submission = models.ForeignKey(SeparatedSubmission, on_delete=models.CASCADE) 

279 created = models.DateTimeField(default=timezone.now) 

280 success = models.BooleanField() 

281 message = models.TextField() 

282 

283 def __str__(self) -> str: 

284 status = "ok" if self.success else "fail" 

285 msg = self.message or "" 

286 return f"{status} @ {self.created}: {msg[:50]}..." if len(msg) > 50 else f"{status} @ {self.created}: {msg or '-'}" 

287 

288 

289class Flag(models.Model): 

290 """ 

291 Quality-assurance flag on a separated submission, visible to users and 

292 administrators. Used to surface data-quality issues (e.g. mismatched worker 

293 data between forms). One separated submission can have multiple flags 

294 (different rule types). 

295 """ 

296 

297 SEVERITY_CHOICES = [ 

298 ("info", "Info"), 

299 ("warning", "Warning"), 

300 ("error", "Error"), 

301 ] 

302 

303 separated_submission = models.ForeignKey( 

304 SeparatedSubmission, 

305 on_delete=models.CASCADE, 

306 related_name="quality_flags", 

307 ) 

308 flag_type = models.CharField( 

309 max_length=64, 

310 db_index=True, 

311 help_text="Code identifying the rule that created this flag (e.g. workers_project_mismatch)", 

312 ) 

313 message = models.TextField(help_text="User-facing message for this flag") 

314 severity = models.CharField( 

315 max_length=16, 

316 choices=SEVERITY_CHOICES, 

317 default="warning", 

318 ) 

319 created = models.DateTimeField(auto_now_add=True) 

320 created_by = models.ForeignKey( 

321 settings.AUTH_USER_MODEL, 

322 on_delete=models.SET_NULL, 

323 null=True, 

324 blank=True, 

325 related_name="+", 

326 ) 

327 resolved_at = models.DateTimeField(null=True, blank=True) 

328 resolved_by = models.ForeignKey( 

329 settings.AUTH_USER_MODEL, 

330 on_delete=models.SET_NULL, 

331 null=True, 

332 blank=True, 

333 related_name="+", 

334 ) 

335 

336 class Meta: 

337 ordering = ["-created"] 

338 

339 def __str__(self) -> str: 

340 return f"{self.flag_type} on separated submission {self.separated_submission_id}"