Coverage for formkit_ninja / models.py: 23.68%
492 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-03 09:21 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-03 09:21 +0000
1from __future__ import annotations
3import logging
4import uuid
5import warnings
6from keyword import iskeyword, issoftkeyword
7from typing import Any, Iterable, TypedDict, get_args
9import pghistory
10import pgtrigger
11from django.conf import settings
12from django.contrib.contenttypes.models import ContentType
13from django.contrib.postgres.aggregates import ArrayAgg
14from django.core.exceptions import ValidationError
15from django.db import models, transaction
16from django.db.models import Q
17from django.db.models.aggregates import Max
18from django.db.models.functions import Greatest
19from django.utils import timezone
20from rich.console import Console
22from formkit_ninja import formkit_schema, triggers
24# Re export "form_submission" models
25from formkit_ninja.code_generation_config import CodeGenerationConfig # noqa: F401
26from formkit_ninja.form_submission.models import (
27 SeparatedSubmission, # noqa: F401
28 Submission, # noqa: F401
29 SubmissionField, # noqa: F401
30 SubmissionFile, # noqa: F401
31)
32from formkit_ninja.utils import short_uuid
34console = Console()
35log = console.log
37logger = logging.getLogger()
40def check_valid_django_id(key: str):
41 if not key:
42 raise ValidationError("Name cannot be empty")
43 if key[0].isdigit():
44 raise ValidationError(f"{key} is not valid, it cannot start with a digit")
45 if not key.isidentifier() or iskeyword(key) or issoftkeyword(key):
46 raise ValidationError(f"{key} cannot be used as a keyword. Should be a valid python identifier")
47 if key[-1] == "_":
48 raise ValidationError(f"{key} is not valid, it cannot end with an underscore")
51class UuidIdModel(models.Model):
52 """
53 Consistently use fields which will
54 help with syncing data:
55 - UUID field is the ID
56 - Created field
57 - Last Modified field
58 - updated_by (optional)
59 - created_by (optional)
60 """
62 class Meta:
63 abstract = True
65 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True)
66 created = models.DateTimeField(default=timezone.now, blank=True, null=True)
67 updated = models.DateTimeField(auto_now=True, blank=True, null=True)
68 created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True)
69 updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True)
72class OptionDict(TypedDict):
73 value: str
74 label: str
77class OptionGroup(models.Model):
78 """
79 This intended to be a "collection" of choices
80 For instance all the values in a single PNDS zTable
81 Also intended to allow users to add / modify their __own__ 'Options'
82 for idb and formkit to recognize
83 """
85 group = models.CharField(max_length=1024, primary_key=True, help_text="The label to use for these options")
86 content_type = models.ForeignKey(
87 ContentType,
88 on_delete=models.PROTECT,
89 null=True,
90 blank=True,
91 help_text=("This is an optional reference to the original source object for this set of options (typically a table from which we copy options)"),
92 )
94 # If the object is a "Content Type" we expect it to have a similar layout to this
96 def save(self, *args, **kwargs):
97 # Prior to save ensure that content_type, if present, fits suitable schema
98 if self.content_type:
99 klass = self.content_type.model_class()
100 try:
101 if klass._meta.get_field("value") is None or not hasattr(klass, "label_set"):
102 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute")
103 except Exception as E:
104 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") from E
105 return super().save(*args, **kwargs)
107 def __str__(self):
108 return f"{self.group}"
110 @classmethod
111 def copy_table(cls, model: type[models.Model], field: str, language: str | None = "en", group_name: str | None = None):
112 """
113 Copy an existing table of options into this OptionGroup
114 """
116 with transaction.atomic():
117 group_obj, group_created = cls.objects.get_or_create(group=group_name, content_type=ContentType.objects.get_for_model(model))
118 log(group_obj)
120 from typing import Any, cast
122 for obj in cast(Any, model).objects.values("pk", field):
123 option, option_created = Option.objects.get_or_create(
124 object_id=obj["pk"],
125 group=group_obj,
126 value=obj["pk"],
127 )
128 OptionLabel.objects.get_or_create(option=option, label=obj[field] or "", lang=language)
131class OptionQuerySet(models.Manager):
132 """
133 Prefetched "labels" for performance
134 """
136 def get_queryset(self):
137 """
138 Added a prefetch_related to the queryset
139 """
140 lang_codes = (n[0] for n in settings.LANGUAGES)
142 label_model = OptionLabel
143 annotated_fields = {f"label_{lang}": label_model.objects.filter(lang=lang, option=models.OuterRef("pk")) for lang in lang_codes}
144 annotated_fields_subquery = {field: models.Subquery(query.values("label")[:1], output_field=models.CharField()) for field, query in annotated_fields.items()}
145 return super().get_queryset().annotate(**annotated_fields_subquery)
148class Option(UuidIdModel):
149 """
150 This is a key/value field representing one "option" for a FormKit property
151 The translated values for this option are in the `Translatable` table
152 """
154 object_id = models.IntegerField(
155 null=True,
156 blank=True,
157 help_text=("This is a reference to the primary key of the original source object (typically a PNDS ztable ID) or a user-specified ID for a new group"),
158 )
159 last_updated = models.DateTimeField(auto_now=True)
160 group = models.ForeignKey(OptionGroup, on_delete=models.CASCADE, null=True, blank=True)
161 # is_active = models.BooleanField(default=True)
162 order = models.IntegerField(null=True, blank=True)
164 class Meta:
165 triggers = triggers.update_or_insert_group_trigger("group_id")
166 constraints = [models.UniqueConstraint(fields=["group", "object_id"], name="unique_option_id")]
167 ordering = (
168 "group",
169 "order",
170 )
172 value = models.CharField(max_length=1024)
173 order_with_respect_to = "group"
175 objects = OptionQuerySet()
177 @classmethod
178 def from_pydantic(
179 cls,
180 options: list[str | OptionDict],
181 group: OptionGroup | None = None,
182 ) -> Iterable["Option"]:
183 """
184 Yields "Options" in the database based on the input given
185 """
186 from formkit_ninja.services.schema_import import SchemaImportService
188 yield from SchemaImportService.import_options(options, group=group)
190 def __str__(self) -> str:
191 # Use group_id (stored on row; OptionGroup.pk is the group name) to avoid N+1.
192 if self.group_id:
193 return f"{self.group_id}::{self.value}"
194 return f"No group: {self.value}"
197class OptionLabel(models.Model):
198 option = models.ForeignKey("Option", on_delete=models.CASCADE)
199 label = models.CharField(max_length=1024)
200 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese")))
202 def save(self, *args, **kwargs):
203 """
204 When saved, save also my "option" so that its last_updated is set
205 """
206 if self.option is not None:
207 self.option.save()
208 return super().save(*args, **kwargs)
210 class Meta:
211 constraints = [models.UniqueConstraint(fields=["option", "lang"], name="unique_option_label")]
213 def __str__(self) -> str:
214 # Use only local fields to avoid N+1 when listing (e.g. in admin).
215 return f"{self.label} ({self.lang})" if self.label else f"option={self.option_id} lang={self.lang}"
218class FormComponents(UuidIdModel):
219 """
220 A model relating "nodes" of a schema to a schema with model ordering
221 """
223 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE)
224 # This is null=True so that a new FormComponent can be added from the admin inline
225 node = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE, null=True, blank=True)
226 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True)
227 order = models.IntegerField(null=True, blank=True)
228 order_with_respect_to = "schema"
230 class Meta:
231 triggers = triggers.update_or_insert_group_trigger("schema_id")
232 ordering = ("schema", "order")
234 def __str__(self) -> str:
235 # Use node_id/schema_id (stored on row) to avoid N+1 when listing FormComponents.
236 return f"node={self.node_id}[{self.order}]: schema={self.schema_id}"
239class NodeChildrenManager(models.Manager):
240 """
241 Adds aggregation and filtering for client side data
242 of NodeChildren relations
243 """
245 def aggregate_changes_table(self, latest_change: int | None = None):
246 values = (
247 self.get_queryset()
248 .values("parent_id")
249 .annotate(
250 children=ArrayAgg("child", ordering="order"),
251 )
252 .annotate(Max("child__track_change"))
253 .annotate(latest_change=Greatest("child__track_change__max", "parent__track_change"))
254 )
255 if latest_change:
256 values = values.filter(Q(latest_change__gt=latest_change) | Q(parent__latest_change__gt=latest_change))
257 return values.values_list("parent_id", "latest_change", "children", named=True)
260class NodeChildren(models.Model):
261 """
262 This is an ordered m2m model representing
263 the "children" of an HTML element
264 """
266 parent = models.ForeignKey(
267 "FormKitSchemaNode",
268 on_delete=models.CASCADE,
269 related_name="parent",
270 )
271 child = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE)
272 order = models.IntegerField(null=True, blank=True)
273 track_change = models.BigIntegerField(null=True, blank=True)
274 order_with_respect_to = "parent"
276 class Meta:
277 triggers = [
278 *triggers.update_or_insert_group_trigger("parent_id"),
279 triggers.bump_sequence_value(sequence_name=triggers.NODE_CHILDREN_CHANGE_ID),
280 ]
281 ordering = (
282 "parent_id",
283 "order",
284 )
286 objects = NodeChildrenManager()
288 def __str__(self) -> str:
289 # Use parent_id/child_id (stored on row) to avoid N+1.
290 return f"parent={self.parent_id} → child={self.child_id} order={self.order}"
293class NodeQS(models.QuerySet):
294 def from_change(self, track_change: int = -1):
295 return self.filter(track_change__gt=track_change)
297 def to_response(self, ignore_errors: bool = True, options: bool = True) -> Iterable[tuple[uuid.UUID, int | None, formkit_schema.Node | str | None, bool]]:
298 """
299 Return a set of FormKit nodes
300 """
301 node: FormKitSchemaNode
302 for node in self.all():
303 try:
304 if node.is_active:
305 yield node.id, node.track_change, node.get_node(recursive=False, options=options), node.protected
306 else:
307 yield node.id, node.track_change, None, node.protected
308 except Exception as E:
309 if not ignore_errors:
310 raise
311 warnings.warn(f"An unparseable FormKit node was hit at {node.pk}")
312 warnings.warn(f"{E}")
315@pghistory.track()
316@pgtrigger.register(
317 pgtrigger.Protect(
318 # If the node is protected, delete is not allowed
319 name="protect_node_deletes_and_updates",
320 operation=pgtrigger.Delete,
321 condition=pgtrigger.Q(old__protected=True),
322 ),
323 pgtrigger.Protect(
324 # If both new and old values are "protected", updates are not allowed
325 name="protect_node_updates",
326 operation=pgtrigger.Update,
327 condition=pgtrigger.Q(old__protected=True) & pgtrigger.Q(new__protected=True),
328 ),
329 pgtrigger.SoftDelete(name="soft_delete", field="is_active"),
330 triggers.bump_sequence_value("track_change", triggers.NODE_CHANGE_ID),
331)
332class FormKitSchemaNode(UuidIdModel):
333 """
334 This represents a single "Node" in a FormKit schema.
335 There are several different types of node which may be defined:
336 FormKitSchemaDOMNode
337 | FormKitSchemaComponent
338 | FormKitSchemaTextNode
339 | FormKitSchemaCondition
340 | FormKitSchemaFormKit
341 """
343 objects = NodeQS.as_manager()
345 NODE_TYPE_CHOICES = (
346 ("$cmp", "Component"), # Not yet implemented
347 ("text", "Text"),
348 ("condition", "Condition"), # Not yet implemented
349 ("$formkit", "FormKit"),
350 ("$el", "Element"),
351 ("raw", "Raw JSON"), # Not yet implemented
352 )
353 FORMKIT_CHOICES = [(t, t) for t in get_args(formkit_schema.FORMKIT_TYPE)]
355 ELEMENT_TYPE_CHOICES = [("p", "p"), ("h1", "h1"), ("h2", "h2"), ("span", "span")]
356 node_type = models.CharField(max_length=256, choices=NODE_TYPE_CHOICES, blank=True, help_text="")
357 description = models.CharField(
358 max_length=4000,
359 null=True,
360 blank=True,
361 help_text="Decribe the type of data / reason for this component",
362 )
363 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True)
364 option_group = models.ForeignKey(OptionGroup, null=True, blank=True, on_delete=models.PROTECT)
365 children = models.ManyToManyField("self", through=NodeChildren, symmetrical=False, blank=True)
366 is_active = models.BooleanField(default=True)
367 protected = models.BooleanField(default=False)
369 node = models.JSONField(
370 null=True,
371 blank=True,
372 help_text="A JSON representation of select parts of the FormKit schema",
373 )
375 additional_props = models.JSONField(
376 null=True,
377 blank=True,
378 help_text="User space for additional, less used props",
379 )
380 icon = models.CharField(max_length=256, null=True, blank=True)
381 title = models.CharField(max_length=1024, null=True, blank=True)
382 readonly = models.BooleanField(default=False)
383 sections_schema = models.JSONField(null=True, blank=True, help_text="Schema for the sections")
384 min = models.CharField(max_length=256, null=True, blank=True)
385 max = models.CharField(max_length=256, null=True, blank=True)
386 step = models.CharField(max_length=256, null=True, blank=True)
387 add_label = models.CharField(max_length=1024, null=True, blank=True)
388 up_control = models.BooleanField(default=True)
389 down_control = models.BooleanField(default=True)
391 # Code Generation Source of Truth
392 django_field_type = models.CharField(
393 max_length=100,
394 null=True,
395 blank=True,
396 help_text="The Django Model Field class to use (e.g., 'CharField', 'IntegerField', 'ForeignKey'). Providing this makes this field the primary source of truth for code generation.",
397 )
398 django_field_args = models.JSONField(
399 default=dict,
400 blank=True,
401 help_text="Arguments passed to the Django field as a JSON dictionary. "
402 "Example: {'null': true, 'blank': true, 'max_length': 255}. "
403 "For ForeignKeys, include the model name: {'to': 'auth.User', 'on_delete': 'models.CASCADE'}.",
404 )
405 django_field_positional_args = models.JSONField(
406 default=list,
407 blank=True,
408 help_text="Positional arguments passed to the Django field as a JSON list. Example: ['auth.User'].",
409 )
410 pydantic_field_type = models.CharField(
411 max_length=100,
412 null=True,
413 blank=True,
414 help_text="The Python/Pydantic type for this field (e.g., 'str', 'int', 'Decimal', 'UUID', 'date').",
415 )
416 extra_imports = models.JSONField(
417 default=list,
418 blank=True,
419 help_text="A list of additional Python import statements required by this field. Example: ['from decimal import Decimal', 'from django.core.validators import MinValueValidator'].",
420 )
421 validators = models.JSONField(
422 default=list,
423 blank=True,
424 help_text="A list of Django/Pydantic validator strings to be applied to this field. Example: ['MinValueValidator(0)', 'validate_v_date'].",
425 )
426 list_filter = models.BooleanField(
427 default=False,
428 help_text="Include this field in generated ModelAdmin.list_filter.",
429 )
431 text_content = models.TextField(null=True, blank=True, help_text="Content for a text element, for children of an $el type component")
432 track_change = models.BigIntegerField(null=True, blank=True)
434 @property
435 def formkit(self):
436 return self.node.get("$formkit") if isinstance(self.node, dict) else None
438 @property
439 def name(self):
440 return self.node.get("name") if isinstance(self.node, dict) else None
442 def __str__(self):
443 return f"Node: {self.label}" if self.label else f"{self.node_type} {self.id}"
445 def save(self, *args, **kwargs):
446 """
447 On save validate the 'node' field matches the 'FormKitNode'
448 """
449 # rename `formkit` to `$formkit`
450 if isinstance(self.node, dict) and "formkit" in self.node:
451 self.node.update({"$formkit": self.node.pop("formkit")})
452 # We're also going to verify that the 'key' is a valid identifier
453 # Keep in mind that the `key` may be used as part of a model so
454 # should be valid Django fieldname too
455 if isinstance(self.node, dict) and self.node_type in ("$formkit", "$el"):
456 if key := self.node.get("name"):
457 check_valid_django_id(key)
459 # Auto-promote common props from both 'additional_props' and 'node'
460 for source in (self.additional_props, self.node):
461 if not isinstance(source, dict):
462 continue
463 for field in (
464 "icon",
465 "title",
466 "readonly",
467 "sectionsSchema",
468 "min",
469 "max",
470 "step",
471 "addLabel",
472 "upControl",
473 "downControl",
474 ):
475 if field in source:
476 if field == "sectionsSchema":
477 target_field = "sections_schema"
478 elif field == "addLabel":
479 target_field = "add_label"
480 elif field == "upControl":
481 target_field = "up_control"
482 elif field == "downControl":
483 target_field = "down_control"
484 else:
485 target_field = field
487 val = source.get(field)
488 if field in ("min", "max", "step") and val is not None:
489 val = str(val)
490 setattr(self, target_field, val)
492 # Resolve code generation defaults if not set
493 self.resolve_code_generation_defaults()
495 return super().save(*args, **kwargs)
497 def resolve_code_generation_defaults(self, force=False):
498 """
499 Populate code generation fields from CodeGenerationConfig and settings
500 if they are not already set.
501 """
502 # We need a node structure to match against
503 node = self.get_node(recursive=False)
504 if isinstance(node, str):
505 # Text nodes don't typically generate fields themselves
506 return
508 from formkit_ninja.parser.database_node_path import DatabaseNodePath
510 # Create a transient DatabaseNodePath to leverage its resolution logic
511 path = DatabaseNodePath(node)
513 if force or not self.django_field_type:
514 self.django_field_type = path.to_django_type()
516 if force or not self.django_field_args or not self.django_field_positional_args:
517 # We want the dict/list, not the string
518 # DatabaseNodePath uses _get_config and _get_from_settings
519 config = path._get_config()
520 if config:
521 if force or not self.django_field_args:
522 self.django_field_args = config.django_args
523 if force or not self.django_field_positional_args:
524 self.django_field_positional_args = config.django_positional_args
525 else:
526 if force or not self.django_field_args:
527 settings_args = path._get_from_settings("django_args")
528 if isinstance(settings_args, dict):
529 self.django_field_args = settings_args
530 if force or not self.django_field_positional_args:
531 settings_pos_args = path._get_from_settings("django_positional_args")
532 if isinstance(settings_pos_args, list):
533 self.django_field_positional_args = settings_pos_args
535 if force or not self.pydantic_field_type:
536 self.pydantic_field_type = path.to_pydantic_type()
538 if force or not self.extra_imports:
539 self.extra_imports = path.get_extra_imports()
541 if force or not self.validators:
542 self.validators = path.get_validators()
544 @property
545 def node_options(self) -> str | list[dict] | None:
546 """
547 Because "options" are translated and
548 separately stored, this step is necessary to
549 reinstate them
550 """
551 if self.node and (opts := self.node.get("options")):
552 return opts
554 if not self.option_group:
555 return None
556 options = self.option_group.option_set.all().prefetch_related("optionlabel_set")
557 # options: Iterable[Option] = self.option_set.all().prefetch_related("optionlabel_set")
558 # TODO: This is horribly slow
559 return [
560 {
561 "value": option.value,
562 "label": f"{label_obj.label if (label_obj := option.optionlabel_set.first()) else ''}",
563 }
564 for option in options
565 ]
567 def get_node_values(self, recursive: bool = True, options: bool = True) -> str | dict:
568 """
569 Reify a 'dict' instance suitable for creating
570 a FormKit Schema node from
571 """
572 # Text element
573 if not self.node:
574 if self.text_content:
575 return self.text_content
576 return {}
577 values = {**self.node}
579 # Options may come from a string in the node, or
580 # may come from an m2m
581 if options and self.node_options:
582 values["options"] = self.node_options
583 if recursive:
584 children = [c.get_node_values() for c in self.children.order_by("nodechildren__order")]
585 if children:
586 values["children"] = children
587 if self.icon:
588 values["icon"] = self.icon
589 if self.title:
590 values["title"] = self.title
591 if self.readonly:
592 values["readonly"] = self.readonly
593 if self.sections_schema:
594 values["sectionsSchema"] = self.sections_schema
595 if self.min:
596 try:
597 values["min"] = int(self.min)
598 except ValueError:
599 values["min"] = self.min
600 if self.max:
601 try:
602 values["max"] = int(self.max)
603 except ValueError:
604 values["max"] = self.max
605 if self.step:
606 try:
607 val = float(self.step)
608 if val.is_integer():
609 values["step"] = int(val)
610 else:
611 values["step"] = str(val) # Keep as string if float to avoid precision issues
612 values["step"] = self.step
613 except ValueError:
614 values["step"] = self.step
615 if self.add_label:
616 values["addLabel"] = self.add_label
617 if not self.up_control: # Only write if false? Or always? Defaults are True.
618 values["upControl"] = self.up_control
619 if not self.down_control:
620 values["downControl"] = self.down_control
622 # Code Generation fields
623 if self.django_field_type:
624 values["django_field_type"] = self.django_field_type
625 if self.django_field_args:
626 values["django_field_args"] = self.django_field_args
627 if self.django_field_positional_args:
628 values["django_field_positional_args"] = self.django_field_positional_args
629 if self.pydantic_field_type:
630 values["pydantic_field_type"] = self.pydantic_field_type
631 if self.extra_imports:
632 values["extra_imports"] = self.extra_imports
633 if self.validators:
634 values["validators"] = self.validators
635 if self.list_filter:
636 values["list_filter"] = self.list_filter
638 # Merge additional_props into the top level and ensure it's removed as a separate key
639 values.pop("additional_props", None)
640 if self.additional_props and len(self.additional_props) > 0:
641 # Handle nested additional_props structure
642 props_to_merge = self.additional_props
643 if "additional_props" in props_to_merge:
644 props_to_merge = props_to_merge["additional_props"]
645 # Filter out None values to prevent Pydantic validation errors
646 clean_props = {k: v for k, v in props_to_merge.items() if v is not None}
647 values.update(clean_props)
649 if self.node_type == "$el" and not values.get("$el"):
650 values["$el"] = "span"
651 elif self.node_type == "$formkit" and not values.get("$formkit"):
652 values["$formkit"] = "text"
654 return {k: v for k, v in values.items() if v != ""}
656 def get_ancestors(self) -> list["FormKitSchemaNode"]:
657 """
658 Return a list of ancestor nodes by following the nodechildren_set relationship upwards.
659 Follows the first parent found for each node.
660 """
661 ancestors: list[FormKitSchemaNode] = []
662 current = self
663 while True:
664 # nodechildren_set contains objects where current is the child
665 nc = current.nodechildren_set.first()
666 if not nc:
667 break
668 current = nc.parent
669 if current in ancestors: # Avoid infinite cycles
670 break
671 ancestors.insert(0, current)
672 if len(ancestors) > 20: # Safety limit
673 break
674 return ancestors
676 def get_node_path(self, recursive=True) -> list[formkit_schema.Node | str]:
677 """
678 Return a list of Pydantic nodes representing the path from the root to this node.
679 """
680 ancestors = self.get_ancestors()
681 return [a.get_node(recursive=False) for a in ancestors] + [self.get_node(recursive=recursive)] # type: ignore[return-value]
683 def get_node(self, recursive=False, options=False, **kwargs) -> formkit_schema.Node | str:
684 """
685 Return a "decorated" node instance
686 with restored options and translated fields
687 """
688 if self.text_content or self.node_type == "text":
689 return self.text_content or ""
690 if self.node == {} or self.node is None:
691 if self.node_type == "$el":
692 node_content_dict: dict[str, Any] = {"$el": "span"}
693 elif self.node_type == "$formkit":
694 node_content_dict = {"$formkit": "text"}
695 else:
696 node_content_dict = {}
697 else:
698 node_content_dict = self.get_node_values(**kwargs, recursive=recursive, options=options) # type: ignore[assignment]
700 formkit_node = formkit_schema.FormKitNode.parse_obj(node_content_dict, recursive=recursive)
701 return formkit_node.__root__
703 @classmethod
704 def from_pydantic( # noqa: C901
705 cls, input_models: formkit_schema.FormKitSchemaProps | Iterable[formkit_schema.FormKitSchemaProps]
706 ) -> Iterable["FormKitSchemaNode"]:
707 if isinstance(input_models, str):
708 yield cls.objects.create(node_type="text", label=input_models, text_content=input_models)
710 elif isinstance(input_models, Iterable) and not isinstance(input_models, formkit_schema.FormKitSchemaProps):
711 for n in input_models:
712 yield from cls.from_pydantic(n)
714 elif isinstance(input_models, formkit_schema.FormKitSchemaProps):
715 input_model = input_models
716 instance = cls()
717 log(f"[green]Creating {instance}")
718 for label_field in ("name", "id", "key", "label"):
719 if label := getattr(input_model, label_field, None):
720 instance.label = label
721 break
723 # Node types
724 if props := getattr(input_model, "additional_props", None):
725 instance.additional_props = props
727 if (icon := getattr(input_model, "icon", None)) is not None:
728 instance.icon = icon
729 if (title := getattr(input_model, "title", None)) is not None:
730 instance.title = title
731 if (readonly := getattr(input_model, "readonly", None)) is not None:
732 instance.readonly = readonly
733 if (sections_schema := getattr(input_model, "sectionsSchema", None)) is not None:
734 instance.sections_schema = sections_schema
735 if (min_val := getattr(input_model, "min", None)) is not None:
736 instance.min = str(min_val)
737 if (max_val := getattr(input_model, "max", None)) is not None:
738 instance.max = str(max_val)
739 if (step := getattr(input_model, "step", None)) is not None:
740 instance.step = str(step)
741 if (add_label := getattr(input_model, "addLabel", None)) is not None:
742 instance.add_label = add_label
743 if (up_control := getattr(input_model, "upControl", None)) is not None:
744 instance.up_control = up_control
745 if (down_control := getattr(input_model, "downControl", None)) is not None:
746 instance.down_control = down_control
748 # Code Generation Fields
749 if (django_field_type := getattr(input_model, "django_field_type", None)) is not None:
750 instance.django_field_type = django_field_type
751 if (django_field_args := getattr(input_model, "django_field_args", None)) is not None:
752 instance.django_field_args = django_field_args
753 if (django_field_positional_args := getattr(input_model, "django_field_positional_args", None)) is not None:
754 instance.django_field_positional_args = django_field_positional_args
755 if (pydantic_field_type := getattr(input_model, "pydantic_field_type", None)) is not None:
756 instance.pydantic_field_type = pydantic_field_type
757 if (extra_imports := getattr(input_model, "extra_imports", None)) is not None:
758 instance.extra_imports = extra_imports
759 if (validators := getattr(input_model, "validators", None)) is not None:
760 instance.validators = validators
761 if (list_filter := getattr(input_model, "list_filter", None)) is not None:
762 instance.list_filter = list_filter
764 # Fields that are valid Pydantic fields but not promoted to columns must be saved in additional_props
765 # otherwise they are lost.
766 extra_fields = [
767 "max",
768 "rows",
769 "cols",
770 "prefixIcon",
771 "classes",
772 "value",
773 "suffixIcon",
774 "validationRules",
775 "maxLength",
776 "itemClass",
777 "itemsClass",
778 "_minDateSource",
779 "_maxDateSource",
780 "disabledDays",
781 ]
782 # Ensure additional_props is a dict
783 if instance.additional_props is None:
784 instance.additional_props = {}
785 elif not isinstance(instance.additional_props, dict):
786 # Should not happen but safety first
787 instance.additional_props = {}
789 for field in extra_fields:
790 if (val := getattr(input_model, field, None)) is not None:
791 # 'max' is now a model field, so don't put it in additional_props
792 if field == "max":
793 continue
794 instance.additional_props[field] = val
796 try:
797 node_type = getattr(input_model, "node_type")
798 except Exception as E:
799 raise E
800 if node_type == "condition":
801 instance.node_type = "condition"
802 elif node_type == "formkit":
803 instance.node_type = "$formkit"
804 elif node_type == "element":
805 instance.node_type = "$el"
806 elif node_type == "component":
807 instance.node_type = "$cmp"
809 log(f"[green]Yielding: {instance}")
811 # Must save the instance before adding "options" or "children"
812 instance.node = input_model.dict(
813 exclude={
814 "options",
815 "children",
816 "additional_props",
817 "node_type",
818 "list_filter",
819 },
820 exclude_none=True,
821 exclude_unset=True,
822 )
823 # Where an alias is used ("el", ) restore it to the expected value
824 # of a FormKit schema node
825 for pydantic_key, db_key in (("el", "$el"), ("formkit", "$formkit")):
826 if db_value := instance.node.pop(pydantic_key, None):
827 instance.node[db_key] = db_value
829 instance.save()
830 # Add the "options" if it is a 'text' type getter
831 options: formkit_schema.OptionsType = getattr(input_model, "options", None)
833 if isinstance(options, str):
834 # Maintain this as it is probably a `$get...` options call
835 # to a Javascript function
836 instance.node["options"] = options
837 instance.save()
839 elif isinstance(options, Iterable):
840 # Create a new "group" to assign these options to
841 # Here we use a random UUID as the group name
842 instance.option_group = OptionGroup.objects.create(group=f"Auto generated group for {str(instance)} {uuid.uuid4().hex[0:8]}")
843 for option in Option.from_pydantic(options, group=instance.option_group): # type: ignore[arg-type]
844 pass
845 instance.save()
847 for c_n in getattr(input_model, "children", []) or []:
848 child_node = next(iter(cls.from_pydantic(c_n)))
849 console.log(f" {child_node}")
850 instance.children.add(child_node)
852 yield instance
854 else:
855 raise TypeError(f"Expected FormKitNode or Iterable[FormKitNode], got {type(input_models)}")
857 def to_pydantic(self, recursive=False, options=False, **kwargs):
858 if self.text_content:
859 return self.text_content
860 return formkit_schema.FormKitNode.parse_obj(self.get_node_values(recursive=recursive, options=options, **kwargs))
863class SchemaManager(models.Manager):
864 """
865 Provides prefetching which we'll almost always want to have
866 """
868 def get_queryset(self):
869 return super().get_queryset().prefetch_related("nodes", "nodes__children")
872class FormKitSchema(UuidIdModel):
873 """
874 This represents a "FormKitSchema" which is an heterogenous
875 collection of items.
876 """
878 label = models.CharField(
879 max_length=1024,
880 null=True,
881 blank=True,
882 help_text="Used as a human-readable label",
883 unique=True,
884 default=uuid.uuid4,
885 )
886 nodes = models.ManyToManyField(FormKitSchemaNode, through=FormComponents)
887 objects = SchemaManager()
889 def get_schema_values(self, recursive=False, options=False, **kwargs):
890 """
891 Return a list of "node" dicts
892 """
893 nodes: Iterable[FormKitSchemaNode] = self.nodes.order_by("formcomponents__order")
894 for node in nodes:
895 yield node.get_node_values(recursive=recursive, options=options, **kwargs)
897 def to_pydantic(self):
898 values = list(self.get_schema_values())
899 return formkit_schema.FormKitSchema.parse_obj(values)
901 def __str__(self) -> str:
902 return self.label or short_uuid(self.id)
904 def save(self, *args, **kwargs):
905 super().save(*args, **kwargs)
907 @classmethod
908 def from_pydantic(cls, input_model: formkit_schema.FormKitSchema, label: str | None = None) -> "FormKitSchema":
909 """
910 Converts a given Pydantic representation of a Schema
911 to Django database fields
912 """
913 from formkit_ninja.services.schema_import import SchemaImportService
915 return SchemaImportService.import_schema(input_model, label=label)
917 @classmethod
918 def from_json(cls, input_file: dict):
919 """
920 Converts a given JSON string to a suitable
921 Django representation
922 """
923 schema_instance = formkit_schema.FormKitSchema.parse_obj(input_file)
924 return cls.from_pydantic(schema_instance)
927class SchemaLabel(models.Model):
928 """
929 This intended to hold translations of Partisipa schema definitions.
930 The title.
931 """
933 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE)
934 label = models.CharField(max_length=1024)
935 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese")))
937 def __str__(self) -> str:
938 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}"
941class SchemaDescription(models.Model):
942 """
943 This intended to hold translations of Partisipa schema definitions.
944 The description.
945 """
947 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE)
948 label = models.CharField(max_length=1024)
949 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese")))
951 def __str__(self) -> str:
952 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}"
955# Import submission models to register them with the app