Coverage for formkit_ninja / models.py: 22.80%

506 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-06 04:12 +0000

1from __future__ import annotations 

2 

3import logging 

4import uuid 

5import warnings 

6from keyword import iskeyword, issoftkeyword 

7from typing import Any, Iterable, TypedDict, get_args 

8 

9import pghistory 

10import pgtrigger 

11from django.conf import settings 

12from django.contrib.contenttypes.models import ContentType 

13from django.contrib.postgres.aggregates import ArrayAgg 

14from django.core.exceptions import ValidationError 

15from django.db import models, transaction 

16from django.db.models import Q 

17from django.db.models.aggregates import Max 

18from django.db.models.functions import Greatest 

19from django.utils import timezone 

20from rich.console import Console 

21 

22from formkit_ninja import formkit_schema, triggers 

23 

24# Re export "form_submission" models 

25from formkit_ninja.code_generation_config import CodeGenerationConfig # noqa: F401 

26from formkit_ninja.form_submission.models import ( 

27 SeparatedSubmission, # noqa: F401 

28 Submission, # noqa: F401 

29 SubmissionField, # noqa: F401 

30 SubmissionFile, # noqa: F401 

31) 

32from formkit_ninja.utils import short_uuid 

33 

34console = Console() 

35log = console.log 

36 

37logger = logging.getLogger() 

38 

39 

40def check_valid_django_id(key: str): 

41 if not key: 

42 raise ValidationError("Name cannot be empty") 

43 if key[0].isdigit(): 

44 raise ValidationError(f"{key} is not valid, it cannot start with a digit") 

45 if not key.isidentifier() or iskeyword(key) or issoftkeyword(key): 

46 raise ValidationError(f"{key} cannot be used as a keyword. Should be a valid python identifier") 

47 if key[-1] == "_": 

48 raise ValidationError(f"{key} is not valid, it cannot end with an underscore") 

49 

50 

51class UuidIdModel(models.Model): 

52 """ 

53 Consistently use fields which will 

54 help with syncing data: 

55 - UUID field is the ID 

56 - Created field 

57 - Last Modified field 

58 - updated_by (optional) 

59 - created_by (optional) 

60 """ 

61 

62 class Meta: 

63 abstract = True 

64 

65 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True) 

66 created = models.DateTimeField(default=timezone.now, blank=True, null=True) 

67 updated = models.DateTimeField(auto_now=True, blank=True, null=True) 

68 created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True) 

69 updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True) 

70 

71 

72class OptionDict(TypedDict): 

73 value: str 

74 label: str 

75 

76 

77class OptionGroup(models.Model): 

78 """ 

79 This intended to be a "collection" of choices 

80 For instance all the values in a single PNDS zTable 

81 Also intended to allow users to add / modify their __own__ 'Options' 

82 for idb and formkit to recognize 

83 """ 

84 

85 group = models.CharField(max_length=1024, primary_key=True, help_text="The label to use for these options") 

86 content_type = models.ForeignKey( 

87 ContentType, 

88 on_delete=models.PROTECT, 

89 null=True, 

90 blank=True, 

91 help_text=("This is an optional reference to the original source object for this set of options (typically a table from which we copy options)"), 

92 ) 

93 

94 # If the object is a "Content Type" we expect it to have a similar layout to this 

95 

96 def save(self, *args, **kwargs): 

97 # Prior to save ensure that content_type, if present, fits suitable schema 

98 if self.content_type: 

99 klass = self.content_type.model_class() 

100 try: 

101 if klass._meta.get_field("value") is None or not hasattr(klass, "label_set"): 

102 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") 

103 except Exception as E: 

104 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") from E 

105 return super().save(*args, **kwargs) 

106 

107 def __str__(self): 

108 return f"{self.group}" 

109 

110 @classmethod 

111 def copy_table(cls, model: type[models.Model], field: str, language: str | None = "en", group_name: str | None = None): 

112 """ 

113 Copy an existing table of options into this OptionGroup 

114 """ 

115 

116 with transaction.atomic(): 

117 group_obj, group_created = cls.objects.get_or_create(group=group_name, content_type=ContentType.objects.get_for_model(model)) 

118 log(group_obj) 

119 

120 from typing import Any, cast 

121 

122 for obj in cast(Any, model).objects.values("pk", field): 

123 option, option_created = Option.objects.get_or_create( 

124 object_id=obj["pk"], 

125 group=group_obj, 

126 value=obj["pk"], 

127 ) 

128 OptionLabel.objects.get_or_create(option=option, label=obj[field] or "", lang=language) 

129 

130 

131class OptionQuerySet(models.Manager): 

132 """ 

133 Prefetched "labels" for performance 

134 """ 

135 

136 def get_queryset(self): 

137 """ 

138 Added a prefetch_related to the queryset 

139 """ 

140 lang_codes = (n[0] for n in settings.LANGUAGES) 

141 

142 label_model = OptionLabel 

143 annotated_fields = {f"label_{lang}": label_model.objects.filter(lang=lang, option=models.OuterRef("pk")) for lang in lang_codes} 

144 annotated_fields_subquery = {field: models.Subquery(query.values("label")[:1], output_field=models.CharField()) for field, query in annotated_fields.items()} 

145 return super().get_queryset().annotate(**annotated_fields_subquery) 

146 

147 

148class Option(UuidIdModel): 

149 """ 

150 This is a key/value field representing one "option" for a FormKit property 

151 The translated values for this option are in the `Translatable` table 

152 """ 

153 

154 object_id = models.IntegerField( 

155 null=True, 

156 blank=True, 

157 help_text=("This is a reference to the primary key of the original source object (typically a PNDS ztable ID) or a user-specified ID for a new group"), 

158 ) 

159 last_updated = models.DateTimeField(auto_now=True) 

160 group = models.ForeignKey(OptionGroup, on_delete=models.CASCADE, null=True, blank=True) 

161 # is_active = models.BooleanField(default=True) 

162 order = models.IntegerField(null=True, blank=True) 

163 

164 class Meta: 

165 triggers = triggers.update_or_insert_group_trigger("group_id") 

166 constraints = [models.UniqueConstraint(fields=["group", "object_id"], name="unique_option_id")] 

167 ordering = ( 

168 "group", 

169 "order", 

170 ) 

171 

172 value = models.CharField(max_length=1024) 

173 order_with_respect_to = "group" 

174 

175 objects = OptionQuerySet() 

176 

177 @classmethod 

178 def from_pydantic( 

179 cls, 

180 options: list[str | OptionDict], 

181 group: OptionGroup | None = None, 

182 ) -> Iterable["Option"]: 

183 """ 

184 Yields "Options" in the database based on the input given 

185 """ 

186 from formkit_ninja.services.schema_import import SchemaImportService 

187 

188 yield from SchemaImportService.import_options(options, group=group) 

189 

190 def __str__(self) -> str: 

191 # Use group_id (stored on row; OptionGroup.pk is the group name) to avoid N+1. 

192 if self.group_id: 

193 return f"{self.group_id}::{self.value}" 

194 return f"No group: {self.value}" 

195 

196 

197class OptionLabel(models.Model): 

198 option = models.ForeignKey("Option", on_delete=models.CASCADE) 

199 label = models.CharField(max_length=1024) 

200 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

201 

202 def save(self, *args, **kwargs): 

203 """ 

204 When saved, save also my "option" so that its last_updated is set 

205 """ 

206 if self.option is not None: 

207 self.option.save() 

208 return super().save(*args, **kwargs) 

209 

210 class Meta: 

211 constraints = [models.UniqueConstraint(fields=["option", "lang"], name="unique_option_label")] 

212 

213 def __str__(self) -> str: 

214 # Use only local fields to avoid N+1 when listing (e.g. in admin). 

215 return f"{self.label} ({self.lang})" if self.label else f"option={self.option_id} lang={self.lang}" 

216 

217 

218class FormComponents(UuidIdModel): 

219 """ 

220 A model relating "nodes" of a schema to a schema with model ordering 

221 """ 

222 

223 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

224 # This is null=True so that a new FormComponent can be added from the admin inline 

225 node = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE, null=True, blank=True) 

226 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True) 

227 order = models.IntegerField(null=True, blank=True) 

228 order_with_respect_to = "schema" 

229 

230 class Meta: 

231 triggers = triggers.update_or_insert_group_trigger("schema_id") 

232 ordering = ("schema", "order") 

233 

234 def __str__(self) -> str: 

235 # Use node_id/schema_id (stored on row) to avoid N+1 when listing FormComponents. 

236 return f"node={self.node_id}[{self.order}]: schema={self.schema_id}" 

237 

238 

239class NodeChildrenManager(models.Manager): 

240 """ 

241 Adds aggregation and filtering for client side data 

242 of NodeChildren relations 

243 """ 

244 

245 def aggregate_changes_table(self, latest_change: int | None = None): 

246 values = ( 

247 self.get_queryset() 

248 .values("parent_id") 

249 .annotate( 

250 children=ArrayAgg("child", ordering="order"), 

251 ) 

252 .annotate(Max("child__track_change")) 

253 .annotate(latest_change=Greatest("child__track_change__max", "parent__track_change")) 

254 ) 

255 if latest_change: 

256 values = values.filter(Q(latest_change__gt=latest_change) | Q(parent__latest_change__gt=latest_change)) 

257 return values.values_list("parent_id", "latest_change", "children", named=True) 

258 

259 

260class NodeChildren(models.Model): 

261 """ 

262 This is an ordered m2m model representing 

263 the "children" of an HTML element 

264 """ 

265 

266 parent = models.ForeignKey( 

267 "FormKitSchemaNode", 

268 on_delete=models.CASCADE, 

269 related_name="parent", 

270 ) 

271 child = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE) 

272 order = models.IntegerField(null=True, blank=True) 

273 track_change = models.BigIntegerField(null=True, blank=True) 

274 order_with_respect_to = "parent" 

275 

276 class Meta: 

277 triggers = [ 

278 *triggers.update_or_insert_group_trigger("parent_id"), 

279 triggers.bump_sequence_value(sequence_name=triggers.NODE_CHILDREN_CHANGE_ID), 

280 ] 

281 ordering = ( 

282 "parent_id", 

283 "order", 

284 ) 

285 

286 objects = NodeChildrenManager() 

287 

288 def __str__(self) -> str: 

289 # Use parent_id/child_id (stored on row) to avoid N+1. 

290 return f"parent={self.parent_id} → child={self.child_id} order={self.order}" 

291 

292 

293class NodeQS(models.QuerySet): 

294 def from_change(self, track_change: int = -1): 

295 return self.filter(track_change__gt=track_change) 

296 

297 def to_response(self, ignore_errors: bool = True, options: bool = True) -> Iterable[tuple[uuid.UUID, int | None, formkit_schema.Node | str | None, bool]]: 

298 """ 

299 Return a set of FormKit nodes 

300 """ 

301 node: FormKitSchemaNode 

302 for node in self.all(): 

303 try: 

304 if node.is_active: 

305 yield node.id, node.track_change, node.get_node(recursive=False, options=options), node.protected 

306 else: 

307 yield node.id, node.track_change, None, node.protected 

308 except Exception as E: 

309 if not ignore_errors: 

310 raise 

311 warnings.warn(f"An unparseable FormKit node was hit at {node.pk}") 

312 warnings.warn(f"{E}") 

313 

314 

315@pghistory.track() 

316@pgtrigger.register( 

317 pgtrigger.Protect( 

318 # If the node is protected, delete is not allowed 

319 name="protect_node_deletes_and_updates", 

320 operation=pgtrigger.Delete, 

321 condition=pgtrigger.Q(old__protected=True), 

322 ), 

323 pgtrigger.Protect( 

324 # If both new and old values are "protected", updates are not allowed 

325 name="protect_node_updates", 

326 operation=pgtrigger.Update, 

327 condition=pgtrigger.Q(old__protected=True) & pgtrigger.Q(new__protected=True), 

328 ), 

329 pgtrigger.SoftDelete(name="soft_delete", field="is_active"), 

330 triggers.bump_sequence_value("track_change", triggers.NODE_CHANGE_ID), 

331) 

332class FormKitSchemaNode(UuidIdModel): 

333 """ 

334 This represents a single "Node" in a FormKit schema. 

335 There are several different types of node which may be defined: 

336 FormKitSchemaDOMNode 

337 | FormKitSchemaComponent 

338 | FormKitSchemaTextNode 

339 | FormKitSchemaCondition 

340 | FormKitSchemaFormKit 

341 """ 

342 

343 objects = NodeQS.as_manager() 

344 

345 NODE_TYPE_CHOICES = ( 

346 ("$cmp", "Component"), # Not yet implemented 

347 ("text", "Text"), 

348 ("condition", "Condition"), # Not yet implemented 

349 ("$formkit", "FormKit"), 

350 ("$el", "Element"), 

351 ("raw", "Raw JSON"), # Not yet implemented 

352 ) 

353 FORMKIT_CHOICES = [(t, t) for t in get_args(formkit_schema.FORMKIT_TYPE)] 

354 

355 ELEMENT_TYPE_CHOICES = [("p", "p"), ("h1", "h1"), ("h2", "h2"), ("span", "span")] 

356 node_type = models.CharField(max_length=256, choices=NODE_TYPE_CHOICES, blank=True, help_text="") 

357 description = models.CharField( 

358 max_length=4000, 

359 null=True, 

360 blank=True, 

361 help_text="Decribe the type of data / reason for this component", 

362 ) 

363 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True) 

364 option_group = models.ForeignKey(OptionGroup, null=True, blank=True, on_delete=models.PROTECT) 

365 children = models.ManyToManyField("self", through=NodeChildren, symmetrical=False, blank=True) 

366 is_active = models.BooleanField(default=True) 

367 protected = models.BooleanField(default=False) 

368 

369 node = models.JSONField( 

370 null=True, 

371 blank=True, 

372 help_text="A JSON representation of select parts of the FormKit schema", 

373 ) 

374 

375 additional_props = models.JSONField( 

376 null=True, 

377 blank=True, 

378 help_text="User space for additional, less used props", 

379 ) 

380 icon = models.CharField(max_length=256, null=True, blank=True) 

381 title = models.CharField(max_length=1024, null=True, blank=True) 

382 readonly = models.BooleanField(default=False) 

383 sections_schema = models.JSONField(null=True, blank=True, help_text="Schema for the sections") 

384 min = models.CharField(max_length=256, null=True, blank=True) 

385 max = models.CharField(max_length=256, null=True, blank=True) 

386 step = models.CharField(max_length=256, null=True, blank=True) 

387 add_label = models.CharField(max_length=1024, null=True, blank=True) 

388 up_control = models.BooleanField(default=True) 

389 down_control = models.BooleanField(default=True) 

390 

391 # Code Generation Source of Truth 

392 django_field_type = models.CharField( 

393 max_length=100, 

394 null=True, 

395 blank=True, 

396 help_text="The Django Model Field class to use (e.g., 'CharField', 'IntegerField', 'ForeignKey'). Providing this makes this field the primary source of truth for code generation.", 

397 ) 

398 django_field_args = models.JSONField( 

399 default=dict, 

400 blank=True, 

401 help_text="Arguments passed to the Django field as a JSON dictionary. " 

402 "Example: {'null': true, 'blank': true, 'max_length': 255}. " 

403 "For ForeignKeys, include the model name: {'to': 'auth.User', 'on_delete': 'models.CASCADE'}.", 

404 ) 

405 django_field_positional_args = models.JSONField( 

406 default=list, 

407 blank=True, 

408 help_text="Positional arguments passed to the Django field as a JSON list. Example: ['auth.User'].", 

409 ) 

410 pydantic_field_type = models.CharField( 

411 max_length=100, 

412 null=True, 

413 blank=True, 

414 help_text="The Python/Pydantic type for this field (e.g., 'str', 'int', 'Decimal', 'UUID', 'date').", 

415 ) 

416 extra_imports = models.JSONField( 

417 default=list, 

418 blank=True, 

419 help_text="A list of additional Python import statements required by this field. Example: ['from decimal import Decimal', 'from django.core.validators import MinValueValidator'].", 

420 ) 

421 validators = models.JSONField( 

422 default=list, 

423 blank=True, 

424 help_text="A list of Django/Pydantic validator strings to be applied to this field. Example: ['MinValueValidator(0)', 'validate_v_date'].", 

425 ) 

426 list_filter = models.BooleanField( 

427 default=False, 

428 help_text="Include this field in generated ModelAdmin.list_filter.", 

429 ) 

430 

431 text_content = models.TextField(null=True, blank=True, help_text="Content for a text element, for children of an $el type component") 

432 track_change = models.BigIntegerField(null=True, blank=True) 

433 

434 @property 

435 def formkit(self): 

436 return self.node.get("$formkit") if isinstance(self.node, dict) else None 

437 

438 @property 

439 def name(self): 

440 return self.node.get("name") if isinstance(self.node, dict) else None 

441 

442 def __str__(self): 

443 return f"Node: {self.label}" if self.label else f"{self.node_type} {self.id}" 

444 

445 def save(self, *args, **kwargs): 

446 """ 

447 On save validate the 'node' field matches the 'FormKitNode' 

448 """ 

449 # rename `formkit` to `$formkit` 

450 if isinstance(self.node, dict) and "formkit" in self.node: 

451 self.node.update({"$formkit": self.node.pop("formkit")}) 

452 # We're also going to verify that the 'key' is a valid identifier 

453 # Keep in mind that the `key` may be used as part of a model so 

454 # should be valid Django fieldname too 

455 if isinstance(self.node, dict) and self.node_type in ("$formkit", "$el"): 

456 if key := self.node.get("name"): 

457 check_valid_django_id(key) 

458 

459 # Auto-promote common props from both 'additional_props' and 'node' 

460 for source in (self.additional_props, self.node): 

461 if not isinstance(source, dict): 

462 continue 

463 for field in ( 

464 "icon", 

465 "title", 

466 "readonly", 

467 "sectionsSchema", 

468 "min", 

469 "max", 

470 "step", 

471 "addLabel", 

472 "upControl", 

473 "downControl", 

474 ): 

475 if field in source: 

476 if field == "sectionsSchema": 

477 target_field = "sections_schema" 

478 elif field == "addLabel": 

479 target_field = "add_label" 

480 elif field == "upControl": 

481 target_field = "up_control" 

482 elif field == "downControl": 

483 target_field = "down_control" 

484 else: 

485 target_field = field 

486 

487 val = source.get(field) 

488 if field in ("min", "max", "step") and val is not None: 

489 val = str(val) 

490 setattr(self, target_field, val) 

491 

492 # Sync promoted columns back into node so stored JSON stays in sync when 

493 # admin edits model fields (e.g. add_label, up_control) rather than node. 

494 # Only write non-default values (or when key already in node) to avoid 

495 # adding keys that weren't in the original schema (round-trip fidelity). 

496 if isinstance(self.node, dict): 

497 _promoted_to_node = ( 

498 ("icon", "icon", None), 

499 ("title", "title", None), 

500 ("readonly", "readonly", False), 

501 ("sections_schema", "sectionsSchema", None), 

502 ("min", "min", None), 

503 ("max", "max", None), 

504 ("step", "step", None), 

505 ("add_label", "addLabel", None), 

506 ("up_control", "upControl", True), 

507 ("down_control", "downControl", True), 

508 ) 

509 for attr, key, default in _promoted_to_node: 

510 val = getattr(self, attr, None) 

511 already_in_node = key in self.node 

512 if isinstance(val, bool): 

513 if already_in_node or val != default: 

514 self.node[key] = val 

515 elif key in self.node: 

516 self.node.pop(key, None) 

517 elif val not in (None, ""): 

518 self.node[key] = val 

519 elif already_in_node: 

520 self.node.pop(key, None) 

521 

522 # Resolve code generation defaults if not set 

523 self.resolve_code_generation_defaults() 

524 

525 return super().save(*args, **kwargs) 

526 

527 def resolve_code_generation_defaults(self, force=False): 

528 """ 

529 Populate code generation fields from CodeGenerationConfig and settings 

530 if they are not already set. 

531 """ 

532 # We need a node structure to match against 

533 node = self.get_node(recursive=False) 

534 if isinstance(node, str): 

535 # Text nodes don't typically generate fields themselves 

536 return 

537 

538 from formkit_ninja.parser.database_node_path import DatabaseNodePath 

539 

540 # Create a transient DatabaseNodePath to leverage its resolution logic 

541 path = DatabaseNodePath(node) 

542 

543 if force or not self.django_field_type: 

544 self.django_field_type = path.to_django_type() 

545 

546 if force or not self.django_field_args or not self.django_field_positional_args: 

547 # We want the dict/list, not the string 

548 # DatabaseNodePath uses _get_config and _get_from_settings 

549 config = path._get_config() 

550 if config: 

551 if force or not self.django_field_args: 

552 self.django_field_args = config.django_args 

553 if force or not self.django_field_positional_args: 

554 self.django_field_positional_args = config.django_positional_args 

555 else: 

556 if force or not self.django_field_args: 

557 settings_args = path._get_from_settings("django_args") 

558 if isinstance(settings_args, dict): 

559 self.django_field_args = settings_args 

560 if force or not self.django_field_positional_args: 

561 settings_pos_args = path._get_from_settings("django_positional_args") 

562 if isinstance(settings_pos_args, list): 

563 self.django_field_positional_args = settings_pos_args 

564 

565 if force or not self.pydantic_field_type: 

566 self.pydantic_field_type = path.to_pydantic_type() 

567 

568 if force or not self.extra_imports: 

569 self.extra_imports = path.get_extra_imports() 

570 

571 if force or not self.validators: 

572 self.validators = path.get_validators() 

573 

574 @property 

575 def node_options(self) -> str | list[dict] | None: 

576 """ 

577 Because "options" are translated and 

578 separately stored, this step is necessary to 

579 reinstate them 

580 """ 

581 if self.node and (opts := self.node.get("options")): 

582 return opts 

583 

584 if not self.option_group: 

585 return None 

586 options = self.option_group.option_set.all().prefetch_related("optionlabel_set") 

587 # options: Iterable[Option] = self.option_set.all().prefetch_related("optionlabel_set") 

588 # TODO: This is horribly slow 

589 return [ 

590 { 

591 "value": option.value, 

592 "label": f"{label_obj.label if (label_obj := option.optionlabel_set.first()) else ''}", 

593 } 

594 for option in options 

595 ] 

596 

597 def get_node_values(self, recursive: bool = True, options: bool = True) -> str | dict: 

598 """ 

599 Reify a 'dict' instance suitable for creating 

600 a FormKit Schema node from 

601 """ 

602 # Text element 

603 if not self.node: 

604 if self.text_content: 

605 return self.text_content 

606 return {} 

607 values = {**self.node} 

608 

609 # Options may come from a string in the node, or 

610 # may come from an m2m 

611 if options and self.node_options: 

612 values["options"] = self.node_options 

613 if recursive: 

614 children = [c.get_node_values() for c in self.children.order_by("nodechildren__order")] 

615 if children: 

616 values["children"] = children 

617 if self.icon: 

618 values["icon"] = self.icon 

619 if self.title: 

620 values["title"] = self.title 

621 if self.readonly: 

622 values["readonly"] = self.readonly 

623 if self.sections_schema: 

624 values["sectionsSchema"] = self.sections_schema 

625 if self.min: 

626 try: 

627 values["min"] = int(self.min) 

628 except ValueError: 

629 values["min"] = self.min 

630 if self.max: 

631 try: 

632 values["max"] = int(self.max) 

633 except ValueError: 

634 values["max"] = self.max 

635 if self.step: 

636 try: 

637 val = float(self.step) 

638 if val.is_integer(): 

639 values["step"] = int(val) 

640 else: 

641 values["step"] = str(val) # Keep as string if float to avoid precision issues 

642 values["step"] = self.step 

643 except ValueError: 

644 values["step"] = self.step 

645 if self.add_label: 

646 values["addLabel"] = self.add_label 

647 if not self.up_control: # Only write if false? Or always? Defaults are True. 

648 values["upControl"] = self.up_control 

649 if not self.down_control: 

650 values["downControl"] = self.down_control 

651 

652 # Code Generation fields 

653 if self.django_field_type: 

654 values["django_field_type"] = self.django_field_type 

655 if self.django_field_args: 

656 values["django_field_args"] = self.django_field_args 

657 if self.django_field_positional_args: 

658 values["django_field_positional_args"] = self.django_field_positional_args 

659 if self.pydantic_field_type: 

660 values["pydantic_field_type"] = self.pydantic_field_type 

661 if self.extra_imports: 

662 values["extra_imports"] = self.extra_imports 

663 if self.validators: 

664 values["validators"] = self.validators 

665 if self.list_filter: 

666 values["list_filter"] = self.list_filter 

667 

668 # Merge additional_props into the top level and ensure it's removed as a separate key 

669 values.pop("additional_props", None) 

670 if self.additional_props and len(self.additional_props) > 0: 

671 # Handle nested additional_props structure 

672 props_to_merge = self.additional_props 

673 if "additional_props" in props_to_merge: 

674 props_to_merge = props_to_merge["additional_props"] 

675 # Filter out None values to prevent Pydantic validation errors 

676 clean_props = {k: v for k, v in props_to_merge.items() if v is not None} 

677 values.update(clean_props) 

678 

679 if self.node_type == "$el" and not values.get("$el"): 

680 values["$el"] = "span" 

681 elif self.node_type == "$formkit" and not values.get("$formkit"): 

682 values["$formkit"] = "text" 

683 

684 return {k: v for k, v in values.items() if v != ""} 

685 

686 def get_ancestors(self) -> list["FormKitSchemaNode"]: 

687 """ 

688 Return a list of ancestor nodes by following the nodechildren_set relationship upwards. 

689 Follows the first parent found for each node. 

690 """ 

691 ancestors: list[FormKitSchemaNode] = [] 

692 current = self 

693 while True: 

694 # nodechildren_set contains objects where current is the child 

695 nc = current.nodechildren_set.first() 

696 if not nc: 

697 break 

698 current = nc.parent 

699 if current in ancestors: # Avoid infinite cycles 

700 break 

701 ancestors.insert(0, current) 

702 if len(ancestors) > 20: # Safety limit 

703 break 

704 return ancestors 

705 

706 def get_node_path(self, recursive=True) -> list[formkit_schema.Node | str]: 

707 """ 

708 Return a list of Pydantic nodes representing the path from the root to this node. 

709 """ 

710 ancestors = self.get_ancestors() 

711 return [a.get_node(recursive=False) for a in ancestors] + [self.get_node(recursive=recursive)] # type: ignore[return-value] 

712 

713 def get_node(self, recursive=False, options=False, **kwargs) -> formkit_schema.Node | str: 

714 """ 

715 Return a "decorated" node instance 

716 with restored options and translated fields 

717 """ 

718 if self.text_content or self.node_type == "text": 

719 return self.text_content or "" 

720 if self.node == {} or self.node is None: 

721 if self.node_type == "$el": 

722 node_content_dict: dict[str, Any] = {"$el": "span"} 

723 elif self.node_type == "$formkit": 

724 node_content_dict = {"$formkit": "text"} 

725 else: 

726 node_content_dict = {} 

727 else: 

728 node_content_dict = self.get_node_values(**kwargs, recursive=recursive, options=options) # type: ignore[assignment] 

729 

730 formkit_node = formkit_schema.FormKitNode.parse_obj(node_content_dict, recursive=recursive) 

731 return formkit_node.__root__ 

732 

733 @classmethod 

734 def from_pydantic( # noqa: C901 

735 cls, input_models: formkit_schema.FormKitSchemaProps | Iterable[formkit_schema.FormKitSchemaProps] 

736 ) -> Iterable["FormKitSchemaNode"]: 

737 if isinstance(input_models, str): 

738 yield cls.objects.create(node_type="text", label=input_models, text_content=input_models) 

739 

740 elif isinstance(input_models, Iterable) and not isinstance(input_models, formkit_schema.FormKitSchemaProps): 

741 for n in input_models: 

742 yield from cls.from_pydantic(n) 

743 

744 elif isinstance(input_models, formkit_schema.FormKitSchemaProps): 

745 input_model = input_models 

746 instance = cls() 

747 log(f"[green]Creating {instance}") 

748 for label_field in ("name", "id", "key", "label"): 

749 if label := getattr(input_model, label_field, None): 

750 instance.label = label 

751 break 

752 

753 # Node types 

754 if props := getattr(input_model, "additional_props", None): 

755 instance.additional_props = props 

756 

757 if (icon := getattr(input_model, "icon", None)) is not None: 

758 instance.icon = icon 

759 if (title := getattr(input_model, "title", None)) is not None: 

760 instance.title = title 

761 if (readonly := getattr(input_model, "readonly", None)) is not None: 

762 instance.readonly = readonly 

763 if (sections_schema := getattr(input_model, "sectionsSchema", None)) is not None: 

764 instance.sections_schema = sections_schema 

765 if (min_val := getattr(input_model, "min", None)) is not None: 

766 instance.min = str(min_val) 

767 if (max_val := getattr(input_model, "max", None)) is not None: 

768 instance.max = str(max_val) 

769 if (step := getattr(input_model, "step", None)) is not None: 

770 instance.step = str(step) 

771 if (add_label := getattr(input_model, "addLabel", None)) is not None: 

772 instance.add_label = add_label 

773 if (up_control := getattr(input_model, "upControl", None)) is not None: 

774 instance.up_control = up_control 

775 if (down_control := getattr(input_model, "downControl", None)) is not None: 

776 instance.down_control = down_control 

777 

778 # Code Generation Fields 

779 if (django_field_type := getattr(input_model, "django_field_type", None)) is not None: 

780 instance.django_field_type = django_field_type 

781 if (django_field_args := getattr(input_model, "django_field_args", None)) is not None: 

782 instance.django_field_args = django_field_args 

783 if (django_field_positional_args := getattr(input_model, "django_field_positional_args", None)) is not None: 

784 instance.django_field_positional_args = django_field_positional_args 

785 if (pydantic_field_type := getattr(input_model, "pydantic_field_type", None)) is not None: 

786 instance.pydantic_field_type = pydantic_field_type 

787 if (extra_imports := getattr(input_model, "extra_imports", None)) is not None: 

788 instance.extra_imports = extra_imports 

789 if (validators := getattr(input_model, "validators", None)) is not None: 

790 instance.validators = validators 

791 if (list_filter := getattr(input_model, "list_filter", None)) is not None: 

792 instance.list_filter = list_filter 

793 

794 # Fields that are valid Pydantic fields but not promoted to columns must be saved in additional_props 

795 # otherwise they are lost. 

796 extra_fields = [ 

797 "max", 

798 "rows", 

799 "cols", 

800 "prefixIcon", 

801 "classes", 

802 "value", 

803 "suffixIcon", 

804 "validationRules", 

805 "maxLength", 

806 "itemClass", 

807 "itemsClass", 

808 "_minDateSource", 

809 "_maxDateSource", 

810 "disabledDays", 

811 ] 

812 # Ensure additional_props is a dict 

813 if instance.additional_props is None: 

814 instance.additional_props = {} 

815 elif not isinstance(instance.additional_props, dict): 

816 # Should not happen but safety first 

817 instance.additional_props = {} 

818 

819 for field in extra_fields: 

820 if (val := getattr(input_model, field, None)) is not None: 

821 # 'max' is now a model field, so don't put it in additional_props 

822 if field == "max": 

823 continue 

824 instance.additional_props[field] = val 

825 

826 try: 

827 node_type = getattr(input_model, "node_type") 

828 except Exception as E: 

829 raise E 

830 if node_type == "condition": 

831 instance.node_type = "condition" 

832 elif node_type == "formkit": 

833 instance.node_type = "$formkit" 

834 elif node_type == "element": 

835 instance.node_type = "$el" 

836 elif node_type == "component": 

837 instance.node_type = "$cmp" 

838 

839 log(f"[green]Yielding: {instance}") 

840 

841 # Must save the instance before adding "options" or "children" 

842 instance.node = input_model.dict( 

843 exclude={ 

844 "options", 

845 "children", 

846 "additional_props", 

847 "node_type", 

848 "list_filter", 

849 }, 

850 exclude_none=True, 

851 exclude_unset=True, 

852 ) 

853 # Where an alias is used ("el", ) restore it to the expected value 

854 # of a FormKit schema node 

855 for pydantic_key, db_key in (("el", "$el"), ("formkit", "$formkit")): 

856 if db_value := instance.node.pop(pydantic_key, None): 

857 instance.node[db_key] = db_value 

858 

859 instance.save() 

860 # Add the "options" if it is a 'text' type getter 

861 options: formkit_schema.OptionsType = getattr(input_model, "options", None) 

862 

863 if isinstance(options, str): 

864 # Maintain this as it is probably a `$get...` options call 

865 # to a Javascript function 

866 instance.node["options"] = options 

867 instance.save() 

868 

869 elif isinstance(options, Iterable): 

870 # Create a new "group" to assign these options to 

871 # Here we use a random UUID as the group name 

872 instance.option_group = OptionGroup.objects.create(group=f"Auto generated group for {str(instance)} {uuid.uuid4().hex[0:8]}") 

873 for option in Option.from_pydantic(options, group=instance.option_group): # type: ignore[arg-type] 

874 pass 

875 instance.save() 

876 

877 for c_n in getattr(input_model, "children", []) or []: 

878 child_node = next(iter(cls.from_pydantic(c_n))) 

879 console.log(f" {child_node}") 

880 instance.children.add(child_node) 

881 

882 yield instance 

883 

884 else: 

885 raise TypeError(f"Expected FormKitNode or Iterable[FormKitNode], got {type(input_models)}") 

886 

887 def to_pydantic(self, recursive=False, options=False, **kwargs): 

888 if self.text_content: 

889 return self.text_content 

890 return formkit_schema.FormKitNode.parse_obj(self.get_node_values(recursive=recursive, options=options, **kwargs)) 

891 

892 

893class SchemaManager(models.Manager): 

894 """ 

895 Provides prefetching which we'll almost always want to have 

896 """ 

897 

898 def get_queryset(self): 

899 return super().get_queryset().prefetch_related("nodes", "nodes__children") 

900 

901 

902class FormKitSchema(UuidIdModel): 

903 """ 

904 This represents a "FormKitSchema" which is an heterogenous 

905 collection of items. 

906 """ 

907 

908 label = models.CharField( 

909 max_length=1024, 

910 null=True, 

911 blank=True, 

912 help_text="Used as a human-readable label", 

913 unique=True, 

914 default=uuid.uuid4, 

915 ) 

916 nodes = models.ManyToManyField(FormKitSchemaNode, through=FormComponents) 

917 objects = SchemaManager() 

918 

919 def get_schema_values(self, recursive=False, options=False, **kwargs): 

920 """ 

921 Return a list of "node" dicts 

922 """ 

923 nodes: Iterable[FormKitSchemaNode] = self.nodes.order_by("formcomponents__order") 

924 for node in nodes: 

925 yield node.get_node_values(recursive=recursive, options=options, **kwargs) 

926 

927 def to_pydantic(self): 

928 values = list(self.get_schema_values()) 

929 return formkit_schema.FormKitSchema.parse_obj(values) 

930 

931 def __str__(self) -> str: 

932 return self.label or short_uuid(self.id) 

933 

934 def save(self, *args, **kwargs): 

935 super().save(*args, **kwargs) 

936 

937 @classmethod 

938 def from_pydantic(cls, input_model: formkit_schema.FormKitSchema, label: str | None = None) -> "FormKitSchema": 

939 """ 

940 Converts a given Pydantic representation of a Schema 

941 to Django database fields 

942 """ 

943 from formkit_ninja.services.schema_import import SchemaImportService 

944 

945 return SchemaImportService.import_schema(input_model, label=label) 

946 

947 @classmethod 

948 def from_json(cls, input_file: dict): 

949 """ 

950 Converts a given JSON string to a suitable 

951 Django representation 

952 """ 

953 schema_instance = formkit_schema.FormKitSchema.parse_obj(input_file) 

954 return cls.from_pydantic(schema_instance) 

955 

956 

957class SchemaLabel(models.Model): 

958 """ 

959 This intended to hold translations of Partisipa schema definitions. 

960 The title. 

961 """ 

962 

963 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

964 label = models.CharField(max_length=1024) 

965 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

966 

967 def __str__(self) -> str: 

968 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}" 

969 

970 

971class SchemaDescription(models.Model): 

972 """ 

973 This intended to hold translations of Partisipa schema definitions. 

974 The description. 

975 """ 

976 

977 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

978 label = models.CharField(max_length=1024) 

979 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

980 

981 def __str__(self) -> str: 

982 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}" 

983 

984 

985# Import submission models to register them with the app