Coverage for formkit_ninja / models.py: 22.80%
506 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-06 04:12 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-03-06 04:12 +0000
1from __future__ import annotations
3import logging
4import uuid
5import warnings
6from keyword import iskeyword, issoftkeyword
7from typing import Any, Iterable, TypedDict, get_args
9import pghistory
10import pgtrigger
11from django.conf import settings
12from django.contrib.contenttypes.models import ContentType
13from django.contrib.postgres.aggregates import ArrayAgg
14from django.core.exceptions import ValidationError
15from django.db import models, transaction
16from django.db.models import Q
17from django.db.models.aggregates import Max
18from django.db.models.functions import Greatest
19from django.utils import timezone
20from rich.console import Console
22from formkit_ninja import formkit_schema, triggers
24# Re export "form_submission" models
25from formkit_ninja.code_generation_config import CodeGenerationConfig # noqa: F401
26from formkit_ninja.form_submission.models import (
27 SeparatedSubmission, # noqa: F401
28 Submission, # noqa: F401
29 SubmissionField, # noqa: F401
30 SubmissionFile, # noqa: F401
31)
32from formkit_ninja.utils import short_uuid
34console = Console()
35log = console.log
37logger = logging.getLogger()
40def check_valid_django_id(key: str):
41 if not key:
42 raise ValidationError("Name cannot be empty")
43 if key[0].isdigit():
44 raise ValidationError(f"{key} is not valid, it cannot start with a digit")
45 if not key.isidentifier() or iskeyword(key) or issoftkeyword(key):
46 raise ValidationError(f"{key} cannot be used as a keyword. Should be a valid python identifier")
47 if key[-1] == "_":
48 raise ValidationError(f"{key} is not valid, it cannot end with an underscore")
51class UuidIdModel(models.Model):
52 """
53 Consistently use fields which will
54 help with syncing data:
55 - UUID field is the ID
56 - Created field
57 - Last Modified field
58 - updated_by (optional)
59 - created_by (optional)
60 """
62 class Meta:
63 abstract = True
65 id = models.UUIDField(primary_key=True, default=uuid.uuid4, editable=False, unique=True)
66 created = models.DateTimeField(default=timezone.now, blank=True, null=True)
67 updated = models.DateTimeField(auto_now=True, blank=True, null=True)
68 created_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True)
69 updated_by = models.ForeignKey(settings.AUTH_USER_MODEL, on_delete=models.PROTECT, related_name="+", blank=True, null=True)
72class OptionDict(TypedDict):
73 value: str
74 label: str
77class OptionGroup(models.Model):
78 """
79 This intended to be a "collection" of choices
80 For instance all the values in a single PNDS zTable
81 Also intended to allow users to add / modify their __own__ 'Options'
82 for idb and formkit to recognize
83 """
85 group = models.CharField(max_length=1024, primary_key=True, help_text="The label to use for these options")
86 content_type = models.ForeignKey(
87 ContentType,
88 on_delete=models.PROTECT,
89 null=True,
90 blank=True,
91 help_text=("This is an optional reference to the original source object for this set of options (typically a table from which we copy options)"),
92 )
94 # If the object is a "Content Type" we expect it to have a similar layout to this
96 def save(self, *args, **kwargs):
97 # Prior to save ensure that content_type, if present, fits suitable schema
98 if self.content_type:
99 klass = self.content_type.model_class()
100 try:
101 if klass._meta.get_field("value") is None or not hasattr(klass, "label_set"):
102 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute")
103 except Exception as E:
104 raise ValueError(f"Expected {klass} to have a 'value' field and a 'label_set' attribute") from E
105 return super().save(*args, **kwargs)
107 def __str__(self):
108 return f"{self.group}"
110 @classmethod
111 def copy_table(cls, model: type[models.Model], field: str, language: str | None = "en", group_name: str | None = None):
112 """
113 Copy an existing table of options into this OptionGroup
114 """
116 with transaction.atomic():
117 group_obj, group_created = cls.objects.get_or_create(group=group_name, content_type=ContentType.objects.get_for_model(model))
118 log(group_obj)
120 from typing import Any, cast
122 for obj in cast(Any, model).objects.values("pk", field):
123 option, option_created = Option.objects.get_or_create(
124 object_id=obj["pk"],
125 group=group_obj,
126 value=obj["pk"],
127 )
128 OptionLabel.objects.get_or_create(option=option, label=obj[field] or "", lang=language)
131class OptionQuerySet(models.Manager):
132 """
133 Prefetched "labels" for performance
134 """
136 def get_queryset(self):
137 """
138 Added a prefetch_related to the queryset
139 """
140 lang_codes = (n[0] for n in settings.LANGUAGES)
142 label_model = OptionLabel
143 annotated_fields = {f"label_{lang}": label_model.objects.filter(lang=lang, option=models.OuterRef("pk")) for lang in lang_codes}
144 annotated_fields_subquery = {field: models.Subquery(query.values("label")[:1], output_field=models.CharField()) for field, query in annotated_fields.items()}
145 return super().get_queryset().annotate(**annotated_fields_subquery)
148class Option(UuidIdModel):
149 """
150 This is a key/value field representing one "option" for a FormKit property
151 The translated values for this option are in the `Translatable` table
152 """
154 object_id = models.IntegerField(
155 null=True,
156 blank=True,
157 help_text=("This is a reference to the primary key of the original source object (typically a PNDS ztable ID) or a user-specified ID for a new group"),
158 )
159 last_updated = models.DateTimeField(auto_now=True)
160 group = models.ForeignKey(OptionGroup, on_delete=models.CASCADE, null=True, blank=True)
161 # is_active = models.BooleanField(default=True)
162 order = models.IntegerField(null=True, blank=True)
164 class Meta:
165 triggers = triggers.update_or_insert_group_trigger("group_id")
166 constraints = [models.UniqueConstraint(fields=["group", "object_id"], name="unique_option_id")]
167 ordering = (
168 "group",
169 "order",
170 )
172 value = models.CharField(max_length=1024)
173 order_with_respect_to = "group"
175 objects = OptionQuerySet()
177 @classmethod
178 def from_pydantic(
179 cls,
180 options: list[str | OptionDict],
181 group: OptionGroup | None = None,
182 ) -> Iterable["Option"]:
183 """
184 Yields "Options" in the database based on the input given
185 """
186 from formkit_ninja.services.schema_import import SchemaImportService
188 yield from SchemaImportService.import_options(options, group=group)
190 def __str__(self) -> str:
191 # Use group_id (stored on row; OptionGroup.pk is the group name) to avoid N+1.
192 if self.group_id:
193 return f"{self.group_id}::{self.value}"
194 return f"No group: {self.value}"
197class OptionLabel(models.Model):
198 option = models.ForeignKey("Option", on_delete=models.CASCADE)
199 label = models.CharField(max_length=1024)
200 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese")))
202 def save(self, *args, **kwargs):
203 """
204 When saved, save also my "option" so that its last_updated is set
205 """
206 if self.option is not None:
207 self.option.save()
208 return super().save(*args, **kwargs)
210 class Meta:
211 constraints = [models.UniqueConstraint(fields=["option", "lang"], name="unique_option_label")]
213 def __str__(self) -> str:
214 # Use only local fields to avoid N+1 when listing (e.g. in admin).
215 return f"{self.label} ({self.lang})" if self.label else f"option={self.option_id} lang={self.lang}"
218class FormComponents(UuidIdModel):
219 """
220 A model relating "nodes" of a schema to a schema with model ordering
221 """
223 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE)
224 # This is null=True so that a new FormComponent can be added from the admin inline
225 node = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE, null=True, blank=True)
226 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True)
227 order = models.IntegerField(null=True, blank=True)
228 order_with_respect_to = "schema"
230 class Meta:
231 triggers = triggers.update_or_insert_group_trigger("schema_id")
232 ordering = ("schema", "order")
234 def __str__(self) -> str:
235 # Use node_id/schema_id (stored on row) to avoid N+1 when listing FormComponents.
236 return f"node={self.node_id}[{self.order}]: schema={self.schema_id}"
239class NodeChildrenManager(models.Manager):
240 """
241 Adds aggregation and filtering for client side data
242 of NodeChildren relations
243 """
245 def aggregate_changes_table(self, latest_change: int | None = None):
246 values = (
247 self.get_queryset()
248 .values("parent_id")
249 .annotate(
250 children=ArrayAgg("child", ordering="order"),
251 )
252 .annotate(Max("child__track_change"))
253 .annotate(latest_change=Greatest("child__track_change__max", "parent__track_change"))
254 )
255 if latest_change:
256 values = values.filter(Q(latest_change__gt=latest_change) | Q(parent__latest_change__gt=latest_change))
257 return values.values_list("parent_id", "latest_change", "children", named=True)
260class NodeChildren(models.Model):
261 """
262 This is an ordered m2m model representing
263 the "children" of an HTML element
264 """
266 parent = models.ForeignKey(
267 "FormKitSchemaNode",
268 on_delete=models.CASCADE,
269 related_name="parent",
270 )
271 child = models.ForeignKey("FormKitSchemaNode", on_delete=models.CASCADE)
272 order = models.IntegerField(null=True, blank=True)
273 track_change = models.BigIntegerField(null=True, blank=True)
274 order_with_respect_to = "parent"
276 class Meta:
277 triggers = [
278 *triggers.update_or_insert_group_trigger("parent_id"),
279 triggers.bump_sequence_value(sequence_name=triggers.NODE_CHILDREN_CHANGE_ID),
280 ]
281 ordering = (
282 "parent_id",
283 "order",
284 )
286 objects = NodeChildrenManager()
288 def __str__(self) -> str:
289 # Use parent_id/child_id (stored on row) to avoid N+1.
290 return f"parent={self.parent_id} → child={self.child_id} order={self.order}"
293class NodeQS(models.QuerySet):
294 def from_change(self, track_change: int = -1):
295 return self.filter(track_change__gt=track_change)
297 def to_response(self, ignore_errors: bool = True, options: bool = True) -> Iterable[tuple[uuid.UUID, int | None, formkit_schema.Node | str | None, bool]]:
298 """
299 Return a set of FormKit nodes
300 """
301 node: FormKitSchemaNode
302 for node in self.all():
303 try:
304 if node.is_active:
305 yield node.id, node.track_change, node.get_node(recursive=False, options=options), node.protected
306 else:
307 yield node.id, node.track_change, None, node.protected
308 except Exception as E:
309 if not ignore_errors:
310 raise
311 warnings.warn(f"An unparseable FormKit node was hit at {node.pk}")
312 warnings.warn(f"{E}")
315@pghistory.track()
316@pgtrigger.register(
317 pgtrigger.Protect(
318 # If the node is protected, delete is not allowed
319 name="protect_node_deletes_and_updates",
320 operation=pgtrigger.Delete,
321 condition=pgtrigger.Q(old__protected=True),
322 ),
323 pgtrigger.Protect(
324 # If both new and old values are "protected", updates are not allowed
325 name="protect_node_updates",
326 operation=pgtrigger.Update,
327 condition=pgtrigger.Q(old__protected=True) & pgtrigger.Q(new__protected=True),
328 ),
329 pgtrigger.SoftDelete(name="soft_delete", field="is_active"),
330 triggers.bump_sequence_value("track_change", triggers.NODE_CHANGE_ID),
331)
332class FormKitSchemaNode(UuidIdModel):
333 """
334 This represents a single "Node" in a FormKit schema.
335 There are several different types of node which may be defined:
336 FormKitSchemaDOMNode
337 | FormKitSchemaComponent
338 | FormKitSchemaTextNode
339 | FormKitSchemaCondition
340 | FormKitSchemaFormKit
341 """
343 objects = NodeQS.as_manager()
345 NODE_TYPE_CHOICES = (
346 ("$cmp", "Component"), # Not yet implemented
347 ("text", "Text"),
348 ("condition", "Condition"), # Not yet implemented
349 ("$formkit", "FormKit"),
350 ("$el", "Element"),
351 ("raw", "Raw JSON"), # Not yet implemented
352 )
353 FORMKIT_CHOICES = [(t, t) for t in get_args(formkit_schema.FORMKIT_TYPE)]
355 ELEMENT_TYPE_CHOICES = [("p", "p"), ("h1", "h1"), ("h2", "h2"), ("span", "span")]
356 node_type = models.CharField(max_length=256, choices=NODE_TYPE_CHOICES, blank=True, help_text="")
357 description = models.CharField(
358 max_length=4000,
359 null=True,
360 blank=True,
361 help_text="Decribe the type of data / reason for this component",
362 )
363 label = models.CharField(max_length=1024, help_text="Used as a human-readable label", null=True, blank=True)
364 option_group = models.ForeignKey(OptionGroup, null=True, blank=True, on_delete=models.PROTECT)
365 children = models.ManyToManyField("self", through=NodeChildren, symmetrical=False, blank=True)
366 is_active = models.BooleanField(default=True)
367 protected = models.BooleanField(default=False)
369 node = models.JSONField(
370 null=True,
371 blank=True,
372 help_text="A JSON representation of select parts of the FormKit schema",
373 )
375 additional_props = models.JSONField(
376 null=True,
377 blank=True,
378 help_text="User space for additional, less used props",
379 )
380 icon = models.CharField(max_length=256, null=True, blank=True)
381 title = models.CharField(max_length=1024, null=True, blank=True)
382 readonly = models.BooleanField(default=False)
383 sections_schema = models.JSONField(null=True, blank=True, help_text="Schema for the sections")
384 min = models.CharField(max_length=256, null=True, blank=True)
385 max = models.CharField(max_length=256, null=True, blank=True)
386 step = models.CharField(max_length=256, null=True, blank=True)
387 add_label = models.CharField(max_length=1024, null=True, blank=True)
388 up_control = models.BooleanField(default=True)
389 down_control = models.BooleanField(default=True)
391 # Code Generation Source of Truth
392 django_field_type = models.CharField(
393 max_length=100,
394 null=True,
395 blank=True,
396 help_text="The Django Model Field class to use (e.g., 'CharField', 'IntegerField', 'ForeignKey'). Providing this makes this field the primary source of truth for code generation.",
397 )
398 django_field_args = models.JSONField(
399 default=dict,
400 blank=True,
401 help_text="Arguments passed to the Django field as a JSON dictionary. "
402 "Example: {'null': true, 'blank': true, 'max_length': 255}. "
403 "For ForeignKeys, include the model name: {'to': 'auth.User', 'on_delete': 'models.CASCADE'}.",
404 )
405 django_field_positional_args = models.JSONField(
406 default=list,
407 blank=True,
408 help_text="Positional arguments passed to the Django field as a JSON list. Example: ['auth.User'].",
409 )
410 pydantic_field_type = models.CharField(
411 max_length=100,
412 null=True,
413 blank=True,
414 help_text="The Python/Pydantic type for this field (e.g., 'str', 'int', 'Decimal', 'UUID', 'date').",
415 )
416 extra_imports = models.JSONField(
417 default=list,
418 blank=True,
419 help_text="A list of additional Python import statements required by this field. Example: ['from decimal import Decimal', 'from django.core.validators import MinValueValidator'].",
420 )
421 validators = models.JSONField(
422 default=list,
423 blank=True,
424 help_text="A list of Django/Pydantic validator strings to be applied to this field. Example: ['MinValueValidator(0)', 'validate_v_date'].",
425 )
426 list_filter = models.BooleanField(
427 default=False,
428 help_text="Include this field in generated ModelAdmin.list_filter.",
429 )
431 text_content = models.TextField(null=True, blank=True, help_text="Content for a text element, for children of an $el type component")
432 track_change = models.BigIntegerField(null=True, blank=True)
434 @property
435 def formkit(self):
436 return self.node.get("$formkit") if isinstance(self.node, dict) else None
438 @property
439 def name(self):
440 return self.node.get("name") if isinstance(self.node, dict) else None
442 def __str__(self):
443 return f"Node: {self.label}" if self.label else f"{self.node_type} {self.id}"
445 def save(self, *args, **kwargs):
446 """
447 On save validate the 'node' field matches the 'FormKitNode'
448 """
449 # rename `formkit` to `$formkit`
450 if isinstance(self.node, dict) and "formkit" in self.node:
451 self.node.update({"$formkit": self.node.pop("formkit")})
452 # We're also going to verify that the 'key' is a valid identifier
453 # Keep in mind that the `key` may be used as part of a model so
454 # should be valid Django fieldname too
455 if isinstance(self.node, dict) and self.node_type in ("$formkit", "$el"):
456 if key := self.node.get("name"):
457 check_valid_django_id(key)
459 # Auto-promote common props from both 'additional_props' and 'node'
460 for source in (self.additional_props, self.node):
461 if not isinstance(source, dict):
462 continue
463 for field in (
464 "icon",
465 "title",
466 "readonly",
467 "sectionsSchema",
468 "min",
469 "max",
470 "step",
471 "addLabel",
472 "upControl",
473 "downControl",
474 ):
475 if field in source:
476 if field == "sectionsSchema":
477 target_field = "sections_schema"
478 elif field == "addLabel":
479 target_field = "add_label"
480 elif field == "upControl":
481 target_field = "up_control"
482 elif field == "downControl":
483 target_field = "down_control"
484 else:
485 target_field = field
487 val = source.get(field)
488 if field in ("min", "max", "step") and val is not None:
489 val = str(val)
490 setattr(self, target_field, val)
492 # Sync promoted columns back into node so stored JSON stays in sync when
493 # admin edits model fields (e.g. add_label, up_control) rather than node.
494 # Only write non-default values (or when key already in node) to avoid
495 # adding keys that weren't in the original schema (round-trip fidelity).
496 if isinstance(self.node, dict):
497 _promoted_to_node = (
498 ("icon", "icon", None),
499 ("title", "title", None),
500 ("readonly", "readonly", False),
501 ("sections_schema", "sectionsSchema", None),
502 ("min", "min", None),
503 ("max", "max", None),
504 ("step", "step", None),
505 ("add_label", "addLabel", None),
506 ("up_control", "upControl", True),
507 ("down_control", "downControl", True),
508 )
509 for attr, key, default in _promoted_to_node:
510 val = getattr(self, attr, None)
511 already_in_node = key in self.node
512 if isinstance(val, bool):
513 if already_in_node or val != default:
514 self.node[key] = val
515 elif key in self.node:
516 self.node.pop(key, None)
517 elif val not in (None, ""):
518 self.node[key] = val
519 elif already_in_node:
520 self.node.pop(key, None)
522 # Resolve code generation defaults if not set
523 self.resolve_code_generation_defaults()
525 return super().save(*args, **kwargs)
527 def resolve_code_generation_defaults(self, force=False):
528 """
529 Populate code generation fields from CodeGenerationConfig and settings
530 if they are not already set.
531 """
532 # We need a node structure to match against
533 node = self.get_node(recursive=False)
534 if isinstance(node, str):
535 # Text nodes don't typically generate fields themselves
536 return
538 from formkit_ninja.parser.database_node_path import DatabaseNodePath
540 # Create a transient DatabaseNodePath to leverage its resolution logic
541 path = DatabaseNodePath(node)
543 if force or not self.django_field_type:
544 self.django_field_type = path.to_django_type()
546 if force or not self.django_field_args or not self.django_field_positional_args:
547 # We want the dict/list, not the string
548 # DatabaseNodePath uses _get_config and _get_from_settings
549 config = path._get_config()
550 if config:
551 if force or not self.django_field_args:
552 self.django_field_args = config.django_args
553 if force or not self.django_field_positional_args:
554 self.django_field_positional_args = config.django_positional_args
555 else:
556 if force or not self.django_field_args:
557 settings_args = path._get_from_settings("django_args")
558 if isinstance(settings_args, dict):
559 self.django_field_args = settings_args
560 if force or not self.django_field_positional_args:
561 settings_pos_args = path._get_from_settings("django_positional_args")
562 if isinstance(settings_pos_args, list):
563 self.django_field_positional_args = settings_pos_args
565 if force or not self.pydantic_field_type:
566 self.pydantic_field_type = path.to_pydantic_type()
568 if force or not self.extra_imports:
569 self.extra_imports = path.get_extra_imports()
571 if force or not self.validators:
572 self.validators = path.get_validators()
574 @property
575 def node_options(self) -> str | list[dict] | None:
576 """
577 Because "options" are translated and
578 separately stored, this step is necessary to
579 reinstate them
580 """
581 if self.node and (opts := self.node.get("options")):
582 return opts
584 if not self.option_group:
585 return None
586 options = self.option_group.option_set.all().prefetch_related("optionlabel_set")
587 # options: Iterable[Option] = self.option_set.all().prefetch_related("optionlabel_set")
588 # TODO: This is horribly slow
589 return [
590 {
591 "value": option.value,
592 "label": f"{label_obj.label if (label_obj := option.optionlabel_set.first()) else ''}",
593 }
594 for option in options
595 ]
597 def get_node_values(self, recursive: bool = True, options: bool = True) -> str | dict:
598 """
599 Reify a 'dict' instance suitable for creating
600 a FormKit Schema node from
601 """
602 # Text element
603 if not self.node:
604 if self.text_content:
605 return self.text_content
606 return {}
607 values = {**self.node}
609 # Options may come from a string in the node, or
610 # may come from an m2m
611 if options and self.node_options:
612 values["options"] = self.node_options
613 if recursive:
614 children = [c.get_node_values() for c in self.children.order_by("nodechildren__order")]
615 if children:
616 values["children"] = children
617 if self.icon:
618 values["icon"] = self.icon
619 if self.title:
620 values["title"] = self.title
621 if self.readonly:
622 values["readonly"] = self.readonly
623 if self.sections_schema:
624 values["sectionsSchema"] = self.sections_schema
625 if self.min:
626 try:
627 values["min"] = int(self.min)
628 except ValueError:
629 values["min"] = self.min
630 if self.max:
631 try:
632 values["max"] = int(self.max)
633 except ValueError:
634 values["max"] = self.max
635 if self.step:
636 try:
637 val = float(self.step)
638 if val.is_integer():
639 values["step"] = int(val)
640 else:
641 values["step"] = str(val) # Keep as string if float to avoid precision issues
642 values["step"] = self.step
643 except ValueError:
644 values["step"] = self.step
645 if self.add_label:
646 values["addLabel"] = self.add_label
647 if not self.up_control: # Only write if false? Or always? Defaults are True.
648 values["upControl"] = self.up_control
649 if not self.down_control:
650 values["downControl"] = self.down_control
652 # Code Generation fields
653 if self.django_field_type:
654 values["django_field_type"] = self.django_field_type
655 if self.django_field_args:
656 values["django_field_args"] = self.django_field_args
657 if self.django_field_positional_args:
658 values["django_field_positional_args"] = self.django_field_positional_args
659 if self.pydantic_field_type:
660 values["pydantic_field_type"] = self.pydantic_field_type
661 if self.extra_imports:
662 values["extra_imports"] = self.extra_imports
663 if self.validators:
664 values["validators"] = self.validators
665 if self.list_filter:
666 values["list_filter"] = self.list_filter
668 # Merge additional_props into the top level and ensure it's removed as a separate key
669 values.pop("additional_props", None)
670 if self.additional_props and len(self.additional_props) > 0:
671 # Handle nested additional_props structure
672 props_to_merge = self.additional_props
673 if "additional_props" in props_to_merge:
674 props_to_merge = props_to_merge["additional_props"]
675 # Filter out None values to prevent Pydantic validation errors
676 clean_props = {k: v for k, v in props_to_merge.items() if v is not None}
677 values.update(clean_props)
679 if self.node_type == "$el" and not values.get("$el"):
680 values["$el"] = "span"
681 elif self.node_type == "$formkit" and not values.get("$formkit"):
682 values["$formkit"] = "text"
684 return {k: v for k, v in values.items() if v != ""}
686 def get_ancestors(self) -> list["FormKitSchemaNode"]:
687 """
688 Return a list of ancestor nodes by following the nodechildren_set relationship upwards.
689 Follows the first parent found for each node.
690 """
691 ancestors: list[FormKitSchemaNode] = []
692 current = self
693 while True:
694 # nodechildren_set contains objects where current is the child
695 nc = current.nodechildren_set.first()
696 if not nc:
697 break
698 current = nc.parent
699 if current in ancestors: # Avoid infinite cycles
700 break
701 ancestors.insert(0, current)
702 if len(ancestors) > 20: # Safety limit
703 break
704 return ancestors
706 def get_node_path(self, recursive=True) -> list[formkit_schema.Node | str]:
707 """
708 Return a list of Pydantic nodes representing the path from the root to this node.
709 """
710 ancestors = self.get_ancestors()
711 return [a.get_node(recursive=False) for a in ancestors] + [self.get_node(recursive=recursive)] # type: ignore[return-value]
713 def get_node(self, recursive=False, options=False, **kwargs) -> formkit_schema.Node | str:
714 """
715 Return a "decorated" node instance
716 with restored options and translated fields
717 """
718 if self.text_content or self.node_type == "text":
719 return self.text_content or ""
720 if self.node == {} or self.node is None:
721 if self.node_type == "$el":
722 node_content_dict: dict[str, Any] = {"$el": "span"}
723 elif self.node_type == "$formkit":
724 node_content_dict = {"$formkit": "text"}
725 else:
726 node_content_dict = {}
727 else:
728 node_content_dict = self.get_node_values(**kwargs, recursive=recursive, options=options) # type: ignore[assignment]
730 formkit_node = formkit_schema.FormKitNode.parse_obj(node_content_dict, recursive=recursive)
731 return formkit_node.__root__
733 @classmethod
734 def from_pydantic( # noqa: C901
735 cls, input_models: formkit_schema.FormKitSchemaProps | Iterable[formkit_schema.FormKitSchemaProps]
736 ) -> Iterable["FormKitSchemaNode"]:
737 if isinstance(input_models, str):
738 yield cls.objects.create(node_type="text", label=input_models, text_content=input_models)
740 elif isinstance(input_models, Iterable) and not isinstance(input_models, formkit_schema.FormKitSchemaProps):
741 for n in input_models:
742 yield from cls.from_pydantic(n)
744 elif isinstance(input_models, formkit_schema.FormKitSchemaProps):
745 input_model = input_models
746 instance = cls()
747 log(f"[green]Creating {instance}")
748 for label_field in ("name", "id", "key", "label"):
749 if label := getattr(input_model, label_field, None):
750 instance.label = label
751 break
753 # Node types
754 if props := getattr(input_model, "additional_props", None):
755 instance.additional_props = props
757 if (icon := getattr(input_model, "icon", None)) is not None:
758 instance.icon = icon
759 if (title := getattr(input_model, "title", None)) is not None:
760 instance.title = title
761 if (readonly := getattr(input_model, "readonly", None)) is not None:
762 instance.readonly = readonly
763 if (sections_schema := getattr(input_model, "sectionsSchema", None)) is not None:
764 instance.sections_schema = sections_schema
765 if (min_val := getattr(input_model, "min", None)) is not None:
766 instance.min = str(min_val)
767 if (max_val := getattr(input_model, "max", None)) is not None:
768 instance.max = str(max_val)
769 if (step := getattr(input_model, "step", None)) is not None:
770 instance.step = str(step)
771 if (add_label := getattr(input_model, "addLabel", None)) is not None:
772 instance.add_label = add_label
773 if (up_control := getattr(input_model, "upControl", None)) is not None:
774 instance.up_control = up_control
775 if (down_control := getattr(input_model, "downControl", None)) is not None:
776 instance.down_control = down_control
778 # Code Generation Fields
779 if (django_field_type := getattr(input_model, "django_field_type", None)) is not None:
780 instance.django_field_type = django_field_type
781 if (django_field_args := getattr(input_model, "django_field_args", None)) is not None:
782 instance.django_field_args = django_field_args
783 if (django_field_positional_args := getattr(input_model, "django_field_positional_args", None)) is not None:
784 instance.django_field_positional_args = django_field_positional_args
785 if (pydantic_field_type := getattr(input_model, "pydantic_field_type", None)) is not None:
786 instance.pydantic_field_type = pydantic_field_type
787 if (extra_imports := getattr(input_model, "extra_imports", None)) is not None:
788 instance.extra_imports = extra_imports
789 if (validators := getattr(input_model, "validators", None)) is not None:
790 instance.validators = validators
791 if (list_filter := getattr(input_model, "list_filter", None)) is not None:
792 instance.list_filter = list_filter
794 # Fields that are valid Pydantic fields but not promoted to columns must be saved in additional_props
795 # otherwise they are lost.
796 extra_fields = [
797 "max",
798 "rows",
799 "cols",
800 "prefixIcon",
801 "classes",
802 "value",
803 "suffixIcon",
804 "validationRules",
805 "maxLength",
806 "itemClass",
807 "itemsClass",
808 "_minDateSource",
809 "_maxDateSource",
810 "disabledDays",
811 ]
812 # Ensure additional_props is a dict
813 if instance.additional_props is None:
814 instance.additional_props = {}
815 elif not isinstance(instance.additional_props, dict):
816 # Should not happen but safety first
817 instance.additional_props = {}
819 for field in extra_fields:
820 if (val := getattr(input_model, field, None)) is not None:
821 # 'max' is now a model field, so don't put it in additional_props
822 if field == "max":
823 continue
824 instance.additional_props[field] = val
826 try:
827 node_type = getattr(input_model, "node_type")
828 except Exception as E:
829 raise E
830 if node_type == "condition":
831 instance.node_type = "condition"
832 elif node_type == "formkit":
833 instance.node_type = "$formkit"
834 elif node_type == "element":
835 instance.node_type = "$el"
836 elif node_type == "component":
837 instance.node_type = "$cmp"
839 log(f"[green]Yielding: {instance}")
841 # Must save the instance before adding "options" or "children"
842 instance.node = input_model.dict(
843 exclude={
844 "options",
845 "children",
846 "additional_props",
847 "node_type",
848 "list_filter",
849 },
850 exclude_none=True,
851 exclude_unset=True,
852 )
853 # Where an alias is used ("el", ) restore it to the expected value
854 # of a FormKit schema node
855 for pydantic_key, db_key in (("el", "$el"), ("formkit", "$formkit")):
856 if db_value := instance.node.pop(pydantic_key, None):
857 instance.node[db_key] = db_value
859 instance.save()
860 # Add the "options" if it is a 'text' type getter
861 options: formkit_schema.OptionsType = getattr(input_model, "options", None)
863 if isinstance(options, str):
864 # Maintain this as it is probably a `$get...` options call
865 # to a Javascript function
866 instance.node["options"] = options
867 instance.save()
869 elif isinstance(options, Iterable):
870 # Create a new "group" to assign these options to
871 # Here we use a random UUID as the group name
872 instance.option_group = OptionGroup.objects.create(group=f"Auto generated group for {str(instance)} {uuid.uuid4().hex[0:8]}")
873 for option in Option.from_pydantic(options, group=instance.option_group): # type: ignore[arg-type]
874 pass
875 instance.save()
877 for c_n in getattr(input_model, "children", []) or []:
878 child_node = next(iter(cls.from_pydantic(c_n)))
879 console.log(f" {child_node}")
880 instance.children.add(child_node)
882 yield instance
884 else:
885 raise TypeError(f"Expected FormKitNode or Iterable[FormKitNode], got {type(input_models)}")
887 def to_pydantic(self, recursive=False, options=False, **kwargs):
888 if self.text_content:
889 return self.text_content
890 return formkit_schema.FormKitNode.parse_obj(self.get_node_values(recursive=recursive, options=options, **kwargs))
893class SchemaManager(models.Manager):
894 """
895 Provides prefetching which we'll almost always want to have
896 """
898 def get_queryset(self):
899 return super().get_queryset().prefetch_related("nodes", "nodes__children")
902class FormKitSchema(UuidIdModel):
903 """
904 This represents a "FormKitSchema" which is an heterogenous
905 collection of items.
906 """
908 label = models.CharField(
909 max_length=1024,
910 null=True,
911 blank=True,
912 help_text="Used as a human-readable label",
913 unique=True,
914 default=uuid.uuid4,
915 )
916 nodes = models.ManyToManyField(FormKitSchemaNode, through=FormComponents)
917 objects = SchemaManager()
919 def get_schema_values(self, recursive=False, options=False, **kwargs):
920 """
921 Return a list of "node" dicts
922 """
923 nodes: Iterable[FormKitSchemaNode] = self.nodes.order_by("formcomponents__order")
924 for node in nodes:
925 yield node.get_node_values(recursive=recursive, options=options, **kwargs)
927 def to_pydantic(self):
928 values = list(self.get_schema_values())
929 return formkit_schema.FormKitSchema.parse_obj(values)
931 def __str__(self) -> str:
932 return self.label or short_uuid(self.id)
934 def save(self, *args, **kwargs):
935 super().save(*args, **kwargs)
937 @classmethod
938 def from_pydantic(cls, input_model: formkit_schema.FormKitSchema, label: str | None = None) -> "FormKitSchema":
939 """
940 Converts a given Pydantic representation of a Schema
941 to Django database fields
942 """
943 from formkit_ninja.services.schema_import import SchemaImportService
945 return SchemaImportService.import_schema(input_model, label=label)
947 @classmethod
948 def from_json(cls, input_file: dict):
949 """
950 Converts a given JSON string to a suitable
951 Django representation
952 """
953 schema_instance = formkit_schema.FormKitSchema.parse_obj(input_file)
954 return cls.from_pydantic(schema_instance)
957class SchemaLabel(models.Model):
958 """
959 This intended to hold translations of Partisipa schema definitions.
960 The title.
961 """
963 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE)
964 label = models.CharField(max_length=1024)
965 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese")))
967 def __str__(self) -> str:
968 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}"
971class SchemaDescription(models.Model):
972 """
973 This intended to hold translations of Partisipa schema definitions.
974 The description.
975 """
977 schema = models.ForeignKey("FormKitSchema", on_delete=models.CASCADE)
978 label = models.CharField(max_length=1024)
979 lang = models.CharField(max_length=4, default="en", choices=(("en", "English"), ("tet", "Tetum"), ("pt", "Portugese")))
981 def __str__(self) -> str:
982 return f"{self.label} ({self.lang})" if self.label else f"schema={self.schema_id} lang={self.lang}"
985# Import submission models to register them with the app