Coverage for cc_modules/tests/webview_tests.py: 15%

1952 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2025-01-31 11:49 +0000

1""" 

2camcops_server/cc_modules/tests/webview_tests.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25""" 

26 

27from collections import OrderedDict 

28import datetime 

29import json 

30import logging 

31import time 

32from typing import cast 

33import unittest 

34from unittest import mock 

35from urllib.parse import urlparse 

36 

37from cardinal_pythonlib.classes import class_attribute_names 

38from cardinal_pythonlib.httpconst import MimeType 

39from pendulum import Duration, local 

40import phonenumbers 

41import pyotp 

42from pyramid.httpexceptions import HTTPBadRequest, HTTPFound 

43from webob.multidict import MultiDict 

44 

45from camcops_server.cc_modules.cc_constants import ( 

46 ERA_NOW, 

47 MfaMethod, 

48 SmsBackendNames, 

49) 

50from camcops_server.cc_modules.cc_device import Device 

51from camcops_server.cc_modules.cc_group import Group 

52from camcops_server.cc_modules.cc_patient import Patient 

53from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

54from camcops_server.cc_modules.cc_pyramid import ( 

55 FlashQueue, 

56 FormAction, 

57 Routes, 

58 ViewArg, 

59 ViewParam, 

60) 

61from camcops_server.cc_modules.cc_sms import ConsoleSmsBackend, get_sms_backend 

62from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry 

63from camcops_server.cc_modules.cc_taskschedule import ( 

64 PatientTaskSchedule, 

65 TaskSchedule, 

66 TaskScheduleItem, 

67) 

68from camcops_server.cc_modules.cc_testfactories import ( 

69 AnyIdNumGroupFactory, 

70 DeviceFactory, 

71 Fake, 

72 GroupFactory, 

73 NHSIdNumDefinitionFactory, 

74 NHSPatientIdNumFactory, 

75 PatientFactory, 

76 PatientTaskScheduleFactory, 

77 RioIdNumDefinitionFactory, 

78 ServerCreatedNHSPatientIdNumFactory, 

79 ServerCreatedPatientFactory, 

80 StudyPatientIdNumFactory, 

81 TaskScheduleFactory, 

82 TaskScheduleItemFactory, 

83 UserFactory, 

84 UserGroupMembershipFactory, 

85) 

86from camcops_server.cc_modules.cc_unittest import ( 

87 BasicDatabaseTestCase, 

88 DemoDatabaseTestCase, 

89 DemoRequestTestCase, 

90) 

91from camcops_server.cc_modules.cc_user import ( 

92 SecurityAccountLockout, 

93 SecurityLoginFailure, 

94 User, 

95) 

96from camcops_server.cc_modules.cc_validators import ( 

97 validate_alphanum_underscore, 

98) 

99from camcops_server.cc_modules.cc_view_classes import FormWizardMixin 

100from camcops_server.tasks.tests.factories import BmiFactory 

101from camcops_server.cc_modules.tests.cc_view_classes_tests import ( 

102 TestStateMixin, 

103) 

104from camcops_server.cc_modules.webview import ( 

105 add_patient, 

106 add_user, 

107 AddPatientView, 

108 AddTaskScheduleItemView, 

109 AddTaskScheduleView, 

110 any_records_use_group, 

111 change_own_password, 

112 ChangeOtherPasswordView, 

113 ChangeOwnPasswordView, 

114 DeleteServerCreatedPatientView, 

115 DeleteTaskScheduleItemView, 

116 DeleteTaskScheduleView, 

117 edit_finalized_patient, 

118 edit_group, 

119 edit_server_created_patient, 

120 edit_user, 

121 edit_user_group_membership, 

122 EditFinalizedPatientView, 

123 EditGroupView, 

124 EditOtherUserMfaView, 

125 EditOwnUserMfaView, 

126 EditServerCreatedPatientView, 

127 EditTaskScheduleItemView, 

128 EditTaskScheduleView, 

129 EditUserGroupAdminView, 

130 EraseTaskEntirelyView, 

131 EraseTaskLeavingPlaceholderView, 

132 forcibly_finalize, 

133 LoginView, 

134 MfaMixin, 

135 SendEmailFromPatientTaskScheduleView, 

136) 

137 

138log = logging.getLogger(__name__) 

139 

140 

141# ============================================================================= 

142# Unit testing 

143# ============================================================================= 

144 

145UTF8 = "utf-8" 

146 

147 

148class WebviewTests(DemoDatabaseTestCase): 

149 def test_any_records_use_group_true(self) -> None: 

150 # All tasks created in DemoDatabaseTestCase will be in this group 

151 self.assertTrue( 

152 any_records_use_group(self.req, self.demo_database_group) 

153 ) 

154 

155 def test_any_records_use_group_false(self) -> None: 

156 """ 

157 If this fails with: 

158 sqlalchemy.exc.InvalidRequestError: SQL expression, column, or mapped 

159 entity expected - got <name of task base class> 

160 then the base class probably needs to be declared __abstract__. See 

161 DiagnosisItemBase as an example. 

162 """ 

163 group = GroupFactory() 

164 

165 self.assertFalse(any_records_use_group(self.req, group)) 

166 

167 def test_webview_constant_validators(self) -> None: 

168 for x in class_attribute_names(ViewArg): 

169 try: 

170 validate_alphanum_underscore(x, self.req) 

171 except ValueError: 

172 self.fail(f"Operations.{x} fails validate_alphanum_underscore") 

173 

174 

175class AddTaskScheduleViewTests(BasicDatabaseTestCase): 

176 def test_schedule_form_displayed(self) -> None: 

177 view = AddTaskScheduleView(self.req) 

178 

179 response = view.dispatch() 

180 self.assertEqual(response.status_code, 200) 

181 self.assertEqual(response.body.decode(UTF8).count("<form"), 1) 

182 

183 def test_schedule_is_created(self) -> None: 

184 multidict = MultiDict( 

185 [ 

186 ("_charset_", UTF8), 

187 ("__formid__", "deform"), 

188 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

189 (ViewParam.NAME, "MOJO"), 

190 (ViewParam.GROUP_ID, self.group.id), 

191 (ViewParam.EMAIL_FROM, "server@example.com"), 

192 (ViewParam.EMAIL_CC, "cc@example.com"), 

193 (ViewParam.EMAIL_BCC, "bcc@example.com"), 

194 (ViewParam.EMAIL_SUBJECT, "Subject"), 

195 (ViewParam.EMAIL_TEMPLATE, "Email template"), 

196 (FormAction.SUBMIT, "submit"), 

197 ] 

198 ) 

199 

200 self.req.fake_request_post_from_dict(multidict) 

201 

202 view = AddTaskScheduleView(self.req) 

203 

204 with self.assertRaises(HTTPFound) as e: 

205 view.dispatch() 

206 

207 schedule = self.dbsession.query(TaskSchedule).one() 

208 

209 self.assertEqual(schedule.name, "MOJO") 

210 self.assertEqual(schedule.email_from, "server@example.com") 

211 self.assertEqual(schedule.email_bcc, "bcc@example.com") 

212 self.assertEqual(schedule.email_subject, "Subject") 

213 self.assertEqual(schedule.email_template, "Email template") 

214 

215 self.assertEqual(e.exception.status_code, 302) 

216 self.assertIn( 

217 Routes.VIEW_TASK_SCHEDULES, e.exception.headers["Location"] 

218 ) 

219 

220 

221class EditTaskScheduleViewTests(DemoRequestTestCase): 

222 def test_schedule_name_can_be_updated(self) -> None: 

223 user = self.req._debugging_user = UserFactory() 

224 group = GroupFactory() 

225 UserGroupMembershipFactory( 

226 group_id=group.id, user_id=user.id, groupadmin=True 

227 ) 

228 

229 schedule = TaskScheduleFactory(group=group) 

230 multidict = MultiDict( 

231 [ 

232 ("_charset_", UTF8), 

233 ("__formid__", "deform"), 

234 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

235 (ViewParam.NAME, "MOJO"), 

236 (ViewParam.GROUP_ID, group.id), 

237 (FormAction.SUBMIT, "submit"), 

238 ] 

239 ) 

240 

241 self.req.fake_request_post_from_dict(multidict) 

242 self.req.add_get_params( 

243 {ViewParam.SCHEDULE_ID: str(schedule.id)}, 

244 set_method_get=False, 

245 ) 

246 

247 view = EditTaskScheduleView(self.req) 

248 

249 with self.assertRaises(HTTPFound) as e: 

250 view.dispatch() 

251 

252 schedule = self.dbsession.query(TaskSchedule).one() 

253 

254 self.assertEqual(schedule.name, "MOJO") 

255 

256 self.assertEqual(e.exception.status_code, 302) 

257 self.assertIn( 

258 Routes.VIEW_TASK_SCHEDULES, e.exception.headers["Location"] 

259 ) 

260 

261 def test_group_a_schedule_cannot_be_edited_by_group_b_admin(self) -> None: 

262 group_a = GroupFactory() 

263 group_b = GroupFactory() 

264 

265 group_a_schedule = TaskScheduleFactory(group=group_a) 

266 

267 group_b_user = UserFactory() 

268 UserGroupMembershipFactory( 

269 group_id=group_b.id, user_id=group_b_user.id, groupadmin=True 

270 ) 

271 self.req._debugging_user = group_b_user 

272 

273 multidict = MultiDict( 

274 [ 

275 ("_charset_", UTF8), 

276 ("__formid__", "deform"), 

277 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

278 (ViewParam.NAME, "Something else"), 

279 (ViewParam.GROUP_ID, group_b.id), 

280 (FormAction.SUBMIT, "submit"), 

281 ] 

282 ) 

283 

284 self.req.fake_request_post_from_dict(multidict) 

285 self.req.add_get_params( 

286 {ViewParam.SCHEDULE_ID: str(group_a_schedule.id)}, 

287 set_method_get=False, 

288 ) 

289 

290 view = EditTaskScheduleView(self.req) 

291 

292 with self.assertRaises(HTTPBadRequest) as cm: 

293 view.dispatch() 

294 

295 self.assertIn("not a group administrator", cm.exception.message) 

296 

297 

298class DeleteTaskScheduleViewTests(DemoRequestTestCase): 

299 def test_schedule_item_is_deleted(self) -> None: 

300 user = self.req._debugging_user = UserFactory() 

301 group = GroupFactory() 

302 UserGroupMembershipFactory( 

303 group_id=group.id, user_id=user.id, groupadmin=True 

304 ) 

305 schedule = TaskScheduleFactory(group=group) 

306 self.assertIsNotNone(self.dbsession.query(TaskSchedule).one_or_none()) 

307 

308 multidict = MultiDict( 

309 [ 

310 ("_charset_", UTF8), 

311 ("__formid__", "deform"), 

312 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

313 ("confirm_1_t", "true"), 

314 ("confirm_2_t", "true"), 

315 ("confirm_4_t", "true"), 

316 ("__start__", "danger:mapping"), 

317 ("target", "7176"), 

318 ("user_entry", "7176"), 

319 ("__end__", "danger:mapping"), 

320 ("delete", "delete"), 

321 (FormAction.DELETE, "delete"), 

322 ] 

323 ) 

324 

325 self.req.fake_request_post_from_dict(multidict) 

326 

327 self.req.add_get_params( 

328 {ViewParam.SCHEDULE_ID: str(schedule.id)}, 

329 set_method_get=False, 

330 ) 

331 view = DeleteTaskScheduleView(self.req) 

332 

333 with self.assertRaises(HTTPFound) as e: 

334 view.dispatch() 

335 

336 self.assertEqual(e.exception.status_code, 302) 

337 self.assertIn( 

338 Routes.VIEW_TASK_SCHEDULES, e.exception.headers["Location"] 

339 ) 

340 

341 self.assertIsNone(self.dbsession.query(TaskSchedule).one_or_none()) 

342 

343 

344class AddTaskScheduleItemViewTests(BasicDatabaseTestCase): 

345 def setUp(self) -> None: 

346 super().setUp() 

347 

348 self.schedule = TaskScheduleFactory(group=self.group) 

349 

350 def test_schedule_item_form_displayed(self) -> None: 

351 view = AddTaskScheduleItemView(self.req) 

352 

353 self.req.add_get_params({ViewParam.SCHEDULE_ID: str(self.schedule.id)}) 

354 

355 response = view.dispatch() 

356 self.assertEqual(response.status_code, 200) 

357 self.assertEqual(response.body.decode(UTF8).count("<form"), 1) 

358 

359 def test_schedule_item_is_created(self) -> None: 

360 multidict = MultiDict( 

361 [ 

362 ("_charset_", UTF8), 

363 ("__formid__", "deform"), 

364 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

365 (ViewParam.SCHEDULE_ID, self.schedule.id), 

366 (ViewParam.TABLE_NAME, "ace3"), 

367 (ViewParam.CLINICIAN_CONFIRMATION, "true"), 

368 ("__start__", "due_from:mapping"), 

369 ("months", "1"), 

370 ("weeks", "2"), 

371 ("days", "3"), 

372 ("__end__", "due_from:mapping"), 

373 ("__start__", "due_within:mapping"), 

374 ("months", "2"), # 60 days 

375 ("weeks", "3"), # 21 days 

376 ("days", "15"), # 15 days 

377 ("__end__", "due_within:mapping"), 

378 (FormAction.SUBMIT, "submit"), 

379 ] 

380 ) 

381 

382 self.req.fake_request_post_from_dict(multidict) 

383 

384 view = AddTaskScheduleItemView(self.req) 

385 

386 with self.assertRaises(HTTPFound) as e: 

387 view.dispatch() 

388 

389 item = self.dbsession.query(TaskScheduleItem).one() 

390 

391 self.assertEqual(item.schedule_id, self.schedule.id) 

392 self.assertEqual(item.task_table_name, "ace3") 

393 self.assertEqual(item.due_from.in_days(), 47) 

394 self.assertEqual(item.due_by.in_days(), 143) 

395 

396 self.assertEqual(e.exception.status_code, 302) 

397 self.assertIn( 

398 f"{Routes.VIEW_TASK_SCHEDULE_ITEMS}" 

399 f"?{ViewParam.SCHEDULE_ID}={self.schedule.id}", 

400 e.exception.headers["Location"], 

401 ) 

402 

403 def test_schedule_item_is_not_created_on_cancel(self) -> None: 

404 multidict = MultiDict( 

405 [ 

406 ("_charset_", UTF8), 

407 ("__formid__", "deform"), 

408 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

409 (ViewParam.SCHEDULE_ID, self.schedule.id), 

410 (ViewParam.TABLE_NAME, "ace3"), 

411 ("__start__", "due_from:mapping"), 

412 ("months", "1"), 

413 ("weeks", "2"), 

414 ("days", "3"), 

415 ("__end__", "due_from:mapping"), 

416 ("__start__", "due_within:mapping"), 

417 ("months", "4"), 

418 ("weeks", "3"), 

419 ("days", "2"), 

420 ("__end__", "due_within:mapping"), 

421 (FormAction.CANCEL, "cancel"), 

422 ] 

423 ) 

424 

425 self.req.fake_request_post_from_dict(multidict) 

426 

427 view = AddTaskScheduleItemView(self.req) 

428 

429 with self.assertRaises(HTTPFound): 

430 view.dispatch() 

431 

432 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

433 

434 self.assertIsNone(item) 

435 

436 def test_non_existent_schedule_handled(self) -> None: 

437 self.req.add_get_params({ViewParam.SCHEDULE_ID: "99999"}) 

438 

439 view = AddTaskScheduleItemView(self.req) 

440 

441 with self.assertRaises(HTTPBadRequest): 

442 view.dispatch() 

443 

444 

445class EditTaskScheduleItemViewTests(BasicDatabaseTestCase): 

446 def setUp(self) -> None: 

447 super().setUp() 

448 self.schedule = TaskScheduleFactory(group=self.group) 

449 

450 def test_schedule_item_is_updated(self) -> None: 

451 item = TaskScheduleItemFactory( 

452 task_schedule=self.schedule, 

453 task_table_name="ace3", 

454 due_from=Duration(days=30), 

455 due_by=Duration(days=60), 

456 ) 

457 

458 multidict = MultiDict( 

459 [ 

460 ("_charset_", UTF8), 

461 ("__formid__", "deform"), 

462 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

463 (ViewParam.SCHEDULE_ID, self.schedule.id), 

464 (ViewParam.TABLE_NAME, "bmi"), 

465 ("__start__", "due_from:mapping"), 

466 ("months", "0"), 

467 ("weeks", "0"), 

468 ("days", "30"), 

469 ("__end__", "due_from:mapping"), 

470 ("__start__", "due_within:mapping"), 

471 ("months", "0"), 

472 ("weeks", "0"), 

473 ("days", "60"), 

474 ("__end__", "due_within:mapping"), 

475 (FormAction.SUBMIT, "submit"), 

476 ] 

477 ) 

478 

479 self.req.fake_request_post_from_dict(multidict) 

480 

481 self.req.add_get_params( 

482 {ViewParam.SCHEDULE_ITEM_ID: str(item.id)}, 

483 set_method_get=False, 

484 ) 

485 view = EditTaskScheduleItemView(self.req) 

486 

487 with self.assertRaises(HTTPFound) as cm: 

488 view.dispatch() 

489 

490 self.assertEqual(item.task_table_name, "bmi") 

491 self.assertEqual(cm.exception.status_code, 302) 

492 self.assertIn( 

493 f"{Routes.VIEW_TASK_SCHEDULE_ITEMS}" 

494 f"?{ViewParam.SCHEDULE_ID}={item.schedule_id}", 

495 cm.exception.headers["Location"], 

496 ) 

497 

498 def test_schedule_item_is_not_updated_on_cancel(self) -> None: 

499 item = TaskScheduleItemFactory( 

500 task_schedule=self.schedule, 

501 task_table_name="ace3", 

502 due_from=Duration(days=30), 

503 due_by=Duration(days=60), 

504 ) 

505 

506 multidict = MultiDict( 

507 [ 

508 ("_charset_", UTF8), 

509 ("__formid__", "deform"), 

510 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

511 (ViewParam.SCHEDULE_ID, self.schedule.id), 

512 (ViewParam.TABLE_NAME, "bmi"), 

513 ("__start__", "due_from:mapping"), 

514 ("months", "0"), 

515 ("weeks", "0"), 

516 ("days", "30"), 

517 ("__end__", "due_from:mapping"), 

518 ("__start__", "due_within:mapping"), 

519 ("months", "0"), 

520 ("weeks", "0"), 

521 ("days", "60"), 

522 ("__end__", "due_within:mapping"), 

523 (FormAction.CANCEL, "cancel"), 

524 ] 

525 ) 

526 

527 self.req.fake_request_post_from_dict(multidict) 

528 

529 self.req.add_get_params( 

530 {ViewParam.SCHEDULE_ITEM_ID: str(item.id)}, 

531 set_method_get=False, 

532 ) 

533 view = EditTaskScheduleItemView(self.req) 

534 

535 with self.assertRaises(HTTPFound): 

536 view.dispatch() 

537 

538 self.assertEqual(item.task_table_name, "ace3") 

539 

540 def test_non_existent_item_handled(self) -> None: 

541 self.req.add_get_params({ViewParam.SCHEDULE_ITEM_ID: "99999"}) 

542 

543 view = EditTaskScheduleItemView(self.req) 

544 

545 with self.assertRaises(HTTPBadRequest): 

546 view.dispatch() 

547 

548 def test_null_item_handled(self) -> None: 

549 view = EditTaskScheduleItemView(self.req) 

550 

551 with self.assertRaises(HTTPBadRequest): 

552 view.dispatch() 

553 

554 def test_get_form_values(self) -> None: 

555 item = TaskScheduleItemFactory( 

556 task_schedule=self.schedule, 

557 task_table_name="ace3", 

558 due_from=Duration(days=30), 

559 due_by=Duration(days=60), 

560 ) 

561 view = EditTaskScheduleItemView(self.req) 

562 view.object = item 

563 

564 form_values = view.get_form_values() 

565 

566 self.assertEqual(form_values[ViewParam.SCHEDULE_ID], self.schedule.id) 

567 self.assertEqual( 

568 form_values[ViewParam.TABLE_NAME], item.task_table_name 

569 ) 

570 self.assertEqual(form_values[ViewParam.DUE_FROM], item.due_from) 

571 

572 due_within = item.due_by - item.due_from 

573 self.assertEqual(form_values[ViewParam.DUE_WITHIN], due_within) 

574 

575 def test_group_a_item_cannot_be_edited_by_group_b_admin(self) -> None: 

576 group_a = GroupFactory() 

577 group_b = GroupFactory() 

578 

579 group_b_admin = self.req._debugging_user = UserFactory() 

580 UserGroupMembershipFactory( 

581 group_id=group_b.id, user_id=group_b_admin.id, groupadmin=True 

582 ) 

583 

584 group_a_schedule = TaskScheduleFactory(group=group_a) 

585 group_a_item = TaskScheduleItemFactory(task_schedule=group_a_schedule) 

586 

587 view = EditTaskScheduleItemView(self.req) 

588 view.object = group_a_item 

589 

590 with self.assertRaises(HTTPBadRequest) as cm: 

591 view.get_schedule() 

592 

593 self.assertIn("not a group administrator", cm.exception.message) 

594 

595 

596class DeleteTaskScheduleItemViewTests(BasicDatabaseTestCase): 

597 def setUp(self) -> None: 

598 super().setUp() 

599 

600 self.schedule = TaskScheduleFactory(group=self.group) 

601 

602 self.schedule = TaskScheduleFactory() 

603 self.item = TaskScheduleItemFactory( 

604 task_schedule=self.schedule, task_table_name="ace3" 

605 ) 

606 

607 def test_delete_form_displayed(self) -> None: 

608 view = DeleteTaskScheduleItemView(self.req) 

609 

610 self.req.add_get_params( 

611 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)} 

612 ) 

613 

614 response = view.dispatch() 

615 self.assertEqual(response.status_code, 200) 

616 self.assertEqual(response.body.decode(UTF8).count("<form"), 1) 

617 

618 def test_errors_displayed_when_deletion_validation_fails(self) -> None: 

619 self.req.fake_request_post_from_dict({FormAction.DELETE: "delete"}) 

620 

621 self.req.add_get_params( 

622 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)}, 

623 set_method_get=False, 

624 ) 

625 view = DeleteTaskScheduleItemView(self.req) 

626 

627 response = view.dispatch() 

628 self.assertIn( 

629 "Errors have been highlighted", response.body.decode(UTF8) 

630 ) 

631 

632 def test_schedule_item_is_deleted(self) -> None: 

633 multidict = MultiDict( 

634 [ 

635 ("_charset_", UTF8), 

636 ("__formid__", "deform"), 

637 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

638 ("confirm_1_t", "true"), 

639 ("confirm_2_t", "true"), 

640 ("confirm_4_t", "true"), 

641 ("__start__", "danger:mapping"), 

642 ("target", "7176"), 

643 ("user_entry", "7176"), 

644 ("__end__", "danger:mapping"), 

645 ("delete", "delete"), 

646 (FormAction.DELETE, "delete"), 

647 ] 

648 ) 

649 

650 self.req.fake_request_post_from_dict(multidict) 

651 

652 self.req.add_get_params( 

653 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)}, 

654 set_method_get=False, 

655 ) 

656 view = DeleteTaskScheduleItemView(self.req) 

657 

658 with self.assertRaises(HTTPFound) as e: 

659 view.dispatch() 

660 

661 self.assertEqual(e.exception.status_code, 302) 

662 self.assertIn( 

663 f"{Routes.VIEW_TASK_SCHEDULE_ITEMS}" 

664 f"?{ViewParam.SCHEDULE_ID}={self.item.schedule_id}", 

665 e.exception.headers["Location"], 

666 ) 

667 

668 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

669 

670 self.assertIsNone(item) 

671 

672 def test_schedule_item_not_deleted_on_cancel(self) -> None: 

673 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"}) 

674 

675 self.req.add_get_params( 

676 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)}, 

677 set_method_get=False, 

678 ) 

679 view = DeleteTaskScheduleItemView(self.req) 

680 

681 with self.assertRaises(HTTPFound): 

682 view.dispatch() 

683 

684 item = self.dbsession.query(TaskScheduleItem).one_or_none() 

685 

686 self.assertIsNotNone(item) 

687 

688 

689class EditFinalizedPatientViewTests(DemoRequestTestCase): 

690 def setUp(self) -> None: 

691 super().setUp() 

692 

693 self.group = AnyIdNumGroupFactory() 

694 user = self.req._debugging_user = UserFactory() 

695 

696 UserGroupMembershipFactory( 

697 group_id=self.group.id, 

698 user_id=user.id, 

699 groupadmin=True, 

700 view_all_patients_when_unfiltered=True, 

701 ) 

702 

703 def test_raises_when_patient_does_not_exists(self) -> None: 

704 with self.assertRaises(HTTPBadRequest) as cm: 

705 edit_finalized_patient(self.req) 

706 

707 self.assertEqual( 

708 str(cm.exception), "Cannot find Patient with _pk:None" 

709 ) 

710 

711 @unittest.skip("Can't save patient in database without group") 

712 def test_raises_when_patient_not_in_a_group(self) -> None: 

713 patient = PatientFactory(_group=None) 

714 

715 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

716 

717 with self.assertRaises(HTTPBadRequest) as cm: 

718 edit_finalized_patient(self.req) 

719 

720 self.assertEqual(str(cm.exception), "Bad patient: not in a group") 

721 

722 def test_raises_when_not_authorized(self) -> None: 

723 patient = PatientFactory() 

724 

725 with mock.patch.object( 

726 self.req._debugging_user, 

727 "may_administer_group", 

728 return_value=False, 

729 ): 

730 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

731 

732 with self.assertRaises(HTTPBadRequest) as cm: 

733 edit_finalized_patient(self.req) 

734 

735 self.assertEqual( 

736 str(cm.exception), "Not authorized to edit this patient" 

737 ) 

738 

739 def test_raises_when_patient_not_finalized(self) -> None: 

740 patient = PatientFactory(_era=ERA_NOW, _group=self.group) 

741 

742 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

743 

744 with self.assertRaises(HTTPBadRequest) as cm: 

745 edit_finalized_patient(self.req) 

746 

747 self.assertIn("Patient is not editable", str(cm.exception)) 

748 

749 def test_patient_updated(self) -> None: 

750 patient = PatientFactory(_group=self.group) 

751 nhs_patient_idnum = NHSPatientIdNumFactory(patient=patient) 

752 

753 self.req.add_get_params( 

754 {ViewParam.SERVER_PK: str(patient.pk)}, set_method_get=False 

755 ) 

756 

757 new_sex = Fake.en_gb.sex() 

758 new_forename = Fake.en_gb.forename(new_sex) 

759 new_surname = Fake.en_gb.last_name() 

760 new_address = Fake.en_gb.address() 

761 new_email = Fake.en_gb.email() 

762 new_gp = Fake.en_gb.name() 

763 new_other = Fake.en_us.paragraph() 

764 new_dob = Fake.en_gb.consistent_date_of_birth() 

765 new_nhs_number = Fake.en_gb.nhs_number() 

766 

767 multidict = MultiDict( 

768 [ 

769 ("_charset_", UTF8), 

770 ("__formid__", "deform"), 

771 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

772 (ViewParam.SERVER_PK, str(patient.pk)), 

773 (ViewParam.GROUP_ID, str(patient.group.id)), 

774 (ViewParam.FORENAME, new_forename), 

775 (ViewParam.SURNAME, new_surname), 

776 ("__start__", "dob:mapping"), 

777 ("date", new_dob), 

778 ("__end__", "dob:mapping"), 

779 ("__start__", "sex:rename"), 

780 ("deformField7", new_sex), 

781 ("__end__", "sex:rename"), 

782 (ViewParam.ADDRESS, new_address), 

783 (ViewParam.EMAIL, new_email), 

784 (ViewParam.GP, new_gp), 

785 (ViewParam.OTHER, new_other), 

786 ("__start__", "id_references:sequence"), 

787 ("__start__", "idnum_sequence:mapping"), 

788 (ViewParam.WHICH_IDNUM, nhs_patient_idnum.which_idnum), 

789 (ViewParam.IDNUM_VALUE, new_nhs_number), 

790 ("__end__", "idnum_sequence:mapping"), 

791 ("__end__", "id_references:sequence"), 

792 ("__start__", "danger:mapping"), 

793 ("target", "7836"), 

794 ("user_entry", "7836"), 

795 ("__end__", "danger:mapping"), 

796 (FormAction.SUBMIT, "submit"), 

797 ] 

798 ) 

799 

800 self.req.fake_request_post_from_dict(multidict) 

801 

802 with self.assertRaises(HTTPFound): 

803 edit_finalized_patient(self.req) 

804 

805 self.dbsession.commit() 

806 

807 self.assertEqual(patient.forename, new_forename) 

808 self.assertEqual(patient.surname, new_surname) 

809 self.assertEqual(patient.dob, new_dob) 

810 self.assertEqual(patient.sex, new_sex) 

811 self.assertEqual(patient.address, new_address) 

812 self.assertEqual(patient.email, new_email) 

813 self.assertEqual(patient.gp, new_gp) 

814 self.assertEqual(patient.other, new_other) 

815 

816 idnum = patient.get_idnum_objects()[0] 

817 self.assertEqual(idnum.patient_id, patient.id) 

818 self.assertEqual(idnum.which_idnum, nhs_patient_idnum.which_idnum) 

819 self.assertEqual(idnum.idnum_value, new_nhs_number) 

820 

821 self.assertEqual(len(patient.special_notes), 1) 

822 note = patient.special_notes[0].note 

823 

824 self.assertIn("Patient details edited", note) 

825 self.assertIn("forename", note) 

826 self.assertIn(new_forename, note) 

827 

828 self.assertIn("surname", note) 

829 self.assertIn(new_surname, note) 

830 

831 self.assertIn(f"idnum{nhs_patient_idnum.which_idnum}", note) 

832 self.assertIn(str(new_nhs_number), note) 

833 

834 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

835 

836 self.assertIn( 

837 f"Amended patient record with server PK {patient.pk}", messages[0] 

838 ) 

839 self.assertIn("forename", messages[0]) 

840 self.assertIn(new_forename, messages[0]) 

841 

842 self.assertIn("surname", messages[0]) 

843 self.assertIn(new_surname, messages[0]) 

844 

845 self.assertIn("idnum1", messages[0]) 

846 self.assertIn(str(new_nhs_number), messages[0]) 

847 

848 def test_message_when_no_changes(self) -> None: 

849 patient = PatientFactory(_group=self.group) 

850 

851 patient_idnum = NHSPatientIdNumFactory( 

852 patient=patient, 

853 ) 

854 self.req.add_get_params( 

855 {ViewParam.SERVER_PK: str(patient.pk)}, set_method_get=False 

856 ) 

857 

858 multidict = MultiDict( 

859 [ 

860 ("_charset_", UTF8), 

861 ("__formid__", "deform"), 

862 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

863 (ViewParam.SERVER_PK, str(patient.pk)), 

864 (ViewParam.GROUP_ID, patient.group.id), 

865 (ViewParam.FORENAME, patient.forename), 

866 (ViewParam.SURNAME, patient.surname), 

867 ("__start__", "dob:mapping"), 

868 ("date", patient.dob.isoformat()), 

869 ("__end__", "dob:mapping"), 

870 ("__start__", "sex:rename"), 

871 ("deformField7", patient.sex), 

872 ("__end__", "sex:rename"), 

873 (ViewParam.ADDRESS, patient.address), 

874 (ViewParam.EMAIL, patient.email), 

875 (ViewParam.GP, patient.gp), 

876 (ViewParam.OTHER, patient.other), 

877 ("__start__", "id_references:sequence"), 

878 ("__start__", "idnum_sequence:mapping"), 

879 (ViewParam.WHICH_IDNUM, patient_idnum.which_idnum), 

880 (ViewParam.IDNUM_VALUE, patient_idnum.idnum_value), 

881 ("__end__", "idnum_sequence:mapping"), 

882 ("__end__", "id_references:sequence"), 

883 ("__start__", "danger:mapping"), 

884 ("target", "7836"), 

885 ("user_entry", "7836"), 

886 ("__end__", "danger:mapping"), 

887 (FormAction.SUBMIT, "submit"), 

888 ] 

889 ) 

890 

891 self.req.fake_request_post_from_dict(multidict) 

892 

893 with self.assertRaises(HTTPFound): 

894 edit_finalized_patient(self.req) 

895 

896 messages = self.req.session.peek_flash(FlashQueue.INFO) 

897 

898 self.assertIn("No changes required", messages[0]) 

899 

900 def test_template_rendered_with_values(self) -> None: 

901 patient = PatientFactory(_group=self.group) 

902 NHSPatientIdNumFactory(patient=patient) 

903 

904 task1 = BmiFactory(patient=patient, _current=False) 

905 task2 = BmiFactory(patient=patient, _current=False) 

906 

907 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

908 

909 view = EditFinalizedPatientView( 

910 self.req, task_tablename=task1.tablename, task_server_pk=task1.pk 

911 ) 

912 with mock.patch.object(view, "render_to_response") as mock_render: 

913 view.dispatch() 

914 

915 args, kwargs = mock_render.call_args 

916 

917 context = args[0] 

918 

919 self.assertIn("form", context) 

920 self.assertIn(task1, context["tasks"]) 

921 self.assertIn(task2, context["tasks"]) 

922 

923 def test_changes_to_simple_params(self) -> None: 

924 view = EditFinalizedPatientView(self.req) 

925 patient = PatientFactory() 

926 old_forename = patient.forename 

927 old_surname = patient.surname 

928 old_address = patient.address 

929 new_forename = Fake.en_gb.forename(patient.sex) 

930 new_surname = Fake.en_gb.last_name() 

931 new_address = Fake.en_gb.address() 

932 

933 view.object = patient 

934 

935 changes = OrderedDict() # type: OrderedDict 

936 

937 appstruct = { 

938 ViewParam.FORENAME: new_forename, 

939 ViewParam.SURNAME: new_surname, 

940 ViewParam.DOB: patient.dob, 

941 ViewParam.ADDRESS: new_address, 

942 ViewParam.OTHER: patient.other, 

943 } 

944 

945 view._save_simple_params(appstruct, changes) 

946 

947 self.assertEqual( 

948 changes[ViewParam.FORENAME], (old_forename, new_forename) 

949 ) 

950 self.assertEqual( 

951 changes[ViewParam.SURNAME], (old_surname, new_surname) 

952 ) 

953 self.assertNotIn(ViewParam.DOB, changes) 

954 self.assertEqual( 

955 changes[ViewParam.ADDRESS], (old_address, new_address) 

956 ) 

957 self.assertNotIn(ViewParam.OTHER, changes) 

958 

959 def test_changes_to_idrefs(self) -> None: 

960 view = EditFinalizedPatientView(self.req) 

961 patient = PatientFactory() 

962 nhs_patient_idnum = NHSPatientIdNumFactory(patient=patient) 

963 study_patient_idnum = StudyPatientIdNumFactory(patient=patient) 

964 rio_iddef = RioIdNumDefinitionFactory() 

965 new_nhs_number = Fake.en_gb.nhs_number() 

966 new_rio_number = 9999 # Below the range the factory would use 

967 

968 view.object = patient 

969 

970 changes = OrderedDict() # type: OrderedDict 

971 

972 appstruct = { 

973 ViewParam.ID_REFERENCES: [ 

974 { 

975 ViewParam.WHICH_IDNUM: nhs_patient_idnum.which_idnum, 

976 ViewParam.IDNUM_VALUE: new_nhs_number, 

977 }, 

978 { 

979 ViewParam.WHICH_IDNUM: rio_iddef.which_idnum, 

980 ViewParam.IDNUM_VALUE: new_rio_number, 

981 }, 

982 ] 

983 } 

984 

985 view._save_idrefs(appstruct, changes) 

986 

987 nhs_key = f"idnum{nhs_patient_idnum.which_idnum} (NHS number)" 

988 self.assertIn(nhs_key, changes) 

989 

990 study_key = f"idnum{study_patient_idnum.which_idnum} (Study number)" 

991 self.assertIn(study_key, changes) 

992 

993 rio_key = f"idnum{rio_iddef.which_idnum} (RiO number)" 

994 self.assertIn(rio_key, changes) 

995 

996 self.assertEqual( 

997 changes[nhs_key], 

998 (nhs_patient_idnum.idnum_value, new_nhs_number), 

999 ) 

1000 

1001 self.assertEqual( 

1002 changes[study_key], (study_patient_idnum.idnum_value, None) 

1003 ) 

1004 self.assertEqual(changes[rio_key], (None, new_rio_number)) 

1005 

1006 

1007class EditServerCreatedPatientViewTests(BasicDatabaseTestCase): 

1008 def test_group_updated(self) -> None: 

1009 patient = ServerCreatedPatientFactory(_group=self.group) 

1010 old_group = patient.group 

1011 new_group = GroupFactory() 

1012 

1013 view = EditServerCreatedPatientView(self.req) 

1014 view.object = patient 

1015 

1016 appstruct = {ViewParam.GROUP_ID: new_group.id} 

1017 

1018 view.save_object(appstruct) 

1019 

1020 self.assertEqual(patient.group_id, new_group.id) 

1021 

1022 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

1023 

1024 self.assertIn(old_group.name, messages[0]) 

1025 self.assertIn(new_group.name, messages[0]) 

1026 self.assertIn("group:", messages[0]) 

1027 

1028 def test_raises_when_not_created_on_the_server(self) -> None: 

1029 patient = PatientFactory() 

1030 

1031 view = EditServerCreatedPatientView(self.req) 

1032 

1033 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

1034 

1035 with self.assertRaises(HTTPBadRequest) as cm: 

1036 view.get_object() 

1037 

1038 self.assertIn("Patient is not editable", str(cm.exception)) 

1039 

1040 def test_patient_task_schedules_updated(self) -> None: 

1041 patient = ServerCreatedPatientFactory() 

1042 nhs_patient_idnum = NHSPatientIdNumFactory(patient=patient) 

1043 group = patient._group 

1044 

1045 schedule1 = TaskScheduleFactory(group=group) 

1046 schedule2 = TaskScheduleFactory(group=group) 

1047 schedule3 = TaskScheduleFactory(group=group) 

1048 

1049 PatientTaskScheduleFactory( 

1050 patient=patient, 

1051 task_schedule=schedule1, 

1052 start_datetime=local(2020, 6, 12, 9), 

1053 settings={ 

1054 "name 1": "value 1", 

1055 "name 2": "value 2", 

1056 "name 3": "value 3", 

1057 }, 

1058 ) 

1059 

1060 PatientTaskScheduleFactory( 

1061 patient=patient, 

1062 task_schedule=schedule3, 

1063 ) 

1064 self.req.add_get_params( 

1065 {ViewParam.SERVER_PK: str(patient.pk)}, set_method_get=False 

1066 ) 

1067 

1068 changed_schedule_1_settings = { 

1069 "name 1": "new value 1", 

1070 "name 2": "new value 2", 

1071 "name 3": "new value 3", 

1072 } 

1073 changed_schedule_1_datetime = local(2020, 6, 19, 8, 0, 0) 

1074 new_schedule_2_settings = { 

1075 "name 4": "value 4", 

1076 "name 5": "value 5", 

1077 "name 6": "value 6", 

1078 } 

1079 new_schedule_2_datetime = local(2020, 7, 1, 13, 45, 0) 

1080 new_nhs_number = Fake.en_gb.nhs_number() 

1081 multidict = MultiDict( 

1082 [ 

1083 ("_charset_", UTF8), 

1084 ("__formid__", "deform"), 

1085 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1086 (ViewParam.SERVER_PK, patient.pk), 

1087 (ViewParam.GROUP_ID, patient.group.id), 

1088 (ViewParam.FORENAME, patient.forename), 

1089 (ViewParam.SURNAME, patient.surname), 

1090 ("__start__", "dob:mapping"), 

1091 ("date", ""), 

1092 ("__end__", "dob:mapping"), 

1093 ("__start__", "sex:rename"), 

1094 ("deformField7", patient.sex), 

1095 ("__end__", "sex:rename"), 

1096 (ViewParam.ADDRESS, patient.address), 

1097 (ViewParam.GP, patient.gp), 

1098 (ViewParam.OTHER, patient.other), 

1099 ("__start__", "id_references:sequence"), 

1100 ("__start__", "idnum_sequence:mapping"), 

1101 (ViewParam.WHICH_IDNUM, nhs_patient_idnum.which_idnum), 

1102 (ViewParam.IDNUM_VALUE, str(new_nhs_number)), 

1103 ("__end__", "idnum_sequence:mapping"), 

1104 ("__end__", "id_references:sequence"), 

1105 ("__start__", "danger:mapping"), 

1106 ("target", "7836"), 

1107 ("user_entry", "7836"), 

1108 ("__end__", "danger:mapping"), 

1109 ("__start__", "task_schedules:sequence"), 

1110 ("__start__", "task_schedule_sequence:mapping"), 

1111 ("schedule_id", schedule1.id), 

1112 ("__start__", "start_datetime:mapping"), 

1113 ("date", changed_schedule_1_datetime.to_date_string()), 

1114 ("time", changed_schedule_1_datetime.to_time_string()), 

1115 ("__end__", "start_datetime:mapping"), 

1116 ("settings", json.dumps(changed_schedule_1_settings)), 

1117 ("__end__", "task_schedule_sequence:mapping"), 

1118 ("__start__", "task_schedule_sequence:mapping"), 

1119 ("schedule_id", schedule2.id), 

1120 ("__start__", "start_datetime:mapping"), 

1121 ("date", new_schedule_2_datetime.to_date_string()), 

1122 ("time", new_schedule_2_datetime.to_time_string()), 

1123 ("__end__", "start_datetime:mapping"), 

1124 ("settings", json.dumps(new_schedule_2_settings)), 

1125 ("__end__", "task_schedule_sequence:mapping"), 

1126 ("__end__", "task_schedules:sequence"), 

1127 (FormAction.SUBMIT, "submit"), 

1128 ] 

1129 ) 

1130 

1131 self.req.fake_request_post_from_dict(multidict) 

1132 

1133 with self.assertRaises(HTTPFound): 

1134 edit_server_created_patient(self.req) 

1135 

1136 self.dbsession.commit() 

1137 

1138 schedules = { 

1139 pts.task_schedule.name: pts for pts in patient.task_schedules 

1140 } 

1141 self.assertIn(schedule1.name, schedules) 

1142 self.assertIn(schedule2.name, schedules) 

1143 self.assertNotIn(schedule3.name, schedules) 

1144 

1145 self.assertEqual( 

1146 schedules[schedule1.name].start_datetime, 

1147 changed_schedule_1_datetime, 

1148 ) 

1149 self.assertEqual( 

1150 schedules[schedule1.name].settings, changed_schedule_1_settings 

1151 ) 

1152 self.assertEqual( 

1153 schedules[schedule2.name].start_datetime, 

1154 new_schedule_2_datetime, 

1155 ) 

1156 self.assertEqual( 

1157 schedules[schedule2.name].settings, new_schedule_2_settings 

1158 ) 

1159 

1160 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

1161 

1162 self.assertIn( 

1163 f"Amended patient record with server PK {patient.pk}", messages[0] 

1164 ) 

1165 self.assertIn("Task schedules", messages[0]) 

1166 

1167 def test_unprivileged_user_cannot_edit_patient(self) -> None: 

1168 patient = ServerCreatedPatientFactory() 

1169 

1170 self.req._debugging_user = UserFactory() 

1171 

1172 view = EditServerCreatedPatientView(self.req) 

1173 view.object = patient 

1174 

1175 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

1176 

1177 with self.assertRaises(HTTPBadRequest) as cm: 

1178 view.dispatch() 

1179 

1180 self.assertEqual( 

1181 cm.exception.message, "Not authorized to edit this patient" 

1182 ) 

1183 

1184 def test_patient_can_be_assigned_the_same_schedule_twice(self) -> None: 

1185 patient = ServerCreatedPatientFactory() 

1186 

1187 schedule1 = TaskScheduleFactory(group=self.group) 

1188 

1189 pts = PatientTaskScheduleFactory( 

1190 patient=patient, 

1191 task_schedule=schedule1, 

1192 start_datetime=local(2020, 6, 12, 12, 34), 

1193 ) 

1194 

1195 appstruct = { 

1196 ViewParam.TASK_SCHEDULES: [ 

1197 { 

1198 ViewParam.PATIENT_TASK_SCHEDULE_ID: pts.id, 

1199 ViewParam.SCHEDULE_ID: schedule1.id, 

1200 ViewParam.START_DATETIME: local(2020, 6, 12, 12, 34), 

1201 ViewParam.SETTINGS: {}, 

1202 }, 

1203 { 

1204 ViewParam.PATIENT_TASK_SCHEDULE_ID: None, 

1205 ViewParam.SCHEDULE_ID: schedule1.id, 

1206 ViewParam.START_DATETIME: None, 

1207 ViewParam.SETTINGS: {}, 

1208 }, 

1209 ] 

1210 } 

1211 

1212 view = EditServerCreatedPatientView(self.req) 

1213 view.object = patient 

1214 

1215 changes: OrderedDict = OrderedDict() 

1216 view._save_task_schedules(appstruct, changes) 

1217 self.req.dbsession.commit() 

1218 

1219 self.assertEqual(patient.task_schedules[0].task_schedule, schedule1) 

1220 self.assertEqual(patient.task_schedules[1].task_schedule, schedule1) 

1221 

1222 def test_form_values_for_existing_patient(self) -> None: 

1223 patient = PatientFactory() 

1224 

1225 schedule1 = TaskScheduleFactory( 

1226 group=self.group, 

1227 ) 

1228 

1229 patient_task_schedule = PatientTaskScheduleFactory( 

1230 patient=patient, 

1231 task_schedule=schedule1, 

1232 start_datetime=local(2020, 6, 12), 

1233 settings={ 

1234 "name 1": "value 1", 

1235 "name 2": "value 2", 

1236 "name 3": "value 3", 

1237 }, 

1238 ) 

1239 

1240 patient_idnum = NHSPatientIdNumFactory(patient=patient) 

1241 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)}) 

1242 

1243 view = EditServerCreatedPatientView(self.req) 

1244 view.object = patient 

1245 

1246 form_values = view.get_form_values() 

1247 

1248 self.assertEqual(form_values[ViewParam.FORENAME], patient.forename) 

1249 self.assertEqual(form_values[ViewParam.SURNAME], patient.surname) 

1250 self.assertEqual(form_values[ViewParam.DOB], patient.dob) 

1251 self.assertEqual(form_values[ViewParam.SEX], patient.sex) 

1252 self.assertEqual(form_values[ViewParam.ADDRESS], patient.address) 

1253 self.assertEqual(form_values[ViewParam.EMAIL], patient.email) 

1254 self.assertEqual(form_values[ViewParam.GP], patient.gp) 

1255 self.assertEqual(form_values[ViewParam.OTHER], patient.other) 

1256 

1257 self.assertEqual(form_values[ViewParam.SERVER_PK], patient.pk) 

1258 self.assertEqual(form_values[ViewParam.GROUP_ID], patient.group.id) 

1259 

1260 idnum = form_values[ViewParam.ID_REFERENCES][0] 

1261 self.assertEqual( 

1262 idnum[ViewParam.WHICH_IDNUM], 

1263 patient_idnum.which_idnum, 

1264 ) 

1265 self.assertEqual( 

1266 idnum[ViewParam.IDNUM_VALUE], patient_idnum.idnum_value 

1267 ) 

1268 

1269 task_schedule = form_values[ViewParam.TASK_SCHEDULES][0] 

1270 self.assertEqual( 

1271 task_schedule[ViewParam.PATIENT_TASK_SCHEDULE_ID], 

1272 patient_task_schedule.id, 

1273 ) 

1274 self.assertEqual( 

1275 task_schedule[ViewParam.SCHEDULE_ID], 

1276 patient_task_schedule.schedule_id, 

1277 ) 

1278 self.assertEqual( 

1279 task_schedule[ViewParam.START_DATETIME], 

1280 patient_task_schedule.start_datetime, 

1281 ) 

1282 self.assertEqual( 

1283 task_schedule[ViewParam.SETTINGS], patient_task_schedule.settings 

1284 ) 

1285 

1286 

1287class AddPatientViewTests(BasicDatabaseTestCase): 

1288 def test_patient_created(self) -> None: 

1289 view = AddPatientView(self.req) 

1290 

1291 schedule1 = TaskScheduleFactory() 

1292 schedule2 = TaskScheduleFactory() 

1293 

1294 start_datetime1 = local(2020, 6, 12) 

1295 start_datetime2 = local(2020, 7, 1) 

1296 

1297 settings1 = json.dumps( 

1298 {"name 1": "value 1", "name 2": "value 2", "name 3": "value 3"} 

1299 ) 

1300 

1301 nhs_iddef = NHSIdNumDefinitionFactory() 

1302 nhs_number = Fake.en_gb.nhs_number() 

1303 

1304 appstruct = { 

1305 ViewParam.GROUP_ID: self.group.id, 

1306 ViewParam.FORENAME: "Jo", 

1307 ViewParam.SURNAME: "Patient", 

1308 ViewParam.DOB: datetime.date(1958, 4, 19), 

1309 ViewParam.SEX: "F", 

1310 ViewParam.ADDRESS: "Address", 

1311 ViewParam.EMAIL: "jopatient@example.com", 

1312 ViewParam.GP: "GP", 

1313 ViewParam.OTHER: "Other", 

1314 ViewParam.ID_REFERENCES: [ 

1315 { 

1316 ViewParam.WHICH_IDNUM: nhs_iddef.which_idnum, 

1317 ViewParam.IDNUM_VALUE: nhs_number, 

1318 } 

1319 ], 

1320 ViewParam.TASK_SCHEDULES: [ 

1321 { 

1322 ViewParam.SCHEDULE_ID: schedule1.id, 

1323 ViewParam.START_DATETIME: start_datetime1, 

1324 ViewParam.SETTINGS: settings1, 

1325 }, 

1326 { 

1327 ViewParam.SCHEDULE_ID: schedule2.id, 

1328 ViewParam.START_DATETIME: start_datetime2, 

1329 ViewParam.SETTINGS: {}, 

1330 }, 

1331 ], 

1332 } 

1333 

1334 view.save_object(appstruct) 

1335 self.dbsession.commit() 

1336 

1337 patient = cast(Patient, view.object) 

1338 

1339 server_device = Device.get_server_device(self.req.dbsession) 

1340 

1341 self.assertEqual(patient.device_id, server_device.id) 

1342 self.assertEqual(patient.era, ERA_NOW) 

1343 self.assertEqual(patient.group.id, self.group.id) 

1344 

1345 self.assertEqual(patient.forename, "Jo") 

1346 self.assertEqual(patient.surname, "Patient") 

1347 self.assertEqual(patient.dob.isoformat(), "1958-04-19") 

1348 self.assertEqual(patient.sex, "F") 

1349 self.assertEqual(patient.address, "Address") 

1350 self.assertEqual(patient.email, "jopatient@example.com") 

1351 self.assertEqual(patient.gp, "GP") 

1352 self.assertEqual(patient.other, "Other") 

1353 

1354 idnum = patient.get_idnum_objects()[0] 

1355 self.assertEqual(idnum.patient_id, patient.id) 

1356 self.assertEqual(idnum.which_idnum, nhs_iddef.which_idnum) 

1357 self.assertEqual(idnum.idnum_value, nhs_number) 

1358 

1359 patient_task_schedules = { 

1360 pts.task_schedule.name: pts for pts in patient.task_schedules 

1361 } 

1362 

1363 self.assertIn(schedule1.name, patient_task_schedules) 

1364 self.assertIn(schedule2.name, patient_task_schedules) 

1365 

1366 self.assertEqual( 

1367 patient_task_schedules[schedule1.name].start_datetime, 

1368 start_datetime1, 

1369 ) 

1370 self.assertEqual( 

1371 patient_task_schedules[schedule1.name].settings, settings1 

1372 ) 

1373 self.assertEqual( 

1374 patient_task_schedules[schedule2.name].start_datetime, 

1375 start_datetime2, 

1376 ) 

1377 

1378 def test_patient_takes_next_available_id(self) -> None: 

1379 patient = ServerCreatedPatientFactory(id=1234) 

1380 nhs_iddef = NHSIdNumDefinitionFactory() 

1381 

1382 view = AddPatientView(self.req) 

1383 

1384 appstruct = { 

1385 ViewParam.GROUP_ID: self.group.id, 

1386 ViewParam.FORENAME: "Jo", 

1387 ViewParam.SURNAME: "Patient", 

1388 ViewParam.DOB: datetime.date(1958, 4, 19), 

1389 ViewParam.SEX: "F", 

1390 ViewParam.ADDRESS: "Address", 

1391 ViewParam.GP: "GP", 

1392 ViewParam.OTHER: "Other", 

1393 ViewParam.ID_REFERENCES: [ 

1394 { 

1395 ViewParam.WHICH_IDNUM: nhs_iddef.which_idnum, 

1396 ViewParam.IDNUM_VALUE: Fake.en_gb.nhs_number(), 

1397 } 

1398 ], 

1399 ViewParam.TASK_SCHEDULES: [], 

1400 } 

1401 

1402 view.save_object(appstruct) 

1403 

1404 patient = cast(Patient, view.object) 

1405 

1406 self.assertEqual(patient.id, 1235) 

1407 

1408 def test_form_rendered_with_values(self) -> None: 

1409 view = AddPatientView(self.req) 

1410 

1411 with mock.patch.object(view, "render_to_response") as mock_render: 

1412 view.dispatch() 

1413 

1414 args, kwargs = mock_render.call_args 

1415 

1416 context = args[0] 

1417 

1418 self.assertIn("form", context) 

1419 

1420 def test_unprivileged_user_cannot_add_patient(self) -> None: 

1421 user = UserFactory(username="testuser") 

1422 

1423 self.req._debugging_user = user 

1424 

1425 with self.assertRaises(HTTPBadRequest) as cm: 

1426 add_patient(self.req) 

1427 

1428 self.assertEqual( 

1429 cm.exception.message, "Not authorized to manage patients" 

1430 ) 

1431 

1432 def test_group_listed_for_privileged_group_member(self) -> None: 

1433 user = UserFactory() 

1434 group = GroupFactory() 

1435 UserGroupMembershipFactory( 

1436 user_id=user.id, group_id=group.id, may_manage_patients=True 

1437 ) 

1438 

1439 self.req._debugging_user = user 

1440 

1441 view = AddPatientView(self.req) 

1442 

1443 with mock.patch.object(view, "render_to_response") as mock_render: 

1444 view.dispatch() 

1445 

1446 args, kwargs = mock_render.call_args 

1447 

1448 context = args[0] 

1449 

1450 self.assertIn(group.name, context["form"]) 

1451 

1452 

1453class DeleteServerCreatedPatientViewTests(BasicDatabaseTestCase): 

1454 def setUp(self) -> None: 

1455 super().setUp() 

1456 

1457 self.patient = ServerCreatedPatientFactory() 

1458 

1459 idnum = ServerCreatedNHSPatientIdNumFactory(patient=self.patient) 

1460 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession) 

1461 

1462 self.schedule = TaskScheduleFactory(group=self.group) 

1463 

1464 PatientTaskScheduleFactory( 

1465 patient=self.patient, 

1466 task_schedule=self.schedule, 

1467 ) 

1468 

1469 self.multidict = MultiDict( 

1470 [ 

1471 ("_charset_", UTF8), 

1472 ("__formid__", "deform"), 

1473 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1474 ("confirm_1_t", "true"), 

1475 ("confirm_2_t", "true"), 

1476 ("confirm_4_t", "true"), 

1477 ("__start__", "danger:mapping"), 

1478 ("target", "7176"), 

1479 ("user_entry", "7176"), 

1480 ("__end__", "danger:mapping"), 

1481 ("delete", "delete"), 

1482 (FormAction.DELETE, "delete"), 

1483 ] 

1484 ) 

1485 

1486 def test_patient_schedule_and_idnums_deleted(self) -> None: 

1487 self.req.fake_request_post_from_dict(self.multidict) 

1488 

1489 patient_pk = self.patient.pk 

1490 self.req.add_get_params( 

1491 {ViewParam.SERVER_PK: str(patient_pk)}, set_method_get=False 

1492 ) 

1493 view = DeleteServerCreatedPatientView(self.req) 

1494 

1495 with self.assertRaises(HTTPFound) as e: 

1496 view.dispatch() 

1497 

1498 self.assertEqual(e.exception.status_code, 302) 

1499 self.assertIn( 

1500 Routes.VIEW_PATIENT_TASK_SCHEDULES, e.exception.headers["Location"] 

1501 ) 

1502 

1503 deleted_patient = ( 

1504 self.dbsession.query(Patient) 

1505 .filter(Patient._pk == patient_pk) 

1506 .one_or_none() 

1507 ) 

1508 

1509 self.assertIsNone(deleted_patient) 

1510 

1511 pts = ( 

1512 self.dbsession.query(PatientTaskSchedule) 

1513 .filter(PatientTaskSchedule.patient_pk == patient_pk) 

1514 .one_or_none() 

1515 ) 

1516 

1517 self.assertIsNone(pts) 

1518 

1519 idnum = ( 

1520 self.dbsession.query(PatientIdNum) 

1521 .filter( 

1522 PatientIdNum.patient_id == self.patient.id, 

1523 PatientIdNum._device_id == self.patient.device_id, 

1524 PatientIdNum._era == self.patient.era, 

1525 PatientIdNum._current == True, # noqa: E712 

1526 ) 

1527 .one_or_none() 

1528 ) 

1529 

1530 self.assertIsNone(idnum) 

1531 

1532 def test_registered_patient_deleted(self) -> None: 

1533 from camcops_server.cc_modules.client_api import ( 

1534 get_or_create_single_user, 

1535 ) 

1536 

1537 user1, _ = get_or_create_single_user(self.req, "test", self.patient) 

1538 self.assertEqual(user1.single_patient, self.patient) 

1539 

1540 user2, _ = get_or_create_single_user(self.req, "test", self.patient) 

1541 self.assertEqual(user2.single_patient, self.patient) 

1542 

1543 self.req.fake_request_post_from_dict(self.multidict) 

1544 

1545 patient_pk = self.patient.pk 

1546 self.req.add_get_params( 

1547 {ViewParam.SERVER_PK: str(patient_pk)}, set_method_get=False 

1548 ) 

1549 view = DeleteServerCreatedPatientView(self.req) 

1550 

1551 with self.assertRaises(HTTPFound): 

1552 view.dispatch() 

1553 

1554 self.dbsession.commit() 

1555 

1556 deleted_patient = ( 

1557 self.dbsession.query(Patient) 

1558 .filter(Patient._pk == patient_pk) 

1559 .one_or_none() 

1560 ) 

1561 

1562 self.assertIsNone(deleted_patient) 

1563 

1564 # TODO: We get weird behaviour when all the tests are run together 

1565 # (fine for --test_class=DeleteServerCreatedPatientViewTests) 

1566 # the assertion below fails with sqlite in spite of the commit() 

1567 # above. 

1568 

1569 # user = self.dbsession.query(User).filter( 

1570 # User.id == user1.id).one_or_none() 

1571 # self.assertIsNone(user.single_patient_pk) 

1572 

1573 # user = self.dbsession.query(User).filter( 

1574 # User.id == user2.id).one_or_none() 

1575 # self.assertIsNone(user.single_patient_pk) 

1576 

1577 def test_unrelated_patient_unaffected(self) -> None: 

1578 other_patient = ServerCreatedPatientFactory() 

1579 patient_pk = other_patient._pk 

1580 

1581 saved_patient = ( 

1582 self.dbsession.query(Patient) 

1583 .filter(Patient._pk == patient_pk) 

1584 .one_or_none() 

1585 ) 

1586 

1587 self.assertIsNotNone(saved_patient) 

1588 

1589 idnum = ServerCreatedNHSPatientIdNumFactory(patient=other_patient) 

1590 

1591 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession) 

1592 

1593 saved_idnum = ( 

1594 self.dbsession.query(PatientIdNum) 

1595 .filter( 

1596 PatientIdNum.patient_id == other_patient.id, 

1597 PatientIdNum._device_id == other_patient.device_id, 

1598 PatientIdNum._era == other_patient.era, 

1599 PatientIdNum._current == True, # noqa: E712 

1600 ) 

1601 .one_or_none() 

1602 ) 

1603 

1604 self.assertIsNotNone(saved_idnum) 

1605 

1606 PatientTaskScheduleFactory( 

1607 patient=other_patient, task_schedule=self.schedule 

1608 ) 

1609 

1610 self.req.fake_request_post_from_dict(self.multidict) 

1611 

1612 self.req.add_get_params( 

1613 {ViewParam.SERVER_PK: self.patient._pk}, set_method_get=False 

1614 ) 

1615 view = DeleteServerCreatedPatientView(self.req) 

1616 

1617 with self.assertRaises(HTTPFound): 

1618 view.dispatch() 

1619 

1620 saved_patient = ( 

1621 self.dbsession.query(Patient) 

1622 .filter(Patient._pk == patient_pk) 

1623 .one_or_none() 

1624 ) 

1625 

1626 self.assertIsNotNone(saved_patient) 

1627 

1628 saved_pts = ( 

1629 self.dbsession.query(PatientTaskSchedule) 

1630 .filter(PatientTaskSchedule.patient_pk == patient_pk) 

1631 .one_or_none() 

1632 ) 

1633 

1634 self.assertIsNotNone(saved_pts) 

1635 

1636 saved_idnum = ( 

1637 self.dbsession.query(PatientIdNum) 

1638 .filter( 

1639 PatientIdNum.patient_id == other_patient.id, 

1640 PatientIdNum._device_id == other_patient.device_id, 

1641 PatientIdNum._era == other_patient.era, 

1642 PatientIdNum._current == True, # noqa: E712 

1643 ) 

1644 .one_or_none() 

1645 ) 

1646 

1647 self.assertIsNotNone(saved_idnum) 

1648 

1649 def test_unprivileged_user_cannot_delete_patient(self) -> None: 

1650 self.req.fake_request_post_from_dict(self.multidict) 

1651 

1652 patient_pk = self.patient.pk 

1653 self.req.add_get_params( 

1654 {ViewParam.SERVER_PK: str(patient_pk)}, set_method_get=False 

1655 ) 

1656 view = DeleteServerCreatedPatientView(self.req) 

1657 

1658 user = UserFactory(username="testuser") 

1659 

1660 self.req._debugging_user = user 

1661 

1662 with self.assertRaises(HTTPBadRequest) as cm: 

1663 view.dispatch() 

1664 

1665 self.assertEqual( 

1666 cm.exception.message, "Not authorized to delete this patient" 

1667 ) 

1668 

1669 def test_unprivileged_user_cannot_see_delete_form(self) -> None: 

1670 self.req.fake_request_post_from_dict(self.multidict) 

1671 

1672 patient_pk = self.patient.pk 

1673 self.req.add_get_params({ViewParam.SERVER_PK: str(patient_pk)}) 

1674 view = DeleteServerCreatedPatientView(self.req) 

1675 

1676 user = UserFactory() 

1677 

1678 self.req._debugging_user = user 

1679 

1680 with self.assertRaises(HTTPBadRequest) as cm: 

1681 view.dispatch() 

1682 

1683 self.assertEqual( 

1684 cm.exception.message, "Not authorized to delete this patient" 

1685 ) 

1686 

1687 

1688class EraseTaskTestCase(BasicDatabaseTestCase): 

1689 def setUp(self) -> None: 

1690 super().setUp() 

1691 

1692 self.patient = PatientFactory(_group=self.group) 

1693 

1694 

1695class EraseTaskLeavingPlaceholderViewTests(EraseTaskTestCase): 

1696 def test_displays_form(self) -> None: 

1697 task = BmiFactory(patient=self.patient) 

1698 self.req.add_get_params( 

1699 { 

1700 ViewParam.SERVER_PK: str(task.pk), 

1701 ViewParam.TABLE_NAME: task.tablename, 

1702 }, 

1703 set_method_get=False, 

1704 ) 

1705 view = EraseTaskLeavingPlaceholderView(self.req) 

1706 

1707 with mock.patch.object(view, "render_to_response") as mock_render: 

1708 view.dispatch() 

1709 

1710 args, kwargs = mock_render.call_args 

1711 context = args[0] 

1712 

1713 self.assertIn("form", context) 

1714 

1715 def test_deletes_task_leaving_placeholder(self) -> None: 

1716 task = BmiFactory(patient=self.patient) 

1717 multidict = MultiDict( 

1718 [ 

1719 ("_charset_", UTF8), 

1720 ("__formid__", "deform"), 

1721 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1722 (ViewParam.SERVER_PK, task.pk), 

1723 (ViewParam.TABLE_NAME, task.tablename), 

1724 ("confirm_1_t", "true"), 

1725 ("confirm_2_t", "true"), 

1726 ("confirm_4_t", "true"), 

1727 ("__start__", "danger:mapping"), 

1728 ("target", "7176"), 

1729 ("user_entry", "7176"), 

1730 ("__end__", "danger:mapping"), 

1731 ("delete", "delete"), 

1732 (FormAction.DELETE, "delete"), 

1733 ] 

1734 ) 

1735 

1736 self.req.fake_request_post_from_dict(multidict) 

1737 

1738 view = EraseTaskLeavingPlaceholderView(self.req) 

1739 with mock.patch.object(task, "manually_erase") as mock_manually_erase: 

1740 

1741 with self.assertRaises(HTTPFound): 

1742 view.dispatch() 

1743 

1744 mock_manually_erase.assert_called_once() 

1745 args, kwargs = mock_manually_erase.call_args 

1746 request = args[0] 

1747 

1748 self.assertEqual(request, self.req) 

1749 

1750 def test_task_not_deleted_on_cancel(self) -> None: 

1751 task = BmiFactory(patient=self.patient) 

1752 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"}) 

1753 

1754 self.req.add_get_params( 

1755 { 

1756 ViewParam.SERVER_PK: str(task.pk), 

1757 ViewParam.TABLE_NAME: task.tablename, 

1758 }, 

1759 set_method_get=False, 

1760 ) 

1761 view = EraseTaskLeavingPlaceholderView(self.req) 

1762 

1763 with self.assertRaises(HTTPFound): 

1764 view.dispatch() 

1765 

1766 task = self.dbsession.query(task.__class__).one_or_none() 

1767 

1768 self.assertIsNotNone(task) 

1769 

1770 def test_redirect_on_cancel(self) -> None: 

1771 task = BmiFactory(patient=self.patient) 

1772 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"}) 

1773 

1774 self.req.add_get_params( 

1775 { 

1776 ViewParam.SERVER_PK: str(task.pk), 

1777 ViewParam.TABLE_NAME: task.tablename, 

1778 }, 

1779 set_method_get=False, 

1780 ) 

1781 view = EraseTaskLeavingPlaceholderView(self.req) 

1782 

1783 with self.assertRaises(HTTPFound) as cm: 

1784 view.dispatch() 

1785 

1786 self.assertEqual(cm.exception.status_code, 302) 

1787 self.assertIn(f"/{Routes.TASK}", cm.exception.headers["Location"]) 

1788 self.assertIn( 

1789 f"{ViewParam.TABLE_NAME}={task.tablename}", 

1790 cm.exception.headers["Location"], 

1791 ) 

1792 self.assertIn( 

1793 f"{ViewParam.SERVER_PK}={task.pk}", 

1794 cm.exception.headers["Location"], 

1795 ) 

1796 self.assertIn( 

1797 f"{ViewParam.VIEWTYPE}={ViewArg.HTML}", 

1798 cm.exception.headers["Location"], 

1799 ) 

1800 

1801 def test_raises_when_task_does_not_exist(self) -> None: 

1802 self.req.add_get_params( 

1803 {ViewParam.SERVER_PK: "123", ViewParam.TABLE_NAME: "phq9"}, 

1804 set_method_get=False, 

1805 ) 

1806 view = EraseTaskLeavingPlaceholderView(self.req) 

1807 

1808 with self.assertRaises(HTTPBadRequest) as cm: 

1809 view.dispatch() 

1810 

1811 self.assertEqual(cm.exception.message, "No such task: phq9, PK=123") 

1812 

1813 def test_raises_when_task_is_live_on_tablet(self) -> None: 

1814 task = BmiFactory(patient=self.patient, _era=ERA_NOW) 

1815 

1816 self.req.add_get_params( 

1817 { 

1818 ViewParam.SERVER_PK: str(task.pk), 

1819 ViewParam.TABLE_NAME: task.tablename, 

1820 }, 

1821 set_method_get=False, 

1822 ) 

1823 view = EraseTaskLeavingPlaceholderView(self.req) 

1824 

1825 with self.assertRaises(HTTPBadRequest) as cm: 

1826 view.dispatch() 

1827 

1828 self.assertIn("Task is live on tablet", cm.exception.message) 

1829 

1830 def test_raises_when_user_not_authorized_to_erase(self) -> None: 

1831 task = BmiFactory(patient=self.patient) 

1832 user = UserFactory() 

1833 

1834 self.req._debugging_user = user 

1835 UserGroupMembershipFactory( 

1836 user_id=user.id, group_id=self.group.id, groupadmin=True 

1837 ) 

1838 

1839 with mock.patch.object( 

1840 user, "authorized_to_erase_tasks", return_value=False 

1841 ): 

1842 self.req.add_get_params( 

1843 { 

1844 ViewParam.SERVER_PK: str(task.pk), 

1845 ViewParam.TABLE_NAME: task.tablename, 

1846 }, 

1847 set_method_get=False, 

1848 ) 

1849 view = EraseTaskLeavingPlaceholderView(self.req) 

1850 

1851 with self.assertRaises(HTTPBadRequest) as cm: 

1852 view.dispatch() 

1853 

1854 self.assertIn("Not authorized to erase tasks", cm.exception.message) 

1855 

1856 def test_raises_when_task_already_erased(self) -> None: 

1857 task = BmiFactory(patient=self.patient, _manually_erased=True) 

1858 

1859 self.req.add_get_params( 

1860 { 

1861 ViewParam.SERVER_PK: str(task.pk), 

1862 ViewParam.TABLE_NAME: task.tablename, 

1863 }, 

1864 set_method_get=False, 

1865 ) 

1866 view = EraseTaskLeavingPlaceholderView(self.req) 

1867 

1868 with self.assertRaises(HTTPBadRequest) as cm: 

1869 view.dispatch() 

1870 

1871 self.assertIn("already erased", cm.exception.message) 

1872 

1873 

1874class EraseTaskEntirelyViewTests(EraseTaskTestCase): 

1875 def test_deletes_task_entirely(self) -> None: 

1876 task = BmiFactory(patient=self.patient) 

1877 multidict = MultiDict( 

1878 [ 

1879 ("_charset_", UTF8), 

1880 ("__formid__", "deform"), 

1881 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1882 (ViewParam.SERVER_PK, task.pk), 

1883 (ViewParam.TABLE_NAME, task.tablename), 

1884 ("confirm_1_t", "true"), 

1885 ("confirm_2_t", "true"), 

1886 ("confirm_4_t", "true"), 

1887 ("__start__", "danger:mapping"), 

1888 ("target", "7176"), 

1889 ("user_entry", "7176"), 

1890 ("__end__", "danger:mapping"), 

1891 ("delete", "delete"), 

1892 (FormAction.DELETE, "delete"), 

1893 ] 

1894 ) 

1895 

1896 self.req.fake_request_post_from_dict(multidict) 

1897 

1898 view = EraseTaskEntirelyView(self.req) 

1899 

1900 with mock.patch.object( 

1901 task, "delete_entirely" 

1902 ) as mock_delete_entirely: 

1903 

1904 with self.assertRaises(HTTPFound): 

1905 view.dispatch() 

1906 

1907 mock_delete_entirely.assert_called_once() 

1908 args, kwargs = mock_delete_entirely.call_args 

1909 request = args[0] 

1910 

1911 self.assertEqual(request, self.req) 

1912 

1913 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

1914 self.assertTrue(len(messages) > 0) 

1915 

1916 self.assertIn("Task erased", messages[0]) 

1917 self.assertIn(task.tablename, messages[0]) 

1918 self.assertIn("server PK {}".format(task.pk), messages[0]) 

1919 

1920 

1921class EditGroupViewTests(DemoRequestTestCase): 

1922 def test_group_updated(self) -> None: 

1923 groupadmin = self.req._debugging_user = UserFactory() 

1924 group = GroupFactory() 

1925 UserGroupMembershipFactory( 

1926 group_id=group.id, user_id=groupadmin.id, groupadmin=True 

1927 ) 

1928 other_group_1 = GroupFactory() 

1929 other_group_2 = GroupFactory() 

1930 

1931 nhs_iddef = NHSIdNumDefinitionFactory() 

1932 

1933 new_name = "new-name" 

1934 new_description = "new description" 

1935 new_upload_policy = "anyidnum AND sex" 

1936 new_finalize_policy = f"idnum{nhs_iddef.which_idnum} AND sex" 

1937 

1938 multidict = MultiDict( 

1939 [ 

1940 ("_charset_", UTF8), 

1941 ("__formid__", "deform"), 

1942 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1943 (ViewParam.GROUP_ID, group.id), 

1944 (ViewParam.NAME, new_name), 

1945 (ViewParam.DESCRIPTION, new_description), 

1946 (ViewParam.UPLOAD_POLICY, new_upload_policy), 

1947 (ViewParam.FINALIZE_POLICY, new_finalize_policy), 

1948 ("__start__", "group_ids:sequence"), 

1949 ("group_id_sequence", str(other_group_1.id)), 

1950 ("group_id_sequence", str(other_group_2.id)), 

1951 ("__end__", "group_ids:sequence"), 

1952 (FormAction.SUBMIT, "submit"), 

1953 ] 

1954 ) 

1955 self.req.fake_request_post_from_dict(multidict) 

1956 

1957 with self.assertRaises(HTTPFound): 

1958 edit_group(self.req) 

1959 

1960 self.assertEqual(group.name, new_name) 

1961 self.assertEqual(group.description, new_description) 

1962 self.assertEqual(group.upload_policy, new_upload_policy) 

1963 self.assertEqual(group.finalize_policy, new_finalize_policy) 

1964 self.assertIn(other_group_1, group.can_see_other_groups) 

1965 self.assertIn(other_group_2, group.can_see_other_groups) 

1966 

1967 def test_ip_use_added(self) -> None: 

1968 from camcops_server.cc_modules.cc_ipuse import IpContexts 

1969 

1970 group = GroupFactory() 

1971 groupadmin = self.req._debugging_user = UserFactory() 

1972 UserGroupMembershipFactory( 

1973 group_id=group.id, user_id=groupadmin.id, groupadmin=True 

1974 ) 

1975 nhs_iddef = NHSIdNumDefinitionFactory() 

1976 

1977 new_name = "new-name" 

1978 new_description = "new description" 

1979 new_upload_policy = "anyidnum AND sex" 

1980 new_finalize_policy = f"idnum{nhs_iddef.which_idnum} AND sex" 

1981 

1982 multidict = MultiDict( 

1983 [ 

1984 ("_charset_", UTF8), 

1985 ("__formid__", "deform"), 

1986 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

1987 (ViewParam.GROUP_ID, group.id), 

1988 (ViewParam.NAME, new_name), 

1989 (ViewParam.DESCRIPTION, new_description), 

1990 (ViewParam.UPLOAD_POLICY, new_upload_policy), 

1991 (ViewParam.FINALIZE_POLICY, new_finalize_policy), 

1992 ("__start__", "ip_use:mapping"), 

1993 (IpContexts.CLINICAL, "true"), 

1994 (IpContexts.COMMERCIAL, "true"), 

1995 ("__end__", "ip_use:mapping"), 

1996 (FormAction.SUBMIT, "submit"), 

1997 ] 

1998 ) 

1999 self.req.fake_request_post_from_dict(multidict) 

2000 

2001 with self.assertRaises(HTTPFound): 

2002 edit_group(self.req) 

2003 

2004 self.assertTrue(group.ip_use.clinical) 

2005 self.assertTrue(group.ip_use.commercial) 

2006 self.assertFalse(group.ip_use.educational) 

2007 self.assertFalse(group.ip_use.research) 

2008 

2009 def test_ip_use_updated(self) -> None: 

2010 from camcops_server.cc_modules.cc_ipuse import IpContexts 

2011 

2012 group = GroupFactory(ip_use__educational=True, ip_use__research=True) 

2013 groupadmin = self.req._debugging_user = UserFactory() 

2014 UserGroupMembershipFactory( 

2015 group_id=group.id, user_id=groupadmin.id, groupadmin=True 

2016 ) 

2017 

2018 old_id = group.ip_use.id 

2019 

2020 nhs_iddef = NHSIdNumDefinitionFactory() 

2021 

2022 new_name = "new-name" 

2023 new_description = "new description" 

2024 new_upload_policy = "anyidnum AND sex" 

2025 new_finalize_policy = f"idnum{nhs_iddef.which_idnum} AND sex" 

2026 

2027 multidict = MultiDict( 

2028 [ 

2029 ("_charset_", UTF8), 

2030 ("__formid__", "deform"), 

2031 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

2032 (ViewParam.GROUP_ID, group.id), 

2033 (ViewParam.NAME, new_name), 

2034 (ViewParam.DESCRIPTION, new_description), 

2035 (ViewParam.UPLOAD_POLICY, new_upload_policy), 

2036 (ViewParam.FINALIZE_POLICY, new_finalize_policy), 

2037 ("__start__", "ip_use:mapping"), 

2038 (IpContexts.CLINICAL, "true"), 

2039 (IpContexts.COMMERCIAL, "true"), 

2040 ("__end__", "ip_use:mapping"), 

2041 (FormAction.SUBMIT, "submit"), 

2042 ] 

2043 ) 

2044 self.req.fake_request_post_from_dict(multidict) 

2045 

2046 with self.assertRaises(HTTPFound): 

2047 edit_group(self.req) 

2048 

2049 self.assertTrue(group.ip_use.clinical) 

2050 self.assertTrue(group.ip_use.commercial) 

2051 self.assertFalse(group.ip_use.educational) 

2052 self.assertFalse(group.ip_use.research) 

2053 self.assertEqual(group.ip_use.id, old_id) 

2054 

2055 def test_other_groups_displayed_in_form(self) -> None: 

2056 z_group = GroupFactory(name="z-group") 

2057 a_group = GroupFactory(name="a-group") 

2058 

2059 other_groups = Group.get_groups_from_id_list( 

2060 self.dbsession, [z_group.id, a_group.id] 

2061 ) 

2062 group = GroupFactory(can_see_other_groups=other_groups) 

2063 

2064 view = EditGroupView(self.req) 

2065 view.object = group 

2066 

2067 form_values = view.get_form_values() 

2068 

2069 self.assertEqual( 

2070 form_values[ViewParam.GROUP_IDS], [a_group.id, z_group.id] 

2071 ) 

2072 

2073 def test_group_id_displayed_in_form(self) -> None: 

2074 group = GroupFactory() 

2075 view = EditGroupView(self.req) 

2076 view.object = group 

2077 

2078 form_values = view.get_form_values() 

2079 

2080 self.assertEqual(form_values[ViewParam.GROUP_ID], group.id) 

2081 

2082 def test_ip_use_displayed_in_form(self) -> None: 

2083 group = GroupFactory() 

2084 view = EditGroupView(self.req) 

2085 view.object = group 

2086 

2087 form_values = view.get_form_values() 

2088 

2089 self.assertEqual(form_values[ViewParam.IP_USE], group.ip_use) 

2090 

2091 

2092class SendEmailFromPatientTaskScheduleViewTests(BasicDatabaseTestCase): 

2093 def setUp(self) -> None: 

2094 super().setUp() 

2095 

2096 self.patient = ServerCreatedPatientFactory() 

2097 idnum = ServerCreatedNHSPatientIdNumFactory(patient=self.patient) 

2098 

2099 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession) 

2100 

2101 self.schedule = TaskScheduleFactory(group=self.group) 

2102 

2103 self.pts = PatientTaskScheduleFactory( 

2104 patient=self.patient, task_schedule=self.schedule 

2105 ) 

2106 

2107 def test_displays_form(self) -> None: 

2108 self.req.add_get_params( 

2109 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)} 

2110 ) 

2111 

2112 view = SendEmailFromPatientTaskScheduleView(self.req) 

2113 with mock.patch.object(view, "render_to_response") as mock_render: 

2114 view.dispatch() 

2115 

2116 args, kwargs = mock_render.call_args 

2117 context = args[0] 

2118 

2119 self.assertIn("form", context) 

2120 

2121 def test_raises_for_missing_pts_id(self) -> None: 

2122 view = SendEmailFromPatientTaskScheduleView(self.req) 

2123 with self.assertRaises(HTTPBadRequest) as cm: 

2124 view.dispatch() 

2125 

2126 self.assertIn( 

2127 "Patient task schedule does not exist", cm.exception.message 

2128 ) 

2129 

2130 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2131 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2132 def test_sends_email( 

2133 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2134 ) -> None: 

2135 self.req.config.email_host = "smtp.example.com" 

2136 self.req.config.email_port = 587 

2137 self.req.config.email_host_username = "mailuser" 

2138 self.req.config.email_host_password = "mailpassword" 

2139 self.req.config.email_use_tls = True 

2140 

2141 multidict = MultiDict( 

2142 [ 

2143 (ViewParam.EMAIL, "patient@example.com"), 

2144 (ViewParam.EMAIL_FROM, "server@example.com"), 

2145 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2146 (ViewParam.EMAIL_BODY, "Email body"), 

2147 (FormAction.SUBMIT, "submit"), 

2148 ] 

2149 ) 

2150 

2151 self.req.fake_request_post_from_dict(multidict) 

2152 self.req.add_get_params( 

2153 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2154 set_method_get=False, 

2155 ) 

2156 view = SendEmailFromPatientTaskScheduleView(self.req) 

2157 

2158 with self.assertRaises(HTTPFound): 

2159 view.dispatch() 

2160 

2161 args, kwargs = mock_make_email.call_args_list[0] 

2162 self.assertEqual(kwargs["from_addr"], "server@example.com") 

2163 self.assertEqual(kwargs["to"], "patient@example.com") 

2164 self.assertEqual(kwargs["subject"], "Subject") 

2165 self.assertEqual(kwargs["body"], "Email body") 

2166 self.assertEqual(kwargs["content_type"], MimeType.HTML) 

2167 

2168 args, kwargs = mock_send_msg.call_args 

2169 self.assertEqual(kwargs["host"], "smtp.example.com") 

2170 self.assertEqual(kwargs["user"], "mailuser") 

2171 self.assertEqual(kwargs["password"], "mailpassword") 

2172 self.assertEqual(kwargs["port"], 587) 

2173 self.assertTrue(kwargs["use_tls"]) 

2174 

2175 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2176 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2177 def test_sends_cc_of_email( 

2178 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2179 ) -> None: 

2180 self.req.config.email_host = "smtp.example.com" 

2181 self.req.config.email_port = 587 

2182 self.req.config.email_host_username = "mailuser" 

2183 self.req.config.email_host_password = "mailpassword" 

2184 self.req.config.email_use_tls = True 

2185 

2186 multidict = MultiDict( 

2187 [ 

2188 (ViewParam.EMAIL, "patient@example.com"), 

2189 (ViewParam.EMAIL_CC, "cc@example.com"), 

2190 (ViewParam.EMAIL_FROM, "server@example.com"), 

2191 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2192 (ViewParam.EMAIL_BODY, "Email body"), 

2193 (FormAction.SUBMIT, "submit"), 

2194 ] 

2195 ) 

2196 

2197 self.req.fake_request_post_from_dict(multidict) 

2198 self.req.add_get_params( 

2199 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2200 set_method_get=False, 

2201 ) 

2202 view = SendEmailFromPatientTaskScheduleView(self.req) 

2203 

2204 with self.assertRaises(HTTPFound): 

2205 view.dispatch() 

2206 

2207 args, kwargs = mock_make_email.call_args 

2208 self.assertEqual(kwargs["to"], "patient@example.com") 

2209 self.assertEqual(kwargs["cc"], "cc@example.com") 

2210 

2211 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2212 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2213 def test_sends_bcc_of_email( 

2214 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2215 ) -> None: 

2216 self.req.config.email_host = "smtp.example.com" 

2217 self.req.config.email_port = 587 

2218 self.req.config.email_host_username = "mailuser" 

2219 self.req.config.email_host_password = "mailpassword" 

2220 self.req.config.email_use_tls = True 

2221 

2222 multidict = MultiDict( 

2223 [ 

2224 (ViewParam.EMAIL, "patient@example.com"), 

2225 (ViewParam.EMAIL_CC, "cc@example.com"), 

2226 (ViewParam.EMAIL_BCC, "bcc@example.com"), 

2227 (ViewParam.EMAIL_FROM, "server@example.com"), 

2228 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2229 (ViewParam.EMAIL_BODY, "Email body"), 

2230 (FormAction.SUBMIT, "submit"), 

2231 ] 

2232 ) 

2233 

2234 self.req.fake_request_post_from_dict(multidict) 

2235 self.req.add_get_params( 

2236 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2237 set_method_get=False, 

2238 ) 

2239 view = SendEmailFromPatientTaskScheduleView(self.req) 

2240 

2241 with self.assertRaises(HTTPFound): 

2242 view.dispatch() 

2243 

2244 args, kwargs = mock_make_email.call_args 

2245 self.assertEqual(kwargs["to"], "patient@example.com") 

2246 self.assertEqual(kwargs["bcc"], "bcc@example.com") 

2247 

2248 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2249 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2250 def test_message_on_success( 

2251 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2252 ) -> None: 

2253 multidict = MultiDict( 

2254 [ 

2255 (ViewParam.EMAIL, "patient@example.com"), 

2256 (ViewParam.EMAIL_FROM, "server@example.com"), 

2257 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2258 (ViewParam.EMAIL_BODY, "Email body"), 

2259 (FormAction.SUBMIT, "submit"), 

2260 ] 

2261 ) 

2262 

2263 self.req.fake_request_post_from_dict(multidict) 

2264 self.req.add_get_params( 

2265 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2266 set_method_get=False, 

2267 ) 

2268 view = SendEmailFromPatientTaskScheduleView(self.req) 

2269 

2270 with self.assertRaises(HTTPFound): 

2271 view.dispatch() 

2272 

2273 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

2274 self.assertTrue(len(messages) > 0) 

2275 

2276 self.assertIn("Email sent to patient@example.com", messages[0]) 

2277 

2278 @mock.patch( 

2279 "camcops_server.cc_modules.cc_email.send_msg", 

2280 side_effect=RuntimeError("Something bad happened"), 

2281 ) 

2282 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2283 def test_message_on_failure( 

2284 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2285 ) -> None: 

2286 multidict = MultiDict( 

2287 [ 

2288 (ViewParam.EMAIL, "patient@example.com"), 

2289 (ViewParam.EMAIL_FROM, "server@example.com"), 

2290 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2291 (ViewParam.EMAIL_BODY, "Email body"), 

2292 (FormAction.SUBMIT, "submit"), 

2293 ] 

2294 ) 

2295 

2296 self.req.fake_request_post_from_dict(multidict) 

2297 self.req.add_get_params( 

2298 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2299 set_method_get=False, 

2300 ) 

2301 view = SendEmailFromPatientTaskScheduleView(self.req) 

2302 

2303 with self.assertRaises(HTTPFound): 

2304 view.dispatch() 

2305 

2306 messages = self.req.session.peek_flash(FlashQueue.DANGER) 

2307 self.assertTrue(len(messages) > 0) 

2308 

2309 self.assertIn( 

2310 "Failed to send email to patient@example.com", messages[0] 

2311 ) 

2312 

2313 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2314 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2315 def test_email_record_created( 

2316 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2317 ) -> None: 

2318 multidict = MultiDict( 

2319 [ 

2320 (ViewParam.EMAIL, "patient@example.com"), 

2321 (ViewParam.EMAIL_FROM, "server@example.com"), 

2322 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2323 (ViewParam.EMAIL_BODY, "Email body"), 

2324 (FormAction.SUBMIT, "submit"), 

2325 ] 

2326 ) 

2327 

2328 self.req.fake_request_post_from_dict(multidict) 

2329 self.req.add_get_params( 

2330 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2331 set_method_get=False, 

2332 ) 

2333 view = SendEmailFromPatientTaskScheduleView(self.req) 

2334 

2335 self.assertEqual(len(self.pts.emails), 0) 

2336 

2337 with self.assertRaises(HTTPFound): 

2338 view.dispatch() 

2339 

2340 self.assertEqual(len(self.pts.emails), 1) 

2341 self.assertEqual(self.pts.emails[0].email.to, "patient@example.com") 

2342 

2343 def test_unprivileged_user_cannot_email_patient(self) -> None: 

2344 user = UserFactory(username="testuser") 

2345 

2346 self.req._debugging_user = user 

2347 

2348 multidict = MultiDict( 

2349 [ 

2350 (ViewParam.EMAIL, "patient@example.com"), 

2351 (ViewParam.EMAIL_FROM, "server@example.com"), 

2352 (ViewParam.EMAIL_SUBJECT, "Subject"), 

2353 (ViewParam.EMAIL_BODY, "Email body"), 

2354 (FormAction.SUBMIT, "submit"), 

2355 ] 

2356 ) 

2357 

2358 self.req.fake_request_post_from_dict(multidict) 

2359 self.req.add_get_params( 

2360 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}, 

2361 set_method_get=False, 

2362 ) 

2363 

2364 with self.assertRaises(HTTPBadRequest) as cm: 

2365 view = SendEmailFromPatientTaskScheduleView(self.req) 

2366 view.dispatch() 

2367 

2368 self.assertEqual( 

2369 cm.exception.message, "Not authorized to email patients" 

2370 ) 

2371 

2372 

2373class LoginViewTests(TestStateMixin, BasicDatabaseTestCase): 

2374 def setUp(self) -> None: 

2375 super().setUp() 

2376 

2377 self.req.matched_route.name = "login_view" 

2378 

2379 def test_form_rendered_with_values(self) -> None: 

2380 self.req.add_get_params( 

2381 {ViewParam.REDIRECT_URL: "https://www.example.com"} 

2382 ) 

2383 view = LoginView(self.req) 

2384 

2385 with mock.patch.object(view, "render_to_response") as mock_render: 

2386 view.dispatch() 

2387 

2388 args, kwargs = mock_render.call_args 

2389 context = args[0] 

2390 

2391 self.assertIn("form", context) 

2392 self.assertIn("https://www.example.com", context["form"]) 

2393 

2394 def test_template_rendered(self) -> None: 

2395 view = LoginView(self.req) 

2396 response = view.dispatch() 

2397 

2398 self.assertIn("Log in", response.body.decode(UTF8)) 

2399 

2400 def test_password_autocomplete_read_from_config(self) -> None: 

2401 self.req.config.disable_password_autocomplete = False 

2402 

2403 view = LoginView(self.req) 

2404 

2405 with mock.patch.object(view, "render_to_response") as mock_render: 

2406 view.dispatch() 

2407 

2408 args, kwargs = mock_render.call_args 

2409 context = args[0] 

2410 

2411 self.assertIn('autocomplete="current-password"', context["form"]) 

2412 

2413 def test_fails_when_user_locked_out(self) -> None: 

2414 user = UserFactory( 

2415 username="test", password__request=self.req, password="secret" 

2416 ) 

2417 SecurityAccountLockout.lock_user_out( 

2418 self.req, user.username, lockout_minutes=1 

2419 ) 

2420 

2421 multidict = MultiDict( 

2422 [ 

2423 (ViewParam.USERNAME, user.username), 

2424 (ViewParam.PASSWORD, "secret"), 

2425 (FormAction.SUBMIT, "submit"), 

2426 ] 

2427 ) 

2428 

2429 self.req.fake_request_post_from_dict(multidict) 

2430 

2431 view = LoginView(self.req) 

2432 

2433 with mock.patch.object( 

2434 view, "fail_locked_out", side_effect=HTTPFound 

2435 ) as mock_fail_locked_out: 

2436 with self.assertRaises(HTTPFound): 

2437 view.dispatch() 

2438 

2439 args, kwargs = mock_fail_locked_out.call_args 

2440 locked_out_until = SecurityAccountLockout.user_locked_out_until( 

2441 self.req, user.username 

2442 ) 

2443 self.assertEqual(args[0], locked_out_until) 

2444 

2445 @mock.patch("camcops_server.cc_modules.webview.audit") 

2446 def test_user_can_log_in(self, mock_audit: mock.Mock) -> None: 

2447 user = UserFactory( 

2448 username="test", password__request=self.req, password="secret" 

2449 ) 

2450 UserGroupMembershipFactory( 

2451 user_id=user.id, group_id=self.group.id, may_use_webviewer=True 

2452 ) 

2453 

2454 multidict = MultiDict( 

2455 [ 

2456 (ViewParam.USERNAME, user.username), 

2457 (ViewParam.PASSWORD, "secret"), 

2458 (FormAction.SUBMIT, "submit"), 

2459 ] 

2460 ) 

2461 

2462 self.req.fake_request_post_from_dict(multidict) 

2463 

2464 view = LoginView(self.req) 

2465 

2466 with mock.patch.object(user, "login") as mock_user_login: 

2467 with mock.patch.object( 

2468 self.req.camcops_session, "login" 

2469 ) as mock_session_login: 

2470 with self.assertRaises(HTTPFound): 

2471 view.dispatch() 

2472 

2473 args, kwargs = mock_user_login.call_args 

2474 self.assertEqual(args[0], self.req) 

2475 

2476 args, kwargs = mock_session_login.call_args 

2477 self.assertEqual(args[0], user) 

2478 

2479 args, kwargs = mock_audit.call_args 

2480 self.assertEqual(args[0], self.req) 

2481 self.assertEqual(args[1], "Login") 

2482 self.assertEqual(kwargs["user_id"], user.id) 

2483 

2484 def test_user_with_totp_sees_token_form(self) -> None: 

2485 user = UserFactory( 

2486 username="test", 

2487 mfa_secret_key=pyotp.random_base32(), 

2488 mfa_method=MfaMethod.TOTP, 

2489 password__request=self.req, 

2490 password="secret", 

2491 ) 

2492 group = GroupFactory() 

2493 UserGroupMembershipFactory( 

2494 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2495 ) 

2496 

2497 view = LoginView(self.req) 

2498 view.state.update( 

2499 mfa_user_id=user.id, 

2500 step=MfaMixin.STEP_MFA, 

2501 mfa_time=int(time.time()), 

2502 ) 

2503 

2504 with mock.patch.object(view, "render_to_response") as mock_render: 

2505 view.dispatch() 

2506 

2507 args, kwargs = mock_render.call_args 

2508 context = args[0] 

2509 

2510 self.assertIn("form", context) 

2511 self.assertIn("Enter the six-digit code", context["form"]) 

2512 self.assertIn( 

2513 "Enter the code for CamCOPS displayed", 

2514 context[MfaMixin.KEY_INSTRUCTIONS], 

2515 ) 

2516 

2517 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2518 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2519 def test_user_with_hotp_email_sees_token_form( 

2520 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2521 ) -> None: 

2522 user = UserFactory( 

2523 username="test", 

2524 mfa_secret_key=pyotp.random_base32(), 

2525 mfa_method=MfaMethod.HOTP_EMAIL, 

2526 email="user@example.com", 

2527 hotp_counter=0, 

2528 password__request=self.req, 

2529 password="secret", 

2530 ) 

2531 group = GroupFactory() 

2532 UserGroupMembershipFactory( 

2533 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2534 ) 

2535 view = LoginView(self.req) 

2536 view.state.update( 

2537 mfa_user_id=user.id, 

2538 step=MfaMixin.STEP_MFA, 

2539 mfa_time=int(time.time()), 

2540 ) 

2541 

2542 with mock.patch.object(view, "render_to_response") as mock_render: 

2543 view.dispatch() 

2544 

2545 args, kwargs = mock_render.call_args 

2546 context = args[0] 

2547 

2548 self.assertIn("form", context) 

2549 self.assertIn("Enter the six-digit code", context["form"]) 

2550 self.assertIn( 

2551 "We've sent a code by email", context[MfaMixin.KEY_INSTRUCTIONS] 

2552 ) 

2553 

2554 def test_user_with_hotp_sms_sees_token_form(self) -> None: 

2555 self.req.config.sms_backend = get_sms_backend( 

2556 SmsBackendNames.CONSOLE, {} 

2557 ) 

2558 

2559 phone_number_str = Fake.en_gb.valid_phone_number() 

2560 user = UserFactory( 

2561 username="test", 

2562 mfa_secret_key=pyotp.random_base32(), 

2563 mfa_method=MfaMethod.HOTP_SMS, 

2564 phone_number=phonenumbers.parse(phone_number_str), 

2565 hotp_counter=0, 

2566 password__request=self.req, 

2567 password="secret", 

2568 ) 

2569 group = GroupFactory() 

2570 UserGroupMembershipFactory( 

2571 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2572 ) 

2573 

2574 view = LoginView(self.req) 

2575 view.state.update( 

2576 mfa_user_id=user.id, 

2577 step=MfaMixin.STEP_MFA, 

2578 mfa_time=int(time.time()), 

2579 ) 

2580 

2581 with mock.patch.object(view, "render_to_response") as mock_render: 

2582 view.dispatch() 

2583 

2584 args, kwargs = mock_render.call_args 

2585 context = args[0] 

2586 

2587 self.assertIn("form", context) 

2588 self.assertIn("Enter the six-digit code", context["form"]) 

2589 self.assertIn( 

2590 "We've sent a code by text message", 

2591 context[MfaMixin.KEY_INSTRUCTIONS], 

2592 ) 

2593 

2594 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2595 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2596 @mock.patch("camcops_server.cc_modules.webview.time") 

2597 def test_session_state_set_for_user_with_mfa( 

2598 self, 

2599 mock_time: mock.Mock, 

2600 mock_make_email: mock.Mock, 

2601 mock_send_msg: mock.Mock, 

2602 ) -> None: 

2603 user = UserFactory( 

2604 username="test", 

2605 mfa_secret_key=pyotp.random_base32(), 

2606 mfa_method=MfaMethod.HOTP_EMAIL, 

2607 email="user@example.com", 

2608 password__request=self.req, 

2609 password="secret", 

2610 ) 

2611 group = GroupFactory() 

2612 UserGroupMembershipFactory( 

2613 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2614 ) 

2615 

2616 multidict = MultiDict( 

2617 [ 

2618 (ViewParam.USERNAME, user.username), 

2619 (ViewParam.PASSWORD, "secret"), 

2620 (FormAction.SUBMIT, "submit"), 

2621 ] 

2622 ) 

2623 

2624 self.req.fake_request_post_from_dict(multidict) 

2625 

2626 view = LoginView(self.req) 

2627 

2628 with mock.patch.object( 

2629 mock_time, "time", return_value=1234567890.1234567 

2630 ): 

2631 view.dispatch() 

2632 

2633 self.assertEqual( 

2634 self.req.camcops_session.form_state[LoginView.KEY_MFA_USER_ID], 

2635 user.id, 

2636 ) 

2637 self.assertEqual( 

2638 self.req.camcops_session.form_state[MfaMixin.KEY_MFA_TIME], 

2639 1234567890, 

2640 ) 

2641 self.assertEqual( 

2642 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP], 

2643 MfaMixin.STEP_MFA, 

2644 ) 

2645 

2646 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2647 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2648 def test_user_with_hotp_is_sent_email( 

2649 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2650 ) -> None: 

2651 self.req.config.email_host = "smtp.example.com" 

2652 self.req.config.email_port = 587 

2653 self.req.config.email_host_username = "mailuser" 

2654 self.req.config.email_host_password = "mailpassword" 

2655 self.req.config.email_use_tls = True 

2656 self.req.config.email_from = "server@example.com" 

2657 

2658 user = UserFactory( 

2659 username="test", 

2660 email="user@example.com", 

2661 mfa_secret_key=pyotp.random_base32(), 

2662 mfa_method=MfaMethod.HOTP_EMAIL, 

2663 hotp_counter=0, 

2664 password__request=self.req, 

2665 password="secret", 

2666 ) 

2667 group = GroupFactory() 

2668 UserGroupMembershipFactory( 

2669 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2670 ) 

2671 

2672 multidict = MultiDict( 

2673 [ 

2674 (ViewParam.USERNAME, user.username), 

2675 (ViewParam.PASSWORD, "secret"), 

2676 (FormAction.SUBMIT, "submit"), 

2677 ] 

2678 ) 

2679 

2680 self.req.fake_request_post_from_dict(multidict) 

2681 

2682 view = LoginView(self.req) 

2683 expected_code = pyotp.HOTP(user.mfa_secret_key).at(1) 

2684 view.dispatch() 

2685 

2686 args, kwargs = mock_make_email.call_args_list[0] 

2687 self.assertEqual(kwargs["from_addr"], "server@example.com") 

2688 self.assertEqual(kwargs["to"], "user@example.com") 

2689 self.assertEqual(kwargs["subject"], "CamCOPS authentication") 

2690 self.assertIn( 

2691 f"Your CamCOPS verification code is {expected_code}", 

2692 kwargs["body"], 

2693 ) 

2694 self.assertEqual(kwargs["content_type"], "text/plain") 

2695 

2696 args, kwargs = mock_send_msg.call_args 

2697 self.assertEqual(kwargs["host"], "smtp.example.com") 

2698 self.assertEqual(kwargs["user"], "mailuser") 

2699 self.assertEqual(kwargs["password"], "mailpassword") 

2700 self.assertEqual(kwargs["port"], 587) 

2701 self.assertTrue(kwargs["use_tls"]) 

2702 

2703 def test_user_with_hotp_is_sent_sms(self) -> None: 

2704 test_config = {"username": "testuser", "password": "testpass"} 

2705 

2706 self.req.config.sms_backend = get_sms_backend( 

2707 SmsBackendNames.CONSOLE, {} 

2708 ) 

2709 self.req.config.sms_config = test_config 

2710 

2711 phone_number_str = Fake.en_gb.valid_phone_number() 

2712 user = UserFactory( 

2713 username="test", 

2714 email="user@example.com", 

2715 phone_number=phonenumbers.parse(phone_number_str), 

2716 mfa_secret_key=pyotp.random_base32(), 

2717 mfa_method=MfaMethod.HOTP_SMS, 

2718 hotp_counter=0, 

2719 password__request=self.req, 

2720 password="secret", 

2721 ) 

2722 group = GroupFactory() 

2723 UserGroupMembershipFactory( 

2724 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2725 ) 

2726 

2727 multidict = MultiDict( 

2728 [ 

2729 (ViewParam.USERNAME, user.username), 

2730 (ViewParam.PASSWORD, "secret"), 

2731 (FormAction.SUBMIT, "submit"), 

2732 ] 

2733 ) 

2734 

2735 self.req.fake_request_post_from_dict(multidict) 

2736 

2737 view = LoginView(self.req) 

2738 expected_code = pyotp.HOTP(user.mfa_secret_key).at(1) 

2739 

2740 with self.assertLogs(level=logging.INFO) as logging_cm: 

2741 view.dispatch() 

2742 

2743 expected_message = f"Your CamCOPS verification code is {expected_code}" 

2744 

2745 self.assertIn( 

2746 ConsoleSmsBackend.make_msg(phone_number_str, expected_message), 

2747 logging_cm.output[0], 

2748 ) 

2749 

2750 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

2751 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

2752 def test_login_with_hotp_increments_counter( 

2753 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

2754 ) -> None: 

2755 user = UserFactory( 

2756 username="test", 

2757 email="user@example.com", 

2758 mfa_secret_key=pyotp.random_base32(), 

2759 mfa_method=MfaMethod.HOTP_EMAIL, 

2760 hotp_counter=0, 

2761 password__request=self.req, 

2762 password="secret", 

2763 ) 

2764 group = GroupFactory() 

2765 UserGroupMembershipFactory( 

2766 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2767 ) 

2768 

2769 multidict = MultiDict( 

2770 [ 

2771 (ViewParam.USERNAME, user.username), 

2772 (ViewParam.PASSWORD, "secret"), 

2773 (FormAction.SUBMIT, "submit"), 

2774 ] 

2775 ) 

2776 

2777 self.req.fake_request_post_from_dict(multidict) 

2778 

2779 view = LoginView(self.req) 

2780 

2781 view.dispatch() 

2782 

2783 self.assertEqual(user.hotp_counter, 1) 

2784 

2785 @mock.patch("camcops_server.cc_modules.webview.audit") 

2786 def test_user_with_totp_can_log_in(self, mock_audit: mock.Mock) -> None: 

2787 user = UserFactory( 

2788 username="test", 

2789 mfa_method=MfaMethod.TOTP, 

2790 mfa_secret_key=pyotp.random_base32(), 

2791 password__request=self.req, 

2792 password="secret", 

2793 ) 

2794 

2795 group = GroupFactory() 

2796 UserGroupMembershipFactory( 

2797 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2798 ) 

2799 

2800 totp = pyotp.TOTP(user.mfa_secret_key) 

2801 

2802 multidict = MultiDict( 

2803 [ 

2804 (ViewParam.ONE_TIME_PASSWORD, totp.now()), 

2805 (FormAction.SUBMIT, "submit"), 

2806 ] 

2807 ) 

2808 

2809 self.req.fake_request_post_from_dict(multidict) 

2810 

2811 view = LoginView(self.req) 

2812 view.state.update(mfa_user_id=user.id, step=MfaMixin.STEP_MFA) 

2813 

2814 with mock.patch.object(user, "login") as mock_user_login: 

2815 with mock.patch.object( 

2816 self.req.camcops_session, "login" 

2817 ) as mock_session_login: 

2818 with mock.patch.object(view, "timed_out", return_value=False): 

2819 with self.assertRaises(HTTPFound): 

2820 view.dispatch() 

2821 

2822 args, kwargs = mock_user_login.call_args 

2823 self.assertEqual(args[0], self.req) 

2824 

2825 args, kwargs = mock_session_login.call_args 

2826 self.assertEqual(args[0], user) 

2827 

2828 args, kwargs = mock_audit.call_args 

2829 self.assertEqual(args[0], self.req) 

2830 self.assertEqual(args[1], "Login") 

2831 self.assertEqual(kwargs["user_id"], user.id) 

2832 self.assert_state_is_finished() 

2833 

2834 @mock.patch("camcops_server.cc_modules.webview.audit") 

2835 def test_user_with_hotp_can_log_in(self, mock_audit: mock.Mock) -> None: 

2836 user = UserFactory( 

2837 username="test", 

2838 mfa_method=MfaMethod.HOTP_EMAIL, 

2839 mfa_secret_key=pyotp.random_base32(), 

2840 hotp_counter=1, 

2841 password__request=self.req, 

2842 password="secret", 

2843 ) 

2844 group = GroupFactory() 

2845 UserGroupMembershipFactory( 

2846 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2847 ) 

2848 

2849 hotp = pyotp.HOTP(user.mfa_secret_key) 

2850 multidict = MultiDict( 

2851 [ 

2852 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)), 

2853 (FormAction.SUBMIT, "submit"), 

2854 ] 

2855 ) 

2856 

2857 self.req.fake_request_post_from_dict(multidict) 

2858 

2859 view = LoginView(self.req) 

2860 view.state.update(mfa_user_id=user.id, step=MfaMixin.STEP_MFA) 

2861 

2862 with mock.patch.object(user, "login") as mock_user_login: 

2863 with mock.patch.object( 

2864 self.req.camcops_session, "login" 

2865 ) as mock_session_login: 

2866 with mock.patch.object(view, "timed_out", return_value=False): 

2867 with self.assertRaises(HTTPFound): 

2868 view.dispatch() 

2869 

2870 args, kwargs = mock_user_login.call_args 

2871 self.assertEqual(args[0], self.req) 

2872 

2873 args, kwargs = mock_session_login.call_args 

2874 self.assertEqual(args[0], user) 

2875 

2876 args, kwargs = mock_audit.call_args 

2877 self.assertEqual(args[0], self.req) 

2878 self.assertEqual(args[1], "Login") 

2879 self.assertEqual(kwargs["user_id"], user.id) 

2880 self.assert_state_is_finished() 

2881 

2882 def test_form_state_cleared_on_failed_login(self) -> None: 

2883 user = UserFactory( 

2884 username="test", 

2885 mfa_method=MfaMethod.HOTP_EMAIL, 

2886 mfa_secret_key=pyotp.random_base32(), 

2887 hotp_counter=1, 

2888 password__request=self.req, 

2889 password="secret", 

2890 ) 

2891 group = GroupFactory() 

2892 UserGroupMembershipFactory( 

2893 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2894 ) 

2895 

2896 hotp = pyotp.HOTP(user.mfa_secret_key) 

2897 

2898 multidict = MultiDict( 

2899 [ 

2900 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)), 

2901 (FormAction.SUBMIT, "submit"), 

2902 ] 

2903 ) 

2904 

2905 self.req.fake_request_post_from_dict(multidict) 

2906 

2907 view = LoginView(self.req) 

2908 view.state.update(step=MfaMixin.STEP_MFA, mfa_user_id=user.id) 

2909 

2910 with mock.patch.object(view, "timed_out", return_value=False): 

2911 with self.assertRaises(HTTPFound): 

2912 view.dispatch() 

2913 

2914 messages = self.req.session.peek_flash(FlashQueue.DANGER) 

2915 self.assertTrue(len(messages) > 0) 

2916 self.assertIn("You entered an invalid code", messages[0]) 

2917 

2918 self.assert_state_is_clean() 

2919 

2920 def test_user_cannot_log_in_if_timed_out(self) -> None: 

2921 self.req.config.mfa_timeout_s = 600 

2922 user = UserFactory( 

2923 username="test", 

2924 mfa_method=MfaMethod.TOTP, 

2925 mfa_secret_key=pyotp.random_base32(), 

2926 password__request=self.req, 

2927 password="secret", 

2928 ) 

2929 group = GroupFactory() 

2930 UserGroupMembershipFactory( 

2931 user_id=user.id, group_id=group.id, may_use_webviewer=True 

2932 ) 

2933 

2934 totp = pyotp.TOTP(user.mfa_secret_key) 

2935 

2936 multidict = MultiDict( 

2937 [ 

2938 (ViewParam.ONE_TIME_PASSWORD, totp.now()), 

2939 (FormAction.SUBMIT, "submit"), 

2940 ] 

2941 ) 

2942 

2943 self.req.fake_request_post_from_dict(multidict) 

2944 

2945 view = LoginView(self.req) 

2946 view.state.update( 

2947 mfa_user=user.id, 

2948 mfa_time=int(time.time() - 601), 

2949 step=MfaMixin.STEP_MFA, 

2950 ) 

2951 

2952 with mock.patch.object( 

2953 view, "fail_timed_out", side_effect=HTTPFound 

2954 ) as mock_fail_timed_out: 

2955 with self.assertRaises(HTTPFound): 

2956 view.dispatch() 

2957 

2958 mock_fail_timed_out.assert_called_once() 

2959 

2960 def test_unprivileged_user_cannot_log_in(self) -> None: 

2961 user = UserFactory( 

2962 username="test", password__request=self.req, password="secret" 

2963 ) 

2964 UserGroupMembershipFactory( 

2965 user_id=user.id, group_id=self.group.id, may_use_webviewer=False 

2966 ) 

2967 

2968 multidict = MultiDict( 

2969 [ 

2970 (ViewParam.USERNAME, user.username), 

2971 (ViewParam.PASSWORD, "secret"), 

2972 (FormAction.SUBMIT, "submit"), 

2973 ] 

2974 ) 

2975 

2976 self.req.fake_request_post_from_dict(multidict) 

2977 

2978 view = LoginView(self.req) 

2979 

2980 with mock.patch.object( 

2981 view, "fail_not_authorized", side_effect=HTTPFound 

2982 ) as mock_fail_not_authorized: 

2983 # The fail_not_authorized() function raises an exception 

2984 # (of type HTTPFound) so the mock must do too. Otherwise 

2985 # it will fall through inappropriately (and crash). 

2986 with self.assertRaises(HTTPFound): 

2987 view.dispatch() 

2988 

2989 mock_fail_not_authorized.assert_called_once() 

2990 

2991 def test_unknown_user_cannot_log_in(self) -> None: 

2992 multidict = MultiDict( 

2993 [ 

2994 (ViewParam.USERNAME, "unknown"), 

2995 (ViewParam.PASSWORD, "secret"), 

2996 (FormAction.SUBMIT, "submit"), 

2997 ] 

2998 ) 

2999 

3000 self.req.fake_request_post_from_dict(multidict) 

3001 

3002 view = LoginView(self.req) 

3003 

3004 with mock.patch.object( 

3005 SecurityLoginFailure, "act_on_login_failure" 

3006 ) as mock_act: 

3007 with mock.patch.object( 

3008 self.req.camcops_session, "logout" 

3009 ) as mock_logout: 

3010 with mock.patch.object( 

3011 view, "fail_not_authorized", side_effect=HTTPFound 

3012 ) as mock_fail_not_authorized: 

3013 with self.assertRaises(HTTPFound): 

3014 view.dispatch() 

3015 

3016 args, kwargs = mock_act.call_args 

3017 self.assertEqual(args[0], self.req) 

3018 self.assertEqual(args[1], "unknown") 

3019 

3020 mock_logout.assert_called_once() 

3021 mock_fail_not_authorized.assert_called_once() 

3022 

3023 def test_timed_out_false_when_timeout_zero(self) -> None: 

3024 self.req.config.mfa_timeout_s = 0 

3025 view = LoginView(self.req) 

3026 view.state["mfa_time"] = 0 

3027 

3028 self.assertFalse(view.timed_out()) 

3029 

3030 def test_timed_out_false_when_no_authenticated_user(self) -> None: 

3031 view = LoginView(self.req) 

3032 

3033 self.assertFalse(view.timed_out()) 

3034 

3035 def test_timed_out_false_when_no_authentication_time(self) -> None: 

3036 view = LoginView(self.req) 

3037 

3038 user = UserFactory(username="test") 

3039 # Should never be the case that we have a user ID but no 

3040 # authentication time 

3041 view.state["mfa_user_id"] = user.id 

3042 

3043 self.assertFalse(view.timed_out()) 

3044 

3045 

3046class EditUserViewTests(BasicDatabaseTestCase): 

3047 def test_redirect_on_cancel(self) -> None: 

3048 regular_user = UserFactory(username="regular_user") 

3049 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"}) 

3050 self.req.add_get_params( 

3051 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3052 ) 

3053 

3054 with self.assertRaises(HTTPFound) as cm: 

3055 edit_user(self.req) 

3056 

3057 self.assertEqual(cm.exception.status_code, 302) 

3058 self.assertIn( 

3059 f"/{Routes.VIEW_ALL_USERS}", cm.exception.headers["Location"] 

3060 ) 

3061 

3062 def test_raises_if_user_may_not_edit_another(self) -> None: 

3063 self.req.add_get_params({ViewParam.USER_ID: str(self.system_user.id)}) 

3064 

3065 regular_user = UserFactory(username="regular_user") 

3066 self.req._debugging_user = regular_user 

3067 with self.assertRaises(HTTPBadRequest) as cm: 

3068 edit_user(self.req) 

3069 

3070 self.assertIn("Nobody may edit the system user", cm.exception.message) 

3071 

3072 def test_superuser_sees_full_form(self) -> None: 

3073 superuser = UserFactory(username="admin", superuser=True) 

3074 self.req._debugging_user = superuser 

3075 

3076 self.req.add_get_params({ViewParam.USER_ID: str(superuser.id)}) 

3077 

3078 response = edit_user(self.req) 

3079 

3080 self.assertIn("Superuser (CAUTION!)", response.body.decode(UTF8)) 

3081 

3082 def test_groupadmin_sees_groupadmin_form(self) -> None: 

3083 groupadmin = UserFactory(username="groupadmin") 

3084 regular_user = UserFactory(username="regular_user") 

3085 group = GroupFactory() 

3086 UserGroupMembershipFactory( 

3087 user_id=groupadmin.id, group_id=group.id, groupadmin=True 

3088 ) 

3089 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id) 

3090 self.req._debugging_user = groupadmin 

3091 

3092 self.req.add_get_params({ViewParam.USER_ID: str(regular_user.id)}) 

3093 

3094 response = edit_user(self.req) 

3095 content = response.body.decode(UTF8) 

3096 

3097 self.assertIn("Full name", content) 

3098 self.assertNotIn("Superuser (CAUTION!)", content) 

3099 

3100 def test_raises_for_conflicting_user_name(self) -> None: 

3101 UserFactory(username="existing_user") 

3102 other_user = UserFactory(username="other_user") 

3103 

3104 multidict = MultiDict( 

3105 [ 

3106 (ViewParam.USERNAME, "existing_user"), 

3107 (FormAction.SUBMIT, "submit"), 

3108 ] 

3109 ) 

3110 self.req.fake_request_post_from_dict(multidict) 

3111 self.req.add_get_params( 

3112 {ViewParam.USER_ID: str(other_user.id)}, set_method_get=False 

3113 ) 

3114 

3115 with self.assertRaises(HTTPBadRequest) as cm: 

3116 edit_user(self.req) 

3117 

3118 self.assertIn("Can't rename user", cm.exception.message) 

3119 

3120 def test_user_is_updated(self) -> None: 

3121 user = UserFactory( 

3122 username="old_username", 

3123 fullname="Old Name", 

3124 email="old@example.com", 

3125 language="da_DK", 

3126 ) 

3127 

3128 multidict = MultiDict( 

3129 [ 

3130 (ViewParam.USERNAME, "new_username"), 

3131 (ViewParam.FULLNAME, "New Name"), 

3132 (ViewParam.EMAIL, "new@example.com"), 

3133 (ViewParam.LANGUAGE, "en_GB"), 

3134 (FormAction.SUBMIT, "submit"), 

3135 ] 

3136 ) 

3137 self.req.fake_request_post_from_dict(multidict) 

3138 self.req.add_get_params( 

3139 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3140 ) 

3141 

3142 with self.assertRaises(HTTPFound): 

3143 edit_user(self.req) 

3144 

3145 self.assertEqual(user.username, "new_username") 

3146 self.assertEqual(user.fullname, "New Name") 

3147 self.assertEqual(user.email, "new@example.com") 

3148 self.assertEqual(user.language, "en_GB") 

3149 

3150 def test_user_is_added_to_group(self) -> None: 

3151 user = UserFactory() 

3152 group = GroupFactory() 

3153 

3154 multidict = MultiDict( 

3155 [ 

3156 (ViewParam.USERNAME, user.username), 

3157 ("__start__", "group_ids:sequence"), 

3158 ("group_id_sequence", str(group.id)), 

3159 ("__end__", "group_ids:sequence"), 

3160 (FormAction.SUBMIT, "submit"), 

3161 ] 

3162 ) 

3163 self.req.fake_request_post_from_dict(multidict) 

3164 self.req.add_get_params( 

3165 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3166 ) 

3167 

3168 with mock.patch.object(user, "set_group_ids") as mock_set_group_ids: 

3169 with self.assertRaises(HTTPFound): 

3170 edit_user(self.req) 

3171 

3172 mock_set_group_ids.assert_called_once_with([group.id]) 

3173 

3174 def test_user_stays_in_group_the_groupadmin_cannot_edit(self) -> None: 

3175 regular_user = UserFactory(username="regular_user") 

3176 group_b_admin = UserFactory(username="group_b_admin") 

3177 group_a = GroupFactory() 

3178 group_b = GroupFactory() 

3179 UserGroupMembershipFactory( 

3180 user_id=regular_user.id, group_id=group_a.id 

3181 ) 

3182 UserGroupMembershipFactory( 

3183 user_id=regular_user.id, group_id=group_b.id 

3184 ) 

3185 UserGroupMembershipFactory( 

3186 user_id=group_b_admin.id, group_id=group_b.id, groupadmin=True 

3187 ) 

3188 self.req._debugging_user = group_b_admin 

3189 

3190 multidict = MultiDict( 

3191 [ 

3192 (ViewParam.USERNAME, regular_user.username), 

3193 ("__start__", "group_ids:sequence"), 

3194 ("group_id_sequence", str(group_b.id)), 

3195 ("__end__", "group_ids:sequence"), 

3196 (FormAction.SUBMIT, "submit"), 

3197 ] 

3198 ) 

3199 self.req.fake_request_post_from_dict(multidict) 

3200 self.req.add_get_params( 

3201 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3202 ) 

3203 

3204 with mock.patch.object( 

3205 regular_user, "set_group_ids" 

3206 ) as mock_set_group_ids: 

3207 with self.assertRaises(HTTPFound): 

3208 edit_user(self.req) 

3209 

3210 [actual_group_ids] = mock_set_group_ids.call_args[0] 

3211 self.assertEqual( 

3212 sorted(actual_group_ids), sorted([group_a.id, group_b.id]) 

3213 ) 

3214 

3215 def test_upload_group_id_unset_when_membership_removed(self) -> None: 

3216 group_a = GroupFactory() 

3217 group_b = GroupFactory() 

3218 regular_user = UserFactory(upload_group=group_a) 

3219 groupadmin = UserFactory() 

3220 UserGroupMembershipFactory( 

3221 group_id=group_a.id, user_id=regular_user.id 

3222 ) 

3223 UserGroupMembershipFactory( 

3224 group_id=group_b.id, user_id=regular_user.id 

3225 ) 

3226 UserGroupMembershipFactory( 

3227 group_id=group_a.id, user_id=groupadmin.id, groupadmin=True 

3228 ) 

3229 UserGroupMembershipFactory( 

3230 group_id=group_b.id, user_id=groupadmin.id, groupadmin=True 

3231 ) 

3232 self.req._debugging_user = groupadmin 

3233 

3234 multidict = MultiDict( 

3235 [ 

3236 (ViewParam.USERNAME, regular_user.username), 

3237 ("__start__", "group_ids:sequence"), 

3238 ("group_id_sequence", str(group_b.id)), 

3239 ("__end__", "group_ids:sequence"), 

3240 (FormAction.SUBMIT, "submit"), 

3241 ] 

3242 ) 

3243 self.req.fake_request_post_from_dict(multidict) 

3244 self.req.add_get_params( 

3245 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3246 ) 

3247 

3248 with self.assertRaises(HTTPFound): 

3249 edit_user(self.req) 

3250 

3251 self.assertIsNone(regular_user.upload_group_id) 

3252 

3253 def test_get_form_values(self) -> None: 

3254 regular_user = UserFactory( 

3255 username="regular_user", 

3256 fullname="Full Name", 

3257 email="user@example.com", 

3258 language="da_DK", 

3259 ) 

3260 group_b_admin = UserFactory(username="group_b_admin") 

3261 group_a = GroupFactory() 

3262 group_b = GroupFactory() 

3263 UserGroupMembershipFactory( 

3264 user_id=regular_user.id, group_id=group_a.id 

3265 ) 

3266 UserGroupMembershipFactory( 

3267 user_id=regular_user.id, group_id=group_b.id 

3268 ) 

3269 UserGroupMembershipFactory( 

3270 user_id=group_b_admin.id, group_id=group_b.id, groupadmin=True 

3271 ) 

3272 self.req._debugging_user = group_b_admin 

3273 

3274 view = EditUserGroupAdminView(self.req) 

3275 # Would normally be set when going through dispatch() 

3276 view.object = regular_user 

3277 

3278 form_values = view.get_form_values() 

3279 

3280 self.assertEqual( 

3281 form_values[ViewParam.USERNAME], regular_user.username 

3282 ) 

3283 self.assertEqual( 

3284 form_values[ViewParam.FULLNAME], regular_user.fullname 

3285 ) 

3286 self.assertEqual(form_values[ViewParam.EMAIL], regular_user.email) 

3287 self.assertEqual( 

3288 form_values[ViewParam.LANGUAGE], regular_user.language 

3289 ) 

3290 self.assertEqual(form_values[ViewParam.GROUP_IDS], [group_b.id]) 

3291 

3292 def test_raises_if_email_address_used_for_mfa(self) -> None: 

3293 regular_user = UserFactory( 

3294 username="regular_user", 

3295 mfa_method=MfaMethod.HOTP_EMAIL, 

3296 email="user@example.com", 

3297 ) 

3298 

3299 multidict = MultiDict( 

3300 [ 

3301 (ViewParam.USERNAME, regular_user.username), 

3302 (ViewParam.EMAIL, ""), 

3303 (FormAction.SUBMIT, "submit"), 

3304 ] 

3305 ) 

3306 self.req.fake_request_post_from_dict(multidict) 

3307 self.req.add_get_params( 

3308 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3309 ) 

3310 

3311 with self.assertRaises(HTTPBadRequest) as cm: 

3312 edit_user(self.req) 

3313 

3314 self.assertIn( 

3315 "used for multi-factor authentication", cm.exception.message 

3316 ) 

3317 

3318 

3319class EditOwnUserMfaViewTests(BasicDatabaseTestCase): 

3320 def test_get_form_values_mfa_method(self) -> None: 

3321 regular_user = UserFactory( 

3322 username="regular_user", mfa_method=MfaMethod.HOTP_SMS 

3323 ) 

3324 self.req._debugging_user = regular_user 

3325 view = EditOwnUserMfaView(self.req) 

3326 

3327 # Would normally be set when going through dispatch() 

3328 view.object = regular_user 

3329 

3330 form_values = view.get_form_values() 

3331 

3332 self.assertEqual( 

3333 form_values[ViewParam.MFA_METHOD], regular_user.mfa_method 

3334 ) 

3335 

3336 def test_get_form_values_hotp_email(self) -> None: 

3337 regular_user = UserFactory( 

3338 username="regular_user", 

3339 mfa_method=MfaMethod.HOTP_EMAIL, 

3340 email="regular_user@example.com", 

3341 ) 

3342 self.req._debugging_user = regular_user 

3343 view = EditOwnUserMfaView(self.req) 

3344 

3345 # Would normally be set when going through dispatch() 

3346 view.object = regular_user 

3347 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_EMAIL) 

3348 

3349 mock_secret_key = pyotp.random_base32() 

3350 with mock.patch( 

3351 "camcops_server.cc_modules.webview.pyotp.random_base32", 

3352 return_value=mock_secret_key, 

3353 ) as mock_random_base32: 

3354 form_values = view.get_form_values() 

3355 

3356 mock_random_base32.assert_called_once() 

3357 

3358 self.assertEqual( 

3359 form_values[ViewParam.MFA_SECRET_KEY], mock_secret_key 

3360 ) 

3361 self.assertEqual(form_values[ViewParam.EMAIL], regular_user.email) 

3362 

3363 def test_get_form_values_hotp_sms(self) -> None: 

3364 regular_user = UserFactory( 

3365 username="regular_user", 

3366 mfa_method=MfaMethod.HOTP_SMS, 

3367 phone_number=phonenumbers.parse(Fake.en_gb.valid_phone_number()), 

3368 ) 

3369 self.req._debugging_user = regular_user 

3370 view = EditOwnUserMfaView(self.req) 

3371 

3372 # Would normally be set when going through dispatch() 

3373 view.object = regular_user 

3374 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_SMS) 

3375 

3376 mock_secret_key = pyotp.random_base32() 

3377 with mock.patch( 

3378 "camcops_server.cc_modules.webview.pyotp.random_base32", 

3379 return_value=mock_secret_key, 

3380 ) as mock_random_base32: 

3381 form_values = view.get_form_values() 

3382 

3383 mock_random_base32.assert_called_once() 

3384 

3385 self.assertEqual( 

3386 form_values[ViewParam.MFA_SECRET_KEY], mock_secret_key 

3387 ) 

3388 self.assertEqual( 

3389 form_values[ViewParam.PHONE_NUMBER], regular_user.phone_number 

3390 ) 

3391 

3392 def test_get_form_values_totp(self) -> None: 

3393 regular_user = UserFactory( 

3394 username="regular_user", mfa_method=MfaMethod.TOTP 

3395 ) 

3396 self.req._debugging_user = regular_user 

3397 view = EditOwnUserMfaView(self.req) 

3398 

3399 # Would normally be set when going through dispatch() 

3400 view.object = regular_user 

3401 view.state.update(step=EditOwnUserMfaView.STEP_TOTP) 

3402 

3403 mock_secret_key = pyotp.random_base32() 

3404 with mock.patch( 

3405 "camcops_server.cc_modules.webview.pyotp.random_base32", 

3406 return_value=mock_secret_key, 

3407 ) as mock_random_base32: 

3408 form_values = view.get_form_values() 

3409 

3410 mock_random_base32.assert_called_once() 

3411 

3412 self.assertEqual( 

3413 form_values[ViewParam.MFA_SECRET_KEY], mock_secret_key 

3414 ) 

3415 

3416 def test_user_can_set_secret_key(self) -> None: 

3417 regular_user = UserFactory(username="regular_user") 

3418 regular_user.mfa_method = MfaMethod.TOTP 

3419 regular_user.ensure_mfa_info() 

3420 # ... otherwise, the absence of e.g. the HOTP counter will cause a 

3421 # secret key reset. 

3422 mfa_secret_key = pyotp.random_base32() 

3423 

3424 multidict = MultiDict( 

3425 [ 

3426 (ViewParam.MFA_SECRET_KEY, mfa_secret_key), 

3427 (FormAction.SUBMIT, "submit"), 

3428 ] 

3429 ) 

3430 self.req._debugging_user = regular_user 

3431 self.req.fake_request_post_from_dict(multidict) 

3432 self.req.config.mfa_methods = [MfaMethod.TOTP] 

3433 

3434 view = EditOwnUserMfaView(self.req) 

3435 view.state.update(step=EditOwnUserMfaView.STEP_TOTP) 

3436 

3437 view.dispatch() 

3438 

3439 self.assertEqual(regular_user.mfa_secret_key, mfa_secret_key) 

3440 

3441 def test_user_can_set_method_totp(self) -> None: 

3442 regular_user = UserFactory(username="regular_user") 

3443 multidict = MultiDict( 

3444 [ 

3445 (ViewParam.MFA_METHOD, MfaMethod.TOTP), 

3446 (FormAction.SUBMIT, "submit"), 

3447 ] 

3448 ) 

3449 self.req._debugging_user = regular_user 

3450 self.req.fake_request_post_from_dict(multidict) 

3451 self.req.config.mfa_methods = [MfaMethod.TOTP] 

3452 

3453 view = EditOwnUserMfaView(self.req) 

3454 

3455 view.dispatch() 

3456 

3457 self.assertEqual(regular_user.mfa_method, MfaMethod.TOTP) 

3458 

3459 def test_user_can_set_method_hotp_email(self) -> None: 

3460 regular_user = UserFactory(username="regular_user") 

3461 multidict = MultiDict( 

3462 [ 

3463 (ViewParam.MFA_METHOD, MfaMethod.HOTP_EMAIL), 

3464 (FormAction.SUBMIT, "submit"), 

3465 ] 

3466 ) 

3467 self.req._debugging_user = regular_user 

3468 self.req.fake_request_post_from_dict(multidict) 

3469 self.req.config.mfa_methods = [MfaMethod.HOTP_EMAIL] 

3470 

3471 view = EditOwnUserMfaView(self.req) 

3472 

3473 view.dispatch() 

3474 

3475 self.assertEqual(regular_user.mfa_method, MfaMethod.HOTP_EMAIL) 

3476 self.assertEqual(regular_user.hotp_counter, 0) 

3477 

3478 def test_user_can_set_method_hotp_sms(self) -> None: 

3479 regular_user = UserFactory(username="regular_user") 

3480 multidict = MultiDict( 

3481 [ 

3482 (ViewParam.MFA_METHOD, MfaMethod.HOTP_SMS), 

3483 (FormAction.SUBMIT, "submit"), 

3484 ] 

3485 ) 

3486 self.req._debugging_user = regular_user 

3487 self.req.fake_request_post_from_dict(multidict) 

3488 self.req.config.mfa_methods = [MfaMethod.HOTP_SMS] 

3489 

3490 view = EditOwnUserMfaView(self.req) 

3491 

3492 view.dispatch() 

3493 

3494 self.assertEqual(regular_user.mfa_method, MfaMethod.HOTP_SMS) 

3495 self.assertEqual(regular_user.hotp_counter, 0) 

3496 

3497 def test_user_can_disable_mfa(self) -> None: 

3498 regular_user = UserFactory( 

3499 username="regular_user", mfa_method=MfaMethod.TOTP 

3500 ) 

3501 multidict = MultiDict( 

3502 [ 

3503 (ViewParam.MFA_METHOD, MfaMethod.NO_MFA), 

3504 (FormAction.SUBMIT, "submit"), 

3505 ] 

3506 ) 

3507 self.req._debugging_user = regular_user 

3508 self.req.fake_request_post_from_dict(multidict) 

3509 self.req.config.mfa_methods = [ 

3510 MfaMethod.TOTP, 

3511 MfaMethod.HOTP_SMS, 

3512 MfaMethod.HOTP_EMAIL, 

3513 MfaMethod.NO_MFA, 

3514 ] 

3515 

3516 view = EditOwnUserMfaView(self.req) 

3517 

3518 with self.assertRaises(HTTPFound): 

3519 view.dispatch() 

3520 

3521 self.assertEqual(regular_user.mfa_method, MfaMethod.NO_MFA) 

3522 

3523 def test_user_can_set_phone_number(self) -> None: 

3524 regular_user = UserFactory(username="regular_user") 

3525 regular_user.mfa_method = MfaMethod.HOTP_SMS 

3526 

3527 phone_number_str = Fake.en_gb.valid_phone_number() 

3528 

3529 multidict = MultiDict( 

3530 [ 

3531 (ViewParam.PHONE_NUMBER, phone_number_str), 

3532 (FormAction.SUBMIT, "submit"), 

3533 ] 

3534 ) 

3535 self.req._debugging_user = regular_user 

3536 self.req.fake_request_post_from_dict(multidict) 

3537 self.req.config.mfa_methods = [MfaMethod.HOTP_SMS] 

3538 

3539 view = EditOwnUserMfaView(self.req) 

3540 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_SMS) 

3541 

3542 view.dispatch() 

3543 

3544 self.assertEqual( 

3545 regular_user.phone_number, phonenumbers.parse(phone_number_str) 

3546 ) 

3547 

3548 def test_user_can_set_email_address(self) -> None: 

3549 regular_user = UserFactory(username="regular_user") 

3550 # We're going to force this user to the e-mail verification step, so 

3551 # we need to ensure it's set to use e-mail MFA: 

3552 regular_user.mfa_method = MfaMethod.HOTP_EMAIL 

3553 multidict = MultiDict( 

3554 [ 

3555 (ViewParam.EMAIL, "regular_user@example.com"), 

3556 (FormAction.SUBMIT, "submit"), 

3557 ] 

3558 ) 

3559 self.req._debugging_user = regular_user 

3560 self.req.fake_request_post_from_dict(multidict) 

3561 

3562 view = EditOwnUserMfaView(self.req) 

3563 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_EMAIL) 

3564 

3565 view.dispatch() 

3566 

3567 self.assertEqual(regular_user.email, "regular_user@example.com") 

3568 

3569 

3570class ChangeOtherPasswordViewTests(TestStateMixin, BasicDatabaseTestCase): 

3571 def setUp(self) -> None: 

3572 super().setUp() 

3573 

3574 self.req.matched_route.name = "change_other_password" 

3575 

3576 def test_raises_for_invalid_user(self) -> None: 

3577 multidict = MultiDict([(FormAction.SUBMIT, "submit")]) 

3578 self.req.fake_request_post_from_dict(multidict) 

3579 

3580 self.req.add_get_params( 

3581 {ViewParam.USER_ID: "123"}, set_method_get=False 

3582 ) 

3583 

3584 view = ChangeOtherPasswordView(self.req) 

3585 with self.assertRaises(HTTPBadRequest) as cm: 

3586 view.dispatch() 

3587 

3588 self.assertIn("Cannot find User with id:123", cm.exception.message) 

3589 

3590 def test_raises_when_user_may_not_edit_other_user(self) -> None: 

3591 regular_user = UserFactory(username="regular_user") 

3592 multidict = MultiDict( 

3593 [ 

3594 ("__start__", "new_password:mapping"), 

3595 (ViewParam.NEW_PASSWORD, "monkeybusiness"), 

3596 ("new_password-confirm", "monkeybusiness"), 

3597 ("__end__", "new_password:mapping"), 

3598 (FormAction.SUBMIT, "submit"), 

3599 ] 

3600 ) 

3601 self.req._debugging_user = regular_user 

3602 self.req.fake_request_post_from_dict(multidict) 

3603 

3604 self.req.add_get_params( 

3605 {ViewParam.USER_ID: str(self.system_user.id)}, set_method_get=False 

3606 ) 

3607 

3608 view = ChangeOtherPasswordView(self.req) 

3609 with self.assertRaises(HTTPBadRequest) as cm: 

3610 view.dispatch() 

3611 

3612 self.assertIn("Nobody may edit the system user", cm.exception.message) 

3613 

3614 def test_password_set(self) -> None: 

3615 groupadmin = UserFactory(username="groupadmin") 

3616 regular_user = UserFactory(username="regular_user") 

3617 group = GroupFactory() 

3618 UserGroupMembershipFactory( 

3619 user_id=groupadmin.id, group_id=group.id, groupadmin=True 

3620 ) 

3621 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id) 

3622 

3623 self.assertFalse(regular_user.must_change_password) 

3624 

3625 multidict = MultiDict( 

3626 [ 

3627 ("__start__", "new_password:mapping"), 

3628 (ViewParam.NEW_PASSWORD, "monkeybusiness"), 

3629 ("new_password-confirm", "monkeybusiness"), 

3630 ("__end__", "new_password:mapping"), 

3631 (FormAction.SUBMIT, "submit"), 

3632 ] 

3633 ) 

3634 self.req._debugging_user = groupadmin 

3635 self.req.fake_request_post_from_dict(multidict) 

3636 

3637 self.req.add_get_params( 

3638 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3639 ) 

3640 

3641 view = ChangeOtherPasswordView(self.req) 

3642 

3643 with mock.patch.object( 

3644 regular_user, "set_password" 

3645 ) as mock_set_password: 

3646 with self.assertRaises(HTTPFound): 

3647 view.dispatch() 

3648 

3649 mock_set_password.assert_called_once_with(self.req, "monkeybusiness") 

3650 self.assertFalse(regular_user.must_change_password) 

3651 

3652 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

3653 self.assertTrue(len(messages) > 0) 

3654 self.assertIn("Password changed for user 'regular_user'", messages[0]) 

3655 

3656 def test_user_forced_to_change_password(self) -> None: 

3657 groupadmin = UserFactory(username="groupadmin") 

3658 regular_user = UserFactory(username="regular_user") 

3659 group = GroupFactory() 

3660 UserGroupMembershipFactory( 

3661 user_id=groupadmin.id, group_id=group.id, groupadmin=True 

3662 ) 

3663 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id) 

3664 multidict = MultiDict( 

3665 [ 

3666 (ViewParam.MUST_CHANGE_PASSWORD, "true"), 

3667 ("__start__", "new_password:mapping"), 

3668 (ViewParam.NEW_PASSWORD, "monkeybusiness"), 

3669 ("new_password-confirm", "monkeybusiness"), 

3670 ("__end__", "new_password:mapping"), 

3671 (FormAction.SUBMIT, "submit"), 

3672 ] 

3673 ) 

3674 self.req._debugging_user = groupadmin 

3675 self.req.fake_request_post_from_dict(multidict) 

3676 

3677 self.req.add_get_params( 

3678 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3679 ) 

3680 

3681 view = ChangeOtherPasswordView(self.req) 

3682 

3683 with mock.patch.object( 

3684 regular_user, "force_password_change" 

3685 ) as mock_force_change: 

3686 with self.assertRaises(HTTPFound): 

3687 view.dispatch() 

3688 

3689 mock_force_change.assert_called_once() 

3690 

3691 def test_redirects_if_editing_own_account(self) -> None: 

3692 superuser = UserFactory(username="admin", superuser=True) 

3693 self.req._debugging_user = superuser 

3694 self.req.add_get_params( 

3695 {ViewParam.USER_ID: str(superuser.id)}, set_method_get=False 

3696 ) 

3697 

3698 view = ChangeOtherPasswordView(self.req) 

3699 with self.assertRaises(HTTPFound) as cm: 

3700 view.dispatch() 

3701 

3702 self.assertEqual(cm.exception.status_code, 302) 

3703 self.assertIn( 

3704 f"/{Routes.CHANGE_OWN_PASSWORD}", cm.exception.headers["Location"] 

3705 ) 

3706 

3707 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

3708 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

3709 def test_user_sees_otp_form_if_mfa_setup( 

3710 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

3711 ) -> None: 

3712 superuser = UserFactory( 

3713 username="admin", 

3714 superuser=True, 

3715 email="admin@example.com", 

3716 mfa_method=MfaMethod.HOTP_EMAIL, 

3717 mfa_secret_key=pyotp.random_base32(), 

3718 hotp_counter=0, 

3719 ) 

3720 

3721 user = UserFactory(username="user") 

3722 self.req._debugging_user = superuser 

3723 self.req.add_get_params( 

3724 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3725 ) 

3726 

3727 view = ChangeOtherPasswordView(self.req) 

3728 

3729 with mock.patch.object(view, "render_to_response") as mock_render: 

3730 view.dispatch() 

3731 

3732 args, kwargs = mock_render.call_args 

3733 context = args[0] 

3734 

3735 self.assertIn("form", context) 

3736 self.assertIn("Enter the six-digit code", context["form"]) 

3737 

3738 def test_code_sent_if_mfa_setup(self) -> None: 

3739 self.req.config.sms_backend = get_sms_backend( 

3740 SmsBackendNames.CONSOLE, {} 

3741 ) 

3742 

3743 phone_number_str = Fake.en_gb.valid_phone_number() 

3744 superuser = UserFactory( 

3745 username="admin", 

3746 superuser=True, 

3747 email="admin@example.com", 

3748 phone_number=phonenumbers.parse(phone_number_str), 

3749 mfa_secret_key=pyotp.random_base32(), 

3750 mfa_method=MfaMethod.HOTP_SMS, 

3751 hotp_counter=0, 

3752 ) 

3753 user = UserFactory(username="user", email="user@example.com") 

3754 self.req._debugging_user = superuser 

3755 self.req.add_get_params( 

3756 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3757 ) 

3758 

3759 view = ChangeOtherPasswordView(self.req) 

3760 with self.assertLogs(level=logging.INFO) as logging_cm: 

3761 view.dispatch() 

3762 

3763 expected_code = pyotp.HOTP(superuser.mfa_secret_key).at(1) 

3764 expected_message = f"Your CamCOPS verification code is {expected_code}" 

3765 

3766 self.assertIn( 

3767 ConsoleSmsBackend.make_msg(phone_number_str, expected_message), 

3768 logging_cm.output[0], 

3769 ) 

3770 

3771 def test_user_can_enter_token(self) -> None: 

3772 superuser = UserFactory( 

3773 username="admin", 

3774 superuser=True, 

3775 mfa_method=MfaMethod.HOTP_EMAIL, 

3776 mfa_secret_key=pyotp.random_base32(), 

3777 email="user@example.com", 

3778 hotp_counter=1, 

3779 ) 

3780 user = UserFactory(username="user", email="user@example.com") 

3781 self.req._debugging_user = superuser 

3782 self.req.add_get_params( 

3783 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3784 ) 

3785 

3786 hotp = pyotp.HOTP(superuser.mfa_secret_key) 

3787 multidict = MultiDict( 

3788 [ 

3789 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)), 

3790 (FormAction.SUBMIT, "submit"), 

3791 ] 

3792 ) 

3793 self.req.fake_request_post_from_dict(multidict) 

3794 

3795 view = ChangeOtherPasswordView(self.req) 

3796 

3797 response = view.dispatch() 

3798 

3799 self.assertEqual( 

3800 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP], 

3801 ChangeOtherPasswordView.STEP_CHANGE_PASSWORD, 

3802 ) 

3803 self.assertIn("Change password for user:", response.body.decode(UTF8)) 

3804 self.assertIn( 

3805 "Type the new password and confirm it", response.body.decode(UTF8) 

3806 ) 

3807 

3808 def test_form_state_cleared_on_invalid_token(self) -> None: 

3809 superuser = UserFactory( 

3810 username="superuser", 

3811 superuser=True, 

3812 mfa_method=MfaMethod.HOTP_EMAIL, 

3813 mfa_secret_key=pyotp.random_base32(), 

3814 email="user@example.com", 

3815 hotp_counter=1, 

3816 ) 

3817 user = UserFactory(username="user", email="user@example.com") 

3818 self.req._debugging_user = superuser 

3819 self.req.add_get_params( 

3820 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3821 ) 

3822 

3823 hotp = pyotp.HOTP(superuser.mfa_secret_key) 

3824 multidict = MultiDict( 

3825 [ 

3826 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)), 

3827 (FormAction.SUBMIT, "submit"), 

3828 ] 

3829 ) 

3830 self.req.fake_request_post_from_dict(multidict) 

3831 

3832 view = ChangeOtherPasswordView(self.req) 

3833 

3834 with self.assertRaises(HTTPFound): 

3835 view.dispatch() 

3836 

3837 messages = self.req.session.peek_flash(FlashQueue.DANGER) 

3838 self.assertTrue(len(messages) > 0) 

3839 self.assertIn("You entered an invalid code", messages[0]) 

3840 

3841 self.assert_state_is_clean() 

3842 

3843 def test_cannot_change_password_if_timed_out(self) -> None: 

3844 self.req.config.mfa_timeout_s = 600 

3845 superuser = UserFactory( 

3846 username="admin", 

3847 superuser=True, 

3848 mfa_method=MfaMethod.TOTP, 

3849 mfa_secret_key=pyotp.random_base32(), 

3850 ) 

3851 user = UserFactory(username="user") 

3852 self.req._debugging_user = superuser 

3853 self.req.add_get_params( 

3854 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3855 ) 

3856 

3857 totp = pyotp.TOTP(superuser.mfa_secret_key) 

3858 multidict = MultiDict( 

3859 [ 

3860 (ViewParam.ONE_TIME_PASSWORD, totp.now()), 

3861 (FormAction.SUBMIT, "submit"), 

3862 ] 

3863 ) 

3864 self.req.fake_request_post_from_dict(multidict) 

3865 

3866 view = ChangeOtherPasswordView(self.req) 

3867 view.state.update( 

3868 mfa_user=superuser.id, 

3869 mfa_time=int(time.time() - 601), 

3870 step=MfaMixin.STEP_MFA, 

3871 ) 

3872 

3873 with mock.patch.object( 

3874 view, "fail_timed_out", side_effect=HTTPFound 

3875 ) as mock_fail_timed_out: 

3876 with self.assertRaises(HTTPFound): 

3877 view.dispatch() 

3878 

3879 mock_fail_timed_out.assert_called_once() 

3880 

3881 

3882class EditOtherUserMfaViewTests(TestStateMixin, BasicDatabaseTestCase): 

3883 def setUp(self) -> None: 

3884 super().setUp() 

3885 

3886 self.req.matched_route.name = "edit_other_user_mfa" 

3887 

3888 def test_raises_for_invalid_user(self) -> None: 

3889 multidict = MultiDict([(FormAction.SUBMIT, "submit")]) 

3890 self.req.fake_request_post_from_dict(multidict) 

3891 

3892 self.req.add_get_params( 

3893 {ViewParam.USER_ID: "123"}, set_method_get=False 

3894 ) 

3895 

3896 view = EditOtherUserMfaView(self.req) 

3897 with self.assertRaises(HTTPBadRequest) as cm: 

3898 view.dispatch() 

3899 

3900 self.assertIn("Cannot find User with id:123", cm.exception.message) 

3901 

3902 def test_raises_when_user_may_not_edit_other_user(self) -> None: 

3903 regular_user = UserFactory(username="regular_user") 

3904 multidict = MultiDict([(FormAction.SUBMIT, "submit")]) 

3905 self.req._debugging_user = regular_user 

3906 self.req.fake_request_post_from_dict(multidict) 

3907 

3908 self.req.add_get_params( 

3909 {ViewParam.USER_ID: str(self.system_user.id)}, set_method_get=False 

3910 ) 

3911 

3912 view = EditOtherUserMfaView(self.req) 

3913 with self.assertRaises(HTTPBadRequest) as cm: 

3914 view.dispatch() 

3915 

3916 self.assertIn("Nobody may edit the system user", cm.exception.message) 

3917 

3918 def test_disable_mfa(self) -> None: 

3919 groupadmin = UserFactory(username="groupadmin") 

3920 regular_user = UserFactory( 

3921 username="regular_user", mfa_method=MfaMethod.TOTP 

3922 ) 

3923 group = GroupFactory() 

3924 UserGroupMembershipFactory( 

3925 user_id=groupadmin.id, group_id=group.id, groupadmin=True 

3926 ) 

3927 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id) 

3928 self.assertFalse(regular_user.must_change_password) 

3929 

3930 multidict = MultiDict( 

3931 [(ViewParam.DISABLE_MFA, "true"), (FormAction.SUBMIT, "submit")] 

3932 ) 

3933 self.req._debugging_user = groupadmin 

3934 self.req.fake_request_post_from_dict(multidict) 

3935 

3936 self.req.add_get_params( 

3937 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False 

3938 ) 

3939 

3940 view = EditOtherUserMfaView(self.req) 

3941 with self.assertRaises(HTTPFound): 

3942 view.dispatch() 

3943 

3944 self.assertEqual(regular_user.mfa_method, MfaMethod.NO_MFA) 

3945 

3946 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

3947 self.assertTrue(len(messages) > 0) 

3948 self.assertIn( 

3949 "Multi-factor authentication disabled for user 'regular_user'", 

3950 messages[0], 

3951 ) 

3952 

3953 def test_redirects_if_editing_own_account(self) -> None: 

3954 superuser = UserFactory(username="admin", superuser=True) 

3955 self.req._debugging_user = superuser 

3956 self.req.add_get_params({ViewParam.USER_ID: str(superuser.id)}) 

3957 

3958 view = EditOtherUserMfaView(self.req) 

3959 with self.assertRaises(HTTPFound) as cm: 

3960 view.dispatch() 

3961 

3962 self.assertEqual(cm.exception.status_code, 302) 

3963 self.assertIn( 

3964 f"/{Routes.EDIT_OWN_USER_MFA}", cm.exception.headers["Location"] 

3965 ) 

3966 

3967 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

3968 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

3969 def test_user_sees_otp_form_if_mfa_setup( 

3970 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

3971 ) -> None: 

3972 superuser = UserFactory( 

3973 username="admin", 

3974 superuser=True, 

3975 email="admin@example.com", 

3976 mfa_method=MfaMethod.HOTP_EMAIL, 

3977 mfa_secret_key=pyotp.random_base32(), 

3978 hotp_counter=0, 

3979 ) 

3980 

3981 user = UserFactory(username="user") 

3982 self.req._debugging_user = superuser 

3983 self.req.add_get_params( 

3984 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

3985 ) 

3986 

3987 view = EditOtherUserMfaView(self.req) 

3988 

3989 with mock.patch.object(view, "render_to_response") as mock_render: 

3990 view.dispatch() 

3991 

3992 args, kwargs = mock_render.call_args 

3993 context = args[0] 

3994 

3995 self.assertIn("form", context) 

3996 self.assertIn("Enter the six-digit code", context["form"]) 

3997 

3998 def test_code_sent_if_mfa_setup(self) -> None: 

3999 self.req.config.sms_backend = get_sms_backend( 

4000 SmsBackendNames.CONSOLE, {} 

4001 ) 

4002 

4003 phone_number_str = Fake.en_gb.valid_phone_number() 

4004 superuser = UserFactory( 

4005 username="admin", 

4006 superuser=True, 

4007 email="admin@example.com", 

4008 phone_number=phonenumbers.parse(phone_number_str), 

4009 mfa_secret_key=pyotp.random_base32(), 

4010 mfa_method=MfaMethod.HOTP_SMS, 

4011 hotp_counter=0, 

4012 ) 

4013 user = UserFactory(username="user", email="user@example.com") 

4014 self.req._debugging_user = superuser 

4015 self.req.add_get_params( 

4016 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

4017 ) 

4018 

4019 view = EditOtherUserMfaView(self.req) 

4020 with self.assertLogs(level=logging.INFO) as logging_cm: 

4021 view.dispatch() 

4022 

4023 expected_code = pyotp.HOTP(superuser.mfa_secret_key).at(1) 

4024 expected_message = f"Your CamCOPS verification code is {expected_code}" 

4025 

4026 self.assertIn( 

4027 ConsoleSmsBackend.make_msg(phone_number_str, expected_message), 

4028 logging_cm.output[0], 

4029 ) 

4030 

4031 def test_user_can_enter_token(self) -> None: 

4032 superuser = UserFactory( 

4033 username="admin", 

4034 superuser=True, 

4035 mfa_method=MfaMethod.HOTP_EMAIL, 

4036 mfa_secret_key=pyotp.random_base32(), 

4037 email="user@example.com", 

4038 hotp_counter=1, 

4039 ) 

4040 user = UserFactory(username="user", email="user@example.com") 

4041 self.req._debugging_user = superuser 

4042 self.req.add_get_params( 

4043 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

4044 ) 

4045 

4046 hotp = pyotp.HOTP(superuser.mfa_secret_key) 

4047 multidict = MultiDict( 

4048 [ 

4049 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)), 

4050 (FormAction.SUBMIT, "submit"), 

4051 ] 

4052 ) 

4053 self.req.fake_request_post_from_dict(multidict) 

4054 

4055 view = EditOtherUserMfaView(self.req) 

4056 

4057 response = view.dispatch() 

4058 

4059 self.assertEqual( 

4060 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP], 

4061 "other_user_mfa", 

4062 ) 

4063 self.assertIn( 

4064 "Edit multi-factor authentication for user:", 

4065 response.body.decode(UTF8), 

4066 ) 

4067 

4068 def test_form_state_cleared_on_invalid_token(self) -> None: 

4069 superuser = UserFactory( 

4070 username="superuser", 

4071 superuser=True, 

4072 mfa_method=MfaMethod.HOTP_EMAIL, 

4073 mfa_secret_key=pyotp.random_base32(), 

4074 email="user@example.com", 

4075 hotp_counter=1, 

4076 ) 

4077 user = UserFactory(username="user", email="user@example.com") 

4078 self.req._debugging_user = superuser 

4079 self.req.add_get_params( 

4080 {ViewParam.USER_ID: str(user.id)}, set_method_get=False 

4081 ) 

4082 

4083 hotp = pyotp.HOTP(superuser.mfa_secret_key) 

4084 multidict = MultiDict( 

4085 [ 

4086 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)), 

4087 (FormAction.SUBMIT, "submit"), 

4088 ] 

4089 ) 

4090 self.req.fake_request_post_from_dict(multidict) 

4091 

4092 view = EditOtherUserMfaView(self.req) 

4093 

4094 with self.assertRaises(HTTPFound): 

4095 view.dispatch() 

4096 

4097 messages = self.req.session.peek_flash(FlashQueue.DANGER) 

4098 self.assertTrue(len(messages) > 0) 

4099 self.assertIn("You entered an invalid code", messages[0]) 

4100 

4101 self.assert_state_is_clean() 

4102 

4103 

4104class EditUserGroupMembershipViewTests(DemoRequestTestCase): 

4105 def test_superuser_can_update_user_group_membership(self) -> None: 

4106 regular_user = UserFactory() 

4107 groupadmin = UserFactory() 

4108 group = GroupFactory() 

4109 

4110 UserGroupMembershipFactory( 

4111 user_id=groupadmin.id, 

4112 group_id=group.id, 

4113 groupadmin=True, 

4114 ) 

4115 

4116 ugm = UserGroupMembershipFactory( 

4117 user_id=regular_user.id, group_id=group.id 

4118 ) 

4119 

4120 self.req._debugging_user = groupadmin 

4121 

4122 self.assertFalse(ugm.may_upload) 

4123 self.assertFalse(ugm.may_register_devices) 

4124 self.assertFalse(ugm.may_use_webviewer) 

4125 self.assertFalse(ugm.view_all_patients_when_unfiltered) 

4126 self.assertFalse(ugm.may_dump_data) 

4127 self.assertFalse(ugm.may_run_reports) 

4128 self.assertFalse(ugm.may_add_notes) 

4129 self.assertFalse(ugm.may_manage_patients) 

4130 self.assertFalse(ugm.may_email_patients) 

4131 self.assertFalse(ugm.groupadmin) 

4132 

4133 multidict = MultiDict( 

4134 [ 

4135 (ViewParam.MAY_UPLOAD, "true"), 

4136 (ViewParam.MAY_REGISTER_DEVICES, "true"), 

4137 (ViewParam.MAY_USE_WEBVIEWER, "true"), 

4138 (ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED, "true"), 

4139 (ViewParam.MAY_DUMP_DATA, "true"), 

4140 (ViewParam.MAY_RUN_REPORTS, "true"), 

4141 (ViewParam.MAY_ADD_NOTES, "true"), 

4142 (ViewParam.MAY_MANAGE_PATIENTS, "true"), 

4143 (ViewParam.MAY_EMAIL_PATIENTS, "true"), 

4144 (ViewParam.GROUPADMIN, "true"), 

4145 (FormAction.SUBMIT, "submit"), 

4146 ] 

4147 ) 

4148 

4149 self.req.fake_request_post_from_dict(multidict) 

4150 self.req.add_get_params( 

4151 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)}, 

4152 set_method_get=False, 

4153 ) 

4154 

4155 with self.assertRaises(HTTPFound): 

4156 edit_user_group_membership(self.req) 

4157 

4158 self.assertTrue(ugm.may_upload) 

4159 self.assertTrue(ugm.may_register_devices) 

4160 self.assertTrue(ugm.may_use_webviewer) 

4161 self.assertTrue(ugm.view_all_patients_when_unfiltered) 

4162 self.assertTrue(ugm.may_dump_data) 

4163 self.assertTrue(ugm.may_run_reports) 

4164 self.assertTrue(ugm.may_add_notes) 

4165 self.assertTrue(ugm.may_manage_patients) 

4166 self.assertTrue(ugm.may_email_patients) 

4167 

4168 def test_groupadmin_can_update_user_group_membership(self) -> None: 

4169 regular_user = UserFactory() 

4170 groupadmin = UserFactory() 

4171 group = GroupFactory() 

4172 

4173 UserGroupMembershipFactory( 

4174 user_id=groupadmin.id, 

4175 group_id=group.id, 

4176 groupadmin=True, 

4177 ) 

4178 

4179 ugm = UserGroupMembershipFactory( 

4180 user_id=regular_user.id, group_id=group.id 

4181 ) 

4182 

4183 self.req._debugging_user = groupadmin 

4184 

4185 self.assertFalse(ugm.may_upload) 

4186 self.assertFalse(ugm.may_register_devices) 

4187 self.assertFalse(ugm.may_use_webviewer) 

4188 self.assertFalse(ugm.view_all_patients_when_unfiltered) 

4189 self.assertFalse(ugm.may_dump_data) 

4190 self.assertFalse(ugm.may_run_reports) 

4191 self.assertFalse(ugm.may_add_notes) 

4192 self.assertFalse(ugm.may_manage_patients) 

4193 self.assertFalse(ugm.may_email_patients) 

4194 

4195 multidict = MultiDict( 

4196 [ 

4197 (ViewParam.MAY_UPLOAD, "true"), 

4198 (ViewParam.MAY_REGISTER_DEVICES, "true"), 

4199 (ViewParam.MAY_USE_WEBVIEWER, "true"), 

4200 (ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED, "true"), 

4201 (ViewParam.MAY_DUMP_DATA, "true"), 

4202 (ViewParam.MAY_RUN_REPORTS, "true"), 

4203 (ViewParam.MAY_ADD_NOTES, "true"), 

4204 (ViewParam.MAY_MANAGE_PATIENTS, "true"), 

4205 (ViewParam.MAY_EMAIL_PATIENTS, "true"), 

4206 (FormAction.SUBMIT, "submit"), 

4207 ] 

4208 ) 

4209 

4210 self.req.fake_request_post_from_dict(multidict) 

4211 self.req.add_get_params( 

4212 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)}, 

4213 set_method_get=False, 

4214 ) 

4215 

4216 with self.assertRaises(HTTPFound): 

4217 edit_user_group_membership(self.req) 

4218 

4219 self.assertTrue(ugm.may_upload) 

4220 self.assertTrue(ugm.may_register_devices) 

4221 self.assertTrue(ugm.may_use_webviewer) 

4222 self.assertTrue(ugm.view_all_patients_when_unfiltered) 

4223 self.assertTrue(ugm.may_dump_data) 

4224 self.assertTrue(ugm.may_run_reports) 

4225 self.assertTrue(ugm.may_add_notes) 

4226 self.assertTrue(ugm.may_manage_patients) 

4227 self.assertTrue(ugm.may_email_patients) 

4228 

4229 def test_raises_if_cant_edit_user(self) -> None: 

4230 system_user = User.get_system_user(self.dbsession) 

4231 groupadmin = UserFactory() 

4232 group = GroupFactory() 

4233 

4234 UserGroupMembershipFactory( 

4235 user_id=groupadmin.id, 

4236 group_id=group.id, 

4237 groupadmin=True, 

4238 ) 

4239 

4240 system_ugm = UserGroupMembershipFactory( 

4241 user_id=system_user.id, group_id=group.id 

4242 ) 

4243 

4244 self.req._debugging_user = groupadmin 

4245 

4246 multidict = MultiDict([(FormAction.SUBMIT, "submit")]) 

4247 

4248 self.req.fake_request_post_from_dict(multidict) 

4249 self.req.add_get_params( 

4250 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(system_ugm.id)}, 

4251 set_method_get=False, 

4252 ) 

4253 

4254 with self.assertRaises(HTTPBadRequest) as cm: 

4255 edit_user_group_membership(self.req) 

4256 

4257 self.assertIn("Nobody may edit the system user", cm.exception.message) 

4258 

4259 def test_raises_if_cant_administer_group(self) -> None: 

4260 group_a = GroupFactory() 

4261 group_b = GroupFactory() 

4262 

4263 user1 = UserFactory() 

4264 user2 = UserFactory() 

4265 

4266 # User 1 is a group administrator for group A, 

4267 # User 2 is a member if group A 

4268 UserGroupMembershipFactory( 

4269 user_id=user1.id, group_id=group_a.id, groupadmin=True 

4270 ) 

4271 UserGroupMembershipFactory(user_id=user2.id, group_id=group_a.id), 

4272 

4273 # User 1 is not an administrator of group B 

4274 # User 2 is a member of group B 

4275 ugm = UserGroupMembershipFactory(user_id=user2.id, group_id=group_b.id) 

4276 

4277 multidict = MultiDict([(FormAction.SUBMIT, "submit")]) 

4278 

4279 self.req.fake_request_post_from_dict(multidict) 

4280 self.req.add_get_params( 

4281 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)}, 

4282 set_method_get=False, 

4283 ) 

4284 

4285 self.req._debugging_user = user1 

4286 

4287 with self.assertRaises(HTTPBadRequest) as cm: 

4288 edit_user_group_membership(self.req) 

4289 

4290 self.assertIn( 

4291 "You may not administer this group", cm.exception.message 

4292 ) 

4293 

4294 def test_cancel_returns_to_users_list(self) -> None: 

4295 regular_user = UserFactory() 

4296 groupadmin = UserFactory() 

4297 group = GroupFactory() 

4298 

4299 UserGroupMembershipFactory( 

4300 user_id=groupadmin.id, 

4301 group_id=group.id, 

4302 groupadmin=True, 

4303 ) 

4304 

4305 ugm = UserGroupMembershipFactory( 

4306 user_id=regular_user.id, group_id=group.id 

4307 ) 

4308 

4309 self.req._debugging_user = groupadmin 

4310 multidict = MultiDict([(FormAction.CANCEL, "cancel")]) 

4311 

4312 self.req.fake_request_post_from_dict(multidict) 

4313 self.req.add_get_params( 

4314 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)}, 

4315 set_method_get=False, 

4316 ) 

4317 

4318 with self.assertRaises(HTTPFound) as cm: 

4319 edit_user_group_membership(self.req) 

4320 

4321 self.assertEqual(cm.exception.status_code, 302) 

4322 

4323 self.assertIn(Routes.VIEW_ALL_USERS, cm.exception.headers["Location"]) 

4324 

4325 

4326class ChangeOwnPasswordViewTests(TestStateMixin, BasicDatabaseTestCase): 

4327 def setUp(self) -> None: 

4328 super().setUp() 

4329 

4330 self.req.matched_route.name = "change_own_password" 

4331 

4332 def test_user_can_change_password(self) -> None: 

4333 new_password = "monkeybusiness" 

4334 

4335 user = UserFactory( 

4336 username="user", 

4337 mfa_method=MfaMethod.NO_MFA, 

4338 password__request=self.req, 

4339 password="secret", 

4340 ) 

4341 multidict = MultiDict( 

4342 [ 

4343 (ViewParam.OLD_PASSWORD, "secret"), 

4344 ("__start__", "new_password:mapping"), 

4345 (ViewParam.NEW_PASSWORD, new_password), 

4346 ("new_password-confirm", new_password), 

4347 ("__end__", "new_password-mapping"), 

4348 (FormAction.SUBMIT, "submit"), 

4349 ] 

4350 ) 

4351 

4352 self.req.fake_request_post_from_dict(multidict) 

4353 self.req._debugging_user = user 

4354 

4355 with mock.patch.object(user, "set_password") as mock_set_password: 

4356 with self.assertRaises(HTTPFound): 

4357 change_own_password(self.req) 

4358 

4359 mock_set_password.assert_called_once_with(self.req, new_password) 

4360 

4361 messages = self.req.session.peek_flash(FlashQueue.SUCCESS) 

4362 self.assertTrue(len(messages) > 0) 

4363 self.assertIn("You have changed your password", messages[0]) 

4364 self.assert_state_is_finished() 

4365 

4366 def test_user_sees_expiry_message(self) -> None: 

4367 user = UserFactory( 

4368 username="user", 

4369 mfa_method=MfaMethod.NO_MFA, 

4370 must_change_password=True, 

4371 ) 

4372 self.req._debugging_user = user 

4373 

4374 with mock.patch.object(self.req.session, "flash") as mock_flash: 

4375 change_own_password(self.req) 

4376 

4377 args, kwargs = mock_flash.call_args 

4378 self.assertIn("Your password has expired", args[0]) 

4379 self.assertEqual(kwargs["queue"], FlashQueue.DANGER) 

4380 

4381 def test_password_must_differ(self) -> None: 

4382 view = ChangeOwnPasswordView(self.req) 

4383 

4384 form_kwargs = view.get_form_kwargs() 

4385 self.assertIn("must_differ", form_kwargs) 

4386 self.assertTrue(form_kwargs["must_differ"]) 

4387 

4388 @mock.patch("camcops_server.cc_modules.cc_email.send_msg") 

4389 @mock.patch("camcops_server.cc_modules.cc_email.make_email") 

4390 def test_user_sees_otp_form_if_mfa_setup( 

4391 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock 

4392 ) -> None: 

4393 user = UserFactory( 

4394 username="user", 

4395 email="user@example.com", 

4396 mfa_method=MfaMethod.HOTP_EMAIL, 

4397 mfa_secret_key=pyotp.random_base32(), 

4398 hotp_counter=0, 

4399 ) 

4400 self.req._debugging_user = user 

4401 

4402 view = ChangeOwnPasswordView(self.req) 

4403 

4404 with mock.patch.object(view, "render_to_response") as mock_render: 

4405 view.dispatch() 

4406 

4407 args, kwargs = mock_render.call_args 

4408 context = args[0] 

4409 

4410 self.assertIn("form", context) 

4411 self.assertIn("Enter the six-digit code", context["form"]) 

4412 

4413 def test_code_sent_if_mfa_setup(self) -> None: 

4414 self.req.config.sms_backend = get_sms_backend( 

4415 SmsBackendNames.CONSOLE, {} 

4416 ) 

4417 phone_number_str = Fake.en_gb.valid_phone_number() 

4418 user = UserFactory( 

4419 username="user", 

4420 email="user@example.com", 

4421 phone_number=phonenumbers.parse(phone_number_str), 

4422 mfa_secret_key=pyotp.random_base32(), 

4423 mfa_method=MfaMethod.HOTP_SMS, 

4424 hotp_counter=0, 

4425 ) 

4426 

4427 self.req._debugging_user = user 

4428 view = ChangeOwnPasswordView(self.req) 

4429 with self.assertLogs(level=logging.INFO) as logging_cm: 

4430 view.dispatch() 

4431 

4432 expected_code = pyotp.HOTP(user.mfa_secret_key).at(1) 

4433 expected_message = f"Your CamCOPS verification code is {expected_code}" 

4434 

4435 self.assertIn( 

4436 ConsoleSmsBackend.make_msg(phone_number_str, expected_message), 

4437 logging_cm.output[0], 

4438 ) 

4439 

4440 def test_user_can_enter_token(self) -> None: 

4441 user = UserFactory( 

4442 username="user", 

4443 mfa_method=MfaMethod.HOTP_EMAIL, 

4444 mfa_secret_key=pyotp.random_base32(), 

4445 email="user@example.com", 

4446 hotp_counter=1, 

4447 password__request=self.req, 

4448 password="secret", 

4449 ) 

4450 self.req._debugging_user = user 

4451 

4452 hotp = pyotp.HOTP(user.mfa_secret_key) 

4453 multidict = MultiDict( 

4454 [ 

4455 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)), # the token 

4456 (FormAction.SUBMIT, "submit"), 

4457 ] 

4458 ) 

4459 self.req.fake_request_post_from_dict(multidict) 

4460 

4461 view = ChangeOwnPasswordView(self.req) 

4462 

4463 response = view.dispatch() 

4464 

4465 self.assertEqual( 

4466 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP], 

4467 ChangeOwnPasswordView.STEP_CHANGE_PASSWORD, 

4468 ) 

4469 self.assertIn("Change your password", response.body.decode(UTF8)) 

4470 self.assertIn( 

4471 "Type the new password and confirm it", response.body.decode(UTF8) 

4472 ) 

4473 

4474 def test_form_state_cleared_on_invalid_token(self) -> None: 

4475 user = UserFactory( 

4476 username="user", 

4477 mfa_method=MfaMethod.HOTP_EMAIL, 

4478 mfa_secret_key=pyotp.random_base32(), 

4479 email="user@example.com", 

4480 hotp_counter=1, 

4481 password__request=self.req, 

4482 password="secret", 

4483 ) 

4484 self.req._debugging_user = user 

4485 

4486 hotp = pyotp.HOTP(user.mfa_secret_key) 

4487 multidict = MultiDict( 

4488 [ 

4489 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)), 

4490 (FormAction.SUBMIT, "submit"), 

4491 ] 

4492 ) 

4493 self.req.fake_request_post_from_dict(multidict) 

4494 

4495 view = ChangeOwnPasswordView(self.req) 

4496 

4497 with self.assertRaises(HTTPFound): 

4498 view.dispatch() 

4499 

4500 messages = self.req.session.peek_flash(FlashQueue.DANGER) 

4501 self.assertTrue(len(messages) > 0) 

4502 self.assertIn("You entered an invalid code", messages[0]) 

4503 

4504 self.assert_state_is_clean() 

4505 

4506 def test_cannot_change_password_if_timed_out(self) -> None: 

4507 self.req.config.mfa_timeout_s = 600 

4508 user = UserFactory( 

4509 username="user", 

4510 mfa_method=MfaMethod.TOTP, 

4511 mfa_secret_key=pyotp.random_base32(), 

4512 password__request=self.req, 

4513 password="secret", 

4514 ) 

4515 self.req._debugging_user = user 

4516 

4517 totp = pyotp.TOTP(user.mfa_secret_key) 

4518 multidict = MultiDict( 

4519 [ 

4520 (ViewParam.ONE_TIME_PASSWORD, totp.now()), 

4521 (FormAction.SUBMIT, "submit"), 

4522 ] 

4523 ) 

4524 self.req.fake_request_post_from_dict(multidict) 

4525 

4526 view = ChangeOwnPasswordView(self.req) 

4527 view.state.update( 

4528 mfa_user=user.id, 

4529 mfa_time=int(time.time() - 601), 

4530 step=MfaMixin.STEP_MFA, 

4531 ) 

4532 

4533 with mock.patch.object( 

4534 view, "fail_timed_out", side_effect=HTTPFound 

4535 ) as mock_fail_timed_out: 

4536 with self.assertRaises(HTTPFound): 

4537 view.dispatch() 

4538 

4539 mock_fail_timed_out.assert_called_once() 

4540 

4541 

4542class AddUserTests(DemoRequestTestCase): 

4543 def setUp(self) -> None: 

4544 super().setUp() 

4545 

4546 self.groupadmin = self.req._debugging_user = UserFactory() 

4547 

4548 def test_user_created(self) -> None: 

4549 group_1 = GroupFactory() 

4550 group_2 = GroupFactory() 

4551 

4552 UserGroupMembershipFactory( 

4553 user_id=self.groupadmin.id, group_id=group_1.id, groupadmin=True 

4554 ) 

4555 UserGroupMembershipFactory( 

4556 user_id=self.groupadmin.id, group_id=group_2.id, groupadmin=True 

4557 ) 

4558 

4559 multidict = MultiDict( 

4560 [ 

4561 ("_charset_", UTF8), 

4562 ("__formid__", "deform"), 

4563 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

4564 (ViewParam.USERNAME, "test"), 

4565 ("__start__", "new_password:mapping"), 

4566 (ViewParam.NEW_PASSWORD, "monkeybusiness"), 

4567 ("new_password-confirm", "monkeybusiness"), 

4568 ("__end__", "new_password:mapping"), 

4569 (ViewParam.MUST_CHANGE_PASSWORD, "true"), 

4570 ("__start__", "group_ids:sequence"), 

4571 ("group_id_sequence", str(group_1.id)), 

4572 ("group_id_sequence", str(group_2.id)), 

4573 ("__end__", "group_ids:sequence"), 

4574 (FormAction.SUBMIT, "submit"), 

4575 ] 

4576 ) 

4577 self.req.fake_request_post_from_dict(multidict) 

4578 

4579 with self.assertRaises(HTTPFound): 

4580 add_user(self.req) 

4581 

4582 user = ( 

4583 self.dbsession.query(User) 

4584 .filter( 

4585 User.username == "test", 

4586 ) 

4587 .one_or_none() 

4588 ) 

4589 

4590 self.assertIsNotNone(user) 

4591 

4592 self.assertTrue(user.must_change_password) 

4593 self.assertIn(group_1.id, user.group_ids) 

4594 self.assertIn(group_2.id, user.group_ids) 

4595 

4596 

4597class ForciblyFinalizeTests(BasicDatabaseTestCase): 

4598 def setUp(self) -> None: 

4599 super().setUp() 

4600 

4601 self.req._debugging_user = self.groupadmin 

4602 

4603 def test_cancel_returns_to_home(self) -> None: 

4604 multidict = MultiDict([(FormAction.CANCEL, "cancel")]) 

4605 self.req.fake_request_post_from_dict(multidict) 

4606 

4607 exc = forcibly_finalize(self.req) 

4608 self.assertIsInstance(exc, HTTPFound) 

4609 self.assertEqual(exc.status_code, 302) 

4610 self.assertEqual(urlparse(exc.headers["Location"]).path, "/") 

4611 

4612 def test_renders_first_form_on_get(self) -> None: 

4613 mock_render = mock.Mock() 

4614 with mock.patch.multiple( 

4615 "camcops_server.cc_modules.webview", render_to_response=mock_render 

4616 ): 

4617 forcibly_finalize(self.req) 

4618 

4619 args, kwargs = mock_render.call_args 

4620 context = args[1] 

4621 

4622 self.assertIn("form", context) 

4623 self.assertIn("<select", context["form"]) 

4624 

4625 def test_renders_confirm_form_on_submit(self) -> None: 

4626 device = DeviceFactory() 

4627 

4628 multidict = MultiDict( 

4629 [ 

4630 ("_charset_", UTF8), 

4631 ("__formid__", "deform"), 

4632 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

4633 (ViewParam.DEVICE_ID, device.id), 

4634 ("__start__", "danger:mapping"), 

4635 ("target", "7176"), 

4636 ("user_entry", "7176"), 

4637 ("__end__", "danger:mapping"), 

4638 (FormAction.SUBMIT, "submit") 

4639 ] 

4640 ) 

4641 self.req.fake_request_post_from_dict(multidict) 

4642 

4643 mock_render = mock.Mock() 

4644 with mock.patch.multiple( 

4645 "camcops_server.cc_modules.webview", render_to_response=mock_render 

4646 ): 

4647 forcibly_finalize(self.req) 

4648 

4649 args, kwargs = mock_render.call_args 

4650 context = args[1] 

4651 

4652 self.assertIn("form", context) 

4653 self.assertIn("Forcibly finalize", context["form"]) 

4654 

4655 def test_finalizes_on_submit(self) -> None: 

4656 device = DeviceFactory() 

4657 patient = PatientFactory(_device=device, _group=self.group) 

4658 bmis = BmiFactory.create_batch(3, patient=patient, _era=ERA_NOW) 

4659 

4660 multidict = MultiDict( 

4661 [ 

4662 ("_charset_", UTF8), 

4663 ("__formid__", "deform"), 

4664 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()), 

4665 (ViewParam.DEVICE_ID, device.id), 

4666 ("confirm_1_t", "true"), 

4667 ("confirm_2_t", "true"), 

4668 ("confirm_4_t", "true"), 

4669 ("__start__", "danger:mapping"), 

4670 ("target", "7176"), 

4671 ("user_entry", "7176"), 

4672 ("__end__", "danger:mapping"), 

4673 (FormAction.FINALIZE, "Forcibly finalize") 

4674 ] 

4675 ) 

4676 self.req.fake_request_post_from_dict(multidict) 

4677 

4678 with self.assertRaises(HTTPFound) as cm: 

4679 forcibly_finalize(self.req) 

4680 

4681 self.assertEqual(cm.exception.status_code, 302) 

4682 self.assertEqual(urlparse(cm.exception.headers["Location"]).path, "/") 

4683 

4684 for bmi in bmis: 

4685 self.assertEqual(bmi._preserving_user_id, self.groupadmin.id) 

4686 self.assertTrue(bmi._forcibly_preserved)