Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1#!/usr/bin/env python 

2 

3""" 

4camcops_server/cc_modules/tests/webview_tests.py 

5 

6=============================================================================== 

7 

8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com). 

9 

10 This file is part of CamCOPS. 

11 

12 CamCOPS is free software: you can redistribute it and/or modify 

13 it under the terms of the GNU General Public License as published by 

14 the Free Software Foundation, either version 3 of the License, or 

15 (at your option) any later version. 

16 

17 CamCOPS is distributed in the hope that it will be useful, 

18 but WITHOUT ANY WARRANTY; without even the implied warranty of 

19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

20 GNU General Public License for more details. 

21 

22 You should have received a copy of the GNU General Public License 

23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

24 

25=============================================================================== 

26""" 

27 

28from collections import OrderedDict 

29import datetime 

30import json 

31from typing import cast 

32import unittest 

33from unittest import mock 

34 

35from pendulum import local 

36from pyramid.httpexceptions import HTTPBadRequest, HTTPFound 

37from webob.multidict import MultiDict 

38 

39from camcops_server.cc_modules.cc_constants import ERA_NOW 

40from camcops_server.cc_modules.cc_device import Device 

41from camcops_server.cc_modules.cc_group import Group 

42from camcops_server.cc_modules.cc_patient import Patient 

43from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

44from camcops_server.cc_modules.cc_pyramid import ( 

45 FormAction, 

46 ViewArg, 

47 ViewParam, 

48) 

49from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry 

50from camcops_server.cc_modules.cc_taskschedule import ( 

51 PatientTaskSchedule, 

52 TaskSchedule, 

53 TaskScheduleItem, 

54) 

55from camcops_server.cc_modules.cc_testhelpers import class_attribute_names 

56from camcops_server.cc_modules.cc_unittest import DemoDatabaseTestCase 

57from camcops_server.cc_modules.cc_user import User 

58from camcops_server.cc_modules.cc_validators import ( 

59 validate_alphanum_underscore, 

60) 

61from camcops_server.cc_modules.webview import ( 

62 AddPatientView, 

63 AddTaskScheduleItemView, 

64 AddTaskScheduleView, 

65 DeleteServerCreatedPatientView, 

66 DeleteTaskScheduleItemView, 

67 DeleteTaskScheduleView, 

68 EditTaskScheduleItemView, 

69 EditTaskScheduleView, 

70 EditFinalizedPatientView, 

71 EditGroupView, 

72 EditServerCreatedPatientView, 

73 EraseTaskEntirelyView, 

74 EraseTaskLeavingPlaceholderView, 

75 FLASH_INFO, 

76 FLASH_SUCCESS, 

77 any_records_use_group, 

78 edit_group, 

79 edit_finalized_patient, 

80 edit_server_created_patient, 

81) 

82 

83 

84# ============================================================================= 

85# Unit testing 

86# ============================================================================= 

87 

88TEST_NHS_NUMBER_1 = 4887211163 # generated at random 

89TEST_NHS_NUMBER_2 = 1381277373 

90 

91 

92class WebviewTests(DemoDatabaseTestCase): 

93 """ 

94 Unit tests. 

95 """ 

96 def test_any_records_use_group_true(self) -> None: 

97 # All tasks created in DemoDatabaseTestCase will be in this group 

98 self.announce("test_any_records_use_group_true") 

99 self.assertTrue(any_records_use_group(self.req, self.group)) 

100 

101 def test_any_records_use_group_false(self) -> None: 

102 """ 

103 If this fails with: 

104 sqlalchemy.exc.InvalidRequestError: SQL expression, column, or mapped 

105 entity expected - got <name of task base class> 

106 then the base class probably needs to be declared __abstract__. See 

107 DiagnosisItemBase as an example. 

108 """ 

109 self.announce("test_any_records_use_group_false") 

110 group = Group() 

111 self.dbsession.add(self.group) 

112 self.dbsession.commit() 

113 

114 self.assertFalse(any_records_use_group(self.req, group)) 

115 

116 def test_webview_constant_validators(self) -> None: 

117 self.announce("test_webview_constant_validators") 

118 for x in class_attribute_names(ViewArg): 

119 try: 

120 validate_alphanum_underscore(x, self.req) 

121 except ValueError: 

122 self.fail(f"Operations.{x} fails validate_alphanum_underscore") 

123 

124 

125class AddTaskScheduleViewTests(DemoDatabaseTestCase): 

126 """ 

127 Unit tests. 

128 """ 

129 def test_schedule_form_displayed(self) -> None: 

130 view = AddTaskScheduleView(self.req) 

131 

132 response = view.dispatch() 

133 self.assertEqual(response.status_code, 200) 

134 self.assertEqual(response.body.decode("utf-8").count("<form"), 1) 

135 

136 def test_schedule_is_created(self) -> None: 

137 multidict = MultiDict([ 

138 ("_charset_", "UTF-8"), 

139 ("__formid__", "deform"), 

140 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

141 (ViewParam.NAME, "MOJO"), 

142 (ViewParam.GROUP_ID, self.group.id), 

143 (ViewParam.EMAIL_SUBJECT, "Subject"), 

144 (ViewParam.EMAIL_TEMPLATE, "Email template"), 

145 (FormAction.SUBMIT, "submit"), 

146 ]) 

147 

148 self.req.fake_request_post_from_dict(multidict) 

149 

150 view = AddTaskScheduleView(self.req) 

151 

152 with self.assertRaises(HTTPFound) as e: 

153 view.dispatch() 

154 

155 schedule = self.dbsession.query(TaskSchedule).one() 

156 

157 self.assertEqual(schedule.name, "MOJO") 

158 self.assertEqual(schedule.email_subject, "Subject") 

159 self.assertEqual(schedule.email_template, "Email template") 

160 

161 self.assertEqual(e.exception.status_code, 302) 

162 self.assertIn( 

163 "view_task_schedules", 

164 e.exception.headers["Location"] 

165 ) 

166 

167 

168class EditTaskScheduleViewTests(DemoDatabaseTestCase): 

169 """ 

170 Unit tests. 

171 """ 

172 def setUp(self) -> None: 

173 super().setUp() 

174 

175 self.schedule = TaskSchedule() 

176 self.schedule.group_id = self.group.id 

177 self.schedule.name = "Test" 

178 self.dbsession.add(self.schedule) 

179 self.dbsession.commit() 

180 

181 def test_schedule_name_can_be_updated(self) -> None: 

182 multidict = MultiDict([ 

183 ("_charset_", "UTF-8"), 

184 ("__formid__", "deform"), 

185 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

186 (ViewParam.NAME, "MOJO"), 

187 (ViewParam.GROUP_ID, self.group.id), 

188 (FormAction.SUBMIT, "submit"), 

189 ]) 

190 

191 self.req.fake_request_post_from_dict(multidict) 

192 self.req.add_get_params({ 

193 ViewParam.SCHEDULE_ID: str(self.schedule.id) 

194 }, set_method_get=False) 

195 

196 view = EditTaskScheduleView(self.req) 

197 

198 with self.assertRaises(HTTPFound) as e: 

199 view.dispatch() 

200 

201 schedule = self.dbsession.query(TaskSchedule).one() 

202 

203 self.assertEqual(schedule.name, "MOJO") 

204 

205 self.assertEqual(e.exception.status_code, 302) 

206 self.assertIn( 

207 "view_task_schedules", 

208 e.exception.headers["Location"] 

209 ) 

210 

211 def test_group_a_schedule_cannot_be_edited_by_group_b_admin(self) -> None: 

212 group_a = Group() 

213 group_a.name = "Group A" 

214 self.dbsession.add(group_a) 

215 

216 group_b = Group() 

217 group_b.name = "Group B" 

218 self.dbsession.add(group_b) 

219 self.dbsession.commit() 

220 

221 group_a_schedule = TaskSchedule() 

222 group_a_schedule.group_id = group_a.id 

223 group_a_schedule.name = "Group A schedule" 

224 self.dbsession.add(group_a_schedule) 

225 self.dbsession.commit() 

226 

227 self.user = User() 

228 self.user.upload_group_id = group_b.id 

229 self.user.username = "group b admin" 

230 self.user.set_password(self.req, "secret123") 

231 self.dbsession.add(self.user) 

232 self.dbsession.commit() 

233 self.req._debugging_user = self.user 

234 

235 multidict = MultiDict([ 

236 ("_charset_", "UTF-8"), 

237 ("__formid__", "deform"), 

238 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

239 (ViewParam.NAME, "Something else"), 

240 (ViewParam.GROUP_ID, self.group.id), 

241 (FormAction.SUBMIT, "submit"), 

242 ]) 

243 

244 self.req.fake_request_post_from_dict(multidict) 

245 self.req.add_get_params({ 

246 ViewParam.SCHEDULE_ID: str(self.schedule.id) 

247 }, set_method_get=False) 

248 

249 view = EditTaskScheduleView(self.req) 

250 

251 with self.assertRaises(HTTPBadRequest) as cm: 

252 view.dispatch() 

253 

254 self.assertIn( 

255 "not a group administrator", 

256 cm.exception.message 

257 ) 

258 

259 

260class DeleteTaskScheduleViewTests(DemoDatabaseTestCase): 

261 """ 

262 Unit tests. 

263 """ 

264 def setUp(self) -> None: 

265 super().setUp() 

266 

267 self.schedule = TaskSchedule() 

268 self.schedule.group_id = self.group.id 

269 self.schedule.name = "Test" 

270 self.dbsession.add(self.schedule) 

271 self.dbsession.commit() 

272 

273 def test_schedule_item_is_deleted(self) -> None: 

274 multidict = MultiDict([ 

275 ("_charset_", "UTF-8"), 

276 ("__formid__", "deform"), 

277 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

278 ("confirm_1_t", "true"), 

279 ("confirm_2_t", "true"), 

280 ("confirm_4_t", "true"), 

281 ("__start__", "danger:mapping"), 

282 ("target", "7176"), 

283 ("user_entry", "7176"), 

284 ("__end__", "danger:mapping"), 

285 ("delete", "delete"), 

286 (FormAction.DELETE, "delete"), 

287 ]) 

288 

289 self.req.fake_request_post_from_dict(multidict) 

290 

291 self.req.add_get_params({ 

292 ViewParam.SCHEDULE_ID: str(self.schedule.id) 

293 }, set_method_get=False) 

294 view = DeleteTaskScheduleView(self.req) 

295 

296 with self.assertRaises(HTTPFound) as e: 

297 view.dispatch() 

298 

299 self.assertEqual(e.exception.status_code, 302) 

300 self.assertIn( 

301 "view_task_schedules", 

302 e.exception.headers["Location"] 

303 ) 

304 

305 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

306 

307 self.assertIsNone(item) 

308 

309 

310class AddTaskScheduleItemViewTests(DemoDatabaseTestCase): 

311 """ 

312 Unit tests. 

313 """ 

314 def setUp(self) -> None: 

315 super().setUp() 

316 

317 self.schedule = TaskSchedule() 

318 self.schedule.group_id = self.group.id 

319 self.schedule.name = "Test" 

320 

321 self.dbsession.add(self.schedule) 

322 self.dbsession.commit() 

323 

324 def test_schedule_item_form_displayed(self) -> None: 

325 view = AddTaskScheduleItemView(self.req) 

326 

327 self.req.add_get_params({ViewParam.SCHEDULE_ID: str(self.schedule.id)}) 

328 

329 response = view.dispatch() 

330 self.assertEqual(response.status_code, 200) 

331 self.assertEqual(response.body.decode("utf-8").count("<form"), 1) 

332 

333 def test_schedule_item_is_created(self) -> None: 

334 multidict = MultiDict([ 

335 ("_charset_", "UTF-8"), 

336 ("__formid__", "deform"), 

337 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

338 (ViewParam.SCHEDULE_ID, self.schedule.id), 

339 (ViewParam.TABLE_NAME, "ace3"), 

340 (ViewParam.CLINICIAN_CONFIRMATION, "true"), 

341 ("__start__", "due_from:mapping"), 

342 ("months", "1"), 

343 ("weeks", "2"), 

344 ("days", "3"), 

345 ("__end__", "due_from:mapping"), 

346 ("__start__", "due_within:mapping"), 

347 ("months", "2"), # 60 days 

348 ("weeks", "3"), # 21 days 

349 ("days", "15"), # 15 days 

350 ("__end__", "due_within:mapping"), 

351 (FormAction.SUBMIT, "submit"), 

352 ]) 

353 

354 self.req.fake_request_post_from_dict(multidict) 

355 

356 view = AddTaskScheduleItemView(self.req) 

357 

358 with self.assertRaises(HTTPFound) as e: 

359 view.dispatch() 

360 

361 item = self.dbsession.query(TaskScheduleItem).one() 

362 

363 self.assertEqual(item.schedule_id, self.schedule.id) 

364 self.assertEqual(item.task_table_name, "ace3") 

365 self.assertEqual(item.due_from.in_days(), 47) 

366 self.assertEqual(item.due_by.in_days(), 143) 

367 

368 self.assertEqual(e.exception.status_code, 302) 

369 self.assertIn( 

370 f"view_task_schedule_items?schedule_id={self.schedule.id}", 

371 e.exception.headers["Location"] 

372 ) 

373 

374 def test_schedule_item_is_not_created_on_cancel(self) -> None: 

375 multidict = MultiDict([ 

376 ("_charset_", "UTF-8"), 

377 ("__formid__", "deform"), 

378 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

379 (ViewParam.SCHEDULE_ID, self.schedule.id), 

380 (ViewParam.TABLE_NAME, "ace3"), 

381 ("__start__", "due_from:mapping"), 

382 ("months", "1"), 

383 ("weeks", "2"), 

384 ("days", "3"), 

385 ("__end__", "due_from:mapping"), 

386 ("__start__", "due_within:mapping"), 

387 ("months", "4"), 

388 ("weeks", "3"), 

389 ("days", "2"), 

390 ("__end__", "due_within:mapping"), 

391 (FormAction.CANCEL, "cancel"), 

392 ]) 

393 

394 self.req.fake_request_post_from_dict(multidict) 

395 

396 view = AddTaskScheduleItemView(self.req) 

397 

398 with self.assertRaises(HTTPFound): 

399 view.dispatch() 

400 

401 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

402 

403 self.assertIsNone(item) 

404 

405 def test_non_existent_schedule_handled(self) -> None: 

406 self.req.add_get_params({ViewParam.SCHEDULE_ID: "99999"}) 

407 

408 view = AddTaskScheduleItemView(self.req) 

409 

410 with self.assertRaises(HTTPBadRequest): 

411 view.dispatch() 

412 

413 

414class EditTaskScheduleItemViewTests(DemoDatabaseTestCase): 

415 """ 

416 Unit tests. 

417 """ 

418 def setUp(self) -> None: 

419 from pendulum import Duration 

420 super().setUp() 

421 

422 self.schedule = TaskSchedule() 

423 self.schedule.group_id = self.group.id 

424 self.schedule.name = "Test" 

425 self.dbsession.add(self.schedule) 

426 self.dbsession.commit() 

427 

428 self.item = TaskScheduleItem() 

429 self.item.schedule_id = self.schedule.id 

430 self.item.task_table_name = "ace3" 

431 self.item.due_from = Duration(days=30) 

432 self.item.due_by = Duration(days=60) 

433 self.dbsession.add(self.item) 

434 self.dbsession.commit() 

435 

436 def test_schedule_item_is_updated(self) -> None: 

437 multidict = MultiDict([ 

438 ("_charset_", "UTF-8"), 

439 ("__formid__", "deform"), 

440 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

441 (ViewParam.SCHEDULE_ID, self.schedule.id), 

442 (ViewParam.TABLE_NAME, "bmi"), 

443 ("__start__", "due_from:mapping"), 

444 ("months", "0"), 

445 ("weeks", "0"), 

446 ("days", "30"), 

447 ("__end__", "due_from:mapping"), 

448 ("__start__", "due_within:mapping"), 

449 ("months", "0"), 

450 ("weeks", "0"), 

451 ("days", "60"), 

452 ("__end__", "due_within:mapping"), 

453 (FormAction.SUBMIT, "submit"), 

454 ]) 

455 

456 self.req.fake_request_post_from_dict(multidict) 

457 

458 self.req.add_get_params({ 

459 ViewParam.SCHEDULE_ITEM_ID: str(self.item.id) 

460 }, set_method_get=False) 

461 view = EditTaskScheduleItemView(self.req) 

462 

463 with self.assertRaises(HTTPFound) as cm: 

464 view.dispatch() 

465 

466 self.assertEqual(self.item.task_table_name, "bmi") 

467 self.assertEqual(cm.exception.status_code, 302) 

468 self.assertIn( 

469 f"view_task_schedule_items?schedule_id={self.item.schedule_id}", 

470 cm.exception.headers["Location"] 

471 ) 

472 

473 def test_schedule_item_is_not_updated_on_cancel(self) -> None: 

474 multidict = MultiDict([ 

475 ("_charset_", "UTF-8"), 

476 ("__formid__", "deform"), 

477 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

478 (ViewParam.SCHEDULE_ID, self.schedule.id), 

479 (ViewParam.TABLE_NAME, "bmi"), 

480 ("__start__", "due_from:mapping"), 

481 ("months", "0"), 

482 ("weeks", "0"), 

483 ("days", "30"), 

484 ("__end__", "due_from:mapping"), 

485 ("__start__", "due_within:mapping"), 

486 ("months", "0"), 

487 ("weeks", "0"), 

488 ("days", "60"), 

489 ("__end__", "due_within:mapping"), 

490 (FormAction.CANCEL, "cancel"), 

491 ]) 

492 

493 self.req.fake_request_post_from_dict(multidict) 

494 

495 self.req.add_get_params({ 

496 ViewParam.SCHEDULE_ITEM_ID: str(self.item.id) 

497 }, set_method_get=False) 

498 view = EditTaskScheduleItemView(self.req) 

499 

500 with self.assertRaises(HTTPFound): 

501 view.dispatch() 

502 

503 self.assertEqual(self.item.task_table_name, "ace3") 

504 

505 def test_non_existent_item_handled(self) -> None: 

506 self.req.add_get_params({ViewParam.SCHEDULE_ITEM_ID: "99999"}) 

507 

508 view = EditTaskScheduleItemView(self.req) 

509 

510 with self.assertRaises(HTTPBadRequest): 

511 view.dispatch() 

512 

513 def test_null_item_handled(self) -> None: 

514 view = EditTaskScheduleItemView(self.req) 

515 

516 with self.assertRaises(HTTPBadRequest): 

517 view.dispatch() 

518 

519 def test_get_form_values(self) -> None: 

520 view = EditTaskScheduleItemView(self.req) 

521 view.object = self.item 

522 

523 form_values = view.get_form_values() 

524 

525 self.assertEqual(form_values[ViewParam.SCHEDULE_ID], self.schedule.id) 

526 self.assertEqual(form_values[ViewParam.TABLE_NAME], 

527 self.item.task_table_name) 

528 self.assertEqual(form_values[ViewParam.DUE_FROM], self.item.due_from) 

529 

530 due_within = self.item.due_by - self.item.due_from 

531 self.assertEqual(form_values[ViewParam.DUE_WITHIN], due_within) 

532 

533 def test_group_a_item_cannot_be_edited_by_group_b_admin(self) -> None: 

534 from pendulum import Duration 

535 

536 group_a = Group() 

537 group_a.name = "Group A" 

538 self.dbsession.add(group_a) 

539 

540 group_b = Group() 

541 group_b.name = "Group B" 

542 self.dbsession.add(group_b) 

543 self.dbsession.commit() 

544 

545 group_a_schedule = TaskSchedule() 

546 group_a_schedule.group_id = group_a.id 

547 group_a_schedule.name = "Group A schedule" 

548 self.dbsession.add(group_a_schedule) 

549 self.dbsession.commit() 

550 

551 group_a_item = TaskScheduleItem() 

552 group_a_item.schedule_id = group_a_schedule.id 

553 group_a_item.task_table_name = "ace3" 

554 group_a_item.due_from = Duration(days=30) 

555 group_a_item.due_by = Duration(days=60) 

556 self.dbsession.add(group_a_item) 

557 self.dbsession.commit() 

558 

559 self.user = User() 

560 self.user.upload_group_id = group_b.id 

561 self.user.username = "group b admin" 

562 self.user.set_password(self.req, "secret123") 

563 self.dbsession.add(self.user) 

564 self.dbsession.commit() 

565 self.req._debugging_user = self.user 

566 

567 view = EditTaskScheduleItemView(self.req) 

568 view.object = group_a_item 

569 

570 with self.assertRaises(HTTPBadRequest) as cm: 

571 view.get_schedule() 

572 

573 self.assertIn( 

574 "not a group administrator", 

575 cm.exception.message 

576 ) 

577 

578 

579class DeleteTaskScheduleItemViewTests(DemoDatabaseTestCase): 

580 """ 

581 Unit tests. 

582 """ 

583 def setUp(self) -> None: 

584 super().setUp() 

585 

586 self.schedule = TaskSchedule() 

587 self.schedule.group_id = self.group.id 

588 self.schedule.name = "Test" 

589 self.dbsession.add(self.schedule) 

590 self.dbsession.commit() 

591 

592 self.item = TaskScheduleItem() 

593 self.item.schedule_id = self.schedule.id 

594 self.item.task_table_name = "ace3" 

595 self.dbsession.add(self.item) 

596 self.dbsession.commit() 

597 

598 def test_delete_form_displayed(self) -> None: 

599 view = DeleteTaskScheduleItemView(self.req) 

600 

601 self.req.add_get_params({ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)}) 

602 

603 response = view.dispatch() 

604 self.assertEqual(response.status_code, 200) 

605 self.assertEqual(response.body.decode("utf-8").count("<form"), 1) 

606 

607 def test_errors_displayed_when_deletion_validation_fails(self) -> None: 

608 self.req.fake_request_post_from_dict({ 

609 FormAction.DELETE: "delete" 

610 }) 

611 

612 self.req.add_get_params({ 

613 ViewParam.SCHEDULE_ITEM_ID: str(self.item.id) 

614 }, set_method_get=False) 

615 view = DeleteTaskScheduleItemView(self.req) 

616 

617 response = view.dispatch() 

618 self.assertIn("Errors have been highlighted", 

619 response.body.decode("utf-8")) 

620 

621 def test_schedule_item_is_deleted(self) -> None: 

622 multidict = MultiDict([ 

623 ("_charset_", "UTF-8"), 

624 ("__formid__", "deform"), 

625 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

626 ("confirm_1_t", "true"), 

627 ("confirm_2_t", "true"), 

628 ("confirm_4_t", "true"), 

629 ("__start__", "danger:mapping"), 

630 ("target", "7176"), 

631 ("user_entry", "7176"), 

632 ("__end__", "danger:mapping"), 

633 ("delete", "delete"), 

634 (FormAction.DELETE, "delete"), 

635 ]) 

636 

637 self.req.fake_request_post_from_dict(multidict) 

638 

639 self.req.add_get_params({ 

640 ViewParam.SCHEDULE_ITEM_ID: str(self.item.id) 

641 }, set_method_get=False) 

642 view = DeleteTaskScheduleItemView(self.req) 

643 

644 with self.assertRaises(HTTPFound) as e: 

645 view.dispatch() 

646 

647 self.assertEqual(e.exception.status_code, 302) 

648 self.assertIn( 

649 f"view_task_schedule_items?schedule_id={self.item.schedule_id}", 

650 e.exception.headers["Location"] 

651 ) 

652 

653 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

654 

655 self.assertIsNone(item) 

656 

657 def test_schedule_item_not_deleted_on_cancel(self) -> None: 

658 self.req.fake_request_post_from_dict({ 

659 FormAction.CANCEL: "cancel" 

660 }) 

661 

662 self.req.add_get_params({ 

663 ViewParam.SCHEDULE_ITEM_ID: str(self.item.id) 

664 }, set_method_get=False) 

665 view = DeleteTaskScheduleItemView(self.req) 

666 

667 with self.assertRaises(HTTPFound): 

668 view.dispatch() 

669 

670 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

671 

672 self.assertIsNotNone(item) 

673 

674 

675class EditFinalizedPatientViewTests(DemoDatabaseTestCase): 

676 """ 

677 Unit tests. 

678 """ 

679 def create_tasks(self) -> None: 

680 # speed things up a bit 

681 pass 

682 

683 def test_raises_when_patient_does_not_exists(self) -> None: 

684 with self.assertRaises(HTTPBadRequest) as cm: 

685 edit_finalized_patient(self.req) 

686 

687 self.assertEqual(str(cm.exception), "Cannot find Patient with _pk:None") 

688 

689 @unittest.skip("Can't save patient in database without group") 

690 def test_raises_when_patient_not_in_a_group(self) -> None: 

691 patient = self.create_patient(_group_id=None) 

692 

693 self.req.add_get_params({ 

694 ViewParam.SERVER_PK: patient.pk 

695 }) 

696 

697 with self.assertRaises(HTTPBadRequest) as cm: 

698 edit_finalized_patient(self.req) 

699 

700 self.assertEqual(str(cm.exception), "Bad patient: not in a group") 

701 

702 def test_raises_when_not_authorized(self) -> None: 

703 patient = self.create_patient() 

704 

705 self.req._debugging_user = User() 

706 

707 with mock.patch.object( 

708 self.req._debugging_user, 

709 "may_administer_group", 

710 return_value=False 

711 ): 

712 self.req.add_get_params({ 

713 ViewParam.SERVER_PK: patient.pk 

714 }) 

715 

716 with self.assertRaises(HTTPBadRequest) as cm: 

717 edit_finalized_patient(self.req) 

718 

719 self.assertEqual(str(cm.exception), 

720 "Not authorized to edit this patient") 

721 

722 def test_raises_when_patient_not_finalized(self) -> None: 

723 device = Device(name="Not the server device") 

724 self.req.dbsession.add(device) 

725 self.req.dbsession.commit() 

726 

727 patient = self.create_patient( 

728 id=1, _device_id=device.id, _era=ERA_NOW 

729 ) 

730 

731 self.req.add_get_params({ 

732 ViewParam.SERVER_PK: patient.pk 

733 }) 

734 

735 with self.assertRaises(HTTPBadRequest) as cm: 

736 edit_finalized_patient(self.req) 

737 

738 self.assertIn("Patient is not editable", str(cm.exception)) 

739 

740 def test_patient_updated(self) -> None: 

741 patient = self.create_patient() 

742 

743 self.req.add_get_params({ 

744 ViewParam.SERVER_PK: patient.pk 

745 }, set_method_get=False) 

746 

747 multidict = MultiDict([ 

748 ("_charset_", "UTF-8"), 

749 ("__formid__", "deform"), 

750 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

751 (ViewParam.SERVER_PK, patient.pk), 

752 (ViewParam.GROUP_ID, patient.group.id), 

753 (ViewParam.FORENAME, "Jo"), 

754 (ViewParam.SURNAME, "Patient"), 

755 ("__start__", "dob:mapping"), 

756 ("date", "1958-04-19"), 

757 ("__end__", "dob:mapping"), 

758 ("__start__", "sex:rename"), 

759 ("deformField7", "X"), 

760 ("__end__", "sex:rename"), 

761 (ViewParam.ADDRESS, "New address"), 

762 (ViewParam.EMAIL, "newjopatient@example.com"), 

763 (ViewParam.GP, "New GP"), 

764 (ViewParam.OTHER, "New other"), 

765 ("__start__", "id_references:sequence"), 

766 ("__start__", "idnum_sequence:mapping"), 

767 (ViewParam.WHICH_IDNUM, self.nhs_iddef.which_idnum), 

768 (ViewParam.IDNUM_VALUE, str(TEST_NHS_NUMBER_1)), 

769 ("__end__", "idnum_sequence:mapping"), 

770 ("__end__", "id_references:sequence"), 

771 ("__start__", "danger:mapping"), 

772 ("target", "7836"), 

773 ("user_entry", "7836"), 

774 ("__end__", "danger:mapping"), 

775 (FormAction.SUBMIT, "submit"), 

776 ]) 

777 

778 self.req.fake_request_post_from_dict(multidict) 

779 

780 with self.assertRaises(HTTPFound): 

781 edit_finalized_patient(self.req) 

782 

783 self.dbsession.commit() 

784 

785 self.assertEqual(patient.forename, "Jo") 

786 self.assertEqual(patient.surname, "Patient") 

787 self.assertEqual(patient.dob.isoformat(), "1958-04-19") 

788 self.assertEqual(patient.sex, "X") 

789 self.assertEqual(patient.address, "New address") 

790 self.assertEqual(patient.email, "newjopatient@example.com") 

791 self.assertEqual(patient.gp, "New GP") 

792 self.assertEqual(patient.other, "New other") 

793 

794 idnum = patient.get_idnum_objects()[0] 

795 self.assertEqual(idnum.patient_id, patient.id) 

796 self.assertEqual(idnum.which_idnum, self.nhs_iddef.which_idnum) 

797 self.assertEqual(idnum.idnum_value, TEST_NHS_NUMBER_1) 

798 

799 self.assertEqual(len(patient.special_notes), 1) 

800 note = patient.special_notes[0].note 

801 

802 self.assertIn("Patient details edited", note) 

803 self.assertIn("forename", note) 

804 self.assertIn("Jo", note) 

805 

806 self.assertIn("surname", note) 

807 self.assertIn("Patient", note) 

808 

809 self.assertIn("idnum1", note) 

810 self.assertIn(str(TEST_NHS_NUMBER_1), note) 

811 

812 messages = self.req.session.peek_flash(FLASH_SUCCESS) 

813 

814 self.assertIn(f"Amended patient record with server PK {patient.pk}", 

815 messages[0]) 

816 self.assertIn("forename", messages[0]) 

817 self.assertIn("Jo", messages[0]) 

818 

819 self.assertIn("surname", messages[0]) 

820 self.assertIn("Patient", messages[0]) 

821 

822 self.assertIn("idnum1", messages[0]) 

823 self.assertIn(str(TEST_NHS_NUMBER_1), messages[0]) 

824 

825 def test_message_when_no_changes(self) -> None: 

826 patient = self.create_patient( 

827 forename="Jo", surname="Patient", dob=datetime.date(1958, 4, 19), 

828 sex="F", address="Address", gp="GP", other="Other" 

829 ) 

830 patient_idnum = self.create_patient_idnum( 

831 patient_id=patient.id, 

832 which_idnum=self.nhs_iddef.which_idnum, 

833 idnum_value=TEST_NHS_NUMBER_1 

834 ) 

835 schedule1 = TaskSchedule() 

836 schedule1.group_id = self.group.id 

837 schedule1.name = "Test 1" 

838 self.dbsession.add(schedule1) 

839 self.dbsession.commit() 

840 

841 patient_task_schedule = PatientTaskSchedule() 

842 patient_task_schedule.patient_pk = patient.pk 

843 patient_task_schedule.schedule_id = schedule1.id 

844 patient_task_schedule.start_datetime = local(2020, 6, 12, 9) 

845 patient_task_schedule.settings = { 

846 "name 1": "value 1", 

847 "name 2": "value 2", 

848 "name 3": "value 3", 

849 } 

850 

851 self.dbsession.add(patient_task_schedule) 

852 self.req.add_get_params({ 

853 ViewParam.SERVER_PK: patient.pk 

854 }, set_method_get=False) 

855 

856 multidict = MultiDict([ 

857 ("_charset_", "UTF-8"), 

858 ("__formid__", "deform"), 

859 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

860 (ViewParam.SERVER_PK, patient.pk), 

861 (ViewParam.GROUP_ID, patient.group.id), 

862 (ViewParam.FORENAME, patient.forename), 

863 (ViewParam.SURNAME, patient.surname), 

864 

865 ("__start__", "dob:mapping"), 

866 ("date", patient.dob.isoformat()), 

867 ("__end__", "dob:mapping"), 

868 

869 ("__start__", "sex:rename"), 

870 ("deformField7", patient.sex), 

871 ("__end__", "sex:rename"), 

872 

873 (ViewParam.ADDRESS, patient.address), 

874 (ViewParam.GP, patient.gp), 

875 (ViewParam.OTHER, patient.other), 

876 

877 ("__start__", "id_references:sequence"), 

878 ("__start__", "idnum_sequence:mapping"), 

879 (ViewParam.WHICH_IDNUM, patient_idnum.which_idnum), 

880 (ViewParam.IDNUM_VALUE, patient_idnum.idnum_value), 

881 ("__end__", "idnum_sequence:mapping"), 

882 ("__end__", "id_references:sequence"), 

883 

884 ("__start__", "danger:mapping"), 

885 ("target", "7836"), 

886 ("user_entry", "7836"), 

887 ("__end__", "danger:mapping"), 

888 

889 ("__start__", "task_schedules:sequence"), 

890 ("__start__", "task_schedule_sequence:mapping"), 

891 ("schedule_id", schedule1.id), 

892 ("__start__", "start_datetime:mapping"), 

893 ("date", "2020-06-12"), 

894 ("time", "09:00:00"), 

895 ("__end__", "start_datetime:mapping"), 

896 ("settings", json.dumps({ 

897 "name 1": "value 1", 

898 "name 2": "value 2", 

899 "name 3": "value 3", 

900 })), 

901 ("__end__", "task_schedule_sequence:mapping"), 

902 ("__end__", "task_schedules:sequence"), 

903 

904 

905 (FormAction.SUBMIT, "submit"), 

906 ]) 

907 

908 self.req.fake_request_post_from_dict(multidict) 

909 

910 with self.assertRaises(HTTPFound): 

911 edit_finalized_patient(self.req) 

912 

913 messages = self.req.session.peek_flash(FLASH_INFO) 

914 

915 self.assertIn("No changes required", messages[0]) 

916 

917 def test_template_rendered_with_values(self) -> None: 

918 patient = self.create_patient( 

919 id=1, forename="Jo", surname="Patient", 

920 dob=datetime.date(1958, 4, 19), 

921 sex="F", address="Address", gp="GP", other="Other" 

922 ) 

923 self.create_patient_idnum( 

924 patient_id=patient.id, 

925 which_idnum=self.nhs_iddef.which_idnum, 

926 idnum_value=TEST_NHS_NUMBER_1 

927 ) 

928 

929 from camcops_server.tasks import Bmi 

930 

931 task1 = Bmi() 

932 task1.id = 1 

933 task1._device_id = patient.device_id 

934 task1._group_id = patient.group_id 

935 task1._era = patient.era 

936 task1.patient_id = patient.id 

937 task1.when_created = self.era_time 

938 task1._current = False 

939 self.dbsession.add(task1) 

940 

941 task2 = Bmi() 

942 task2.id = 2 

943 task2._device_id = patient.device_id 

944 task2._group_id = patient.group_id 

945 task2._era = patient.era 

946 task2.patient_id = patient.id 

947 task2.when_created = self.era_time 

948 task2._current = False 

949 self.dbsession.add(task2) 

950 self.dbsession.commit() 

951 

952 self.req.add_get_params({ 

953 ViewParam.SERVER_PK: patient.pk 

954 }) 

955 

956 view = EditFinalizedPatientView(self.req) 

957 with mock.patch.object(view, "render_to_response") as mock_render: 

958 view.dispatch() 

959 

960 args, kwargs = mock_render.call_args 

961 

962 context = args[0] 

963 

964 self.assertIn("form", context) 

965 self.assertIn(task1, context["tasks"]) 

966 self.assertIn(task2, context["tasks"]) 

967 

968 def test_form_values_for_existing_patient(self) -> None: 

969 patient = self.create_patient( 

970 id=1, forename="Jo", surname="Patient", 

971 dob=datetime.date(1958, 4, 19), 

972 sex="F", address="Address", email="jopatient@example.com", 

973 gp="GP", other="Other" 

974 ) 

975 

976 schedule1 = TaskSchedule() 

977 schedule1.group_id = self.group.id 

978 schedule1.name = "Test 1" 

979 self.dbsession.add(schedule1) 

980 self.dbsession.commit() 

981 

982 patient_task_schedule = PatientTaskSchedule() 

983 patient_task_schedule.patient_pk = patient.pk 

984 patient_task_schedule.schedule_id = schedule1.id 

985 patient_task_schedule.start_datetime = local(2020, 6, 12) 

986 patient_task_schedule.settings = { 

987 "name 1": "value 1", 

988 "name 2": "value 2", 

989 "name 3": "value 3", 

990 } 

991 

992 self.dbsession.add(patient_task_schedule) 

993 self.dbsession.commit() 

994 

995 self.create_patient_idnum( 

996 patient_id=patient.id, 

997 which_idnum=self.nhs_iddef.which_idnum, 

998 idnum_value=TEST_NHS_NUMBER_1 

999 ) 

1000 

1001 self.req.add_get_params({ 

1002 ViewParam.SERVER_PK: patient.pk 

1003 }) 

1004 

1005 view = EditFinalizedPatientView(self.req) 

1006 view.object = patient 

1007 

1008 form_values = view.get_form_values() 

1009 

1010 self.assertEqual(form_values[ViewParam.FORENAME], "Jo") 

1011 self.assertEqual(form_values[ViewParam.SURNAME], "Patient") 

1012 self.assertEqual(form_values[ViewParam.DOB], datetime.date(1958, 4, 19)) 

1013 self.assertEqual(form_values[ViewParam.SEX], "F") 

1014 self.assertEqual(form_values[ViewParam.ADDRESS], "Address") 

1015 self.assertEqual(form_values[ViewParam.EMAIL], "jopatient@example.com") 

1016 self.assertEqual(form_values[ViewParam.GP], "GP") 

1017 self.assertEqual(form_values[ViewParam.OTHER], "Other") 

1018 

1019 self.assertEqual(form_values[ViewParam.SERVER_PK], patient.pk) 

1020 self.assertEqual(form_values[ViewParam.GROUP_ID], patient.group.id) 

1021 

1022 idnum = form_values[ViewParam.ID_REFERENCES][0] 

1023 self.assertEqual(idnum[ViewParam.WHICH_IDNUM], 

1024 self.nhs_iddef.which_idnum) 

1025 self.assertEqual(idnum[ViewParam.IDNUM_VALUE], TEST_NHS_NUMBER_1) 

1026 

1027 task_schedule = form_values[ViewParam.TASK_SCHEDULES][0] 

1028 self.assertEqual(task_schedule[ViewParam.SCHEDULE_ID], 

1029 patient_task_schedule.schedule_id) 

1030 self.assertEqual(task_schedule[ViewParam.START_DATETIME], 

1031 patient_task_schedule.start_datetime) 

1032 self.assertEqual(task_schedule[ViewParam.SETTINGS], 

1033 patient_task_schedule.settings) 

1034 

1035 def test_changes_to_simple_params(self) -> None: 

1036 view = EditFinalizedPatientView(self.req) 

1037 patient = self.create_patient( 

1038 id=1, forename="Jo", surname="Patient", 

1039 dob=datetime.date(1958, 4, 19), 

1040 sex="F", address="Address", email="jopatient@example.com", 

1041 gp="GP", other=None, 

1042 ) 

1043 view.object = patient 

1044 

1045 changes = OrderedDict() # type: OrderedDict 

1046 

1047 appstruct = { 

1048 ViewParam.FORENAME: "Joanna", 

1049 ViewParam.SURNAME: "Patient-Patient", 

1050 ViewParam.DOB: datetime.date(1958, 4, 19), 

1051 ViewParam.ADDRESS: "New address", 

1052 ViewParam.OTHER: "", 

1053 } 

1054 

1055 view._save_simple_params(appstruct, changes) 

1056 

1057 self.assertEqual(changes[ViewParam.FORENAME], ("Jo", "Joanna")) 

1058 self.assertEqual(changes[ViewParam.SURNAME], 

1059 ("Patient", "Patient-Patient")) 

1060 self.assertNotIn(ViewParam.DOB, changes) 

1061 self.assertEqual(changes[ViewParam.ADDRESS], ("Address", "New address")) 

1062 self.assertNotIn(ViewParam.OTHER, changes) 

1063 

1064 def test_changes_to_idrefs(self) -> None: 

1065 view = EditFinalizedPatientView(self.req) 

1066 patient = self.create_patient(id=1) 

1067 self.create_patient_idnum( 

1068 patient_id=patient.id, 

1069 which_idnum=self.nhs_iddef.which_idnum, 

1070 idnum_value=TEST_NHS_NUMBER_1 

1071 ) 

1072 self.create_patient_idnum( 

1073 patient_id=patient.id, 

1074 which_idnum=self.study_iddef.which_idnum, 

1075 idnum_value=123 

1076 ) 

1077 

1078 view.object = patient 

1079 

1080 changes = OrderedDict() # type: OrderedDict 

1081 

1082 appstruct = { 

1083 ViewParam.ID_REFERENCES: [ 

1084 { 

1085 ViewParam.WHICH_IDNUM: self.nhs_iddef.which_idnum, 

1086 ViewParam.IDNUM_VALUE: TEST_NHS_NUMBER_2, 

1087 }, 

1088 { 

1089 ViewParam.WHICH_IDNUM: self.rio_iddef.which_idnum, 

1090 ViewParam.IDNUM_VALUE: 456, 

1091 } 

1092 ] 

1093 } 

1094 

1095 view._save_idrefs(appstruct, changes) 

1096 

1097 self.assertEqual(changes["idnum1 (NHS number)"], 

1098 (TEST_NHS_NUMBER_1, TEST_NHS_NUMBER_2)) 

1099 self.assertEqual(changes["idnum3 (Study number)"], 

1100 (123, None)) 

1101 self.assertEqual(changes["idnum2 (RiO number)"], 

1102 (None, 456)) 

1103 

1104 

1105class EditServerCreatedPatientViewTests(DemoDatabaseTestCase): 

1106 """ 

1107 Unit tests. 

1108 """ 

1109 def create_tasks(self) -> None: 

1110 # speed things up a bit 

1111 pass 

1112 

1113 def test_group_updated(self) -> None: 

1114 patient = self.create_patient(sex="F", as_server_patient=True) 

1115 new_group = Group() 

1116 new_group.name = "newgroup" 

1117 new_group.description = "New group" 

1118 new_group.upload_policy = "sex AND anyidnum" 

1119 new_group.finalize_policy = "sex AND idnum1" 

1120 self.dbsession.add(new_group) 

1121 self.dbsession.commit() 

1122 

1123 view = EditServerCreatedPatientView(self.req) 

1124 view.object = patient 

1125 

1126 appstruct = { 

1127 ViewParam.GROUP_ID: new_group.id, 

1128 } 

1129 

1130 view.save_object(appstruct) 

1131 

1132 self.assertEqual(patient.group_id, new_group.id) 

1133 

1134 messages = self.req.session.peek_flash(FLASH_SUCCESS) 

1135 

1136 self.assertIn("testgroup", messages[0]) 

1137 self.assertIn("newgroup", messages[0]) 

1138 self.assertIn("group:", messages[0]) 

1139 

1140 def test_raises_when_not_created_on_the_server(self) -> None: 

1141 patient = self.create_patient( 

1142 id=1, _device_id=self.other_device.id, 

1143 ) 

1144 

1145 view = EditServerCreatedPatientView(self.req) 

1146 

1147 self.req.add_get_params({ 

1148 ViewParam.SERVER_PK: patient.pk 

1149 }) 

1150 

1151 with self.assertRaises(HTTPBadRequest) as cm: 

1152 view.get_object() 

1153 

1154 self.assertIn("Patient is not editable", str(cm.exception)) 

1155 

1156 def test_patient_task_schedules_updated(self) -> None: 

1157 patient = self.create_patient(sex="F", as_server_patient=True) 

1158 

1159 schedule1 = TaskSchedule() 

1160 schedule1.group_id = self.group.id 

1161 schedule1.name = "Test 1" 

1162 self.dbsession.add(schedule1) 

1163 schedule2 = TaskSchedule() 

1164 schedule2.group_id = self.group.id 

1165 schedule2.name = "Test 2" 

1166 self.dbsession.add(schedule2) 

1167 schedule3 = TaskSchedule() 

1168 schedule3.group_id = self.group.id 

1169 schedule3.name = "Test 3" 

1170 self.dbsession.add(schedule3) 

1171 self.dbsession.commit() 

1172 

1173 patient_task_schedule = PatientTaskSchedule() 

1174 patient_task_schedule.patient_pk = patient.pk 

1175 patient_task_schedule.schedule_id = schedule1.id 

1176 patient_task_schedule.start_datetime = local(2020, 6, 12, 9) 

1177 patient_task_schedule.settings = { 

1178 "name 1": "value 1", 

1179 "name 2": "value 2", 

1180 "name 3": "value 3", 

1181 } 

1182 

1183 self.dbsession.add(patient_task_schedule) 

1184 

1185 patient_task_schedule = PatientTaskSchedule() 

1186 patient_task_schedule.patient_pk = patient.pk 

1187 patient_task_schedule.schedule_id = schedule3.id 

1188 

1189 self.dbsession.add(patient_task_schedule) 

1190 self.dbsession.commit() 

1191 

1192 self.req.add_get_params({ 

1193 ViewParam.SERVER_PK: patient.pk 

1194 }, set_method_get=False) 

1195 

1196 changed_schedule_1_settings = { 

1197 "name 1": "new value 1", 

1198 "name 2": "new value 2", 

1199 "name 3": "new value 3", 

1200 } 

1201 new_schedule_2_settings = { 

1202 "name 4": "value 4", 

1203 "name 5": "value 5", 

1204 "name 6": "value 6", 

1205 } 

1206 multidict = MultiDict([ 

1207 ("_charset_", "UTF-8"), 

1208 ("__formid__", "deform"), 

1209 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1210 (ViewParam.SERVER_PK, patient.pk), 

1211 (ViewParam.GROUP_ID, patient.group.id), 

1212 (ViewParam.FORENAME, patient.forename), 

1213 (ViewParam.SURNAME, patient.surname), 

1214 ("__start__", "dob:mapping"), 

1215 ("date", ""), 

1216 ("__end__", "dob:mapping"), 

1217 ("__start__", "sex:rename"), 

1218 ("deformField7", patient.sex), 

1219 ("__end__", "sex:rename"), 

1220 (ViewParam.ADDRESS, patient.address), 

1221 (ViewParam.GP, patient.gp), 

1222 (ViewParam.OTHER, patient.other), 

1223 ("__start__", "id_references:sequence"), 

1224 ("__start__", "idnum_sequence:mapping"), 

1225 (ViewParam.WHICH_IDNUM, self.nhs_iddef.which_idnum), 

1226 (ViewParam.IDNUM_VALUE, str(TEST_NHS_NUMBER_1)), 

1227 ("__end__", "idnum_sequence:mapping"), 

1228 ("__end__", "id_references:sequence"), 

1229 ("__start__", "danger:mapping"), 

1230 ("target", "7836"), 

1231 ("user_entry", "7836"), 

1232 ("__end__", "danger:mapping"), 

1233 ("__start__", "task_schedules:sequence"), 

1234 ("__start__", "task_schedule_sequence:mapping"), 

1235 ("schedule_id", schedule1.id), 

1236 ("__start__", "start_datetime:mapping"), 

1237 ("date", "2020-06-19"), 

1238 ("time", "08:00:00"), 

1239 ("__end__", "start_datetime:mapping"), 

1240 ("settings", json.dumps(changed_schedule_1_settings)), 

1241 ("__end__", "task_schedule_sequence:mapping"), 

1242 ("__start__", "task_schedule_sequence:mapping"), 

1243 ("schedule_id", schedule2.id), 

1244 ("__start__", "start_datetime:mapping"), 

1245 ("date", "2020-07-01"), 

1246 ("time", "13:45:00"), 

1247 ("__end__", "start_datetime:mapping"), 

1248 ("settings", json.dumps(new_schedule_2_settings)), 

1249 ("__end__", "task_schedule_sequence:mapping"), 

1250 ("__end__", "task_schedules:sequence"), 

1251 

1252 (FormAction.SUBMIT, "submit"), 

1253 ]) 

1254 

1255 self.req.fake_request_post_from_dict(multidict) 

1256 

1257 with self.assertRaises(HTTPFound): 

1258 edit_server_created_patient(self.req) 

1259 

1260 self.dbsession.commit() 

1261 

1262 schedules = {pts.task_schedule.name: pts 

1263 for pts in patient.task_schedules} 

1264 self.assertIn("Test 1", schedules) 

1265 self.assertIn("Test 2", schedules) 

1266 self.assertNotIn("Test 3", schedules) 

1267 

1268 self.assertEqual( 

1269 schedules["Test 1"].start_datetime, local(2020, 6, 19, 8) 

1270 ) 

1271 self.assertEqual( 

1272 schedules["Test 1"].settings, changed_schedule_1_settings, 

1273 ) 

1274 self.assertEqual( 

1275 schedules["Test 2"].start_datetime, 

1276 local(2020, 7, 1, 13, 45) 

1277 ) 

1278 self.assertEqual( 

1279 schedules["Test 2"].settings, new_schedule_2_settings, 

1280 ) 

1281 

1282 messages = self.req.session.peek_flash(FLASH_SUCCESS) 

1283 

1284 self.assertIn(f"Amended patient record with server PK {patient.pk}", 

1285 messages[0]) 

1286 self.assertIn("Test 2", messages[0]) 

1287 

1288 def test_changes_to_task_schedules(self) -> None: 

1289 patient = self.create_patient(sex="F", as_server_patient=True) 

1290 

1291 schedule1 = TaskSchedule() 

1292 schedule1.group_id = self.group.id 

1293 schedule1.name = "Test 1" 

1294 self.dbsession.add(schedule1) 

1295 schedule2 = TaskSchedule() 

1296 schedule2.group_id = self.group.id 

1297 schedule2.name = "Test 2" 

1298 self.dbsession.add(schedule2) 

1299 schedule3 = TaskSchedule() 

1300 schedule3.group_id = self.group.id 

1301 schedule3.name = "Test 3" 

1302 self.dbsession.add(schedule3) 

1303 self.dbsession.commit() 

1304 

1305 patient_task_schedule = PatientTaskSchedule() 

1306 patient_task_schedule.patient_pk = patient.pk 

1307 patient_task_schedule.schedule_id = schedule1.id 

1308 patient_task_schedule.start_datetime = local(2020, 6, 12, 12, 34) 

1309 

1310 schedule_1_settings = { 

1311 "name 1": "value 1", 

1312 "name 2": "value 2", 

1313 "name 3": "value 3", 

1314 } 

1315 

1316 patient_task_schedule.settings = schedule_1_settings 

1317 

1318 self.dbsession.add(patient_task_schedule) 

1319 

1320 patient_task_schedule = PatientTaskSchedule() 

1321 patient_task_schedule.patient_pk = patient.pk 

1322 schedule_3_settings = { 

1323 "name 1": "value 1", 

1324 } 

1325 patient_task_schedule.schedule_id = schedule3.id 

1326 patient_task_schedule.settings = schedule_3_settings 

1327 patient_task_schedule.start_datetime = local(2020, 7, 31, 13, 45) 

1328 

1329 self.dbsession.add(patient_task_schedule) 

1330 self.dbsession.commit() 

1331 

1332 # The patient starts on schedule 1 and schedule 3 

1333 view = EditServerCreatedPatientView(self.req) 

1334 view.object = patient 

1335 

1336 changes = OrderedDict() # type: OrderedDict 

1337 

1338 changed_schedule_1_settings = { 

1339 "name 1": "new value 1", 

1340 "name 2": "new value 2", 

1341 "name 3": "new value 3", 

1342 } 

1343 new_schedule_2_settings = { 

1344 "name 4": "value 4", 

1345 "name 5": "value 5", 

1346 "name 6": "value 6", 

1347 } 

1348 

1349 # We update schedule 1, add schedule 2 and (by its absence) delete 

1350 # schedule 3 

1351 appstruct = { 

1352 ViewParam.TASK_SCHEDULES: [ 

1353 { 

1354 ViewParam.SCHEDULE_ID: schedule1.id, 

1355 ViewParam.START_DATETIME: local( 

1356 2020, 6, 19, 0, 1 

1357 ), 

1358 ViewParam.SETTINGS: changed_schedule_1_settings, 

1359 }, 

1360 { 

1361 ViewParam.SCHEDULE_ID: schedule2.id, 

1362 ViewParam.START_DATETIME: local( 

1363 2020, 7, 1, 19, 2), 

1364 ViewParam.SETTINGS: new_schedule_2_settings, 

1365 } 

1366 ] 

1367 } 

1368 

1369 view._save_task_schedules(appstruct, changes) 

1370 

1371 expected_old_1 = (local(2020, 6, 12, 12, 34), 

1372 schedule_1_settings) 

1373 expected_new_1 = (local(2020, 6, 19, 0, 1), 

1374 changed_schedule_1_settings) 

1375 

1376 expected_old_2 = (None, None) 

1377 expected_new_2 = (local(2020, 7, 1, 19, 2), 

1378 new_schedule_2_settings) 

1379 

1380 expected_old_3 = (local(2020, 7, 31, 13, 45), 

1381 schedule_3_settings) 

1382 expected_new_3 = (None, None) 

1383 

1384 self.assertEqual(changes[f"schedule{schedule1.id} (Test 1)"], 

1385 (expected_old_1, expected_new_1)) 

1386 self.assertEqual(changes[f"schedule{schedule2.id} (Test 2)"], 

1387 (expected_old_2, expected_new_2)) 

1388 self.assertEqual(changes[f"schedule{schedule3.id} (Test 3)"], 

1389 (expected_old_3, expected_new_3)) 

1390 

1391 

1392class AddPatientViewTests(DemoDatabaseTestCase): 

1393 """ 

1394 Unit tests. 

1395 """ 

1396 def test_patient_created(self) -> None: 

1397 view = AddPatientView(self.req) 

1398 

1399 schedule1 = TaskSchedule() 

1400 schedule1.group_id = self.group.id 

1401 schedule1.name = "Test 1" 

1402 self.dbsession.add(schedule1) 

1403 

1404 schedule2 = TaskSchedule() 

1405 schedule2.group_id = self.group.id 

1406 schedule2.name = "Test 2" 

1407 self.dbsession.add(schedule2) 

1408 self.dbsession.commit() 

1409 

1410 start_datetime1 = local(2020, 6, 12) 

1411 start_datetime2 = local(2020, 7, 1) 

1412 

1413 settings1 = json.dumps({ 

1414 "name 1": "value 1", 

1415 "name 2": "value 2", 

1416 "name 3": "value 3", 

1417 }) 

1418 

1419 appstruct = { 

1420 ViewParam.GROUP_ID: self.group.id, 

1421 ViewParam.FORENAME: "Jo", 

1422 ViewParam.SURNAME: "Patient", 

1423 ViewParam.DOB: datetime.date(1958, 4, 19), 

1424 ViewParam.SEX: "F", 

1425 ViewParam.ADDRESS: "Address", 

1426 ViewParam.EMAIL: "jopatient@example.com", 

1427 ViewParam.GP: "GP", 

1428 ViewParam.OTHER: "Other", 

1429 ViewParam.ID_REFERENCES: [{ 

1430 ViewParam.WHICH_IDNUM: self.nhs_iddef.which_idnum, 

1431 ViewParam.IDNUM_VALUE: 1192220552, 

1432 }], 

1433 ViewParam.TASK_SCHEDULES: [ 

1434 { 

1435 ViewParam.SCHEDULE_ID: schedule1.id, 

1436 ViewParam.START_DATETIME: start_datetime1, 

1437 ViewParam.SETTINGS: settings1, 

1438 }, 

1439 { 

1440 ViewParam.SCHEDULE_ID: schedule2.id, 

1441 ViewParam.START_DATETIME: start_datetime2, 

1442 ViewParam.SETTINGS: {}, 

1443 }, 

1444 ], 

1445 } 

1446 

1447 view.save_object(appstruct) 

1448 

1449 patient = cast(Patient, view.object) 

1450 

1451 server_device = Device.get_server_device( 

1452 self.req.dbsession 

1453 ) 

1454 

1455 self.assertEqual(patient.id, 1) 

1456 self.assertEqual(patient.device_id, server_device.id) 

1457 self.assertEqual(patient.era, ERA_NOW) 

1458 self.assertEqual(patient.group.id, self.group.id) 

1459 

1460 self.assertEqual(patient.forename, "Jo") 

1461 self.assertEqual(patient.surname, "Patient") 

1462 self.assertEqual(patient.dob.isoformat(), "1958-04-19") 

1463 self.assertEqual(patient.sex, "F") 

1464 self.assertEqual(patient.address, "Address") 

1465 self.assertEqual(patient.email, "jopatient@example.com") 

1466 self.assertEqual(patient.gp, "GP") 

1467 self.assertEqual(patient.other, "Other") 

1468 

1469 idnum = patient.get_idnum_objects()[0] 

1470 self.assertEqual(idnum.patient_id, 1) 

1471 self.assertEqual(idnum.which_idnum, self.nhs_iddef.which_idnum) 

1472 self.assertEqual(idnum.idnum_value, 1192220552) 

1473 

1474 patient_task_schedules = { 

1475 pts.task_schedule.name: pts for pts in patient.task_schedules 

1476 } 

1477 

1478 self.assertIn("Test 1", patient_task_schedules) 

1479 self.assertIn("Test 2", patient_task_schedules) 

1480 

1481 self.assertEqual( 

1482 patient_task_schedules["Test 1"].start_datetime, 

1483 start_datetime1 

1484 ) 

1485 self.assertEqual( 

1486 patient_task_schedules["Test 1"].settings, 

1487 settings1 

1488 ) 

1489 self.assertEqual( 

1490 patient_task_schedules["Test 2"].start_datetime, 

1491 start_datetime2 

1492 ) 

1493 

1494 def test_patient_takes_next_available_id(self) -> None: 

1495 self.create_patient(id=1234, as_server_patient=True) 

1496 

1497 view = AddPatientView(self.req) 

1498 

1499 appstruct = { 

1500 ViewParam.GROUP_ID: self.group.id, 

1501 ViewParam.FORENAME: "Jo", 

1502 ViewParam.SURNAME: "Patient", 

1503 ViewParam.DOB: datetime.date(1958, 4, 19), 

1504 ViewParam.SEX: "F", 

1505 ViewParam.ADDRESS: "Address", 

1506 ViewParam.GP: "GP", 

1507 ViewParam.OTHER: "Other", 

1508 ViewParam.ID_REFERENCES: [{ 

1509 ViewParam.WHICH_IDNUM: self.nhs_iddef.which_idnum, 

1510 ViewParam.IDNUM_VALUE: 1192220552, 

1511 }], 

1512 ViewParam.TASK_SCHEDULES: [ 

1513 ], 

1514 } 

1515 

1516 view.save_object(appstruct) 

1517 

1518 patient = cast(Patient, view.object) 

1519 

1520 self.assertEqual(patient.id, 1235) 

1521 

1522 def test_form_rendered_with_values(self) -> None: 

1523 view = AddPatientView(self.req) 

1524 

1525 with mock.patch.object(view, "render_to_response") as mock_render: 

1526 view.dispatch() 

1527 

1528 args, kwargs = mock_render.call_args 

1529 

1530 context = args[0] 

1531 

1532 self.assertIn("form", context) 

1533 

1534 

1535class DeleteServerCreatedPatientViewTests(DemoDatabaseTestCase): 

1536 """ 

1537 Unit tests. 

1538 """ 

1539 def setUp(self) -> None: 

1540 super().setUp() 

1541 

1542 self.patient = self.create_patient( 

1543 as_server_patient=True, 

1544 forename="Jo", surname="Patient", 

1545 dob=datetime.date(1958, 4, 19), 

1546 sex="F", address="Address", gp="GP", other="Other" 

1547 ) 

1548 

1549 patient_pk = self.patient.pk 

1550 

1551 idnum = self.create_patient_idnum( 

1552 as_server_patient=True, 

1553 patient_id=self.patient.id, 

1554 which_idnum=self.nhs_iddef.which_idnum, 

1555 idnum_value=TEST_NHS_NUMBER_1 

1556 ) 

1557 

1558 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession) 

1559 

1560 self.schedule = TaskSchedule() 

1561 self.schedule.group_id = self.group.id 

1562 self.schedule.name = "Test 1" 

1563 self.dbsession.add(self.schedule) 

1564 self.dbsession.commit() 

1565 

1566 pts = PatientTaskSchedule() 

1567 pts.patient_pk = patient_pk 

1568 pts.schedule_id = self.schedule.id 

1569 self.dbsession.add(pts) 

1570 self.dbsession.commit() 

1571 

1572 self.multidict = MultiDict([ 

1573 ("_charset_", "UTF-8"), 

1574 ("__formid__", "deform"), 

1575 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1576 ("confirm_1_t", "true"), 

1577 ("confirm_2_t", "true"), 

1578 ("confirm_4_t", "true"), 

1579 ("__start__", "danger:mapping"), 

1580 ("target", "7176"), 

1581 ("user_entry", "7176"), 

1582 ("__end__", "danger:mapping"), 

1583 ("delete", "delete"), 

1584 (FormAction.DELETE, "delete"), 

1585 ]) 

1586 

1587 def create_tasks(self) -> None: 

1588 # speed things up a bit 

1589 pass 

1590 

1591 def test_patient_schedule_and_idnums_deleted(self) -> None: 

1592 self.req.fake_request_post_from_dict(self.multidict) 

1593 

1594 patient_pk = self.patient.pk 

1595 self.req.add_get_params({ 

1596 ViewParam.SERVER_PK: patient_pk 

1597 }, set_method_get=False) 

1598 view = DeleteServerCreatedPatientView(self.req) 

1599 

1600 with self.assertRaises(HTTPFound) as e: 

1601 view.dispatch() 

1602 

1603 self.assertEqual(e.exception.status_code, 302) 

1604 self.assertIn( 

1605 "view_patient_task_schedules", 

1606 e.exception.headers["Location"] 

1607 ) 

1608 

1609 deleted_patient = self.dbsession.query(Patient).filter( 

1610 Patient._pk == patient_pk).one_or_none() 

1611 

1612 self.assertIsNone(deleted_patient) 

1613 

1614 pts = self.dbsession.query(PatientTaskSchedule).filter( 

1615 PatientTaskSchedule.patient_pk == patient_pk).one_or_none() 

1616 

1617 self.assertIsNone(pts) 

1618 

1619 idnum = self.dbsession.query(PatientIdNum).filter( 

1620 PatientIdNum.patient_id == self.patient.id, 

1621 PatientIdNum._device_id == self.patient.device_id, 

1622 PatientIdNum._era == self.patient.era, 

1623 PatientIdNum._current == True # noqa: E712 

1624 ).one_or_none() 

1625 

1626 self.assertIsNone(idnum) 

1627 

1628 def test_registered_patient_deleted(self) -> None: 

1629 from camcops_server.cc_modules.client_api import ( 

1630 get_or_create_single_user, 

1631 ) 

1632 user1, _ = get_or_create_single_user(self.req, "test", self.patient) 

1633 self.assertEqual(user1.single_patient, self.patient) 

1634 

1635 user2, _ = get_or_create_single_user(self.req, "test", self.patient) 

1636 self.assertEqual(user2.single_patient, self.patient) 

1637 

1638 self.req.fake_request_post_from_dict(self.multidict) 

1639 

1640 patient_pk = self.patient.pk 

1641 self.req.add_get_params({ 

1642 ViewParam.SERVER_PK: patient_pk 

1643 }, set_method_get=False) 

1644 view = DeleteServerCreatedPatientView(self.req) 

1645 

1646 with self.assertRaises(HTTPFound): 

1647 view.dispatch() 

1648 

1649 self.dbsession.commit() 

1650 

1651 deleted_patient = self.dbsession.query(Patient).filter( 

1652 Patient._pk == patient_pk).one_or_none() 

1653 

1654 self.assertIsNone(deleted_patient) 

1655 

1656 # TODO: We get weird behaviour when all the tests are run together 

1657 # (fine for --test_class=DeleteServerCreatedPatientViewTests) 

1658 # the assertion below fails with sqlite in spite of the commit() 

1659 # above. 

1660 

1661 # user = self.dbsession.query(User).filter( 

1662 # User.id == user1.id).one_or_none() 

1663 # self.assertIsNone(user.single_patient_pk) 

1664 

1665 # user = self.dbsession.query(User).filter( 

1666 # User.id == user2.id).one_or_none() 

1667 # self.assertIsNone(user.single_patient_pk) 

1668 

1669 def test_unrelated_patient_unaffected(self) -> None: 

1670 other_patient = self.create_patient( 

1671 as_server_patient=True, 

1672 forename="Mo", surname="Patient", 

1673 dob=datetime.date(1968, 11, 30), 

1674 sex="M", address="Address", gp="GP", other="Other" 

1675 ) 

1676 patient_pk = other_patient._pk 

1677 

1678 saved_patient = self.dbsession.query(Patient).filter( 

1679 Patient._pk == patient_pk).one_or_none() 

1680 

1681 self.assertIsNotNone(saved_patient) 

1682 

1683 idnum = self.create_patient_idnum( 

1684 as_server_patient=True, 

1685 patient_id=other_patient.id, 

1686 which_idnum=self.nhs_iddef.which_idnum, 

1687 idnum_value=TEST_NHS_NUMBER_2 

1688 ) 

1689 

1690 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession) 

1691 

1692 saved_idnum = self.dbsession.query(PatientIdNum).filter( 

1693 PatientIdNum.patient_id == other_patient.id, 

1694 PatientIdNum._device_id == other_patient.device_id, 

1695 PatientIdNum._era == other_patient.era, 

1696 PatientIdNum._current == True # noqa: E712 

1697 ).one_or_none() 

1698 

1699 self.assertIsNotNone(saved_idnum) 

1700 

1701 pts = PatientTaskSchedule() 

1702 pts.patient_pk = patient_pk 

1703 pts.schedule_id = self.schedule.id 

1704 self.dbsession.add(pts) 

1705 self.dbsession.commit() 

1706 

1707 self.req.fake_request_post_from_dict(self.multidict) 

1708 

1709 self.req.add_get_params({ 

1710 ViewParam.SERVER_PK: self.patient._pk 

1711 }, set_method_get=False) 

1712 view = DeleteServerCreatedPatientView(self.req) 

1713 

1714 with self.assertRaises(HTTPFound): 

1715 view.dispatch() 

1716 

1717 saved_patient = self.dbsession.query(Patient).filter( 

1718 Patient._pk == patient_pk).one_or_none() 

1719 

1720 self.assertIsNotNone(saved_patient) 

1721 

1722 saved_pts = self.dbsession.query(PatientTaskSchedule).filter( 

1723 PatientTaskSchedule.patient_pk == patient_pk).one_or_none() 

1724 

1725 self.assertIsNotNone(saved_pts) 

1726 

1727 saved_idnum = self.dbsession.query(PatientIdNum).filter( 

1728 PatientIdNum.patient_id == other_patient.id, 

1729 PatientIdNum._device_id == other_patient.device_id, 

1730 PatientIdNum._era == other_patient.era, 

1731 PatientIdNum._current == True # noqa: E712 

1732 ).one_or_none() 

1733 

1734 self.assertIsNotNone(saved_idnum) 

1735 

1736 

1737class EraseTaskTestCase(DemoDatabaseTestCase): 

1738 """ 

1739 Unit tests. 

1740 """ 

1741 def create_tasks(self) -> None: 

1742 from camcops_server.tasks.bmi import Bmi 

1743 

1744 self.task = Bmi() 

1745 self.task.id = 1 

1746 self.apply_standard_task_fields(self.task) 

1747 patient = self.create_patient_with_one_idnum() 

1748 self.task.patient_id = patient.id 

1749 

1750 self.dbsession.add(self.task) 

1751 self.dbsession.commit() 

1752 

1753 

1754class EraseTaskLeavingPlaceholderViewTests(EraseTaskTestCase): 

1755 """ 

1756 Unit tests. 

1757 """ 

1758 def test_displays_form(self) -> None: 

1759 self.req.add_get_params({ 

1760 ViewParam.SERVER_PK: self.task.pk, 

1761 ViewParam.TABLE_NAME: self.task.tablename, 

1762 }, set_method_get=False) 

1763 view = EraseTaskLeavingPlaceholderView(self.req) 

1764 

1765 with mock.patch.object(view, "render_to_response") as mock_render: 

1766 view.dispatch() 

1767 

1768 args, kwargs = mock_render.call_args 

1769 context = args[0] 

1770 

1771 self.assertIn("form", context) 

1772 

1773 def test_deletes_task_leaving_placeholder(self) -> None: 

1774 multidict = MultiDict([ 

1775 ("_charset_", "UTF-8"), 

1776 ("__formid__", "deform"), 

1777 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1778 (ViewParam.SERVER_PK, self.task.pk), 

1779 (ViewParam.TABLE_NAME, self.task.tablename), 

1780 ("confirm_1_t", "true"), 

1781 ("confirm_2_t", "true"), 

1782 ("confirm_4_t", "true"), 

1783 ("__start__", "danger:mapping"), 

1784 ("target", "7176"), 

1785 ("user_entry", "7176"), 

1786 ("__end__", "danger:mapping"), 

1787 ("delete", "delete"), 

1788 (FormAction.DELETE, "delete"), 

1789 ]) 

1790 

1791 self.req.fake_request_post_from_dict(multidict) 

1792 

1793 view = EraseTaskLeavingPlaceholderView(self.req) 

1794 with mock.patch.object(self.task, 

1795 "manually_erase") as mock_manually_erase: 

1796 

1797 with self.assertRaises(HTTPFound): 

1798 view.dispatch() 

1799 

1800 mock_manually_erase.assert_called_once() 

1801 args, kwargs = mock_manually_erase.call_args 

1802 request = args[0] 

1803 

1804 self.assertEqual(request, self.req) 

1805 

1806 def test_task_not_deleted_on_cancel(self) -> None: 

1807 self.req.fake_request_post_from_dict({ 

1808 FormAction.CANCEL: "cancel" 

1809 }) 

1810 

1811 self.req.add_get_params({ 

1812 ViewParam.SERVER_PK: self.task.pk, 

1813 ViewParam.TABLE_NAME: self.task.tablename, 

1814 }, set_method_get=False) 

1815 view = EraseTaskLeavingPlaceholderView(self.req) 

1816 

1817 with self.assertRaises(HTTPFound): 

1818 view.dispatch() 

1819 

1820 task = self.dbsession.query(self.task.__class__).one_or_none() 

1821 

1822 self.assertIsNotNone(task) 

1823 

1824 def test_redirect_on_cancel(self) -> None: 

1825 self.req.fake_request_post_from_dict({ 

1826 FormAction.CANCEL: "cancel" 

1827 }) 

1828 

1829 self.req.add_get_params({ 

1830 ViewParam.SERVER_PK: self.task.pk, 

1831 ViewParam.TABLE_NAME: self.task.tablename, 

1832 }, set_method_get=False) 

1833 view = EraseTaskLeavingPlaceholderView(self.req) 

1834 

1835 with self.assertRaises(HTTPFound) as cm: 

1836 view.dispatch() 

1837 

1838 self.assertEqual(cm.exception.status_code, 302) 

1839 self.assertIn( 

1840 "/task", cm.exception.headers["Location"] 

1841 ) 

1842 self.assertIn( 

1843 "table_name={}".format(self.task.tablename), 

1844 cm.exception.headers["Location"] 

1845 ) 

1846 self.assertIn( 

1847 "server_pk={}".format(self.task.pk), 

1848 cm.exception.headers["Location"] 

1849 ) 

1850 self.assertIn("viewtype=html", cm.exception.headers["Location"]) 

1851 

1852 def test_raises_when_task_does_not_exist(self) -> None: 

1853 self.req.add_get_params({ 

1854 ViewParam.SERVER_PK: "123", 

1855 ViewParam.TABLE_NAME: "phq9", 

1856 }, set_method_get=False) 

1857 view = EraseTaskLeavingPlaceholderView(self.req) 

1858 

1859 with self.assertRaises(HTTPBadRequest) as cm: 

1860 view.dispatch() 

1861 

1862 self.assertEqual( 

1863 cm.exception.message, 

1864 "No such task: phq9, PK=123" 

1865 ) 

1866 

1867 def test_raises_when_task_is_live_on_tablet(self) -> None: 

1868 self.task._era = ERA_NOW 

1869 self.dbsession.add(self.task) 

1870 self.dbsession.commit() 

1871 

1872 self.req.add_get_params({ 

1873 ViewParam.SERVER_PK: self.task.pk, 

1874 ViewParam.TABLE_NAME: self.task.tablename, 

1875 }, set_method_get=False) 

1876 view = EraseTaskLeavingPlaceholderView(self.req) 

1877 

1878 with self.assertRaises(HTTPBadRequest) as cm: 

1879 view.dispatch() 

1880 

1881 self.assertIn( 

1882 "Task is live on tablet", 

1883 cm.exception.message 

1884 ) 

1885 

1886 def test_raises_when_user_not_authorized_to_erase(self) -> None: 

1887 with mock.patch.object(self.user, "authorized_to_erase_tasks", 

1888 return_value=False): 

1889 

1890 self.req.add_get_params({ 

1891 ViewParam.SERVER_PK: self.task.pk, 

1892 ViewParam.TABLE_NAME: self.task.tablename, 

1893 }, set_method_get=False) 

1894 view = EraseTaskLeavingPlaceholderView(self.req) 

1895 

1896 with self.assertRaises(HTTPBadRequest) as cm: 

1897 view.dispatch() 

1898 

1899 self.assertIn( 

1900 "Not authorized to erase tasks", 

1901 cm.exception.message 

1902 ) 

1903 

1904 def test_raises_when_task_already_erased(self) -> None: 

1905 self.task._manually_erased = True 

1906 self.dbsession.add(self.task) 

1907 self.dbsession.commit() 

1908 

1909 self.req.add_get_params({ 

1910 ViewParam.SERVER_PK: self.task.pk, 

1911 ViewParam.TABLE_NAME: self.task.tablename, 

1912 }, set_method_get=False) 

1913 view = EraseTaskLeavingPlaceholderView(self.req) 

1914 

1915 with self.assertRaises(HTTPBadRequest) as cm: 

1916 view.dispatch() 

1917 

1918 self.assertIn( 

1919 "already erased", 

1920 cm.exception.message 

1921 ) 

1922 

1923 

1924class EraseTaskEntirelyViewTests(EraseTaskTestCase): 

1925 """ 

1926 Unit tests. 

1927 """ 

1928 def test_deletes_task_entirely(self) -> None: 

1929 multidict = MultiDict([ 

1930 ("_charset_", "UTF-8"), 

1931 ("__formid__", "deform"), 

1932 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1933 (ViewParam.SERVER_PK, self.task.pk), 

1934 (ViewParam.TABLE_NAME, self.task.tablename), 

1935 ("confirm_1_t", "true"), 

1936 ("confirm_2_t", "true"), 

1937 ("confirm_4_t", "true"), 

1938 ("__start__", "danger:mapping"), 

1939 ("target", "7176"), 

1940 ("user_entry", "7176"), 

1941 ("__end__", "danger:mapping"), 

1942 ("delete", "delete"), 

1943 (FormAction.DELETE, "delete"), 

1944 ]) 

1945 

1946 self.req.fake_request_post_from_dict(multidict) 

1947 

1948 view = EraseTaskEntirelyView(self.req) 

1949 

1950 with mock.patch.object(self.task, 

1951 "delete_entirely") as mock_delete_entirely: 

1952 

1953 with self.assertRaises(HTTPFound): 

1954 view.dispatch() 

1955 

1956 mock_delete_entirely.assert_called_once() 

1957 args, kwargs = mock_delete_entirely.call_args 

1958 request = args[0] 

1959 

1960 self.assertEqual(request, self.req) 

1961 

1962 messages = self.req.session.peek_flash(FLASH_SUCCESS) 

1963 self.assertTrue(len(messages) > 0) 

1964 

1965 self.assertIn("Task erased", messages[0]) 

1966 self.assertIn(self.task.tablename, messages[0]) 

1967 self.assertIn("server PK {}".format(self.task.pk), messages[0]) 

1968 

1969 

1970class EditGroupViewTests(DemoDatabaseTestCase): 

1971 """ 

1972 Unit tests. 

1973 """ 

1974 def test_group_updated(self) -> None: 

1975 other_group_1 = Group() 

1976 other_group_1.name = "other-group-1" 

1977 self.dbsession.add(other_group_1) 

1978 

1979 other_group_2 = Group() 

1980 other_group_2.name = "other-group-2" 

1981 self.dbsession.add(other_group_2) 

1982 

1983 self.dbsession.commit() 

1984 

1985 multidict = MultiDict([ 

1986 ("_charset_", "UTF-8"), 

1987 ("__formid__", "deform"), 

1988 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1989 (ViewParam.GROUP_ID, self.group.id), 

1990 (ViewParam.NAME, "new-name"), 

1991 (ViewParam.DESCRIPTION, "new description"), 

1992 (ViewParam.UPLOAD_POLICY, "anyidnum AND sex"), # reversed 

1993 (ViewParam.FINALIZE_POLICY, "idnum1 AND sex"), # reversed 

1994 ("__start__", "group_ids:sequence"), 

1995 ("group_id_sequence", str(other_group_1.id)), 

1996 ("group_id_sequence", str(other_group_2.id)), 

1997 ("__end__", "group_ids:sequence"), 

1998 (FormAction.SUBMIT, "submit"), 

1999 ]) 

2000 self.req.fake_request_post_from_dict(multidict) 

2001 

2002 with self.assertRaises(HTTPFound): 

2003 edit_group(self.req) 

2004 

2005 self.assertEqual(self.group.name, "new-name") 

2006 self.assertEqual(self.group.description, "new description") 

2007 self.assertEqual(self.group.upload_policy, "anyidnum AND sex") 

2008 self.assertEqual(self.group.finalize_policy, "idnum1 AND sex") 

2009 self.assertIn(other_group_1, self.group.can_see_other_groups) 

2010 self.assertIn(other_group_2, self.group.can_see_other_groups) 

2011 

2012 def test_ip_use_added(self) -> None: 

2013 from camcops_server.cc_modules.cc_ipuse import IpContexts 

2014 multidict = MultiDict([ 

2015 ("_charset_", "UTF-8"), 

2016 ("__formid__", "deform"), 

2017 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

2018 (ViewParam.GROUP_ID, self.group.id), 

2019 (ViewParam.NAME, "new-name"), 

2020 (ViewParam.DESCRIPTION, "new description"), 

2021 (ViewParam.UPLOAD_POLICY, "anyidnum AND sex"), 

2022 (ViewParam.FINALIZE_POLICY, "idnum1 AND sex"), 

2023 ("__start__", "ip_use:mapping"), 

2024 (IpContexts.CLINICAL, "true"), 

2025 (IpContexts.COMMERCIAL, "true"), 

2026 ("__end__", "ip_use:mapping"), 

2027 (FormAction.SUBMIT, "submit"), 

2028 ]) 

2029 self.req.fake_request_post_from_dict(multidict) 

2030 

2031 with self.assertRaises(HTTPFound): 

2032 edit_group(self.req) 

2033 

2034 self.assertTrue(self.group.ip_use.clinical) 

2035 self.assertTrue(self.group.ip_use.commercial) 

2036 self.assertFalse(self.group.ip_use.educational) 

2037 self.assertFalse(self.group.ip_use.research) 

2038 

2039 def test_ip_use_updated(self) -> None: 

2040 from camcops_server.cc_modules.cc_ipuse import IpContexts 

2041 self.group.ip_use.educational = True 

2042 self.group.ip_use.research = True 

2043 self.dbsession.add(self.group.ip_use) 

2044 self.dbsession.commit() 

2045 

2046 old_id = self.group.ip_use.id 

2047 

2048 multidict = MultiDict([ 

2049 ("_charset_", "UTF-8"), 

2050 ("__formid__", "deform"), 

2051 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

2052 (ViewParam.GROUP_ID, self.group.id), 

2053 (ViewParam.NAME, "new-name"), 

2054 (ViewParam.DESCRIPTION, "new description"), 

2055 (ViewParam.UPLOAD_POLICY, "anyidnum AND sex"), 

2056 (ViewParam.FINALIZE_POLICY, "idnum1 AND sex"), 

2057 ("__start__", "ip_use:mapping"), 

2058 (IpContexts.CLINICAL, "true"), 

2059 (IpContexts.COMMERCIAL, "true"), 

2060 ("__end__", "ip_use:mapping"), 

2061 (FormAction.SUBMIT, "submit"), 

2062 ]) 

2063 self.req.fake_request_post_from_dict(multidict) 

2064 

2065 with self.assertRaises(HTTPFound): 

2066 edit_group(self.req) 

2067 

2068 self.assertTrue(self.group.ip_use.clinical) 

2069 self.assertTrue(self.group.ip_use.commercial) 

2070 self.assertFalse(self.group.ip_use.educational) 

2071 self.assertFalse(self.group.ip_use.research) 

2072 self.assertEqual(self.group.ip_use.id, old_id) 

2073 

2074 def test_other_groups_displayed_in_form(self) -> None: 

2075 z_group = Group() 

2076 z_group.name = "z-group" 

2077 self.dbsession.add(z_group) 

2078 

2079 a_group = Group() 

2080 a_group.name = "a-group" 

2081 self.dbsession.add(a_group) 

2082 self.dbsession.commit() 

2083 

2084 other_groups = Group.get_groups_from_id_list( 

2085 self.dbsession, [z_group.id, a_group.id] 

2086 ) 

2087 self.group.can_see_other_groups = other_groups 

2088 

2089 self.dbsession.add(self.group) 

2090 self.dbsession.commit() 

2091 

2092 view = EditGroupView(self.req) 

2093 view.object = self.group 

2094 

2095 form_values = view.get_form_values() 

2096 

2097 self.assertEqual( 

2098 form_values[ViewParam.GROUP_IDS], [a_group.id, z_group.id] 

2099 ) 

2100 

2101 def test_group_id_displayed_in_form(self) -> None: 

2102 view = EditGroupView(self.req) 

2103 view.object = self.group 

2104 

2105 form_values = view.get_form_values() 

2106 

2107 self.assertEqual( 

2108 form_values[ViewParam.GROUP_ID], self.group.id 

2109 ) 

2110 

2111 def test_ip_use_displayed_in_form(self) -> None: 

2112 view = EditGroupView(self.req) 

2113 view.object = self.group 

2114 

2115 form_values = view.get_form_values() 

2116 

2117 self.assertEqual( 

2118 form_values[ViewParam.IP_USE], self.group.ip_use 

2119 )