Coverage for formkit_ninja / api.py: 46%

260 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-23 06:00 +0000

1import importlib 

2import re 

3from functools import cached_property 

4from http import HTTPStatus 

5from importlib.util import find_spec 

6from types import ModuleType 

7from typing import Sequence, cast 

8from uuid import UUID, uuid4 

9 

10from django.db import transaction 

11from django.db.models import F 

12from django.db.models.aggregates import Max 

13from django.http import HttpRequest, HttpResponse 

14from django.shortcuts import get_object_or_404 

15from django.utils.cache import add_never_cache_headers 

16from ninja import Field, ModelSchema, Router, Schema 

17from pydantic import BaseModel, validator 

18 

19from formkit_ninja import formkit_schema, models 

20 

21if find_spec("sentry_sdk"): 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true

22 sentry_sdk: ModuleType | None = importlib.import_module("sentry_sdk") 

23else: 

24 sentry_sdk = None 

25 

26 

27def sentry_message(message: str): 

28 if sentry_sdk and hasattr(sentry_sdk, "capture_message"): 

29 sentry_sdk.capture_message(f"{message}") 

30 

31 

32router = Router(tags=["FormKit"]) 

33 

34 

35class FormKitSchemaIn(ModelSchema): 

36 class Config: 

37 model = models.FormKitSchema 

38 model_fields = "__all__" 

39 

40 

41class SchemaLabel(ModelSchema): 

42 class Config: 

43 model = models.SchemaLabel 

44 model_fields = ("lang", "label") 

45 

46 

47class SchemaDescription(ModelSchema): 

48 class Config: 

49 model = models.SchemaLabel 

50 model_fields = ("lang", "label") 

51 

52 

53class FormKitSchemaListOut(ModelSchema): 

54 schemalabel_set: list[SchemaLabel] 

55 schemadescription_set: list[SchemaDescription] 

56 

57 class Config: 

58 model = models.FormKitSchema 

59 model_fields = ("id", "label") 

60 

61 

62class FormComponentsOut(ModelSchema): 

63 node_id: UUID 

64 schema_id: UUID 

65 

66 class Config: 

67 model = models.FormComponents 

68 model_fields = ("label",) 

69 

70 

71class NodeChildrenOut(ModelSchema): 

72 children: list[UUID] = [] 

73 latest_change: int | None = None 

74 

75 class Config: 

76 model = models.NodeChildren 

77 model_fields = ("parent",) 

78 

79 

80class NodeReturnType(BaseModel): 

81 key: UUID 

82 last_updated: int 

83 node: formkit_schema.Node 

84 protected: bool 

85 

86 

87class NodeInactiveType(BaseModel): 

88 key: UUID 

89 last_updated: int 

90 is_active: bool = False 

91 protected: bool 

92 

93 

94class NodeStringType(NodeReturnType): 

95 """ 

96 str | formkit_schema.FormKitNode causes openapi generator to fail 

97 """ 

98 

99 node: str # type: ignore[assignment] 

100 

101 

102NodeQSResponse = Sequence[NodeStringType | NodeReturnType | NodeInactiveType] 

103 

104 

105def node_queryset_response(qs: models.NodeQS) -> NodeQSResponse: 

106 responses = [] 

107 n: NodeStringType | NodeInactiveType | NodeReturnType 

108 for key, last_updated, node_val, protected in qs.to_response(ignore_errors=False): 

109 if last_updated is None: 

110 last_updated = -1 

111 if isinstance(node_val, str): 

112 n = NodeStringType(key=key, last_updated=last_updated, protected=protected, node=node_val) 

113 elif node_val is None: 

114 n = NodeInactiveType(key=key, last_updated=last_updated, protected=protected, is_active=False) 

115 else: 

116 n = NodeReturnType(key=key, last_updated=last_updated, protected=protected, node=node_val) # type: ignore[arg-type] 

117 responses.append(n) 

118 return responses 

119 

120 

121class Option(ModelSchema): 

122 group_name: str # This is annotation of the model `content_type_model` 

123 value: str 

124 # Note: For other projects you may want to extend this with additional languages 

125 label_tet: str | None 

126 label_en: str | None 

127 label_pt: str | None 

128 # This is an optional field used to indicate the last update 

129 # It's linked to a Django pg trigger instance in Partisipa 

130 change_id: int | None = None 

131 

132 class Config: 

133 model = models.Option 

134 model_fields = ("value",) 

135 

136 

137@router.get("list-schemas", response=list[FormKitSchemaListOut]) 

138def get_list_schemas(request): 

139 return models.FormKitSchema.objects.all() 

140 

141 

142@router.get("list-nodes", response=NodeQSResponse, by_alias=True, exclude_none=True) 

143def get_formkit_nodes(request: HttpRequest, response: HttpResponse, latest_change: int | None = -1): 

144 """ 

145 Get all of the FormKit nodes in the database 

146 """ 

147 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects) 

148 nodes = objects.from_change(latest_change or -1) 

149 lc = nodes.aggregate(_=Max("track_change"))["_"] 

150 response["latest_change"] = lc if lc is not None else (latest_change or -1) 

151 add_never_cache_headers(response) 

152 return node_queryset_response(nodes) 

153 

154 

155@router.get( 

156 "list-related-nodes", 

157 response=list[NodeChildrenOut], 

158 exclude_defaults=True, 

159 exclude_none=True, 

160) 

161def get_related_nodes(request, response: HttpResponse, latest_change: int | None = -1): 

162 """ 

163 Get all of the FormKit node relationships in the database 

164 """ 

165 add_never_cache_headers(response) 

166 objects: models.NodeChildrenManager = models.NodeChildren.objects 

167 return objects.aggregate_changes_table(latest_change=latest_change) 

168 

169 

170@router.get( 

171 "list-components", 

172 response=list[FormComponentsOut], 

173 exclude_defaults=True, 

174 exclude_none=True, 

175 by_alias=True, 

176) 

177def get_components(request): 

178 values = models.FormComponents.objects.all() 

179 return values 

180 

181 

182@router.get( 

183 "schema/by-uuid/{schema_id}", 

184 response=formkit_schema.FormKitSchema, 

185 exclude_none=True, 

186 by_alias=True, 

187) 

188def get_schemas(request, schema_id: UUID): 

189 """ 

190 Get a schema based on its UUID 

191 """ 

192 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, id=schema_id) 

193 model = schema.to_pydantic() 

194 return model 

195 

196 

197@router.get( 

198 "schema/all", 

199 response=list[formkit_schema.FormKitSchema], 

200 exclude_none=True, 

201 by_alias=True, 

202) 

203def get_all_schemas(request): 

204 """ 

205 Get all schemas 

206 """ 

207 schemas = models.FormKitSchema.objects.all() 

208 model = [s.to_pydantic() for s in schemas] 

209 return model 

210 

211 

212@router.get( 

213 "schema/by-label/{label}", 

214 response=formkit_schema.FormKitSchema, 

215 exclude_none=True, 

216 by_alias=True, 

217) 

218def get_schema_by_label(request, label: str): 

219 """ 

220 Get a schema based on its label 

221 """ 

222 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, label=label) 

223 model = schema.to_pydantic() 

224 return model 

225 

226 

227@router.get( 

228 "node/{node_id}", 

229 response=formkit_schema.FormKitNode, 

230 exclude_none=True, 

231 by_alias=True, 

232) 

233def get_node(request, node_id: UUID): 

234 """ 

235 Gets a node based on its UUID 

236 """ 

237 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id) 

238 instance = node.get_node() 

239 return instance 

240 

241 

242@router.get("/options", response=list[Option], exclude_none=True) 

243def list_options(request: HttpRequest, response: HttpResponse): 

244 """ 

245 List all available "native" FormKit ninja labels and links 

246 """ 

247 return models.Option.objects.annotate(group_name=F("group__group")) 

248 

249 

250class FormKitErrors(BaseModel): 

251 errors: list[str] = [] 

252 field_errors: dict[str, str] = {} 

253 

254 

255@router.delete("delete", response=NodeInactiveType, exclude_none=True, by_alias=True) 

256def delete_node(request, node_id: UUID): 

257 """ 

258 Delete a node based on its UUID 

259 """ 

260 with transaction.atomic(): 

261 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id) 

262 node.delete() 

263 # node.refresh_from_db() 

264 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects) 

265 return node_queryset_response(objects.filter(pk=node_id))[0] 

266 

267 

268class FormKitNodeIn(Schema): 

269 """ 

270 Creates a new FormKit text or number node 

271 We'd like to use `formkit_schema.FormKitSchemaFormKit` 

272 here but that `discriminated node` stuff makes it hard 

273 """ 

274 

275 formkit: str = Field(default="text", alias="$formkit") 

276 label: str | None = None 

277 key: str | None = None 

278 name: str | None = None 

279 placeholder: str | None = None 

280 help: str | None = None 

281 

282 # Fields from "number" 

283 max: int | str | None = None 

284 min: int | str | None = None 

285 step: str | None = None 

286 

287 # Field from dropdown/select/autocomplete/radio/checkbox 

288 options: str | None = None 

289 

290 # Repeater-specific properties 

291 addLabel: str | None = None 

292 itemClass: str | None = None 

293 itemsClass: str | None = None 

294 upControl: bool | None = None 

295 downControl: bool | None = None 

296 

297 # Conditional logic 

298 if_condition: str | None = Field(default=None, alias="if") 

299 

300 # Validation 

301 validationRules: str | None = None 

302 validation: str | list[str] | None = None 

303 

304 # Field Constraints 

305 maxLength: int | None = None 

306 _minDateSource: str | None = None 

307 _maxDateSource: str | None = None 

308 disabledDays: str | None = None 

309 

310 # Used for Creates 

311 parent_id: UUID | None = None 

312 

313 # Used for Updates 

314 uuid: UUID = Field(default_factory=uuid4) 

315 

316 # Used for "Add Group" 

317 # This should include an `icon`, `title` and `id` for the second level group 

318 additional_props: dict[str, str | int] | None = None 

319 

320 @validator("formkit") 

321 def validate_formkit_type(cls, v): 

322 """Validate that the formkit type is a valid FormKit type""" 

323 from typing import get_args 

324 

325 valid_types = get_args(formkit_schema.FORMKIT_TYPE) 

326 if v not in valid_types: 

327 raise ValueError(f"Invalid FormKit type: {v}. Valid types are: {', '.join(valid_types)}") 

328 return v 

329 

330 @cached_property 

331 def parent(self): 

332 if self.parent_id is None: 

333 return None, None 

334 try: 

335 parent = models.FormKitSchemaNode.objects.get(pk=self.parent_id) 

336 except models.FormKitSchemaNode.DoesNotExist: 

337 return None, ["The parent node given does not exist"] 

338 if parent.node.get("$formkit") not in {"group", "repeater"}: 

339 return None, ["The parent node given is not a group or repeater"] 

340 if parent is None: 

341 return None, ["The parent node given does not exist"] 

342 return parent, None 

343 

344 @cached_property 

345 def parent_names(self) -> set[str]: 

346 """ 

347 Return the names of parent nodes' child nodes. 

348 The saved child node must not use any of these names. 

349 """ 

350 parent, parent_errors = self.parent 

351 if self.parent[0] and self.child: 

352 # Ensures that names are not "overwritten" 

353 return set(parent.children.exclude(pk=self.child.pk).values_list("node__name", flat=True)) 

354 elif self.parent[0]: 

355 return set(parent.children.values_list("node__name", flat=True)) 

356 else: 

357 return set() 

358 

359 @cached_property 

360 def child(self): 

361 # The uuid may belong to a node or may be a new value 

362 try: 

363 return models.FormKitSchemaNode.objects.get(pk=self.uuid) 

364 except models.FormKitSchemaNode.DoesNotExist: 

365 return models.FormKitSchemaNode(pk=self.uuid, node={}) 

366 

367 @cached_property 

368 def preferred_name(self): 

369 """ 

370 Fetch a suitable name for the database to use. 

371 This name must be unique to the 'parent' group, a valid Python id, valid Django id, 

372 preferably lowercase. 

373 """ 

374 # If "name" is not provided use the "label" field 

375 if self.name is not None: 

376 return disambiguate_name(make_name_valid_id(self.name), self.parent_names) 

377 elif self.label is not None: 

378 return disambiguate_name(make_name_valid_id(self.label), self.parent_names) 

379 return make_name_valid_id(f"{uuid4().hex[:8]}_unnamed") 

380 

381 class Config: 

382 allow_population_by_field_name = True 

383 keep_untouched = (cached_property,) 

384 

385 

386def create_or_update_child_node(payload: FormKitNodeIn): 

387 parent, parent_errors = payload.parent 

388 child = payload.child 

389 

390 if parent_errors: 

391 return None, parent_errors 

392 if child.is_active is False: 

393 return None, ["This node has already been deleted and cannot be edited"] 

394 

395 values = payload.dict( 

396 by_alias=True, 

397 exclude_none=True, 

398 exclude={"parent_id", "uuid"} | {"parent", "child", "preferred_name", "parent_names"}, 

399 ) 

400 # Ensure the name is unique and suitable 

401 # Do not replace existing names though 

402 existing_name = child.node.get("name", None) if isinstance(child.node, dict) else None 

403 if existing_name is None: 

404 values["name"] = payload.preferred_name 

405 child.node.update(values) 

406 if payload.additional_props is not None: 

407 child.node.update(payload.additional_props) 

408 

409 child.label = payload.label 

410 

411 if isinstance(payload.additional_props, dict): 

412 if label := payload.additional_props.get("label"): 

413 # groups require a label 

414 child.label = label 

415 

416 child.node_type = "formkit" 

417 

418 with transaction.atomic(): 

419 child.save() 

420 if parent: 

421 models.NodeChildren.objects.create(parent=parent, child=child) 

422 

423 return child, [] 

424 

425 

426def make_name_valid_id(in_: str): 

427 """ 

428 Take a string. Replace any python-invalid characters with '_' 

429 """ 

430 subbed = re.sub(r"\W|^(?=\d)", "_", in_) 

431 while subbed[-1] == "_": 

432 subbed = subbed[:-1] 

433 return subbed.lower() 

434 

435 

436def disambiguate_name(name_in: str, used_names: Sequence[str]): 

437 suffix = 1 

438 if name_in not in used_names: 

439 return name_in 

440 while f"{name_in}_{suffix}" in used_names: 

441 suffix = suffix + 1 

442 return f"{name_in}_{suffix}" 

443 

444 

445@router.post( 

446 "create_or_update_node", 

447 response={ 

448 HTTPStatus.OK: NodeReturnType, 

449 HTTPStatus.INTERNAL_SERVER_ERROR: FormKitErrors, 

450 }, 

451 exclude_none=True, 

452 by_alias=True, 

453) 

454def create_or_update_node(request, response: HttpResponse, payload: FormKitNodeIn): 

455 """ 

456 Creates or updates a node in the FormKitSchemaNode model. 

457 

458 This function takes payload of type FormKitNodeIn. 

459 It fetches the parent node if it exists and checks if it is a group or repeater. 

460 If the parent node is not valid or is not a group or repeater, it returns an error response. 

461 Otherwise, it proceeds to create or update the node. 

462 

463 Args: 

464 request: The request object. 

465 response (HttpResponse): The HttpResponse object. 

466 payload (FormKitNodeIn): The payload containing the data for the node to be created or updated. 

467 

468 Returns: 

469 HTTPStatus: The status of the HTTP response. 

470 FormKitErrors: The errors encountered during the process, if any. 

471 """ 

472 

473 error_response = FormKitErrors() 

474 # Update the payload "name" 

475 # When label is provided, use the label to generate the name 

476 # Fetch parent node, if it exists, and check that it is a group or repeater 

477 

478 try: 

479 child, errors = create_or_update_child_node(payload) 

480 if errors: 

481 error_response.errors.append(errors) 

482 except Exception as E: 

483 error_response.errors.append(f"{E}") 

484 

485 if error_response.errors or error_response.field_errors: 

486 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response 

487 

488 return node_queryset_response(models.FormKitSchemaNode.objects.filter(pk__in=[child.pk]))[0]