Coverage for formkit_ninja / api.py: 35.22%
318 statements
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-20 04:40 +0000
« prev ^ index » next coverage.py v7.13.1, created at 2026-02-20 04:40 +0000
1import json
2import re
3from functools import cached_property
4from http import HTTPStatus
5from typing import Sequence, cast
6from uuid import UUID, uuid4
8from django.contrib.auth import get_user
9from django.db import transaction
10from django.db.models import F
11from django.db.models.aggregates import Max
12from django.http import HttpRequest, HttpResponse
13from django.shortcuts import get_object_or_404
14from django.utils.cache import add_never_cache_headers
15from ninja import Field, ModelSchema, Router, Schema
16from pydantic import BaseModel, validator
18from formkit_ninja import formkit_schema, models
19from formkit_ninja.notifications import get_default_notifier
21notifier = get_default_notifier()
24def sentry_message(message: str) -> None:
25 notifier.notify(message)
28router = Router(tags=["FormKit"])
31def formkit_auth(request: HttpRequest):
32 """
33 Custom authentication function that checks if user is authenticated.
34 Permissions are checked in the endpoint itself to return proper 403 status.
35 """
36 user = get_user(request)
37 if not user or not user.is_authenticated:
38 return None
39 return user
42class FormKitSchemaIn(ModelSchema):
43 class Config:
44 model = models.FormKitSchema
45 model_fields = "__all__"
48class SchemaLabel(ModelSchema):
49 class Config:
50 model = models.SchemaLabel
51 model_fields = ("lang", "label")
54class SchemaDescription(ModelSchema):
55 class Config:
56 model = models.SchemaLabel
57 model_fields = ("lang", "label")
60class FormKitSchemaListOut(ModelSchema):
61 schemalabel_set: list[SchemaLabel]
62 schemadescription_set: list[SchemaDescription]
64 class Config:
65 model = models.FormKitSchema
66 model_fields = ("id", "label")
69class FormComponentsOut(ModelSchema):
70 node_id: UUID
71 schema_id: UUID
73 class Config:
74 model = models.FormComponents
75 model_fields = ("label",)
78class NodeChildrenOut(ModelSchema):
79 children: list[UUID] = []
80 latest_change: int | None = None
82 class Config:
83 model = models.NodeChildren
84 model_fields = ("parent",)
87class NodeReturnType(BaseModel):
88 key: UUID
89 last_updated: int
90 node: formkit_schema.Node
91 protected: bool
94class NodeInactiveType(BaseModel):
95 key: UUID
96 last_updated: int
97 is_active: bool = False
98 protected: bool
101class NodeStringType(NodeReturnType):
102 """
103 str | formkit_schema.FormKitNode causes openapi generator to fail
104 """
106 node: str # type: ignore[assignment]
109NodeQSResponse = Sequence[NodeStringType | NodeReturnType | NodeInactiveType]
112def node_queryset_response(qs: models.NodeQS) -> NodeQSResponse:
113 responses = []
114 n: NodeStringType | NodeInactiveType | NodeReturnType
115 for key, last_updated, node_val, protected in qs.to_response(ignore_errors=False):
116 if last_updated is None:
117 last_updated = -1
118 if isinstance(node_val, str):
119 n = NodeStringType(key=key, last_updated=last_updated, protected=protected, node=node_val)
120 elif node_val is None:
121 n = NodeInactiveType(key=key, last_updated=last_updated, protected=protected, is_active=False)
122 else:
123 n = NodeReturnType(key=key, last_updated=last_updated, protected=protected, node=node_val) # type: ignore[arg-type]
124 responses.append(n)
125 return responses
128class Option(ModelSchema):
129 group_name: str # This is annotation of the model `content_type_model`
130 value: str
131 # Note: For other projects you may want to extend this with additional languages
132 label_tet: str | None
133 label_en: str | None
134 label_pt: str | None
135 # This is an optional field used to indicate the last update
136 # It's linked to a Django pg trigger instance in Partisipa
137 change_id: int | None = None
139 class Config:
140 model = models.Option
141 model_fields = ("value",)
144@router.get("list-schemas", response=list[FormKitSchemaListOut])
145def get_list_schemas(request):
146 return models.FormKitSchema.objects.all()
149@router.get("list-nodes", response=NodeQSResponse, by_alias=True, exclude_none=True)
150def get_formkit_nodes(request: HttpRequest, response: HttpResponse, latest_change: int | None = -1):
151 """
152 Get all of the FormKit nodes in the database
153 """
154 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects)
155 nodes = objects.from_change(latest_change or -1)
156 lc = nodes.aggregate(_=Max("track_change"))["_"]
157 response["latest_change"] = lc if lc is not None else (latest_change or -1)
158 add_never_cache_headers(response)
159 return node_queryset_response(nodes)
162@router.get(
163 "list-related-nodes",
164 response=list[NodeChildrenOut],
165 exclude_defaults=True,
166 exclude_none=True,
167)
168def get_related_nodes(request, response: HttpResponse, latest_change: int | None = -1):
169 """
170 Get all of the FormKit node relationships in the database
171 """
172 add_never_cache_headers(response)
173 objects: models.NodeChildrenManager = models.NodeChildren.objects
174 return objects.aggregate_changes_table(latest_change=latest_change)
177@router.get(
178 "list-components",
179 response=list[FormComponentsOut],
180 exclude_defaults=True,
181 exclude_none=True,
182 by_alias=True,
183)
184def get_components(request):
185 values = models.FormComponents.objects.all()
186 return values
189@router.get(
190 "schema/by-uuid/{schema_id}",
191 response=formkit_schema.FormKitSchema,
192 exclude_none=True,
193 by_alias=True,
194)
195def get_schemas(request, schema_id: UUID):
196 """
197 Get a schema based on its UUID
198 """
199 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, id=schema_id)
200 model = schema.to_pydantic()
201 return model
204@router.get(
205 "schema/all",
206 response=list[formkit_schema.FormKitSchema],
207 exclude_none=True,
208 by_alias=True,
209)
210def get_all_schemas(request):
211 """
212 Get all schemas
213 """
214 schemas = models.FormKitSchema.objects.all()
215 model = [s.to_pydantic() for s in schemas]
216 return model
219@router.get(
220 "schema/by-label/{label}",
221 response=formkit_schema.FormKitSchema,
222 exclude_none=True,
223 by_alias=True,
224)
225def get_schema_by_label(request, label: str):
226 """
227 Get a schema based on its label
228 """
229 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, label=label)
230 model = schema.to_pydantic()
231 return model
234@router.get(
235 "node/{node_id}",
236 response=formkit_schema.FormKitNode,
237 exclude_none=True,
238 by_alias=True,
239)
240def get_node(request, node_id: UUID):
241 """
242 Gets a node based on its UUID
243 """
244 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id)
245 instance = node.get_node()
246 return instance
249@router.get("/options", response=list[Option], exclude_none=True)
250def list_options(request: HttpRequest, response: HttpResponse):
251 """
252 List all available "native" FormKit ninja labels and links
253 """
254 return models.Option.objects.annotate(group_name=F("group__group"))
257class FormKitErrors(BaseModel):
258 errors: list[str] = []
259 field_errors: dict[str, str] = {}
262@router.delete(
263 "delete/{node_id}",
264 response={
265 HTTPStatus.OK: NodeInactiveType,
266 HTTPStatus.FORBIDDEN: FormKitErrors,
267 HTTPStatus.NOT_FOUND: FormKitErrors,
268 },
269 exclude_none=True,
270 by_alias=True,
271 auth=formkit_auth,
272)
273def delete_node(request, node_id: UUID):
274 """
275 Delete a node based on its UUID
276 """
277 # Authentication is checked by formkit_auth
278 # Check permissions here to return proper 403 status
279 if not request.user.has_perm("formkit_ninja.change_formkitschemanode"):
280 error_response = FormKitErrors()
281 error_response.errors.append("You do not have permission to delete FormKit schema nodes.")
282 return HTTPStatus.FORBIDDEN, error_response
283 try:
284 with transaction.atomic():
285 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id)
286 node.delete()
287 # node.refresh_from_db()
288 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects)
289 return node_queryset_response(objects.filter(pk=node_id))[0]
290 except Exception as e:
291 # Handle protected node deletion or other database errors
292 error_response = FormKitErrors()
293 error_msg = str(e)
294 if "protected" in error_msg.lower() or "cannot delete" in error_msg.lower():
295 error_response.errors.append("This node is protected and cannot be deleted.")
296 return HTTPStatus.FORBIDDEN, error_response
297 # Re-raise other exceptions
298 raise
301class FormKitNodeIn(Schema):
302 """
303 Creates a new FormKit text or number node
304 We'd like to use `formkit_schema.FormKitSchemaFormKit`
305 here but that `discriminated node` stuff makes it hard
306 """
308 formkit: str = Field(default="text", alias="$formkit")
309 label: str | None = None
310 key: str | None = None
311 name: str | None = None
312 placeholder: str | None = None
313 help: str | None = None
314 value: str | None = None # Default value for fields (especially hidden fields)
316 # Fields from "number"
317 max: int | str | None = None
318 min: int | str | None = None
319 step: str | None = None
321 # Field from dropdown/select/autocomplete/radio/checkbox
322 options: str | None = None
324 # Repeater-specific properties
325 addLabel: str | None = None
326 itemClass: str | None = None
327 itemsClass: str | None = None
328 upControl: bool | None = None
329 downControl: bool | None = None
331 # Conditional logic
332 if_condition: str | None = Field(default=None, alias="if")
334 # Validation
335 validationRules: str | None = None
336 validation: str | list[str] | None = None
338 # Field Constraints
339 maxLength: int | None = None
340 _minDateSource: str | None = None
341 _maxDateSource: str | None = None
342 disabledDays: str | None = None
344 # Used for Creates
345 parent_id: UUID | None = None
347 # Used for Updates - optional for creates, required for updates
348 uuid: UUID | None = None
350 # Used for "Add Group"
351 # This should include an `icon`, `title` and `id` for the second level group
352 additional_props: dict[str, str | int] | None = None
354 @validator("formkit")
355 def validate_formkit_type(cls, v):
356 """Validate that the formkit type is a valid FormKit type"""
357 from typing import get_args
359 valid_types = get_args(formkit_schema.FORMKIT_TYPE)
360 if v not in valid_types:
361 raise ValueError(f"Invalid FormKit type: {v}. Valid types are: {', '.join(valid_types)}")
362 return v
364 @cached_property
365 def parent(self):
366 if self.parent_id is None:
367 return None, None
368 try:
369 parent = models.FormKitSchemaNode.objects.get(pk=self.parent_id)
370 except models.FormKitSchemaNode.DoesNotExist:
371 return None, ["The parent node given does not exist"]
372 if parent.node.get("$formkit") not in {"group", "repeater"}:
373 return None, ["The parent node given is not a group or repeater"]
374 if parent is None:
375 return None, ["The parent node given does not exist"]
376 return parent, None
378 @cached_property
379 def parent_names(self) -> set[str]:
380 """
381 Return the names of parent nodes' child nodes.
382 The saved child node must not use any of these names.
383 """
384 parent, parent_errors = self.parent
385 if self.parent[0] and self.child:
386 # Ensures that names are not "overwritten"
387 return set(parent.children.exclude(pk=self.child.pk).values_list("node__name", flat=True))
388 elif self.parent[0]:
389 return set(parent.children.values_list("node__name", flat=True))
390 else:
391 return set()
393 @cached_property
394 def child(self):
395 # The uuid may belong to a node or may be a new value
396 if self.uuid is None:
397 # Create mode - generate new UUID
398 return models.FormKitSchemaNode(pk=uuid4(), node={})
399 try:
400 # Update mode - fetch existing node
401 return models.FormKitSchemaNode.objects.get(pk=self.uuid)
402 except models.FormKitSchemaNode.DoesNotExist:
403 # UUID provided but node doesn't exist - this is an error for updates
404 return None
406 @cached_property
407 def preferred_name(self):
408 """
409 Fetch a suitable name for the database to use.
410 This name must be unique to the 'parent' group, a valid Python id, valid Django id,
411 preferably lowercase.
412 """
413 # If "name" is not provided use the "label" field
414 if self.name is not None:
415 return disambiguate_name(make_name_valid_id(self.name), self.parent_names)
416 elif self.label is not None:
417 return disambiguate_name(make_name_valid_id(self.label), self.parent_names)
418 return make_name_valid_id(f"{uuid4().hex[:8]}_unnamed")
420 class Config:
421 allow_population_by_field_name = True
422 keep_untouched = (cached_property,)
425def create_or_update_child_node(payload: FormKitNodeIn, raw_payload_dict: dict | None = None):
426 """
427 Create or update a child node from API payload.
429 Args:
430 payload: Validated FormKitNodeIn payload (only recognized fields)
431 raw_payload_dict: Optional raw payload dict with all fields including unrecognized ones
432 """
433 parent, parent_errors = payload.parent
434 child = payload.child
436 if parent_errors:
437 return None, parent_errors
439 # If uuid was provided but node doesn't exist, return error
440 if payload.uuid is not None and child is None:
441 return None, ["Node with the provided UUID does not exist"]
443 # If child is None (shouldn't happen after above check, but safety first)
444 if child is None:
445 return None, ["Failed to create or retrieve node"]
447 if child.is_active is False:
448 return None, ["This node has already been deleted and cannot be edited"]
450 values = payload.dict(
451 by_alias=True,
452 exclude_none=True,
453 exclude={"parent_id", "uuid"} | {"parent", "child", "preferred_name", "parent_names"},
454 )
455 # Ensure the name is unique and suitable
456 # Do not replace existing names though
457 # Initialize node dict if it doesn't exist
458 if child.node is None:
459 child.node = {}
460 existing_name = child.node.get("name", None) if isinstance(child.node, dict) else None
461 if existing_name is None:
462 values["name"] = payload.preferred_name
463 child.node.update(values)
464 if payload.additional_props is not None:
465 child.node.update(payload.additional_props)
466 # Also store additional_props in the model field
467 if child.additional_props is None:
468 child.additional_props = {}
469 child.additional_props.update(payload.additional_props)
471 # Extract and preserve unrecognized fields from raw payload
472 if raw_payload_dict is not None:
473 # Get set of recognized fields from FormKitNodeIn schema
474 recognized_fields = set(FormKitNodeIn.__fields__.keys())
475 # Also include alias names
476 for field_name, field_info in FormKitNodeIn.__fields__.items():
477 if hasattr(field_info, "alias") and field_info.alias:
478 recognized_fields.add(field_info.alias)
480 # Fields that are API-specific and should not go to additional_props
481 api_only_fields = {"parent_id", "uuid"}
483 # Extract unrecognized fields (not in schema, not API-only)
484 unrecognized_fields = {
485 k: v
486 for k, v in raw_payload_dict.items()
487 if k not in recognized_fields | api_only_fields and v is not None # Exclude None values
488 }
490 # Store unrecognized fields in additional_props
491 if unrecognized_fields:
492 if child.additional_props is None:
493 child.additional_props = {}
494 # Merge with existing additional_props (don't overwrite if already set)
495 for key, value in unrecognized_fields.items():
496 if key not in child.additional_props:
497 child.additional_props[key] = value
499 child.label = payload.label
501 if isinstance(payload.additional_props, dict):
502 if label := payload.additional_props.get("label"):
503 # groups require a label
504 child.label = label
506 child.node_type = "formkit"
508 with transaction.atomic():
509 child.save()
510 if parent:
511 # Use get_or_create to avoid duplicate relationships
512 models.NodeChildren.objects.get_or_create(
513 parent=parent,
514 child=child,
515 defaults={"order": None}, # Order can be set later if needed
516 )
518 return child, []
521def make_name_valid_id(in_: str):
522 """
523 Take a string. Replace any python-invalid characters with '_'
524 """
525 subbed = re.sub(r"\W|^(?=\d)", "_", in_)
526 while subbed[-1] == "_":
527 subbed = subbed[:-1]
528 return subbed.lower()
531def disambiguate_name(name_in: str, used_names: Sequence[str]):
532 suffix = 1
533 if name_in not in used_names:
534 return name_in
535 while f"{name_in}_{suffix}" in used_names:
536 suffix = suffix + 1
537 return f"{name_in}_{suffix}"
540@router.post(
541 "create_or_update_node",
542 response={
543 HTTPStatus.OK: NodeReturnType,
544 HTTPStatus.NOT_FOUND: FormKitErrors,
545 HTTPStatus.BAD_REQUEST: FormKitErrors,
546 HTTPStatus.FORBIDDEN: FormKitErrors,
547 HTTPStatus.INTERNAL_SERVER_ERROR: FormKitErrors,
548 },
549 exclude_none=True,
550 by_alias=True,
551 auth=formkit_auth,
552)
553def create_or_update_node(request, response: HttpResponse, payload: FormKitNodeIn):
554 """
555 Creates or updates a node in the FormKitSchemaNode model.
557 This function takes payload of type FormKitNodeIn.
558 It fetches the parent node if it exists and checks if it is a group or repeater.
559 If the parent node is not valid or is not a group or repeater, it returns an error response.
560 Otherwise, it proceeds to create or update the node.
562 Args:
563 request: The request object.
564 response (HttpResponse): The HttpResponse object.
565 payload (FormKitNodeIn): The payload containing the data for the node to be created or updated.
567 Returns:
568 HTTPStatus: The status of the HTTP response.
569 FormKitErrors: The errors encountered during the process, if any.
570 """
571 # Authentication is checked by formkit_auth
572 # Check permissions here to return proper 403 status
573 if not request.user.has_perm("formkit_ninja.change_formkitschemanode"):
574 error_response = FormKitErrors()
575 error_response.errors.append("You do not have permission to create or update FormKit schema nodes.")
576 return HTTPStatus.FORBIDDEN, error_response
578 error_response = FormKitErrors()
579 # Update the payload "name"
580 # When label is provided, use the label to generate the name
581 # Fetch parent node, if it exists, and check that it is a group or repeater
583 # Extract unrecognized fields from raw request body
584 raw_payload_dict = None
585 try:
586 if hasattr(request, "body") and request.body:
587 raw_payload_dict = json.loads(request.body)
588 except (json.JSONDecodeError, AttributeError):
589 # If we can't parse the body, continue without extracting unrecognized fields
590 pass
592 try:
593 child, errors = create_or_update_child_node(payload, raw_payload_dict)
594 if errors:
595 # Flatten errors if it's a list
596 if isinstance(errors, list):
597 error_response.errors.extend(errors)
598 else:
599 error_response.errors.append(errors)
601 # Determine appropriate status code based on error type
602 error_text = " ".join(error_response.errors) if error_response.errors else ""
603 if "does not exist" in error_text or "UUID" in error_text:
604 return HTTPStatus.NOT_FOUND, error_response
605 elif "deleted" in error_text or "cannot be edited" in error_text:
606 return HTTPStatus.BAD_REQUEST, error_response
607 else:
608 return HTTPStatus.BAD_REQUEST, error_response
609 except Exception as E:
610 error_response.errors.append(f"{E}")
611 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response
613 if error_response.errors or error_response.field_errors:
614 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response
616 return node_queryset_response(models.FormKitSchemaNode.objects.filter(pk__in=[child.pk]))[0]