Coverage for formkit_ninja / models.py: 23.33%

477 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-02-20 04:40 +0000

1from __future__ import annotations 

2 

3import logging 

4import uuid 

5import warnings 

6from keyword import iskeyword, issoftkeyword 

7from typing import Any, Iterable, TypedDict, get_args 

8 

9import pghistory 

10import pgtrigger 

11from django.conf import settings 

12from django.contrib.contenttypes.models import ContentType 

13from django.contrib.postgres.aggregates import ArrayAgg 

14from django.core.exceptions import ValidationError 

15from django.db import models, transaction 

16from django.db.models import Q 

17from django.db.models.aggregates import Max 

18from django.db.models.functions import Greatest 

19from rich.console import Console 

20 

21from formkit_ninja import formkit_schema, triggers 

22 

23# Re export "form_submission" models 

24from formkit_ninja.code_generation_config import CodeGenerationConfig # noqa: F401 

25from formkit_ninja.form_submission.models import ( 

26 SeparatedSubmission, # noqa: F401 

27 Submission, # noqa: F401 

28 SubmissionField, # noqa: F401 

29 SubmissionFile, # noqa: F401 

30) 

31 

32console = Console() 

33log = console.log 

34 

35logger = logging.getLogger() 

36 

37 

38def check_valid_django_id(key: str): 

39 if not key: 

40 raise ValidationError("Name cannot be empty") 

41 if key[0].isdigit(): 

42 raise ValidationError(f"{key} is not valid, it cannot start with a digit") 

43 if not key.isidentifier() or iskeyword(key) or issoftkeyword(key): 

44 raise ValidationError(f"{key} cannot be used as a keyword. Should be a valid python identifier") 

45 if key[-1] == "_": 

46 raise ValidationError(f"{key} is not valid, it cannot end with an underscore") 

47 

48 

49class UuidIdModel(models.Model): 

50 """ 

51 Consistently use fields which will 

52 help with syncing data: 

53 - UUID field is the ID 

54 - Created field 

55 - Last Modified field 

56 - updated_by (optional) 

57 - created_by (optional) 

58 """ 

59 

60 class Meta: 

61 abstract = True 

62 

63 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True) 

64 created = models.DateTimeField(auto_now_add=True, blank=True, null=True) 

65 updated = models.DateTimeField(auto_now=True, blank=True, null=True) 

66 created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True) 

67 updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True) 

68 

69 

70class OptionDict(TypedDict): 

71 value: str 

72 label: str 

73 

74 

75class OptionGroup(models.Model): 

76 """ 

77 This intended to be a "collection" of choices 

78 For instance all the values in a single PNDS zTable 

79 Also intended to allow users to add / modify their __own__ 'Options' 

80 for idb and formkit to recognize 

81 """ 

82 

83 group = models.CharField(max_length=1024, primary_key=True, help_text="The label to use for these options") 

84 content_type = models.ForeignKey( 

85 ContentType, 

86 on_delete=models.PROTECT, 

87 null=True, 

88 blank=True, 

89 help_text=("This is an optional reference to the original source object for this set of options (typically a table from which we copy options)"), 

90 ) 

91 

92 # If the object is a "Content Type" we expect it to have a similar layout to this 

93 

94 def save(self, *args, **kwargs): 

95 # Prior to save ensure that content_type, if present, fits suitable schema 

96 if self.content_type: 

97 klass = self.content_type.model_class() 

98 try: 

99 if klass._meta.get_field("value") is None or not hasattr(klass, "label_set"): 

100 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") 

101 except Exception as E: 

102 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") from E 

103 return super().save(*args, **kwargs) 

104 

105 def __str__(self): 

106 return f"{self.group}" 

107 

108 @classmethod 

109 def copy_table(cls, model: type[models.Model], field: str, language: str | None = "en", group_name: str | None = None): 

110 """ 

111 Copy an existing table of options into this OptionGroup 

112 """ 

113 

114 with transaction.atomic(): 

115 group_obj, group_created = cls.objects.get_or_create(group=group_name, content_type=ContentType.objects.get_for_model(model)) 

116 log(group_obj) 

117 

118 from typing import Any, cast 

119 

120 for obj in cast(Any, model).objects.values("pk", field): 

121 option, option_created = Option.objects.get_or_create( 

122 object_id=obj["pk"], 

123 group=group_obj, 

124 value=obj["pk"], 

125 ) 

126 OptionLabel.objects.get_or_create(option=option, label=obj[field] or "", lang=language) 

127 

128 

129class OptionQuerySet(models.Manager): 

130 """ 

131 Prefetched "labels" for performance 

132 """ 

133 

134 def get_queryset(self): 

135 """ 

136 Added a prefetch_related to the queryset 

137 """ 

138 lang_codes = (n[0] for n in settings.LANGUAGES) 

139 

140 label_model = OptionLabel 

141 annotated_fields = {f"label_{lang}": label_model.objects.filter(lang=lang, option=models.OuterRef("pk")) for lang in lang_codes} 

142 annotated_fields_subquery = {field: models.Subquery(query.values("label")[:1], output_field=models.CharField()) for field, query in annotated_fields.items()} 

143 return super().get_queryset().annotate(**annotated_fields_subquery) 

144 

145 

146class Option(UuidIdModel): 

147 """ 

148 This is a key/value field representing one "option" for a FormKit property 

149 The translated values for this option are in the `Translatable` table 

150 """ 

151 

152 object_id = models.IntegerField( 

153 null=True, 

154 blank=True, 

155 help_text=("This is a reference to the primary key of the original source object (typically a PNDS ztable ID) or a user-specified ID for a new group"), 

156 ) 

157 last_updated = models.DateTimeField(auto_now=True) 

158 group = models.ForeignKey(OptionGroup, on_delete=models.CASCADE, null=True, blank=True) 

159 # is_active = models.BooleanField(default=True) 

160 order = models.IntegerField(null=True, blank=True) 

161 

162 class Meta: 

163 triggers = triggers.update_or_insert_group_trigger("group_id") 

164 constraints = [models.UniqueConstraint(fields=["group", "object_id"], name="unique_option_id")] 

165 ordering = ( 

166 "group", 

167 "order", 

168 ) 

169 

170 value = models.CharField(max_length=1024) 

171 order_with_respect_to = "group" 

172 

173 objects = OptionQuerySet() 

174 

175 @classmethod 

176 def from_pydantic( 

177 cls, 

178 options: list[str | OptionDict], 

179 group: OptionGroup | None = None, 

180 ) -> Iterable["Option"]: 

181 """ 

182 Yields "Options" in the database based on the input given 

183 """ 

184 from formkit_ninja.services.schema_import import SchemaImportService 

185 

186 yield from SchemaImportService.import_options(options, group=group) 

187 

188 def __str__(self): 

189 if self.group: 

190 return f"{self.group.group}::{self.value}" 

191 else: 

192 return f"No group: {self.value}" 

193 

194 

195class OptionLabel(models.Model): 

196 option = models.ForeignKey("Option", on_delete=models.CASCADE) 

197 label = models.CharField(max_length=1024) 

198 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

199 

200 def save(self, *args, **kwargs): 

201 """ 

202 When saved, save also my "option" so that its last_updated is set 

203 """ 

204 if self.option is not None: 

205 self.option.save() 

206 return super().save(*args, **kwargs) 

207 

208 class Meta: 

209 constraints = [models.UniqueConstraint(fields=["option", "lang"], name="unique_option_label")] 

210 

211 

212class FormComponents(UuidIdModel): 

213 """ 

214 A model relating "nodes" of a schema to a schema with model ordering 

215 """ 

216 

217 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

218 # This is null=True so that a new FormComponent can be added from the admin inline 

219 node = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE, null=True, blank=True) 

220 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True) 

221 order = models.IntegerField(null=True, blank=True) 

222 order_with_respect_to = "schema" 

223 

224 class Meta: 

225 triggers = triggers.update_or_insert_group_trigger("schema_id") 

226 ordering = ("schema", "order") 

227 

228 def __str__(self): 

229 return f"{self.node}[{self.order}]: {self.schema}" 

230 

231 

232class NodeChildrenManager(models.Manager): 

233 """ 

234 Adds aggregation and filtering for client side data 

235 of NodeChildren relations 

236 """ 

237 

238 def aggregate_changes_table(self, latest_change: int | None = None): 

239 values = ( 

240 self.get_queryset() 

241 .values("parent_id") 

242 .annotate( 

243 children=ArrayAgg("child", ordering="order"), 

244 ) 

245 .annotate(Max("child__track_change")) 

246 .annotate(latest_change=Greatest("child__track_change__max", "parent__track_change")) 

247 ) 

248 if latest_change: 

249 values = values.filter(Q(latest_change__gt=latest_change) | Q(parent__latest_change__gt=latest_change)) 

250 return values.values_list("parent_id", "latest_change", "children", named=True) 

251 

252 

253class NodeChildren(models.Model): 

254 """ 

255 This is an ordered m2m model representing 

256 the "children" of an HTML element 

257 """ 

258 

259 parent = models.ForeignKey( 

260 "FormKitSchemaNode", 

261 on_delete=models.CASCADE, 

262 related_name="parent", 

263 ) 

264 child = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE) 

265 order = models.IntegerField(null=True, blank=True) 

266 track_change = models.BigIntegerField(null=True, blank=True) 

267 order_with_respect_to = "parent" 

268 

269 class Meta: 

270 triggers = [ 

271 *triggers.update_or_insert_group_trigger("parent_id"), 

272 triggers.bump_sequence_value(sequence_name=triggers.NODE_CHILDREN_CHANGE_ID), 

273 ] 

274 ordering = ( 

275 "parent_id", 

276 "order", 

277 ) 

278 

279 objects = NodeChildrenManager() 

280 

281 

282class NodeQS(models.QuerySet): 

283 def from_change(self, track_change: int = -1): 

284 return self.filter(track_change__gt=track_change) 

285 

286 def to_response(self, ignore_errors: bool = True, options: bool = True) -> Iterable[tuple[uuid.UUID, int | None, formkit_schema.Node | str | None, bool]]: 

287 """ 

288 Return a set of FormKit nodes 

289 """ 

290 node: FormKitSchemaNode 

291 for node in self.all(): 

292 try: 

293 if node.is_active: 

294 yield node.id, node.track_change, node.get_node(recursive=False, options=options), node.protected 

295 else: 

296 yield node.id, node.track_change, None, node.protected 

297 except Exception as E: 

298 if not ignore_errors: 

299 raise 

300 warnings.warn(f"An unparseable FormKit node was hit at {node.pk}") 

301 warnings.warn(f"{E}") 

302 

303 

304@pghistory.track() 

305@pgtrigger.register( 

306 pgtrigger.Protect( 

307 # If the node is protected, delete is not allowed 

308 name="protect_node_deletes_and_updates", 

309 operation=pgtrigger.Delete, 

310 condition=pgtrigger.Q(old__protected=True), 

311 ), 

312 pgtrigger.Protect( 

313 # If both new and old values are "protected", updates are not allowed 

314 name="protect_node_updates", 

315 operation=pgtrigger.Update, 

316 condition=pgtrigger.Q(old__protected=True) & pgtrigger.Q(new__protected=True), 

317 ), 

318 pgtrigger.SoftDelete(name="soft_delete", field="is_active"), 

319 triggers.bump_sequence_value("track_change", triggers.NODE_CHANGE_ID), 

320) 

321class FormKitSchemaNode(UuidIdModel): 

322 """ 

323 This represents a single "Node" in a FormKit schema. 

324 There are several different types of node which may be defined: 

325 FormKitSchemaDOMNode 

326 | FormKitSchemaComponent 

327 | FormKitSchemaTextNode 

328 | FormKitSchemaCondition 

329 | FormKitSchemaFormKit 

330 """ 

331 

332 objects = NodeQS.as_manager() 

333 

334 NODE_TYPE_CHOICES = ( 

335 ("$cmp", "Component"), # Not yet implemented 

336 ("text", "Text"), 

337 ("condition", "Condition"), # Not yet implemented 

338 ("$formkit", "FormKit"), 

339 ("$el", "Element"), 

340 ("raw", "Raw JSON"), # Not yet implemented 

341 ) 

342 FORMKIT_CHOICES = [(t, t) for t in get_args(formkit_schema.FORMKIT_TYPE)] 

343 

344 ELEMENT_TYPE_CHOICES = [("p", "p"), ("h1", "h1"), ("h2", "h2"), ("span", "span")] 

345 node_type = models.CharField(max_length=256, choices=NODE_TYPE_CHOICES, blank=True, help_text="") 

346 description = models.CharField( 

347 max_length=4000, 

348 null=True, 

349 blank=True, 

350 help_text="Decribe the type of data / reason for this component", 

351 ) 

352 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True) 

353 option_group = models.ForeignKey(OptionGroup, null=True, blank=True, on_delete=models.PROTECT) 

354 children = models.ManyToManyField("self", through=NodeChildren, symmetrical=False, blank=True) 

355 is_active = models.BooleanField(default=True) 

356 protected = models.BooleanField(default=False) 

357 

358 node = models.JSONField( 

359 null=True, 

360 blank=True, 

361 help_text="A JSON representation of select parts of the FormKit schema", 

362 ) 

363 

364 additional_props = models.JSONField( 

365 null=True, 

366 blank=True, 

367 help_text="User space for additional, less used props", 

368 ) 

369 icon = models.CharField(max_length=256, null=True, blank=True) 

370 title = models.CharField(max_length=1024, null=True, blank=True) 

371 readonly = models.BooleanField(default=False) 

372 sections_schema = models.JSONField(null=True, blank=True, help_text="Schema for the sections") 

373 min = models.CharField(max_length=256, null=True, blank=True) 

374 max = models.CharField(max_length=256, null=True, blank=True) 

375 step = models.CharField(max_length=256, null=True, blank=True) 

376 add_label = models.CharField(max_length=1024, null=True, blank=True) 

377 up_control = models.BooleanField(default=True) 

378 down_control = models.BooleanField(default=True) 

379 

380 # Code Generation Source of Truth 

381 django_field_type = models.CharField( 

382 max_length=100, 

383 null=True, 

384 blank=True, 

385 help_text="The Django Model Field class to use (e.g., 'CharField', 'IntegerField', 'ForeignKey'). Providing this makes this field the primary source of truth for code generation.", 

386 ) 

387 django_field_args = models.JSONField( 

388 default=dict, 

389 blank=True, 

390 help_text="Arguments passed to the Django field as a JSON dictionary. " 

391 "Example: {'null': true, 'blank': true, 'max_length': 255}. " 

392 "For ForeignKeys, include the model name: {'to': 'auth.User', 'on_delete': 'models.CASCADE'}.", 

393 ) 

394 django_field_positional_args = models.JSONField( 

395 default=list, 

396 blank=True, 

397 help_text="Positional arguments passed to the Django field as a JSON list. Example: ['auth.User'].", 

398 ) 

399 pydantic_field_type = models.CharField( 

400 max_length=100, 

401 null=True, 

402 blank=True, 

403 help_text="The Python/Pydantic type for this field (e.g., 'str', 'int', 'Decimal', 'UUID', 'date').", 

404 ) 

405 extra_imports = models.JSONField( 

406 default=list, 

407 blank=True, 

408 help_text="A list of additional Python import statements required by this field. Example: ['from decimal import Decimal', 'from django.core.validators import MinValueValidator'].", 

409 ) 

410 validators = models.JSONField( 

411 default=list, 

412 blank=True, 

413 help_text="A list of Django/Pydantic validator strings to be applied to this field. Example: ['MinValueValidator(0)', 'validate_v_date'].", 

414 ) 

415 

416 text_content = models.TextField(null=True, blank=True, help_text="Content for a text element, for children of an $el type component") 

417 track_change = models.BigIntegerField(null=True, blank=True) 

418 

419 @property 

420 def formkit(self): 

421 return self.node.get("$formkit") if isinstance(self.node, dict) else None 

422 

423 @property 

424 def name(self): 

425 return self.node.get("name") if isinstance(self.node, dict) else None 

426 

427 def __str__(self): 

428 return f"Node: {self.label}" if self.label else f"{self.node_type} {self.id}" 

429 

430 def save(self, *args, **kwargs): 

431 """ 

432 On save validate the 'node' field matches the 'FormKitNode' 

433 """ 

434 # rename `formkit` to `$formkit` 

435 if isinstance(self.node, dict) and "formkit" in self.node: 

436 self.node.update({"$formkit": self.node.pop("formkit")}) 

437 # We're also going to verify that the 'key' is a valid identifier 

438 # Keep in mind that the `key` may be used as part of a model so 

439 # should be valid Django fieldname too 

440 if isinstance(self.node, dict) and self.node_type in ("$formkit", "$el"): 

441 if key := self.node.get("name"): 

442 check_valid_django_id(key) 

443 

444 # Auto-promote common props from both 'additional_props' and 'node' 

445 for source in (self.additional_props, self.node): 

446 if not isinstance(source, dict): 

447 continue 

448 for field in ( 

449 "icon", 

450 "title", 

451 "readonly", 

452 "sectionsSchema", 

453 "min", 

454 "max", 

455 "step", 

456 "addLabel", 

457 "upControl", 

458 "downControl", 

459 ): 

460 if field in source: 

461 if field == "sectionsSchema": 

462 target_field = "sections_schema" 

463 elif field == "addLabel": 

464 target_field = "add_label" 

465 elif field == "upControl": 

466 target_field = "up_control" 

467 elif field == "downControl": 

468 target_field = "down_control" 

469 else: 

470 target_field = field 

471 

472 val = source.get(field) 

473 if field in ("min", "max", "step") and val is not None: 

474 val = str(val) 

475 setattr(self, target_field, val) 

476 

477 # Resolve code generation defaults if not set 

478 self.resolve_code_generation_defaults() 

479 

480 return super().save(*args, **kwargs) 

481 

482 def resolve_code_generation_defaults(self, force=False): 

483 """ 

484 Populate code generation fields from CodeGenerationConfig and settings 

485 if they are not already set. 

486 """ 

487 # We need a node structure to match against 

488 node = self.get_node(recursive=False) 

489 if isinstance(node, str): 

490 # Text nodes don't typically generate fields themselves 

491 return 

492 

493 from formkit_ninja.parser.database_node_path import DatabaseNodePath 

494 

495 # Create a transient DatabaseNodePath to leverage its resolution logic 

496 path = DatabaseNodePath(node) 

497 

498 if force or not self.django_field_type: 

499 self.django_field_type = path.to_django_type() 

500 

501 if force or not self.django_field_args or not self.django_field_positional_args: 

502 # We want the dict/list, not the string 

503 # DatabaseNodePath uses _get_config and _get_from_settings 

504 config = path._get_config() 

505 if config: 

506 if force or not self.django_field_args: 

507 self.django_field_args = config.django_args 

508 if force or not self.django_field_positional_args: 

509 self.django_field_positional_args = config.django_positional_args 

510 else: 

511 if force or not self.django_field_args: 

512 settings_args = path._get_from_settings("django_args") 

513 if isinstance(settings_args, dict): 

514 self.django_field_args = settings_args 

515 if force or not self.django_field_positional_args: 

516 settings_pos_args = path._get_from_settings("django_positional_args") 

517 if isinstance(settings_pos_args, list): 

518 self.django_field_positional_args = settings_pos_args 

519 

520 if force or not self.pydantic_field_type: 

521 self.pydantic_field_type = path.to_pydantic_type() 

522 

523 if force or not self.extra_imports: 

524 self.extra_imports = path.get_extra_imports() 

525 

526 if force or not self.validators: 

527 self.validators = path.get_validators() 

528 

529 @property 

530 def node_options(self) -> str | list[dict] | None: 

531 """ 

532 Because "options" are translated and 

533 separately stored, this step is necessary to 

534 reinstate them 

535 """ 

536 if self.node and (opts := self.node.get("options")): 

537 return opts 

538 

539 if not self.option_group: 

540 return None 

541 options = self.option_group.option_set.all().prefetch_related("optionlabel_set") 

542 # options: Iterable[Option] = self.option_set.all().prefetch_related("optionlabel_set") 

543 # TODO: This is horribly slow 

544 return [ 

545 { 

546 "value": option.value, 

547 "label": f"{label_obj.label if (label_obj := option.optionlabel_set.first()) else ''}", 

548 } 

549 for option in options 

550 ] 

551 

552 def get_node_values(self, recursive: bool = True, options: bool = True) -> str | dict: 

553 """ 

554 Reify a 'dict' instance suitable for creating 

555 a FormKit Schema node from 

556 """ 

557 # Text element 

558 if not self.node: 

559 if self.text_content: 

560 return self.text_content 

561 return {} 

562 values = {**self.node} 

563 

564 # Options may come from a string in the node, or 

565 # may come from an m2m 

566 if options and self.node_options: 

567 values["options"] = self.node_options 

568 if recursive: 

569 children = [c.get_node_values() for c in self.children.order_by("nodechildren__order")] 

570 if children: 

571 values["children"] = children 

572 if self.icon: 

573 values["icon"] = self.icon 

574 if self.title: 

575 values["title"] = self.title 

576 if self.readonly: 

577 values["readonly"] = self.readonly 

578 if self.sections_schema: 

579 values["sectionsSchema"] = self.sections_schema 

580 if self.min: 

581 try: 

582 values["min"] = int(self.min) 

583 except ValueError: 

584 values["min"] = self.min 

585 if self.max: 

586 try: 

587 values["max"] = int(self.max) 

588 except ValueError: 

589 values["max"] = self.max 

590 if self.step: 

591 try: 

592 val = float(self.step) 

593 if val.is_integer(): 

594 values["step"] = int(val) 

595 else: 

596 values["step"] = str(val) # Keep as string if float to avoid precision issues 

597 values["step"] = self.step 

598 except ValueError: 

599 values["step"] = self.step 

600 if self.add_label: 

601 values["addLabel"] = self.add_label 

602 if not self.up_control: # Only write if false? Or always? Defaults are True. 

603 values["upControl"] = self.up_control 

604 if not self.down_control: 

605 values["downControl"] = self.down_control 

606 

607 # Code Generation fields 

608 if self.django_field_type: 

609 values["django_field_type"] = self.django_field_type 

610 if self.django_field_args: 

611 values["django_field_args"] = self.django_field_args 

612 if self.django_field_positional_args: 

613 values["django_field_positional_args"] = self.django_field_positional_args 

614 if self.pydantic_field_type: 

615 values["pydantic_field_type"] = self.pydantic_field_type 

616 if self.extra_imports: 

617 values["extra_imports"] = self.extra_imports 

618 if self.validators: 

619 values["validators"] = self.validators 

620 

621 # Merge additional_props into the top level and ensure it's removed as a separate key 

622 values.pop("additional_props", None) 

623 if self.additional_props and len(self.additional_props) > 0: 

624 # Handle nested additional_props structure 

625 props_to_merge = self.additional_props 

626 if "additional_props" in props_to_merge: 

627 props_to_merge = props_to_merge["additional_props"] 

628 # Filter out None values to prevent Pydantic validation errors 

629 clean_props = {k: v for k, v in props_to_merge.items() if v is not None} 

630 values.update(clean_props) 

631 

632 if self.node_type == "$el" and not values.get("$el"): 

633 values["$el"] = "span" 

634 elif self.node_type == "$formkit" and not values.get("$formkit"): 

635 values["$formkit"] = "text" 

636 

637 return {k: v for k, v in values.items() if v != ""} 

638 

639 def get_ancestors(self) -> list["FormKitSchemaNode"]: 

640 """ 

641 Return a list of ancestor nodes by following the nodechildren_set relationship upwards. 

642 Follows the first parent found for each node. 

643 """ 

644 ancestors: list[FormKitSchemaNode] = [] 

645 current = self 

646 while True: 

647 # nodechildren_set contains objects where current is the child 

648 nc = current.nodechildren_set.first() 

649 if not nc: 

650 break 

651 current = nc.parent 

652 if current in ancestors: # Avoid infinite cycles 

653 break 

654 ancestors.insert(0, current) 

655 if len(ancestors) > 20: # Safety limit 

656 break 

657 return ancestors 

658 

659 def get_node_path(self, recursive=True) -> list[formkit_schema.Node | str]: 

660 """ 

661 Return a list of Pydantic nodes representing the path from the root to this node. 

662 """ 

663 ancestors = self.get_ancestors() 

664 return [a.get_node(recursive=False) for a in ancestors] + [self.get_node(recursive=recursive)] # type: ignore[return-value] 

665 

666 def get_node(self, recursive=False, options=False, **kwargs) -> formkit_schema.Node | str: 

667 """ 

668 Return a "decorated" node instance 

669 with restored options and translated fields 

670 """ 

671 if self.text_content or self.node_type == "text": 

672 return self.text_content or "" 

673 if self.node == {} or self.node is None: 

674 if self.node_type == "$el": 

675 node_content_dict: dict[str, Any] = {"$el": "span"} 

676 elif self.node_type == "$formkit": 

677 node_content_dict = {"$formkit": "text"} 

678 else: 

679 node_content_dict = {} 

680 else: 

681 node_content_dict = self.get_node_values(**kwargs, recursive=recursive, options=options) # type: ignore[assignment] 

682 

683 formkit_node = formkit_schema.FormKitNode.parse_obj(node_content_dict, recursive=recursive) 

684 return formkit_node.__root__ 

685 

686 @classmethod 

687 def from_pydantic( # noqa: C901 

688 cls, input_models: formkit_schema.FormKitSchemaProps | Iterable[formkit_schema.FormKitSchemaProps] 

689 ) -> Iterable["FormKitSchemaNode"]: 

690 if isinstance(input_models, str): 

691 yield cls.objects.create(node_type="text", label=input_models, text_content=input_models) 

692 

693 elif isinstance(input_models, Iterable) and not isinstance(input_models, formkit_schema.FormKitSchemaProps): 

694 for n in input_models: 

695 yield from cls.from_pydantic(n) 

696 

697 elif isinstance(input_models, formkit_schema.FormKitSchemaProps): 

698 input_model = input_models 

699 instance = cls() 

700 log(f"[green]Creating {instance}") 

701 for label_field in ("name", "id", "key", "label"): 

702 if label := getattr(input_model, label_field, None): 

703 instance.label = label 

704 break 

705 

706 # Node types 

707 if props := getattr(input_model, "additional_props", None): 

708 instance.additional_props = props 

709 

710 if (icon := getattr(input_model, "icon", None)) is not None: 

711 instance.icon = icon 

712 if (title := getattr(input_model, "title", None)) is not None: 

713 instance.title = title 

714 if (readonly := getattr(input_model, "readonly", None)) is not None: 

715 instance.readonly = readonly 

716 if (sections_schema := getattr(input_model, "sectionsSchema", None)) is not None: 

717 instance.sections_schema = sections_schema 

718 if (min_val := getattr(input_model, "min", None)) is not None: 

719 instance.min = str(min_val) 

720 if (max_val := getattr(input_model, "max", None)) is not None: 

721 instance.max = str(max_val) 

722 if (step := getattr(input_model, "step", None)) is not None: 

723 instance.step = str(step) 

724 if (add_label := getattr(input_model, "addLabel", None)) is not None: 

725 instance.add_label = add_label 

726 if (up_control := getattr(input_model, "upControl", None)) is not None: 

727 instance.up_control = up_control 

728 if (down_control := getattr(input_model, "downControl", None)) is not None: 

729 instance.down_control = down_control 

730 

731 # Code Generation Fields 

732 if (django_field_type := getattr(input_model, "django_field_type", None)) is not None: 

733 instance.django_field_type = django_field_type 

734 if (django_field_args := getattr(input_model, "django_field_args", None)) is not None: 

735 instance.django_field_args = django_field_args 

736 if (django_field_positional_args := getattr(input_model, "django_field_positional_args", None)) is not None: 

737 instance.django_field_positional_args = django_field_positional_args 

738 if (pydantic_field_type := getattr(input_model, "pydantic_field_type", None)) is not None: 

739 instance.pydantic_field_type = pydantic_field_type 

740 if (extra_imports := getattr(input_model, "extra_imports", None)) is not None: 

741 instance.extra_imports = extra_imports 

742 if (validators := getattr(input_model, "validators", None)) is not None: 

743 instance.validators = validators 

744 

745 # Fields that are valid Pydantic fields but not promoted to columns must be saved in additional_props 

746 # otherwise they are lost. 

747 extra_fields = [ 

748 "max", 

749 "rows", 

750 "cols", 

751 "prefixIcon", 

752 "classes", 

753 "value", 

754 "suffixIcon", 

755 "validationRules", 

756 "maxLength", 

757 "itemClass", 

758 "itemsClass", 

759 "_minDateSource", 

760 "_maxDateSource", 

761 "disabledDays", 

762 ] 

763 # Ensure additional_props is a dict 

764 if instance.additional_props is None: 

765 instance.additional_props = {} 

766 elif not isinstance(instance.additional_props, dict): 

767 # Should not happen but safety first 

768 instance.additional_props = {} 

769 

770 for field in extra_fields: 

771 if (val := getattr(input_model, field, None)) is not None: 

772 # 'max' is now a model field, so don't put it in additional_props 

773 if field == "max": 

774 continue 

775 instance.additional_props[field] = val 

776 

777 try: 

778 node_type = getattr(input_model, "node_type") 

779 except Exception as E: 

780 raise E 

781 if node_type == "condition": 

782 instance.node_type = "condition" 

783 elif node_type == "formkit": 

784 instance.node_type = "$formkit" 

785 elif node_type == "element": 

786 instance.node_type = "$el" 

787 elif node_type == "component": 

788 instance.node_type = "$cmp" 

789 

790 log(f"[green]Yielding: {instance}") 

791 

792 # Must save the instance before adding "options" or "children" 

793 instance.node = input_model.dict( 

794 exclude={ 

795 "options", 

796 "children", 

797 "additional_props", 

798 "node_type", 

799 }, 

800 exclude_none=True, 

801 exclude_unset=True, 

802 ) 

803 # Where an alias is used ("el", ) restore it to the expected value 

804 # of a FormKit schema node 

805 for pydantic_key, db_key in (("el", "$el"), ("formkit", "$formkit")): 

806 if db_value := instance.node.pop(pydantic_key, None): 

807 instance.node[db_key] = db_value 

808 

809 instance.save() 

810 # Add the "options" if it is a 'text' type getter 

811 options: formkit_schema.OptionsType = getattr(input_model, "options", None) 

812 

813 if isinstance(options, str): 

814 # Maintain this as it is probably a `$get...` options call 

815 # to a Javascript function 

816 instance.node["options"] = options 

817 instance.save() 

818 

819 elif isinstance(options, Iterable): 

820 # Create a new "group" to assign these options to 

821 # Here we use a random UUID as the group name 

822 instance.option_group = OptionGroup.objects.create(group=f"Auto generated group for {str(instance)} {uuid.uuid4().hex[0:8]}") 

823 for option in Option.from_pydantic(options, group=instance.option_group): # type: ignore[arg-type] 

824 pass 

825 instance.save() 

826 

827 for c_n in getattr(input_model, "children", []) or []: 

828 child_node = next(iter(cls.from_pydantic(c_n))) 

829 console.log(f" {child_node}") 

830 instance.children.add(child_node) 

831 

832 yield instance 

833 

834 else: 

835 raise TypeError(f"Expected FormKitNode or Iterable[FormKitNode], got {type(input_models)}") 

836 

837 def to_pydantic(self, recursive=False, options=False, **kwargs): 

838 if self.text_content: 

839 return self.text_content 

840 return formkit_schema.FormKitNode.parse_obj(self.get_node_values(recursive=recursive, options=options, **kwargs)) 

841 

842 

843class SchemaManager(models.Manager): 

844 """ 

845 Provides prefetching which we'll almost always want to have 

846 """ 

847 

848 def get_queryset(self): 

849 return super().get_queryset().prefetch_related("nodes", "nodes__children") 

850 

851 

852class FormKitSchema(UuidIdModel): 

853 """ 

854 This represents a "FormKitSchema" which is an heterogenous 

855 collection of items. 

856 """ 

857 

858 label = models.CharField( 

859 max_length=1024, 

860 null=True, 

861 blank=True, 

862 help_text="Used as a human-readable label", 

863 unique=True, 

864 default=uuid.uuid4, 

865 ) 

866 nodes = models.ManyToManyField(FormKitSchemaNode, through=FormComponents) 

867 objects = SchemaManager() 

868 

869 def get_schema_values(self, recursive=False, options=False, **kwargs): 

870 """ 

871 Return a list of "node" dicts 

872 """ 

873 nodes: Iterable[FormKitSchemaNode] = self.nodes.order_by("formcomponents__order") 

874 for node in nodes: 

875 yield node.get_node_values(recursive=recursive, options=options, **kwargs) 

876 

877 def to_pydantic(self): 

878 values = list(self.get_schema_values()) 

879 return formkit_schema.FormKitSchema.parse_obj(values) 

880 

881 def __str__(self): 

882 return f"{self.label}" or f"{str(self.id)[:8]}" 

883 

884 def save(self, *args, **kwargs): 

885 super().save(*args, **kwargs) 

886 

887 @classmethod 

888 def from_pydantic(cls, input_model: formkit_schema.FormKitSchema, label: str | None = None) -> "FormKitSchema": 

889 """ 

890 Converts a given Pydantic representation of a Schema 

891 to Django database fields 

892 """ 

893 from formkit_ninja.services.schema_import import SchemaImportService 

894 

895 return SchemaImportService.import_schema(input_model, label=label) 

896 

897 @classmethod 

898 def from_json(cls, input_file: dict): 

899 """ 

900 Converts a given JSON string to a suitable 

901 Django representation 

902 """ 

903 schema_instance = formkit_schema.FormKitSchema.parse_obj(input_file) 

904 return cls.from_pydantic(schema_instance) 

905 

906 

907class SchemaLabel(models.Model): 

908 """ 

909 This intended to hold translations of Partisipa schema definitions. 

910 The title. 

911 """ 

912 

913 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

914 label = models.CharField(max_length=1024) 

915 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

916 

917 

918class SchemaDescription(models.Model): 

919 """ 

920 This intended to hold translations of Partisipa schema definitions. 

921 The description. 

922 """ 

923 

924 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE) 

925 label = models.CharField(max_length=1024) 

926 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese"))) 

927 

928 

929# Import submission models to register them with the app