Coverage for api.py: 0%

253 statements  

« prev     ^ index     » next       coverage.py v7.13.1, created at 2026-01-22 07:15 +0000

1import importlib 

2import re 

3from functools import cached_property 

4from http import HTTPStatus 

5from importlib.util import find_spec 

6from types import ModuleType 

7from typing import Sequence, cast 

8from uuid import UUID, uuid4 

9 

10from django.db import transaction 

11from django.db.models import F 

12from django.db.models.aggregates import Max 

13from django.http import HttpRequest, HttpResponse 

14from django.shortcuts import get_object_or_404 

15from django.utils.cache import add_never_cache_headers 

16from ninja import Field, ModelSchema, Router, Schema 

17from pydantic import BaseModel 

18 

19from formkit_ninja import formkit_schema, models 

20 

21if find_spec("sentry_sdk"): 

22 sentry_sdk: ModuleType | None = importlib.import_module("sentry_sdk") 

23else: 

24 sentry_sdk = None 

25 

26 

27def sentry_message(message: str): 

28 if sentry_sdk and hasattr(sentry_sdk, "capture_message"): 

29 sentry_sdk.capture_message(f"{message}") 

30 

31 

32router = Router(tags=["FormKit"]) 

33 

34 

35class FormKitSchemaIn(ModelSchema): 

36 class Config: 

37 model = models.FormKitSchema 

38 model_fields = "__all__" 

39 

40 

41class SchemaLabel(ModelSchema): 

42 class Config: 

43 model = models.SchemaLabel 

44 model_fields = ("lang", "label") 

45 

46 

47class SchemaDescription(ModelSchema): 

48 class Config: 

49 model = models.SchemaLabel 

50 model_fields = ("lang", "label") 

51 

52 

53class FormKitSchemaListOut(ModelSchema): 

54 schemalabel_set: list[SchemaLabel] 

55 schemadescription_set: list[SchemaDescription] 

56 

57 class Config: 

58 model = models.FormKitSchema 

59 model_fields = ("id", "label") 

60 

61 

62class FormComponentsOut(ModelSchema): 

63 node_id: UUID 

64 schema_id: UUID 

65 

66 class Config: 

67 model = models.FormComponents 

68 model_fields = ("label",) 

69 

70 

71class NodeChildrenOut(ModelSchema): 

72 children: list[UUID] = [] 

73 latest_change: int | None = None 

74 

75 class Config: 

76 model = models.NodeChildren 

77 model_fields = ("parent",) 

78 

79 

80class NodeReturnType(BaseModel): 

81 key: UUID 

82 last_updated: int 

83 node: formkit_schema.Node 

84 protected: bool 

85 

86 

87class NodeInactiveType(BaseModel): 

88 key: UUID 

89 last_updated: int 

90 is_active: bool = False 

91 protected: bool 

92 

93 

94class NodeStringType(NodeReturnType): 

95 """ 

96 str | formkit_schema.FormKitNode causes openapi generator to fail 

97 """ 

98 

99 node: str # type: ignore[assignment] 

100 

101 

102NodeQSResponse = Sequence[NodeStringType | NodeReturnType | NodeInactiveType] 

103 

104 

105def node_queryset_response(qs: models.NodeQS) -> NodeQSResponse: 

106 responses = [] 

107 n: NodeStringType | NodeInactiveType | NodeReturnType 

108 for key, last_updated, node_val, protected in qs.to_response(ignore_errors=False): 

109 if last_updated is None: 

110 last_updated = -1 

111 if isinstance(node_val, str): 

112 n = NodeStringType(key=key, last_updated=last_updated, protected=protected, node=node_val) 

113 elif node_val is None: 

114 n = NodeInactiveType(key=key, last_updated=last_updated, protected=protected, is_active=False) 

115 else: 

116 n = NodeReturnType(key=key, last_updated=last_updated, protected=protected, node=node_val) # type: ignore[arg-type] 

117 responses.append(n) 

118 return responses 

119 

120 

121class Option(ModelSchema): 

122 group_name: str # This is annotation of the model `content_type_model` 

123 value: str 

124 # Note: For other projects you may want to extend this with additional languages 

125 label_tet: str | None 

126 label_en: str | None 

127 label_pt: str | None 

128 # This is an optional field used to indicate the last update 

129 # It's linked to a Django pg trigger instance in Partisipa 

130 change_id: int | None = None 

131 

132 class Config: 

133 model = models.Option 

134 model_fields = ("value",) 

135 

136 

137@router.get("list-schemas", response=list[FormKitSchemaListOut]) 

138def get_list_schemas(request): 

139 return models.FormKitSchema.objects.all() 

140 

141 

142@router.get("list-nodes", response=NodeQSResponse, by_alias=True, exclude_none=True) 

143def get_formkit_nodes(request: HttpRequest, response: HttpResponse, latest_change: int | None = -1): 

144 """ 

145 Get all of the FormKit nodes in the database 

146 """ 

147 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects) 

148 nodes = objects.from_change(latest_change or -1) 

149 lc = nodes.aggregate(_=Max("track_change"))["_"] 

150 response["latest_change"] = lc if lc is not None else (latest_change or -1) 

151 add_never_cache_headers(response) 

152 return node_queryset_response(nodes) 

153 

154 

155@router.get( 

156 "list-related-nodes", 

157 response=list[NodeChildrenOut], 

158 exclude_defaults=True, 

159 exclude_none=True, 

160) 

161def get_related_nodes(request, response: HttpResponse, latest_change: int | None = -1): 

162 """ 

163 Get all of the FormKit node relationships in the database 

164 """ 

165 add_never_cache_headers(response) 

166 objects: models.NodeChildrenManager = models.NodeChildren.objects 

167 return objects.aggregate_changes_table(latest_change=latest_change) 

168 

169 

170@router.get( 

171 "list-components", 

172 response=list[FormComponentsOut], 

173 exclude_defaults=True, 

174 exclude_none=True, 

175 by_alias=True, 

176) 

177def get_components(request): 

178 values = models.FormComponents.objects.all() 

179 return values 

180 

181 

182@router.get( 

183 "schema/by-uuid/{schema_id}", 

184 response=formkit_schema.FormKitSchema, 

185 exclude_none=True, 

186 by_alias=True, 

187) 

188def get_schemas(request, schema_id: UUID): 

189 """ 

190 Get a schema based on its UUID 

191 """ 

192 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, id=schema_id) 

193 model = schema.to_pydantic() 

194 return model 

195 

196 

197@router.get( 

198 "schema/all", 

199 response=list[formkit_schema.FormKitSchema], 

200 exclude_none=True, 

201 by_alias=True, 

202) 

203def get_all_schemas(request): 

204 """ 

205 Get all schemas 

206 """ 

207 schemas = models.FormKitSchema.objects.all() 

208 model = [s.to_pydantic() for s in schemas] 

209 return model 

210 

211 

212@router.get( 

213 "schema/by-label/{label}", 

214 response=formkit_schema.FormKitSchema, 

215 exclude_none=True, 

216 by_alias=True, 

217) 

218def get_schema_by_label(request, label: str): 

219 """ 

220 Get a schema based on its label 

221 """ 

222 schema: models.FormKitSchema = get_object_or_404(models.FormKitSchema.objects, label=label) 

223 model = schema.to_pydantic() 

224 return model 

225 

226 

227@router.get( 

228 "node/{node_id}", 

229 response=formkit_schema.FormKitNode, 

230 exclude_none=True, 

231 by_alias=True, 

232) 

233def get_node(request, node_id: UUID): 

234 """ 

235 Gets a node based on its UUID 

236 """ 

237 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id) 

238 instance = node.get_node() 

239 return instance 

240 

241 

242@router.get("/options", response=list[Option], exclude_none=True) 

243def list_options(request: HttpRequest, response: HttpResponse): 

244 """ 

245 List all available "native" FormKit ninja labels and links 

246 """ 

247 return models.Option.objects.annotate(group_name=F("group__group")) 

248 

249 

250class FormKitErrors(BaseModel): 

251 errors: list[str] = [] 

252 field_errors: dict[str, str] = {} 

253 

254 

255@router.delete("delete", response=NodeInactiveType, exclude_none=True, by_alias=True) 

256def delete_node(request, node_id: UUID): 

257 """ 

258 Delete a node based on its UUID 

259 """ 

260 with transaction.atomic(): 

261 node: models.FormKitSchemaNode = get_object_or_404(models.FormKitSchemaNode.objects, id=node_id) 

262 node.delete() 

263 # node.refresh_from_db() 

264 objects: models.NodeQS = cast(models.NodeQS, models.FormKitSchemaNode.objects) 

265 return node_queryset_response(objects.filter(pk=node_id))[0] 

266 

267 

268class FormKitNodeIn(Schema): 

269 """ 

270 Creates a new FormKit text or number node 

271 We'd like to use `formkit_schema.FormKitSchemaFormKit` 

272 here but that `discriminated node` stuff makes it hard 

273 """ 

274 

275 formkit: str = Field(default="text", alias="$formkit") 

276 label: str | None = None 

277 key: str | None = None 

278 name: str | None = None 

279 placeholder: str | None = None 

280 help: str | None = None 

281 

282 # Fields from "number" 

283 max: int | str | None = None 

284 min: int | str | None = None 

285 step: str | None = None 

286 

287 # Field from dropdown/select/autocomplete/radio/checkbox 

288 options: str | None = None 

289 

290 # Repeater-specific properties 

291 addLabel: str | None = None 

292 itemClass: str | None = None 

293 itemsClass: str | None = None 

294 upControl: bool | None = None 

295 downControl: bool | None = None 

296 

297 # Conditional logic 

298 if_condition: str | None = Field(default=None, alias="if") 

299 

300 # Validation 

301 validationRules: str | None = None 

302 validation: str | list[str] | None = None 

303 

304 # Field Constraints 

305 maxLength: int | None = None 

306 _minDateSource: str | None = None 

307 _maxDateSource: str | None = None 

308 disabledDays: str | None = None 

309 

310 # Used for Creates 

311 parent_id: UUID | None = None 

312 

313 # Used for Updates 

314 uuid: UUID = Field(default_factory=uuid4) 

315 

316 # Used for "Add Group" 

317 # This should include an `icon`, `title` and `id` for the second level group 

318 additional_props: dict[str, str | int] | None = None 

319 

320 @cached_property 

321 def parent(self): 

322 if self.parent_id is None: 

323 return None, None 

324 try: 

325 parent = models.FormKitSchemaNode.objects.get(pk=self.parent_id) 

326 except models.FormKitSchemaNode.DoesNotExist: 

327 return None, ["The parent node given does not exist"] 

328 if parent.node.get("$formkit") not in {"group", "repeater"}: 

329 return None, ["The parent node given is not a group or repeater"] 

330 if parent is None: 

331 return None, ["The parent node given does not exist"] 

332 return parent, None 

333 

334 @cached_property 

335 def parent_names(self) -> set[str]: 

336 """ 

337 Return the names of parent nodes' child nodes. 

338 The saved child node must not use any of these names. 

339 """ 

340 parent, parent_errors = self.parent 

341 if self.parent[0] and self.child: 

342 # Ensures that names are not "overwritten" 

343 return set(parent.children.exclude(pk=self.child.pk).values_list("node__name", flat=True)) 

344 elif self.parent[0]: 

345 return set(parent.children.values_list("node__name", flat=True)) 

346 else: 

347 return set() 

348 

349 @cached_property 

350 def child(self): 

351 # The uuid may belong to a node or may be a new value 

352 try: 

353 return models.FormKitSchemaNode.objects.get(pk=self.uuid) 

354 except models.FormKitSchemaNode.DoesNotExist: 

355 return models.FormKitSchemaNode(pk=self.uuid, node={}) 

356 

357 @cached_property 

358 def preferred_name(self): 

359 """ 

360 Fetch a suitable name for the database to use. 

361 This name must be unique to the 'parent' group, a valid Python id, valid Django id, 

362 preferably lowercase. 

363 """ 

364 # If "name" is not provided use the "label" field 

365 if self.name is not None: 

366 return disambiguate_name(make_name_valid_id(self.name), self.parent_names) 

367 elif self.label is not None: 

368 return disambiguate_name(make_name_valid_id(self.label), self.parent_names) 

369 return make_name_valid_id(f"{uuid4().hex[:8]}_unnamed") 

370 

371 class Config: 

372 allow_population_by_field_name = True 

373 keep_untouched = (cached_property,) 

374 

375 

376def create_or_update_child_node(payload: FormKitNodeIn): 

377 parent, parent_errors = payload.parent 

378 child = payload.child 

379 

380 if parent_errors: 

381 return None, parent_errors 

382 if child.is_active is False: 

383 return None, ["This node has already been deleted and cannot be edited"] 

384 

385 values = payload.dict( 

386 by_alias=True, 

387 exclude_none=True, 

388 exclude={"parent_id", "uuid"} | {"parent", "child", "preferred_name", "parent_names"}, 

389 ) 

390 # Ensure the name is unique and suitable 

391 # Do not replace existing names though 

392 existing_name = child.node.get("name", None) if isinstance(child.node, dict) else None 

393 if existing_name is None: 

394 values["name"] = payload.preferred_name 

395 child.node.update(values) 

396 if payload.additional_props is not None: 

397 child.node.update(payload.additional_props) 

398 

399 child.label = payload.label 

400 

401 if isinstance(payload.additional_props, dict): 

402 if label := payload.additional_props.get("label"): 

403 # groups require a label 

404 child.label = label 

405 

406 child.node_type = "formkit" 

407 

408 with transaction.atomic(): 

409 child.save() 

410 if parent: 

411 models.NodeChildren.objects.create(parent=parent, child=child) 

412 

413 return child, [] 

414 

415 

416def make_name_valid_id(in_: str): 

417 """ 

418 Take a string. Replace any python-invalid characters with '_' 

419 """ 

420 subbed = re.sub(r"\W|^(?=\d)", "_", in_) 

421 while subbed[-1] == "_": 

422 subbed = subbed[:-1] 

423 return subbed.lower() 

424 

425 

426def disambiguate_name(name_in: str, used_names: Sequence[str]): 

427 suffix = 1 

428 if name_in not in used_names: 

429 return name_in 

430 while f"{name_in}_{suffix}" in used_names: 

431 suffix = suffix + 1 

432 return f"{name_in}_{suffix}" 

433 

434 

435@router.post( 

436 "create_or_update_node", 

437 response={ 

438 HTTPStatus.OK: NodeReturnType, 

439 HTTPStatus.INTERNAL_SERVER_ERROR: FormKitErrors, 

440 }, 

441 exclude_none=True, 

442 by_alias=True, 

443) 

444def create_or_update_node(request, response: HttpResponse, payload: FormKitNodeIn): 

445 """ 

446 Creates or updates a node in the FormKitSchemaNode model. 

447 

448 This function takes payload of type FormKitNodeIn. 

449 It fetches the parent node if it exists and checks if it is a group or repeater. 

450 If the parent node is not valid or is not a group or repeater, it returns an error response. 

451 Otherwise, it proceeds to create or update the node. 

452 

453 Args: 

454 request: The request object. 

455 response (HttpResponse): The HttpResponse object. 

456 payload (FormKitNodeIn): The payload containing the data for the node to be created or updated. 

457 

458 Returns: 

459 HTTPStatus: The status of the HTTP response. 

460 FormKitErrors: The errors encountered during the process, if any. 

461 """ 

462 

463 error_response = FormKitErrors() 

464 # Update the payload "name" 

465 # When label is provided, use the label to generate the name 

466 # Fetch parent node, if it exists, and check that it is a group or repeater 

467 

468 try: 

469 child, errors = create_or_update_child_node(payload) 

470 if errors: 

471 error_response.errors.append(errors) 

472 except Exception as E: 

473 error_response.errors.append(f"{E}") 

474 

475 if error_response.errors or error_response.field_errors: 

476 return HTTPStatus.INTERNAL_SERVER_ERROR, error_response 

477 

478 return node_queryset_response(models.FormKitSchemaNode.objects.filter(pk__in=[child.pk]))[0]