Coverage for formkit_ninja / api.py: 0.00%

318 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-03-03 09:21 +0000

1import json 

2import re 

3from functools import cached_property 

4from http import HTTPStatus 

5from typing import Sequence, cast 

6from uuid import UUID, uuid4 

7 

8from django.contrib.auth import get_user 

9from django.db import transaction 

10from django.db.models import F 

11from django.db.models.aggregates import Max 

12from django.http import HttpRequest, HttpResponse 

13from django.shortcuts import get_object_or_404 

14from django.utils.cache import add_never_cache_headers 

15from ninja import Field, ModelSchema, Router, Schema 

16from pydantic import BaseModel, validator 

17 

18from formkit_ninja import formkit_schema, models 

19from formkit_ninja.notifications import get_default_notifier 

20 

21notifier = get_default_notifier() 

22 

23 

24def sentry_message(message: str) -> None: 

25 notifier.notify(message) 

26 

27 

28router = Router(tags=["FormKit"]) 

29 

30 

31def formkit_auth(request: HttpRequest): 

32 """ 

33 Custom authentication function that checks if user is authenticated. 

34 Permissions are checked in the endpoint itself to return proper 403 status. 

35 """ 

36 user = get_user(request) 

37 if not user or not user.is_authenticated: 

38 return None 

39 return user 

40 

41 

42class FormKitSchemaIn(ModelSchema): 

43 class Config: 

44 model = models.FormKitSchema 

45 model_fields = "__all__" 

46 

47 

48class SchemaLabel(ModelSchema): 

49 class Config: 

50 model = models.SchemaLabel 

51 model_fields = ("lang", "label") 

52 

53 

54class SchemaDescription(ModelSchema): 

55 class Config: 

56 model = models.SchemaLabel 

57 model_fields = ("lang", "label") 

58 

59 

60class FormKitSchemaListOut(ModelSchema): 

61 schemalabel_set: list[SchemaLabel] 

62 schemadescription_set: list[SchemaDescription] 

63 

64 class Config: 

65 model = models.FormKitSchema 

66 model_fields = ("id", "label") 

67 

68 

69class FormComponentsOut(ModelSchema): 

70 node_id: UUID 

71 schema_id: UUID 

72 

73 class Config: 

74 model = models.FormComponents 

75 model_fields = ("label",) 

76 

77 

78class NodeChildrenOut(ModelSchema): 

79 children: list[UUID] = [] 

80 latest_change: int | None = None 

81 

82 class Config: 

83 model = models.NodeChildren 

84 model_fields = ("parent",) 

85 

86 

87class NodeReturnType(BaseModel): 

88 key: UUID 

89 last_updated: int 

90 node: formkit_schema.Node 

91 protected: bool 

92 

93 

94class NodeInactiveType(BaseModel): 

95 key: UUID 

96 last_updated: int 

97 is_active: bool = False 

98 protected: bool 

99 

100 

101class NodeStringType(NodeReturnType): 

102 """ 

103 str | formkit_schema.FormKitNode causes openapi generator to fail 

104 """ 

105 

106 node: str # type: ignore[assignment] 

107 

108 

109NodeQSResponse = Sequence[NodeStringType | NodeReturnType | NodeInactiveType] 

110 

111 

112def node_queryset_response(qs: models.NodeQS) -> NodeQSResponse: 

113 responses = [] 

114 n: NodeStringType | NodeInactiveType | NodeReturnType 

115 for key, last_updated, node_val, protected in qs.to_response(ignore_errors=False): 

116 if last_updated is None: 

117 last_updated = -1 

118 if isinstance(node_val, str): 

119 n = NodeStringType(key=key, last_updated=last_updated, protected=protected, node=node_val) 

120 elif node_val is None: 

121 n = NodeInactiveType(key=key, last_updated=last_updated, protected=protected, is_active=False) 

122 else: 

123 n = NodeReturnType(key=key, last_updated=last_updated, protected=protected, node=node_val) # type: ignore[arg-type] 

124 responses.append(n) 

125 return responses 

126 

127 

128class Option(ModelSchema): 

129 group_name: str # This is annotation of the model `content_type_model` 

130 value: str 

131 # Note: For other projects you may want to extend this with additional languages 

132 label_tet: str | None 

133 label_en: str | None 

134 label_pt: str | None 

135 # This is an optional field used to indicate the last update 

136 # It's linked to a Django pg trigger instance in Partisipa 

137 change_id: int | None = None 

138 

139 class Config: 

140 model = models.Option 

141 model_fields = ("value",) 

142 

143 

144@router.get("list-schemas", response=list[FormKitSchemaListOut]) 

145def get_list_schemas(request): 

146 return models.FormKitSchema.objects.all() 

147 

148 

149@router.get("list-nodes", response=NodeQSResponse, by_alias=True, exclude_none=True) 

150def get_formkit_nodes(request: HttpRequest, response: HttpResponse, latest_change: int | None = -1): 

151 """ 

152 Get all of the FormKit nodes in the database 

153 """ 

154 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects) 

155 nodes = objects.from_change(latest_change or -1) 

156 lc = nodes.aggregate(_=Max("track_change"))["_"] 

157 response["latest_change"] = lc if lc is not None else (latest_change or -1) 

158 add_never_cache_headers(response) 

159 return node_queryset_response(nodes) 

160 

161 

162@router.get( 

163 "list-related-nodes", 

164 response=list[NodeChildrenOut], 

165 exclude_defaults=True, 

166 exclude_none=True, 

167) 

168def get_related_nodes(request, response: HttpResponse, latest_change: int | None = -1): 

169 """ 

170 Get all of the FormKit node relationships in the database 

171 """ 

172 add_never_cache_headers(response) 

173 objects: models.NodeChildrenManager = models.NodeChildren.objects 

174 return objects.aggregate_changes_table(latest_change=latest_change) 

175 

176 

177@router.get( 

178 "list-components", 

179 response=list[FormComponentsOut], 

180 exclude_defaults=True, 

181 exclude_none=True, 

182 by_alias=True, 

183) 

184def get_components(request): 

185 values = models.FormComponents.objects.all() 

186 return values 

187 

188 

189@router.get( 

190 "schema/by-uuid/{schema_id}", 

191 response=formkit_schema.FormKitSchema, 

192 exclude_none=True, 

193 by_alias=True, 

194) 

195def get_schemas(request, schema_id: UUID): 

196 """ 

197 Get a schema based on its UUID 

198 """ 

199 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, id=schema_id) 

200 model = schema.to_pydantic() 

201 return model 

202 

203 

204@router.get( 

205 "schema/all", 

206 response=list[formkit_schema.FormKitSchema], 

207 exclude_none=True, 

208 by_alias=True, 

209) 

210def get_all_schemas(request): 

211 """ 

212 Get all schemas 

213 """ 

214 schemas = models.FormKitSchema.objects.all() 

215 model = [s.to_pydantic() for s in schemas] 

216 return model 

217 

218 

219@router.get( 

220 "schema/by-label/{label}", 

221 response=formkit_schema.FormKitSchema, 

222 exclude_none=True, 

223 by_alias=True, 

224) 

225def get_schema_by_label(request, label: str): 

226 """ 

227 Get a schema based on its label 

228 """ 

229 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, label=label) 

230 model = schema.to_pydantic() 

231 return model 

232 

233 

234@router.get( 

235 "node/{node_id}", 

236 response=formkit_schema.FormKitNode, 

237 exclude_none=True, 

238 by_alias=True, 

239) 

240def get_node(request, node_id: UUID): 

241 """ 

242 Gets a node based on its UUID 

243 """ 

244 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id) 

245 instance = node.get_node() 

246 return instance 

247 

248 

249@router.get("/options", response=list[Option], exclude_none=True) 

250def list_options(request: HttpRequest, response: HttpResponse): 

251 """ 

252 List all available "native" FormKit ninja labels and links 

253 """ 

254 return models.Option.objects.annotate(group_name=F("group__group")) 

255 

256 

257class FormKitErrors(BaseModel): 

258 errors: list[str] = [] 

259 field_errors: dict[str, str] = {} 

260 

261 

262@router.delete( 

263 "delete/{node_id}", 

264 response={ 

265 HTTPStatus.OK: NodeInactiveType, 

266 HTTPStatus.FORBIDDEN: FormKitErrors, 

267 HTTPStatus.NOT_FOUND: FormKitErrors, 

268 }, 

269 exclude_none=True, 

270 by_alias=True, 

271 auth=formkit_auth, 

272) 

273def delete_node(request, node_id: UUID): 

274 """ 

275 Delete a node based on its UUID 

276 """ 

277 # Authentication is checked by formkit_auth 

278 # Check permissions here to return proper 403 status 

279 if not request.user.has_perm("formkit_ninja.change_formkitschemanode"): 

280 error_response = FormKitErrors() 

281 error_response.errors.append("You do not have permission to delete FormKit schema nodes.") 

282 return HTTPStatus.FORBIDDEN, error_response 

283 try: 

284 with transaction.atomic(): 

285 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id) 

286 node.delete() 

287 # node.refresh_from_db() 

288 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects) 

289 return node_queryset_response(objects.filter(pk=node_id))[0] 

290 except Exception as e: 

291 # Handle protected node deletion or other database errors 

292 error_response = FormKitErrors() 

293 error_msg = str(e) 

294 if "protected" in error_msg.lower() or "cannot delete" in error_msg.lower(): 

295 error_response.errors.append("This node is protected and cannot be deleted.") 

296 return HTTPStatus.FORBIDDEN, error_response 

297 # Re-raise other exceptions 

298 raise 

299 

300 

301class FormKitNodeIn(Schema): 

302 """ 

303 Creates a new FormKit text or number node 

304 We'd like to use `formkit_schema.FormKitSchemaFormKit` 

305 here but that `discriminated node` stuff makes it hard 

306 """ 

307 

308 formkit: str = Field(default="text", alias="$formkit") 

309 label: str | None = None 

310 key: str | None = None 

311 name: str | None = None 

312 placeholder: str | None = None 

313 help: str | None = None 

314 value: str | None = None # Default value for fields (especially hidden fields) 

315 

316 # Fields from "number" 

317 max: int | str | None = None 

318 min: int | str | None = None 

319 step: str | None = None 

320 

321 # Field from dropdown/select/autocomplete/radio/checkbox 

322 options: str | None = None 

323 

324 # Repeater-specific properties 

325 addLabel: str | None = None 

326 itemClass: str | None = None 

327 itemsClass: str | None = None 

328 upControl: bool | None = None 

329 downControl: bool | None = None 

330 

331 # Conditional logic 

332 if_condition: str | None = Field(default=None, alias="if") 

333 

334 # Validation 

335 validationRules: str | None = None 

336 validation: str | list[str] | None = None 

337 

338 # Field Constraints 

339 maxLength: int | None = None 

340 _minDateSource: str | None = None 

341 _maxDateSource: str | None = None 

342 disabledDays: str | None = None 

343 

344 # Used for Creates 

345 parent_id: UUID | None = None 

346 

347 # Used for Updates - optional for creates, required for updates 

348 uuid: UUID | None = None 

349 

350 # Used for "Add Group" 

351 # This should include an `icon`, `title` and `id` for the second level group 

352 additional_props: dict[str, str | int] | None = None 

353 

354 @validator("formkit") 

355 def validate_formkit_type(cls, v): 

356 """Validate that the formkit type is a valid FormKit type""" 

357 from typing import get_args 

358 

359 valid_types = get_args(formkit_schema.FORMKIT_TYPE) 

360 if v not in valid_types: 

361 raise ValueError(f"Invalid FormKit type: {v}. Valid types are: {', '.join(valid_types)}") 

362 return v 

363 

364 @cached_property 

365 def parent(self): 

366 if self.parent_id is None: 

367 return None, None 

368 try: 

369 parent = models.FormKitSchemaNode.objects.get(pk=self.parent_id) 

370 except models.FormKitSchemaNode.DoesNotExist: 

371 return None, ["The parent node given does not exist"] 

372 if parent.node.get("$formkit") not in {"group", "repeater"}: 

373 return None, ["The parent node given is not a group or repeater"] 

374 if parent is None: 

375 return None, ["The parent node given does not exist"] 

376 return parent, None 

377 

378 @cached_property 

379 def parent_names(self) -> set[str]: 

380 """ 

381 Return the names of parent nodes' child nodes. 

382 The saved child node must not use any of these names. 

383 """ 

384 parent, parent_errors = self.parent 

385 if self.parent[0] and self.child: 

386 # Ensures that names are not "overwritten" 

387 return set(parent.children.exclude(pk=self.child.pk).values_list("node__name", flat=True)) 

388 elif self.parent[0]: 

389 return set(parent.children.values_list("node__name", flat=True)) 

390 else: 

391 return set() 

392 

393 @cached_property 

394 def child(self): 

395 # The uuid may belong to a node or may be a new value 

396 if self.uuid is None: 

397 # Create mode - generate new UUID 

398 return models.FormKitSchemaNode(pk=uuid4(), node={}) 

399 try: 

400 # Update mode - fetch existing node 

401 return models.FormKitSchemaNode.objects.get(pk=self.uuid) 

402 except models.FormKitSchemaNode.DoesNotExist: 

403 # UUID provided but node doesn't exist - this is an error for updates 

404 return None 

405 

406 @cached_property 

407 def preferred_name(self): 

408 """ 

409 Fetch a suitable name for the database to use. 

410 This name must be unique to the 'parent' group, a valid Python id, valid Django id, 

411 preferably lowercase. 

412 """ 

413 # If "name" is not provided use the "label" field 

414 if self.name is not None: 

415 return disambiguate_name(make_name_valid_id(self.name), self.parent_names) 

416 elif self.label is not None: 

417 return disambiguate_name(make_name_valid_id(self.label), self.parent_names) 

418 return make_name_valid_id(f"{uuid4().hex[:8]}_unnamed") 

419 

420 class Config: 

421 allow_population_by_field_name = True 

422 keep_untouched = (cached_property,) 

423 

424 

425def create_or_update_child_node(payload: FormKitNodeIn, raw_payload_dict: dict | None = None): 

426 """ 

427 Create or update a child node from API payload. 

428 

429 Args: 

430 payload: Validated FormKitNodeIn payload (only recognized fields) 

431 raw_payload_dict: Optional raw payload dict with all fields including unrecognized ones 

432 """ 

433 parent, parent_errors = payload.parent 

434 child = payload.child 

435 

436 if parent_errors: 

437 return None, parent_errors 

438 

439 # If uuid was provided but node doesn't exist, return error 

440 if payload.uuid is not None and child is None: 

441 return None, ["Node with the provided UUID does not exist"] 

442 

443 # If child is None (shouldn't happen after above check, but safety first) 

444 if child is None: 

445 return None, ["Failed to create or retrieve node"] 

446 

447 if child.is_active is False: 

448 return None, ["This node has already been deleted and cannot be edited"] 

449 

450 values = payload.dict( 

451 by_alias=True, 

452 exclude_none=True, 

453 exclude={"parent_id", "uuid"} | {"parent", "child", "preferred_name", "parent_names"}, 

454 ) 

455 # Ensure the name is unique and suitable 

456 # Do not replace existing names though 

457 # Initialize node dict if it doesn't exist 

458 if child.node is None: 

459 child.node = {} 

460 existing_name = child.node.get("name", None) if isinstance(child.node, dict) else None 

461 if existing_name is None: 

462 values["name"] = payload.preferred_name 

463 child.node.update(values) 

464 if payload.additional_props is not None: 

465 child.node.update(payload.additional_props) 

466 # Also store additional_props in the model field 

467 if child.additional_props is None: 

468 child.additional_props = {} 

469 child.additional_props.update(payload.additional_props) 

470 

471 # Extract and preserve unrecognized fields from raw payload 

472 if raw_payload_dict is not None: 

473 # Get set of recognized fields from FormKitNodeIn schema 

474 recognized_fields = set(FormKitNodeIn.__fields__.keys()) 

475 # Also include alias names 

476 for field_name, field_info in FormKitNodeIn.__fields__.items(): 

477 if hasattr(field_info, "alias") and field_info.alias: 

478 recognized_fields.add(field_info.alias) 

479 

480 # Fields that are API-specific and should not go to additional_props 

481 api_only_fields = {"parent_id", "uuid"} 

482 

483 # Extract unrecognized fields (not in schema, not API-only) 

484 unrecognized_fields = { 

485 k: v 

486 for k, v in raw_payload_dict.items() 

487 if k not in recognized_fields | api_only_fields and v is not None # Exclude None values 

488 } 

489 

490 # Store unrecognized fields in additional_props 

491 if unrecognized_fields: 

492 if child.additional_props is None: 

493 child.additional_props = {} 

494 # Merge with existing additional_props (don't overwrite if already set) 

495 for key, value in unrecognized_fields.items(): 

496 if key not in child.additional_props: 

497 child.additional_props[key] = value 

498 

499 child.label = payload.label 

500 

501 if isinstance(payload.additional_props, dict): 

502 if label := payload.additional_props.get("label"): 

503 # groups require a label 

504 child.label = label 

505 

506 child.node_type = "formkit" 

507 

508 with transaction.atomic(): 

509 child.save() 

510 if parent: 

511 # Use get_or_create to avoid duplicate relationships 

512 models.NodeChildren.objects.get_or_create( 

513 parent=parent, 

514 child=child, 

515 defaults={"order": None}, # Order can be set later if needed 

516 ) 

517 

518 return child, [] 

519 

520 

521def make_name_valid_id(in_: str): 

522 """ 

523 Take a string. Replace any python-invalid characters with '_' 

524 """ 

525 subbed = re.sub(r"\W|^(?=\d)", "_", in_) 

526 while subbed[-1] == "_": 

527 subbed = subbed[:-1] 

528 return subbed.lower() 

529 

530 

531def disambiguate_name(name_in: str, used_names: Sequence[str]): 

532 suffix = 1 

533 if name_in not in used_names: 

534 return name_in 

535 while f"{name_in}_{suffix}" in used_names: 

536 suffix = suffix + 1 

537 return f"{name_in}_{suffix}" 

538 

539 

540@router.post( 

541 "create_or_update_node", 

542 response={ 

543 HTTPStatus.OK: NodeReturnType, 

544 HTTPStatus.NOT_FOUND: FormKitErrors, 

545 HTTPStatus.BAD_REQUEST: FormKitErrors, 

546 HTTPStatus.FORBIDDEN: FormKitErrors, 

547 HTTPStatus.INTERNAL_SERVER_ERROR: FormKitErrors, 

548 }, 

549 exclude_none=True, 

550 by_alias=True, 

551 auth=formkit_auth, 

552) 

553def create_or_update_node(request, response: HttpResponse, payload: FormKitNodeIn): 

554 """ 

555 Creates or updates a node in the FormKitSchemaNode model. 

556 

557 This function takes payload of type FormKitNodeIn. 

558 It fetches the parent node if it exists and checks if it is a group or repeater. 

559 If the parent node is not valid or is not a group or repeater, it returns an error response. 

560 Otherwise, it proceeds to create or update the node. 

561 

562 Args: 

563 request: The request object. 

564 response (HttpResponse): The HttpResponse object. 

565 payload (FormKitNodeIn): The payload containing the data for the node to be created or updated. 

566 

567 Returns: 

568 HTTPStatus: The status of the HTTP response. 

569 FormKitErrors: The errors encountered during the process, if any. 

570 """ 

571 # Authentication is checked by formkit_auth 

572 # Check permissions here to return proper 403 status 

573 if not request.user.has_perm("formkit_ninja.change_formkitschemanode"): 

574 error_response = FormKitErrors() 

575 error_response.errors.append("You do not have permission to create or update FormKit schema nodes.") 

576 return HTTPStatus.FORBIDDEN, error_response 

577 

578 error_response = FormKitErrors() 

579 # Update the payload "name" 

580 # When label is provided, use the label to generate the name 

581 # Fetch parent node, if it exists, and check that it is a group or repeater 

582 

583 # Extract unrecognized fields from raw request body 

584 raw_payload_dict = None 

585 try: 

586 if hasattr(request, "body") and request.body: 

587 raw_payload_dict = json.loads(request.body) 

588 except (json.JSONDecodeError, AttributeError): 

589 # If we can't parse the body, continue without extracting unrecognized fields 

590 pass 

591 

592 try: 

593 child, errors = create_or_update_child_node(payload, raw_payload_dict) 

594 if errors: 

595 # Flatten errors if it's a list 

596 if isinstance(errors, list): 

597 error_response.errors.extend(errors) 

598 else: 

599 error_response.errors.append(errors) 

600 

601 # Determine appropriate status code based on error type 

602 error_text = " ".join(error_response.errors) if error_response.errors else "" 

603 if "does not exist" in error_text or "UUID" in error_text: 

604 return HTTPStatus.NOT_FOUND, error_response 

605 elif "deleted" in error_text or "cannot be edited" in error_text: 

606 return HTTPStatus.BAD_REQUEST, error_response 

607 else: 

608 return HTTPStatus.BAD_REQUEST, error_response 

609 except Exception as E: 

610 error_response.errors.append(f"{E}") 

611 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response 

612 

613 if error_response.errors or error_response.field_errors: 

614 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response 

615 

616 return node_queryset_response(models.FormKitSchemaNode.objects.filter(pk__in=[child.pk]))[0]