Coverage for formkit_ninja / models.py: 23.68%

492 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-03 09:21 +0000

1from __future__ import annotations 

2 

3import logging 

4import uuid 

5import warnings 

6from keyword import iskeyword, issoftkeyword 

7from typing import Any, Iterable, TypedDict, get_args 

8 

9import pghistory 

10import pgtrigger 

11from django.conf import settings 

12from django.contrib.contenttypes.models import ContentType 

13from django.contrib.postgres.aggregates import ArrayAgg 

14from django.core.exceptions import ValidationError 

15from django.db import models, transaction 

16from django.db.models import Q 

17from django.db.models.aggregates import Max 

18from django.db.models.functions import Greatest 

19from django.utils import timezone 

20from rich.console import Console 

21 

22from formkit_ninja import formkit_schema, triggers 

23 

24# Re export "form_submission" models 

25from formkit_ninja.code_generation_config import CodeGenerationConfig # noqa: F401 

26from formkit_ninja.form_submission.models import ( 

27 SeparatedSubmission, # noqa: F401 

28 Submission, # noqa: F401 

29 SubmissionField, # noqa: F401 

30 SubmissionFile, # noqa: F401 

31) 

32from formkit_ninja.utils import short_uuid 

33 

34console = Console() 

35log = console.log 

36 

37logger = logging.getLogger() 

38 

39 

40def check_valid_django_id(key: str): 

41 if not key: 

42 raise ValidationError("Name cannot be empty") 

43 if key[0].isdigit(): 

44 raise ValidationError(f"{key} is not valid, it cannot start with a digit") 

45 if not key.isidentifier() or iskeyword(key) or issoftkeyword(key): 

46 raise ValidationError(f"{key} cannot be used as a keyword. Should be a valid python identifier") 

47 if key[-1] == "_": 

48 raise ValidationError(f"{key} is not valid, it cannot end with an underscore") 

49 

50 

51class UuidIdModel(models.Model): 

52 """ 

53 Consistently use fields which will 

54 help with syncing data: 

55 - UUID field is the ID 

56 - Created field 

57 - Last Modified field 

58 - updated_by (optional) 

59 - created_by (optional) 

60 """ 

61 

62 class Meta: 

63 abstract = True 

64 

65 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True) 

66 created = models.DateTimeField(default=timezone.now, blank=True, null=True) 

67 updated = models.DateTimeField(auto_now=True, blank=True, null=True) 

68 created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True) 

69 updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True) 

70 

71 

72class OptionDict(TypedDict): 

73 value: str 

74 label: str 

75 

76 

77class OptionGroup(models.Model): 

78 """ 

79 This intended to be a "collection" of choices 

80 For instance all the values in a single PNDS zTable 

81 Also intended to allow users to add / modify their __own__ 'Options' 

82 for idb and formkit to recognize 

83 """ 

84 

85 group = models.CharField(max_length=1024, primary_key=True, help_text="The label to use for these options") 

86 content_type = models.ForeignKey( 

87 ContentType, 

88 on_delete=models.PROTECT, 

89 null=True, 

90 blank=True, 

91 help_text=("This is an optional reference to the original source object for this set of options (typically a table from which we copy options)"), 

92 ) 

93 

94 # If the object is a "Content Type" we expect it to have a similar layout to this 

95 

96 def save(self, *args, **kwargs): 

97 # Prior to save ensure that content_type, if present, fits suitable schema 

98 if self.content_type: 

99 klass = self.content_type.model_class() 

100 try: 

101 if klass._meta.get_field("value") is None or not hasattr(klass, "label_set"): 

102 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") 

103 except Exception as E: 

104 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") from E 

105 return super().save(*args, **kwargs) 

106 

107 def __str__(self): 

108 return f"{self.group}" 

109 

110 @classmethod 

111 def copy_table(cls, model: type[models.Model], field: str, language: str | None = "en", group_name: str | None = None): 

112 """ 

113 Copy an existing table of options into this OptionGroup 

114 """ 

115 

116 with transaction.atomic(): 

117 group_obj, group_created = cls.objects.get_or_create(group=group_name, content_type=ContentType.objects.get_for_model(model)) 

118 log(group_obj) 

119 

120 from typing import Any, cast 

121 

122 for obj in cast(Any, model).objects.values("pk", field): 

123 option, option_created = Option.objects.get_or_create( 

124 object_id=obj["pk"], 

125 group=group_obj, 

126 value=obj["pk"], 

127 ) 

128 OptionLabel.objects.get_or_create(option=option, label=obj[field] or "", lang=language) 

129 

130 

131class OptionQuerySet(models.Manager): 

132 """ 

133 Prefetched "labels" for performance 

134 """ 

135 

136 def get_queryset(self): 

137 """ 

138 Added a prefetch_related to the queryset 

139 """ 

140 lang_codes = (n[0] for n in settings.LANGUAGES) 

141 

142 label_model = OptionLabel 

143 annotated_fields = {f"label_{lang}": label_model.objects.filter(lang=lang, option=models.OuterRef("pk")) for lang in lang_codes} 

144 annotated_fields_subquery = {field: models.Subquery(query.values("label")[:1], output_field=models.CharField()) for field, query in annotated_fields.items()} 

145 return super().get_queryset().annotate(**annotated_fields_subquery) 

146 

147 

148class Option(UuidIdModel): 

149 """ 

150 This is a key/value field representing one "option" for a FormKit property 

151 The translated values for this option are in the `Translatable` table 

152 """ 

153 

154 object_id = models.IntegerField( 

155 null=True, 

156 blank=True, 

157 help_text=("This is a reference to the primary key of the original source object (typically a PNDS ztable ID) or a user-specified ID for a new group"), 

158 ) 

159 last_updated = models.DateTimeField(auto_now=True) 

160 group = models.ForeignKey(OptionGroup, on_delete=models.CASCADE, null=True, blank=True) 

161 # is_active = models.BooleanField(default=True) 

162 order = models.IntegerField(null=True, blank=True) 

163 

164 class Meta: 

165 triggers = triggers.update_or_insert_group_trigger("group_id") 

166 constraints = [models.UniqueConstraint(fields=["group", "object_id"], name="unique_option_id")] 

167 ordering = ( 

168 "group", 

169 "order", 

170 ) 

171 

172 value = models.CharField(max_length=1024) 

173 order_with_respect_to = "group" 

174 

175 objects = OptionQuerySet() 

176 

177 @classmethod 

178 def from_pydantic( 

179 cls, 

180 options: list[str | OptionDict], 

181 group: OptionGroup | None = None, 

182 ) -> Iterable["Option"]: 

183 """ 

184 Yields "Options" in the database based on the input given 

185 """ 

186 from formkit_ninja.services.schema_import import SchemaImportService 

187 

188 yield from SchemaImportService.import_options(options, group=group) 

189 

190 def __str__(self) -> str: 

191 # Use group_id (stored on row; OptionGroup.pk is the group name) to avoid N+1. 

192 if self.group_id: 

193 return f"{self.group_id}::{self.value}" 

194 return f"No group: {self.value}" 

195 

196 

197class OptionLabel(models.Model): 

198 option = models.ForeignKey("Option", on_delete=models.CASCADE) 

199 label = models.CharField(max_length=1024) 

200 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

201 

202 def save(self, *args, **kwargs): 

203 """ 

204 When saved, save also my "option" so that its last_updated is set 

205 """ 

206 if self.option is not None: 

207 self.option.save() 

208 return super().save(*args, **kwargs) 

209 

210 class Meta: 

211 constraints = [models.UniqueConstraint(fields=["option", "lang"], name="unique_option_label")] 

212 

213 def __str__(self) -> str: 

214 # Use only local fields to avoid N+1 when listing (e.g. in admin). 

215 return f"{self.label} ({self.lang})" if self.label else f"option={self.option_id} lang={self.lang}" 

216 

217 

218class FormComponents(UuidIdModel): 

219 """ 

220 A model relating "nodes" of a schema to a schema with model ordering 

221 """ 

222 

223 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

224 # This is null=True so that a new FormComponent can be added from the admin inline 

225 node = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE, null=True, blank=True) 

226 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True) 

227 order = models.IntegerField(null=True, blank=True) 

228 order_with_respect_to = "schema" 

229 

230 class Meta: 

231 triggers = triggers.update_or_insert_group_trigger("schema_id") 

232 ordering = ("schema", "order") 

233 

234 def __str__(self) -> str: 

235 # Use node_id/schema_id (stored on row) to avoid N+1 when listing FormComponents. 

236 return f"node={self.node_id}[{self.order}]: schema={self.schema_id}" 

237 

238 

239class NodeChildrenManager(models.Manager): 

240 """ 

241 Adds aggregation and filtering for client side data 

242 of NodeChildren relations 

243 """ 

244 

245 def aggregate_changes_table(self, latest_change: int | None = None): 

246 values = ( 

247 self.get_queryset() 

248 .values("parent_id") 

249 .annotate( 

250 children=ArrayAgg("child", ordering="order"), 

251 ) 

252 .annotate(Max("child__track_change")) 

253 .annotate(latest_change=Greatest("child__track_change__max", "parent__track_change")) 

254 ) 

255 if latest_change: 

256 values = values.filter(Q(latest_change__gt=latest_change) | Q(parent__latest_change__gt=latest_change)) 

257 return values.values_list("parent_id", "latest_change", "children", named=True) 

258 

259 

260class NodeChildren(models.Model): 

261 """ 

262 This is an ordered m2m model representing 

263 the "children" of an HTML element 

264 """ 

265 

266 parent = models.ForeignKey( 

267 "FormKitSchemaNode", 

268 on_delete=models.CASCADE, 

269 related_name="parent", 

270 ) 

271 child = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE) 

272 order = models.IntegerField(null=True, blank=True) 

273 track_change = models.BigIntegerField(null=True, blank=True) 

274 order_with_respect_to = "parent" 

275 

276 class Meta: 

277 triggers = [ 

278 *triggers.update_or_insert_group_trigger("parent_id"), 

279 triggers.bump_sequence_value(sequence_name=triggers.NODE_CHILDREN_CHANGE_ID), 

280 ] 

281 ordering = ( 

282 "parent_id", 

283 "order", 

284 ) 

285 

286 objects = NodeChildrenManager() 

287 

288 def __str__(self) -> str: 

289 # Use parent_id/child_id (stored on row) to avoid N+1. 

290 return f"parent={self.parent_id} → child={self.child_id} order={self.order}" 

291 

292 

293class NodeQS(models.QuerySet): 

294 def from_change(self, track_change: int = -1): 

295 return self.filter(track_change__gt=track_change) 

296 

297 def to_response(self, ignore_errors: bool = True, options: bool = True) -> Iterable[tuple[uuid.UUID, int | None, formkit_schema.Node | str | None, bool]]: 

298 """ 

299 Return a set of FormKit nodes 

300 """ 

301 node: FormKitSchemaNode 

302 for node in self.all(): 

303 try: 

304 if node.is_active: 

305 yield node.id, node.track_change, node.get_node(recursive=False, options=options), node.protected 

306 else: 

307 yield node.id, node.track_change, None, node.protected 

308 except Exception as E: 

309 if not ignore_errors: 

310 raise 

311 warnings.warn(f"An unparseable FormKit node was hit at {node.pk}") 

312 warnings.warn(f"{E}") 

313 

314 

315@pghistory.track() 

316@pgtrigger.register( 

317 pgtrigger.Protect( 

318 # If the node is protected, delete is not allowed 

319 name="protect_node_deletes_and_updates", 

320 operation=pgtrigger.Delete, 

321 condition=pgtrigger.Q(old__protected=True), 

322 ), 

323 pgtrigger.Protect( 

324 # If both new and old values are "protected", updates are not allowed 

325 name="protect_node_updates", 

326 operation=pgtrigger.Update, 

327 condition=pgtrigger.Q(old__protected=True) & pgtrigger.Q(new__protected=True), 

328 ), 

329 pgtrigger.SoftDelete(name="soft_delete", field="is_active"), 

330 triggers.bump_sequence_value("track_change", triggers.NODE_CHANGE_ID), 

331) 

332class FormKitSchemaNode(UuidIdModel): 

333 """ 

334 This represents a single "Node" in a FormKit schema. 

335 There are several different types of node which may be defined: 

336 FormKitSchemaDOMNode 

337 | FormKitSchemaComponent 

338 | FormKitSchemaTextNode 

339 | FormKitSchemaCondition 

340 | FormKitSchemaFormKit 

341 """ 

342 

343 objects = NodeQS.as_manager() 

344 

345 NODE_TYPE_CHOICES = ( 

346 ("$cmp", "Component"), # Not yet implemented 

347 ("text", "Text"), 

348 ("condition", "Condition"), # Not yet implemented 

349 ("$formkit", "FormKit"), 

350 ("$el", "Element"), 

351 ("raw", "Raw JSON"), # Not yet implemented 

352 ) 

353 FORMKIT_CHOICES = [(t, t) for t in get_args(formkit_schema.FORMKIT_TYPE)] 

354 

355 ELEMENT_TYPE_CHOICES = [("p", "p"), ("h1", "h1"), ("h2", "h2"), ("span", "span")] 

356 node_type = models.CharField(max_length=256, choices=NODE_TYPE_CHOICES, blank=True, help_text="") 

357 description = models.CharField( 

358 max_length=4000, 

359 null=True, 

360 blank=True, 

361 help_text="Decribe the type of data / reason for this component", 

362 ) 

363 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True) 

364 option_group = models.ForeignKey(OptionGroup, null=True, blank=True, on_delete=models.PROTECT) 

365 children = models.ManyToManyField("self", through=NodeChildren, symmetrical=False, blank=True) 

366 is_active = models.BooleanField(default=True) 

367 protected = models.BooleanField(default=False) 

368 

369 node = models.JSONField( 

370 null=True, 

371 blank=True, 

372 help_text="A JSON representation of select parts of the FormKit schema", 

373 ) 

374 

375 additional_props = models.JSONField( 

376 null=True, 

377 blank=True, 

378 help_text="User space for additional, less used props", 

379 ) 

380 icon = models.CharField(max_length=256, null=True, blank=True) 

381 title = models.CharField(max_length=1024, null=True, blank=True) 

382 readonly = models.BooleanField(default=False) 

383 sections_schema = models.JSONField(null=True, blank=True, help_text="Schema for the sections") 

384 min = models.CharField(max_length=256, null=True, blank=True) 

385 max = models.CharField(max_length=256, null=True, blank=True) 

386 step = models.CharField(max_length=256, null=True, blank=True) 

387 add_label = models.CharField(max_length=1024, null=True, blank=True) 

388 up_control = models.BooleanField(default=True) 

389 down_control = models.BooleanField(default=True) 

390 

391 # Code Generation Source of Truth 

392 django_field_type = models.CharField( 

393 max_length=100, 

394 null=True, 

395 blank=True, 

396 help_text="The Django Model Field class to use (e.g., 'CharField', 'IntegerField', 'ForeignKey'). Providing this makes this field the primary source of truth for code generation.", 

397 ) 

398 django_field_args = models.JSONField( 

399 default=dict, 

400 blank=True, 

401 help_text="Arguments passed to the Django field as a JSON dictionary. " 

402 "Example: {'null': true, 'blank': true, 'max_length': 255}. " 

403 "For ForeignKeys, include the model name: {'to': 'auth.User', 'on_delete': 'models.CASCADE'}.", 

404 ) 

405 django_field_positional_args = models.JSONField( 

406 default=list, 

407 blank=True, 

408 help_text="Positional arguments passed to the Django field as a JSON list. Example: ['auth.User'].", 

409 ) 

410 pydantic_field_type = models.CharField( 

411 max_length=100, 

412 null=True, 

413 blank=True, 

414 help_text="The Python/Pydantic type for this field (e.g., 'str', 'int', 'Decimal', 'UUID', 'date').", 

415 ) 

416 extra_imports = models.JSONField( 

417 default=list, 

418 blank=True, 

419 help_text="A list of additional Python import statements required by this field. Example: ['from decimal import Decimal', 'from django.core.validators import MinValueValidator'].", 

420 ) 

421 validators = models.JSONField( 

422 default=list, 

423 blank=True, 

424 help_text="A list of Django/Pydantic validator strings to be applied to this field. Example: ['MinValueValidator(0)', 'validate_v_date'].", 

425 ) 

426 list_filter = models.BooleanField( 

427 default=False, 

428 help_text="Include this field in generated ModelAdmin.list_filter.", 

429 ) 

430 

431 text_content = models.TextField(null=True, blank=True, help_text="Content for a text element, for children of an $el type component") 

432 track_change = models.BigIntegerField(null=True, blank=True) 

433 

434 @property 

435 def formkit(self): 

436 return self.node.get("$formkit") if isinstance(self.node, dict) else None 

437 

438 @property 

439 def name(self): 

440 return self.node.get("name") if isinstance(self.node, dict) else None 

441 

442 def __str__(self): 

443 return f"Node: {self.label}" if self.label else f"{self.node_type} {self.id}" 

444 

445 def save(self, *args, **kwargs): 

446 """ 

447 On save validate the 'node' field matches the 'FormKitNode' 

448 """ 

449 # rename `formkit` to `$formkit` 

450 if isinstance(self.node, dict) and "formkit" in self.node: 

451 self.node.update({"$formkit": self.node.pop("formkit")}) 

452 # We're also going to verify that the 'key' is a valid identifier 

453 # Keep in mind that the `key` may be used as part of a model so 

454 # should be valid Django fieldname too 

455 if isinstance(self.node, dict) and self.node_type in ("$formkit", "$el"): 

456 if key := self.node.get("name"): 

457 check_valid_django_id(key) 

458 

459 # Auto-promote common props from both 'additional_props' and 'node' 

460 for source in (self.additional_props, self.node): 

461 if not isinstance(source, dict): 

462 continue 

463 for field in ( 

464 "icon", 

465 "title", 

466 "readonly", 

467 "sectionsSchema", 

468 "min", 

469 "max", 

470 "step", 

471 "addLabel", 

472 "upControl", 

473 "downControl", 

474 ): 

475 if field in source: 

476 if field == "sectionsSchema": 

477 target_field = "sections_schema" 

478 elif field == "addLabel": 

479 target_field = "add_label" 

480 elif field == "upControl": 

481 target_field = "up_control" 

482 elif field == "downControl": 

483 target_field = "down_control" 

484 else: 

485 target_field = field 

486 

487 val = source.get(field) 

488 if field in ("min", "max", "step") and val is not None: 

489 val = str(val) 

490 setattr(self, target_field, val) 

491 

492 # Resolve code generation defaults if not set 

493 self.resolve_code_generation_defaults() 

494 

495 return super().save(*args, **kwargs) 

496 

497 def resolve_code_generation_defaults(self, force=False): 

498 """ 

499 Populate code generation fields from CodeGenerationConfig and settings 

500 if they are not already set. 

501 """ 

502 # We need a node structure to match against 

503 node = self.get_node(recursive=False) 

504 if isinstance(node, str): 

505 # Text nodes don't typically generate fields themselves 

506 return 

507 

508 from formkit_ninja.parser.database_node_path import DatabaseNodePath 

509 

510 # Create a transient DatabaseNodePath to leverage its resolution logic 

511 path = DatabaseNodePath(node) 

512 

513 if force or not self.django_field_type: 

514 self.django_field_type = path.to_django_type() 

515 

516 if force or not self.django_field_args or not self.django_field_positional_args: 

517 # We want the dict/list, not the string 

518 # DatabaseNodePath uses _get_config and _get_from_settings 

519 config = path._get_config() 

520 if config: 

521 if force or not self.django_field_args: 

522 self.django_field_args = config.django_args 

523 if force or not self.django_field_positional_args: 

524 self.django_field_positional_args = config.django_positional_args 

525 else: 

526 if force or not self.django_field_args: 

527 settings_args = path._get_from_settings("django_args") 

528 if isinstance(settings_args, dict): 

529 self.django_field_args = settings_args 

530 if force or not self.django_field_positional_args: 

531 settings_pos_args = path._get_from_settings("django_positional_args") 

532 if isinstance(settings_pos_args, list): 

533 self.django_field_positional_args = settings_pos_args 

534 

535 if force or not self.pydantic_field_type: 

536 self.pydantic_field_type = path.to_pydantic_type() 

537 

538 if force or not self.extra_imports: 

539 self.extra_imports = path.get_extra_imports() 

540 

541 if force or not self.validators: 

542 self.validators = path.get_validators() 

543 

544 @property 

545 def node_options(self) -> str | list[dict] | None: 

546 """ 

547 Because "options" are translated and 

548 separately stored, this step is necessary to 

549 reinstate them 

550 """ 

551 if self.node and (opts := self.node.get("options")): 

552 return opts 

553 

554 if not self.option_group: 

555 return None 

556 options = self.option_group.option_set.all().prefetch_related("optionlabel_set") 

557 # options: Iterable[Option] = self.option_set.all().prefetch_related("optionlabel_set") 

558 # TODO: This is horribly slow 

559 return [ 

560 { 

561 "value": option.value, 

562 "label": f"{label_obj.label if (label_obj := option.optionlabel_set.first()) else ''}", 

563 } 

564 for option in options 

565 ] 

566 

567 def get_node_values(self, recursive: bool = True, options: bool = True) -> str | dict: 

568 """ 

569 Reify a 'dict' instance suitable for creating 

570 a FormKit Schema node from 

571 """ 

572 # Text element 

573 if not self.node: 

574 if self.text_content: 

575 return self.text_content 

576 return {} 

577 values = {**self.node} 

578 

579 # Options may come from a string in the node, or 

580 # may come from an m2m 

581 if options and self.node_options: 

582 values["options"] = self.node_options 

583 if recursive: 

584 children = [c.get_node_values() for c in self.children.order_by("nodechildren__order")] 

585 if children: 

586 values["children"] = children 

587 if self.icon: 

588 values["icon"] = self.icon 

589 if self.title: 

590 values["title"] = self.title 

591 if self.readonly: 

592 values["readonly"] = self.readonly 

593 if self.sections_schema: 

594 values["sectionsSchema"] = self.sections_schema 

595 if self.min: 

596 try: 

597 values["min"] = int(self.min) 

598 except ValueError: 

599 values["min"] = self.min 

600 if self.max: 

601 try: 

602 values["max"] = int(self.max) 

603 except ValueError: 

604 values["max"] = self.max 

605 if self.step: 

606 try: 

607 val = float(self.step) 

608 if val.is_integer(): 

609 values["step"] = int(val) 

610 else: 

611 values["step"] = str(val) # Keep as string if float to avoid precision issues 

612 values["step"] = self.step 

613 except ValueError: 

614 values["step"] = self.step 

615 if self.add_label: 

616 values["addLabel"] = self.add_label 

617 if not self.up_control: # Only write if false? Or always? Defaults are True. 

618 values["upControl"] = self.up_control 

619 if not self.down_control: 

620 values["downControl"] = self.down_control 

621 

622 # Code Generation fields 

623 if self.django_field_type: 

624 values["django_field_type"] = self.django_field_type 

625 if self.django_field_args: 

626 values["django_field_args"] = self.django_field_args 

627 if self.django_field_positional_args: 

628 values["django_field_positional_args"] = self.django_field_positional_args 

629 if self.pydantic_field_type: 

630 values["pydantic_field_type"] = self.pydantic_field_type 

631 if self.extra_imports: 

632 values["extra_imports"] = self.extra_imports 

633 if self.validators: 

634 values["validators"] = self.validators 

635 if self.list_filter: 

636 values["list_filter"] = self.list_filter 

637 

638 # Merge additional_props into the top level and ensure it's removed as a separate key 

639 values.pop("additional_props", None) 

640 if self.additional_props and len(self.additional_props) > 0: 

641 # Handle nested additional_props structure 

642 props_to_merge = self.additional_props 

643 if "additional_props" in props_to_merge: 

644 props_to_merge = props_to_merge["additional_props"] 

645 # Filter out None values to prevent Pydantic validation errors 

646 clean_props = {k: v for k, v in props_to_merge.items() if v is not None} 

647 values.update(clean_props) 

648 

649 if self.node_type == "$el" and not values.get("$el"): 

650 values["$el"] = "span" 

651 elif self.node_type == "$formkit" and not values.get("$formkit"): 

652 values["$formkit"] = "text" 

653 

654 return {k: v for k, v in values.items() if v != ""} 

655 

656 def get_ancestors(self) -> list["FormKitSchemaNode"]: 

657 """ 

658 Return a list of ancestor nodes by following the nodechildren_set relationship upwards. 

659 Follows the first parent found for each node. 

660 """ 

661 ancestors: list[FormKitSchemaNode] = [] 

662 current = self 

663 while True: 

664 # nodechildren_set contains objects where current is the child 

665 nc = current.nodechildren_set.first() 

666 if not nc: 

667 break 

668 current = nc.parent 

669 if current in ancestors: # Avoid infinite cycles 

670 break 

671 ancestors.insert(0, current) 

672 if len(ancestors) > 20: # Safety limit 

673 break 

674 return ancestors 

675 

676 def get_node_path(self, recursive=True) -> list[formkit_schema.Node | str]: 

677 """ 

678 Return a list of Pydantic nodes representing the path from the root to this node. 

679 """ 

680 ancestors = self.get_ancestors() 

681 return [a.get_node(recursive=False) for a in ancestors] + [self.get_node(recursive=recursive)] # type: ignore[return-value] 

682 

683 def get_node(self, recursive=False, options=False, **kwargs) -> formkit_schema.Node | str: 

684 """ 

685 Return a "decorated" node instance 

686 with restored options and translated fields 

687 """ 

688 if self.text_content or self.node_type == "text": 

689 return self.text_content or "" 

690 if self.node == {} or self.node is None: 

691 if self.node_type == "$el": 

692 node_content_dict: dict[str, Any] = {"$el": "span"} 

693 elif self.node_type == "$formkit": 

694 node_content_dict = {"$formkit": "text"} 

695 else: 

696 node_content_dict = {} 

697 else: 

698 node_content_dict = self.get_node_values(**kwargs, recursive=recursive, options=options) # type: ignore[assignment] 

699 

700 formkit_node = formkit_schema.FormKitNode.parse_obj(node_content_dict, recursive=recursive) 

701 return formkit_node.__root__ 

702 

703 @classmethod 

704 def from_pydantic( # noqa: C901 

705 cls, input_models: formkit_schema.FormKitSchemaProps | Iterable[formkit_schema.FormKitSchemaProps] 

706 ) -> Iterable["FormKitSchemaNode"]: 

707 if isinstance(input_models, str): 

708 yield cls.objects.create(node_type="text", label=input_models, text_content=input_models) 

709 

710 elif isinstance(input_models, Iterable) and not isinstance(input_models, formkit_schema.FormKitSchemaProps): 

711 for n in input_models: 

712 yield from cls.from_pydantic(n) 

713 

714 elif isinstance(input_models, formkit_schema.FormKitSchemaProps): 

715 input_model = input_models 

716 instance = cls() 

717 log(f"[green]Creating {instance}") 

718 for label_field in ("name", "id", "key", "label"): 

719 if label := getattr(input_model, label_field, None): 

720 instance.label = label 

721 break 

722 

723 # Node types 

724 if props := getattr(input_model, "additional_props", None): 

725 instance.additional_props = props 

726 

727 if (icon := getattr(input_model, "icon", None)) is not None: 

728 instance.icon = icon 

729 if (title := getattr(input_model, "title", None)) is not None: 

730 instance.title = title 

731 if (readonly := getattr(input_model, "readonly", None)) is not None: 

732 instance.readonly = readonly 

733 if (sections_schema := getattr(input_model, "sectionsSchema", None)) is not None: 

734 instance.sections_schema = sections_schema 

735 if (min_val := getattr(input_model, "min", None)) is not None: 

736 instance.min = str(min_val) 

737 if (max_val := getattr(input_model, "max", None)) is not None: 

738 instance.max = str(max_val) 

739 if (step := getattr(input_model, "step", None)) is not None: 

740 instance.step = str(step) 

741 if (add_label := getattr(input_model, "addLabel", None)) is not None: 

742 instance.add_label = add_label 

743 if (up_control := getattr(input_model, "upControl", None)) is not None: 

744 instance.up_control = up_control 

745 if (down_control := getattr(input_model, "downControl", None)) is not None: 

746 instance.down_control = down_control 

747 

748 # Code Generation Fields 

749 if (django_field_type := getattr(input_model, "django_field_type", None)) is not None: 

750 instance.django_field_type = django_field_type 

751 if (django_field_args := getattr(input_model, "django_field_args", None)) is not None: 

752 instance.django_field_args = django_field_args 

753 if (django_field_positional_args := getattr(input_model, "django_field_positional_args", None)) is not None: 

754 instance.django_field_positional_args = django_field_positional_args 

755 if (pydantic_field_type := getattr(input_model, "pydantic_field_type", None)) is not None: 

756 instance.pydantic_field_type = pydantic_field_type 

757 if (extra_imports := getattr(input_model, "extra_imports", None)) is not None: 

758 instance.extra_imports = extra_imports 

759 if (validators := getattr(input_model, "validators", None)) is not None: 

760 instance.validators = validators 

761 if (list_filter := getattr(input_model, "list_filter", None)) is not None: 

762 instance.list_filter = list_filter 

763 

764 # Fields that are valid Pydantic fields but not promoted to columns must be saved in additional_props 

765 # otherwise they are lost. 

766 extra_fields = [ 

767 "max", 

768 "rows", 

769 "cols", 

770 "prefixIcon", 

771 "classes", 

772 "value", 

773 "suffixIcon", 

774 "validationRules", 

775 "maxLength", 

776 "itemClass", 

777 "itemsClass", 

778 "_minDateSource", 

779 "_maxDateSource", 

780 "disabledDays", 

781 ] 

782 # Ensure additional_props is a dict 

783 if instance.additional_props is None: 

784 instance.additional_props = {} 

785 elif not isinstance(instance.additional_props, dict): 

786 # Should not happen but safety first 

787 instance.additional_props = {} 

788 

789 for field in extra_fields: 

790 if (val := getattr(input_model, field, None)) is not None: 

791 # 'max' is now a model field, so don't put it in additional_props 

792 if field == "max": 

793 continue 

794 instance.additional_props[field] = val 

795 

796 try: 

797 node_type = getattr(input_model, "node_type") 

798 except Exception as E: 

799 raise E 

800 if node_type == "condition": 

801 instance.node_type = "condition" 

802 elif node_type == "formkit": 

803 instance.node_type = "$formkit" 

804 elif node_type == "element": 

805 instance.node_type = "$el" 

806 elif node_type == "component": 

807 instance.node_type = "$cmp" 

808 

809 log(f"[green]Yielding: {instance}") 

810 

811 # Must save the instance before adding "options" or "children" 

812 instance.node = input_model.dict( 

813 exclude={ 

814 "options", 

815 "children", 

816 "additional_props", 

817 "node_type", 

818 "list_filter", 

819 }, 

820 exclude_none=True, 

821 exclude_unset=True, 

822 ) 

823 # Where an alias is used ("el", ) restore it to the expected value 

824 # of a FormKit schema node 

825 for pydantic_key, db_key in (("el", "$el"), ("formkit", "$formkit")): 

826 if db_value := instance.node.pop(pydantic_key, None): 

827 instance.node[db_key] = db_value 

828 

829 instance.save() 

830 # Add the "options" if it is a 'text' type getter 

831 options: formkit_schema.OptionsType = getattr(input_model, "options", None) 

832 

833 if isinstance(options, str): 

834 # Maintain this as it is probably a `$get...` options call 

835 # to a Javascript function 

836 instance.node["options"] = options 

837 instance.save() 

838 

839 elif isinstance(options, Iterable): 

840 # Create a new "group" to assign these options to 

841 # Here we use a random UUID as the group name 

842 instance.option_group = OptionGroup.objects.create(group=f"Auto generated group for {str(instance)} {uuid.uuid4().hex[0:8]}") 

843 for option in Option.from_pydantic(options, group=instance.option_group): # type: ignore[arg-type] 

844 pass 

845 instance.save() 

846 

847 for c_n in getattr(input_model, "children", []) or []: 

848 child_node = next(iter(cls.from_pydantic(c_n))) 

849 console.log(f" {child_node}") 

850 instance.children.add(child_node) 

851 

852 yield instance 

853 

854 else: 

855 raise TypeError(f"Expected FormKitNode or Iterable[FormKitNode], got {type(input_models)}") 

856 

857 def to_pydantic(self, recursive=False, options=False, **kwargs): 

858 if self.text_content: 

859 return self.text_content 

860 return formkit_schema.FormKitNode.parse_obj(self.get_node_values(recursive=recursive, options=options, **kwargs)) 

861 

862 

863class SchemaManager(models.Manager): 

864 """ 

865 Provides prefetching which we'll almost always want to have 

866 """ 

867 

868 def get_queryset(self): 

869 return super().get_queryset().prefetch_related("nodes", "nodes__children") 

870 

871 

872class FormKitSchema(UuidIdModel): 

873 """ 

874 This represents a "FormKitSchema" which is an heterogenous 

875 collection of items. 

876 """ 

877 

878 label = models.CharField( 

879 max_length=1024, 

880 null=True, 

881 blank=True, 

882 help_text="Used as a human-readable label", 

883 unique=True, 

884 default=uuid.uuid4, 

885 ) 

886 nodes = models.ManyToManyField(FormKitSchemaNode, through=FormComponents) 

887 objects = SchemaManager() 

888 

889 def get_schema_values(self, recursive=False, options=False, **kwargs): 

890 """ 

891 Return a list of "node" dicts 

892 """ 

893 nodes: Iterable[FormKitSchemaNode] = self.nodes.order_by("formcomponents__order") 

894 for node in nodes: 

895 yield node.get_node_values(recursive=recursive, options=options, **kwargs) 

896 

897 def to_pydantic(self): 

898 values = list(self.get_schema_values()) 

899 return formkit_schema.FormKitSchema.parse_obj(values) 

900 

901 def __str__(self) -> str: 

902 return self.label or short_uuid(self.id) 

903 

904 def save(self, *args, **kwargs): 

905 super().save(*args, **kwargs) 

906 

907 @classmethod 

908 def from_pydantic(cls, input_model: formkit_schema.FormKitSchema, label: str | None = None) -> "FormKitSchema": 

909 """ 

910 Converts a given Pydantic representation of a Schema 

911 to Django database fields 

912 """ 

913 from formkit_ninja.services.schema_import import SchemaImportService 

914 

915 return SchemaImportService.import_schema(input_model, label=label) 

916 

917 @classmethod 

918 def from_json(cls, input_file: dict): 

919 """ 

920 Converts a given JSON string to a suitable 

921 Django representation 

922 """ 

923 schema_instance = formkit_schema.FormKitSchema.parse_obj(input_file) 

924 return cls.from_pydantic(schema_instance) 

925 

926 

927class SchemaLabel(models.Model): 

928 """ 

929 This intended to hold translations of Partisipa schema definitions. 

930 The title. 

931 """ 

932 

933 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

934 label = models.CharField(max_length=1024) 

935 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

936 

937 def __str__(self) -> str: 

938 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}" 

939 

940 

941class SchemaDescription(models.Model): 

942 """ 

943 This intended to hold translations of Partisipa schema definitions. 

944 The description. 

945 """ 

946 

947 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

948 label = models.CharField(max_length=1024) 

949 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

950 

951 def __str__(self) -> str: 

952 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}" 

953 

954 

955# Import submission models to register them with the app