Coverage for formkit_ninja / api.py: 46%
260 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 06:00 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-01-23 06:00 +0000
1import importlib
2import re
3from functools import cached_property
4from http import HTTPStatus
5from importlib.util import find_spec
6from types import ModuleType
7from typing import Sequence, cast
8from uuid import UUID, uuid4
10from django.db import transaction
11from django.db.models import F
12from django.db.models.aggregates import Max
13from django.http import HttpRequest, HttpResponse
14from django.shortcuts import get_object_or_404
15from django.utils.cache import add_never_cache_headers
16from ninja import Field, ModelSchema, Router, Schema
17from pydantic import BaseModel, validator
19from formkit_ninja import formkit_schema, models
21if find_spec("sentry_sdk"): 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true
22 sentry_sdk: ModuleType | None = importlib.import_module("sentry_sdk")
23else:
24 sentry_sdk = None
27def sentry_message(message: str):
28 if sentry_sdk and hasattr(sentry_sdk, "capture_message"):
29 sentry_sdk.capture_message(f"{message}")
32router = Router(tags=["FormKit"])
35class FormKitSchemaIn(ModelSchema):
36 class Config:
37 model = models.FormKitSchema
38 model_fields = "__all__"
41class SchemaLabel(ModelSchema):
42 class Config:
43 model = models.SchemaLabel
44 model_fields = ("lang", "label")
47class SchemaDescription(ModelSchema):
48 class Config:
49 model = models.SchemaLabel
50 model_fields = ("lang", "label")
53class FormKitSchemaListOut(ModelSchema):
54 schemalabel_set: list[SchemaLabel]
55 schemadescription_set: list[SchemaDescription]
57 class Config:
58 model = models.FormKitSchema
59 model_fields = ("id", "label")
62class FormComponentsOut(ModelSchema):
63 node_id: UUID
64 schema_id: UUID
66 class Config:
67 model = models.FormComponents
68 model_fields = ("label",)
71class NodeChildrenOut(ModelSchema):
72 children: list[UUID] = []
73 latest_change: int | None = None
75 class Config:
76 model = models.NodeChildren
77 model_fields = ("parent",)
80class NodeReturnType(BaseModel):
81 key: UUID
82 last_updated: int
83 node: formkit_schema.Node
84 protected: bool
87class NodeInactiveType(BaseModel):
88 key: UUID
89 last_updated: int
90 is_active: bool = False
91 protected: bool
94class NodeStringType(NodeReturnType):
95 """
96 str | formkit_schema.FormKitNode causes openapi generator to fail
97 """
99 node: str # type: ignore[assignment]
102NodeQSResponse = Sequence[NodeStringType | NodeReturnType | NodeInactiveType]
105def node_queryset_response(qs: models.NodeQS) -> NodeQSResponse:
106 responses = []
107 n: NodeStringType | NodeInactiveType | NodeReturnType
108 for key, last_updated, node_val, protected in qs.to_response(ignore_errors=False):
109 if last_updated is None:
110 last_updated = -1
111 if isinstance(node_val, str):
112 n = NodeStringType(key=key, last_updated=last_updated, protected=protected, node=node_val)
113 elif node_val is None:
114 n = NodeInactiveType(key=key, last_updated=last_updated, protected=protected, is_active=False)
115 else:
116 n = NodeReturnType(key=key, last_updated=last_updated, protected=protected, node=node_val) # type: ignore[arg-type]
117 responses.append(n)
118 return responses
121class Option(ModelSchema):
122 group_name: str # This is annotation of the model `content_type_model`
123 value: str
124 # Note: For other projects you may want to extend this with additional languages
125 label_tet: str | None
126 label_en: str | None
127 label_pt: str | None
128 # This is an optional field used to indicate the last update
129 # It's linked to a Django pg trigger instance in Partisipa
130 change_id: int | None = None
132 class Config:
133 model = models.Option
134 model_fields = ("value",)
137@router.get("list-schemas", response=list[FormKitSchemaListOut])
138def get_list_schemas(request):
139 return models.FormKitSchema.objects.all()
142@router.get("list-nodes", response=NodeQSResponse, by_alias=True, exclude_none=True)
143def get_formkit_nodes(request: HttpRequest, response: HttpResponse, latest_change: int | None = -1):
144 """
145 Get all of the FormKit nodes in the database
146 """
147 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects)
148 nodes = objects.from_change(latest_change or -1)
149 lc = nodes.aggregate(_=Max("track_change"))["_"]
150 response["latest_change"] = lc if lc is not None else (latest_change or -1)
151 add_never_cache_headers(response)
152 return node_queryset_response(nodes)
155@router.get(
156 "list-related-nodes",
157 response=list[NodeChildrenOut],
158 exclude_defaults=True,
159 exclude_none=True,
160)
161def get_related_nodes(request, response: HttpResponse, latest_change: int | None = -1):
162 """
163 Get all of the FormKit node relationships in the database
164 """
165 add_never_cache_headers(response)
166 objects: models.NodeChildrenManager = models.NodeChildren.objects
167 return objects.aggregate_changes_table(latest_change=latest_change)
170@router.get(
171 "list-components",
172 response=list[FormComponentsOut],
173 exclude_defaults=True,
174 exclude_none=True,
175 by_alias=True,
176)
177def get_components(request):
178 values = models.FormComponents.objects.all()
179 return values
182@router.get(
183 "schema/by-uuid/{schema_id}",
184 response=formkit_schema.FormKitSchema,
185 exclude_none=True,
186 by_alias=True,
187)
188def get_schemas(request, schema_id: UUID):
189 """
190 Get a schema based on its UUID
191 """
192 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, id=schema_id)
193 model = schema.to_pydantic()
194 return model
197@router.get(
198 "schema/all",
199 response=list[formkit_schema.FormKitSchema],
200 exclude_none=True,
201 by_alias=True,
202)
203def get_all_schemas(request):
204 """
205 Get all schemas
206 """
207 schemas = models.FormKitSchema.objects.all()
208 model = [s.to_pydantic() for s in schemas]
209 return model
212@router.get(
213 "schema/by-label/{label}",
214 response=formkit_schema.FormKitSchema,
215 exclude_none=True,
216 by_alias=True,
217)
218def get_schema_by_label(request, label: str):
219 """
220 Get a schema based on its label
221 """
222 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, label=label)
223 model = schema.to_pydantic()
224 return model
227@router.get(
228 "node/{node_id}",
229 response=formkit_schema.FormKitNode,
230 exclude_none=True,
231 by_alias=True,
232)
233def get_node(request, node_id: UUID):
234 """
235 Gets a node based on its UUID
236 """
237 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id)
238 instance = node.get_node()
239 return instance
242@router.get("/options", response=list[Option], exclude_none=True)
243def list_options(request: HttpRequest, response: HttpResponse):
244 """
245 List all available "native" FormKit ninja labels and links
246 """
247 return models.Option.objects.annotate(group_name=F("group__group"))
250class FormKitErrors(BaseModel):
251 errors: list[str] = []
252 field_errors: dict[str, str] = {}
255@router.delete("delete", response=NodeInactiveType, exclude_none=True, by_alias=True)
256def delete_node(request, node_id: UUID):
257 """
258 Delete a node based on its UUID
259 """
260 with transaction.atomic():
261 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id)
262 node.delete()
263 # node.refresh_from_db()
264 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects)
265 return node_queryset_response(objects.filter(pk=node_id))[0]
268class FormKitNodeIn(Schema):
269 """
270 Creates a new FormKit text or number node
271 We'd like to use `formkit_schema.FormKitSchemaFormKit`
272 here but that `discriminated node` stuff makes it hard
273 """
275 formkit: str = Field(default="text", alias="$formkit")
276 label: str | None = None
277 key: str | None = None
278 name: str | None = None
279 placeholder: str | None = None
280 help: str | None = None
282 # Fields from "number"
283 max: int | str | None = None
284 min: int | str | None = None
285 step: str | None = None
287 # Field from dropdown/select/autocomplete/radio/checkbox
288 options: str | None = None
290 # Repeater-specific properties
291 addLabel: str | None = None
292 itemClass: str | None = None
293 itemsClass: str | None = None
294 upControl: bool | None = None
295 downControl: bool | None = None
297 # Conditional logic
298 if_condition: str | None = Field(default=None, alias="if")
300 # Validation
301 validationRules: str | None = None
302 validation: str | list[str] | None = None
304 # Field Constraints
305 maxLength: int | None = None
306 _minDateSource: str | None = None
307 _maxDateSource: str | None = None
308 disabledDays: str | None = None
310 # Used for Creates
311 parent_id: UUID | None = None
313 # Used for Updates
314 uuid: UUID = Field(default_factory=uuid4)
316 # Used for "Add Group"
317 # This should include an `icon`, `title` and `id` for the second level group
318 additional_props: dict[str, str | int] | None = None
320 @validator("formkit")
321 def validate_formkit_type(cls, v):
322 """Validate that the formkit type is a valid FormKit type"""
323 from typing import get_args
325 valid_types = get_args(formkit_schema.FORMKIT_TYPE)
326 if v not in valid_types:
327 raise ValueError(f"Invalid FormKit type: {v}. Valid types are: {', '.join(valid_types)}")
328 return v
330 @cached_property
331 def parent(self):
332 if self.parent_id is None:
333 return None, None
334 try:
335 parent = models.FormKitSchemaNode.objects.get(pk=self.parent_id)
336 except models.FormKitSchemaNode.DoesNotExist:
337 return None, ["The parent node given does not exist"]
338 if parent.node.get("$formkit") not in {"group", "repeater"}:
339 return None, ["The parent node given is not a group or repeater"]
340 if parent is None:
341 return None, ["The parent node given does not exist"]
342 return parent, None
344 @cached_property
345 def parent_names(self) -> set[str]:
346 """
347 Return the names of parent nodes' child nodes.
348 The saved child node must not use any of these names.
349 """
350 parent, parent_errors = self.parent
351 if self.parent[0] and self.child:
352 # Ensures that names are not "overwritten"
353 return set(parent.children.exclude(pk=self.child.pk).values_list("node__name", flat=True))
354 elif self.parent[0]:
355 return set(parent.children.values_list("node__name", flat=True))
356 else:
357 return set()
359 @cached_property
360 def child(self):
361 # The uuid may belong to a node or may be a new value
362 try:
363 return models.FormKitSchemaNode.objects.get(pk=self.uuid)
364 except models.FormKitSchemaNode.DoesNotExist:
365 return models.FormKitSchemaNode(pk=self.uuid, node={})
367 @cached_property
368 def preferred_name(self):
369 """
370 Fetch a suitable name for the database to use.
371 This name must be unique to the 'parent' group, a valid Python id, valid Django id,
372 preferably lowercase.
373 """
374 # If "name" is not provided use the "label" field
375 if self.name is not None:
376 return disambiguate_name(make_name_valid_id(self.name), self.parent_names)
377 elif self.label is not None:
378 return disambiguate_name(make_name_valid_id(self.label), self.parent_names)
379 return make_name_valid_id(f"{uuid4().hex[:8]}_unnamed")
381 class Config:
382 allow_population_by_field_name = True
383 keep_untouched = (cached_property,)
386def create_or_update_child_node(payload: FormKitNodeIn):
387 parent, parent_errors = payload.parent
388 child = payload.child
390 if parent_errors:
391 return None, parent_errors
392 if child.is_active is False:
393 return None, ["This node has already been deleted and cannot be edited"]
395 values = payload.dict(
396 by_alias=True,
397 exclude_none=True,
398 exclude={"parent_id", "uuid"} | {"parent", "child", "preferred_name", "parent_names"},
399 )
400 # Ensure the name is unique and suitable
401 # Do not replace existing names though
402 existing_name = child.node.get("name", None) if isinstance(child.node, dict) else None
403 if existing_name is None:
404 values["name"] = payload.preferred_name
405 child.node.update(values)
406 if payload.additional_props is not None:
407 child.node.update(payload.additional_props)
409 child.label = payload.label
411 if isinstance(payload.additional_props, dict):
412 if label := payload.additional_props.get("label"):
413 # groups require a label
414 child.label = label
416 child.node_type = "formkit"
418 with transaction.atomic():
419 child.save()
420 if parent:
421 models.NodeChildren.objects.create(parent=parent, child=child)
423 return child, []
426def make_name_valid_id(in_: str):
427 """
428 Take a string. Replace any python-invalid characters with '_'
429 """
430 subbed = re.sub(r"\W|^(?=\d)", "_", in_)
431 while subbed[-1] == "_":
432 subbed = subbed[:-1]
433 return subbed.lower()
436def disambiguate_name(name_in: str, used_names: Sequence[str]):
437 suffix = 1
438 if name_in not in used_names:
439 return name_in
440 while f"{name_in}_{suffix}" in used_names:
441 suffix = suffix + 1
442 return f"{name_in}_{suffix}"
445@router.post(
446 "create_or_update_node",
447 response={
448 HTTPStatus.OK: NodeReturnType,
449 HTTPStatus.INTERNAL_SERVER_ERROR: FormKitErrors,
450 },
451 exclude_none=True,
452 by_alias=True,
453)
454def create_or_update_node(request, response: HttpResponse, payload: FormKitNodeIn):
455 """
456 Creates or updates a node in the FormKitSchemaNode model.
458 This function takes payload of type FormKitNodeIn.
459 It fetches the parent node if it exists and checks if it is a group or repeater.
460 If the parent node is not valid or is not a group or repeater, it returns an error response.
461 Otherwise, it proceeds to create or update the node.
463 Args:
464 request: The request object.
465 response (HttpResponse): The HttpResponse object.
466 payload (FormKitNodeIn): The payload containing the data for the node to be created or updated.
468 Returns:
469 HTTPStatus: The status of the HTTP response.
470 FormKitErrors: The errors encountered during the process, if any.
471 """
473 error_response = FormKitErrors()
474 # Update the payload "name"
475 # When label is provided, use the label to generate the name
476 # Fetch parent node, if it exists, and check that it is a group or repeater
478 try:
479 child, errors = create_or_update_child_node(payload)
480 if errors:
481 error_response.errors.append(errors)
482 except Exception as E:
483 error_response.errors.append(f"{E}")
485 if error_response.errors or error_response.field_errors:
486 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response
488 return node_queryset_response(models.FormKitSchemaNode.objects.filter(pk__in=[child.pk]))[0]