Coverage for cc_modules/tests/webview_tests.py: 15%
1952 statements
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-31 11:49 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2025-01-31 11:49 +0000
1"""
2camcops_server/cc_modules/tests/webview_tests.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
25"""
27from collections import OrderedDict
28import datetime
29import json
30import logging
31import time
32from typing import cast
33import unittest
34from unittest import mock
35from urllib.parse import urlparse
37from cardinal_pythonlib.classes import class_attribute_names
38from cardinal_pythonlib.httpconst import MimeType
39from pendulum import Duration, local
40import phonenumbers
41import pyotp
42from pyramid.httpexceptions import HTTPBadRequest, HTTPFound
43from webob.multidict import MultiDict
45from camcops_server.cc_modules.cc_constants import (
46 ERA_NOW,
47 MfaMethod,
48 SmsBackendNames,
49)
50from camcops_server.cc_modules.cc_device import Device
51from camcops_server.cc_modules.cc_group import Group
52from camcops_server.cc_modules.cc_patient import Patient
53from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
54from camcops_server.cc_modules.cc_pyramid import (
55 FlashQueue,
56 FormAction,
57 Routes,
58 ViewArg,
59 ViewParam,
60)
61from camcops_server.cc_modules.cc_sms import ConsoleSmsBackend, get_sms_backend
62from camcops_server.cc_modules.cc_taskindex import PatientIdNumIndexEntry
63from camcops_server.cc_modules.cc_taskschedule import (
64 PatientTaskSchedule,
65 TaskSchedule,
66 TaskScheduleItem,
67)
68from camcops_server.cc_modules.cc_testfactories import (
69 AnyIdNumGroupFactory,
70 DeviceFactory,
71 Fake,
72 GroupFactory,
73 NHSIdNumDefinitionFactory,
74 NHSPatientIdNumFactory,
75 PatientFactory,
76 PatientTaskScheduleFactory,
77 RioIdNumDefinitionFactory,
78 ServerCreatedNHSPatientIdNumFactory,
79 ServerCreatedPatientFactory,
80 StudyPatientIdNumFactory,
81 TaskScheduleFactory,
82 TaskScheduleItemFactory,
83 UserFactory,
84 UserGroupMembershipFactory,
85)
86from camcops_server.cc_modules.cc_unittest import (
87 BasicDatabaseTestCase,
88 DemoDatabaseTestCase,
89 DemoRequestTestCase,
90)
91from camcops_server.cc_modules.cc_user import (
92 SecurityAccountLockout,
93 SecurityLoginFailure,
94 User,
95)
96from camcops_server.cc_modules.cc_validators import (
97 validate_alphanum_underscore,
98)
99from camcops_server.cc_modules.cc_view_classes import FormWizardMixin
100from camcops_server.tasks.tests.factories import BmiFactory
101from camcops_server.cc_modules.tests.cc_view_classes_tests import (
102 TestStateMixin,
103)
104from camcops_server.cc_modules.webview import (
105 add_patient,
106 add_user,
107 AddPatientView,
108 AddTaskScheduleItemView,
109 AddTaskScheduleView,
110 any_records_use_group,
111 change_own_password,
112 ChangeOtherPasswordView,
113 ChangeOwnPasswordView,
114 DeleteServerCreatedPatientView,
115 DeleteTaskScheduleItemView,
116 DeleteTaskScheduleView,
117 edit_finalized_patient,
118 edit_group,
119 edit_server_created_patient,
120 edit_user,
121 edit_user_group_membership,
122 EditFinalizedPatientView,
123 EditGroupView,
124 EditOtherUserMfaView,
125 EditOwnUserMfaView,
126 EditServerCreatedPatientView,
127 EditTaskScheduleItemView,
128 EditTaskScheduleView,
129 EditUserGroupAdminView,
130 EraseTaskEntirelyView,
131 EraseTaskLeavingPlaceholderView,
132 forcibly_finalize,
133 LoginView,
134 MfaMixin,
135 SendEmailFromPatientTaskScheduleView,
136)
138log = logging.getLogger(__name__)
141# =============================================================================
142# Unit testing
143# =============================================================================
145UTF8 = "utf-8"
148class WebviewTests(DemoDatabaseTestCase):
149 def test_any_records_use_group_true(self) -> None:
150 # All tasks created in DemoDatabaseTestCase will be in this group
151 self.assertTrue(
152 any_records_use_group(self.req, self.demo_database_group)
153 )
155 def test_any_records_use_group_false(self) -> None:
156 """
157 If this fails with:
158 sqlalchemy.exc.InvalidRequestError: SQL expression, column, or mapped
159 entity expected - got <name of task base class>
160 then the base class probably needs to be declared __abstract__. See
161 DiagnosisItemBase as an example.
162 """
163 group = GroupFactory()
165 self.assertFalse(any_records_use_group(self.req, group))
167 def test_webview_constant_validators(self) -> None:
168 for x in class_attribute_names(ViewArg):
169 try:
170 validate_alphanum_underscore(x, self.req)
171 except ValueError:
172 self.fail(f"Operations.{x} fails validate_alphanum_underscore")
175class AddTaskScheduleViewTests(BasicDatabaseTestCase):
176 def test_schedule_form_displayed(self) -> None:
177 view = AddTaskScheduleView(self.req)
179 response = view.dispatch()
180 self.assertEqual(response.status_code, 200)
181 self.assertEqual(response.body.decode(UTF8).count("<form"), 1)
183 def test_schedule_is_created(self) -> None:
184 multidict = MultiDict(
185 [
186 ("_charset_", UTF8),
187 ("__formid__", "deform"),
188 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
189 (ViewParam.NAME, "MOJO"),
190 (ViewParam.GROUP_ID, self.group.id),
191 (ViewParam.EMAIL_FROM, "server@example.com"),
192 (ViewParam.EMAIL_CC, "cc@example.com"),
193 (ViewParam.EMAIL_BCC, "bcc@example.com"),
194 (ViewParam.EMAIL_SUBJECT, "Subject"),
195 (ViewParam.EMAIL_TEMPLATE, "Email template"),
196 (FormAction.SUBMIT, "submit"),
197 ]
198 )
200 self.req.fake_request_post_from_dict(multidict)
202 view = AddTaskScheduleView(self.req)
204 with self.assertRaises(HTTPFound) as e:
205 view.dispatch()
207 schedule = self.dbsession.query(TaskSchedule).one()
209 self.assertEqual(schedule.name, "MOJO")
210 self.assertEqual(schedule.email_from, "server@example.com")
211 self.assertEqual(schedule.email_bcc, "bcc@example.com")
212 self.assertEqual(schedule.email_subject, "Subject")
213 self.assertEqual(schedule.email_template, "Email template")
215 self.assertEqual(e.exception.status_code, 302)
216 self.assertIn(
217 Routes.VIEW_TASK_SCHEDULES, e.exception.headers["Location"]
218 )
221class EditTaskScheduleViewTests(DemoRequestTestCase):
222 def test_schedule_name_can_be_updated(self) -> None:
223 user = self.req._debugging_user = UserFactory()
224 group = GroupFactory()
225 UserGroupMembershipFactory(
226 group_id=group.id, user_id=user.id, groupadmin=True
227 )
229 schedule = TaskScheduleFactory(group=group)
230 multidict = MultiDict(
231 [
232 ("_charset_", UTF8),
233 ("__formid__", "deform"),
234 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
235 (ViewParam.NAME, "MOJO"),
236 (ViewParam.GROUP_ID, group.id),
237 (FormAction.SUBMIT, "submit"),
238 ]
239 )
241 self.req.fake_request_post_from_dict(multidict)
242 self.req.add_get_params(
243 {ViewParam.SCHEDULE_ID: str(schedule.id)},
244 set_method_get=False,
245 )
247 view = EditTaskScheduleView(self.req)
249 with self.assertRaises(HTTPFound) as e:
250 view.dispatch()
252 schedule = self.dbsession.query(TaskSchedule).one()
254 self.assertEqual(schedule.name, "MOJO")
256 self.assertEqual(e.exception.status_code, 302)
257 self.assertIn(
258 Routes.VIEW_TASK_SCHEDULES, e.exception.headers["Location"]
259 )
261 def test_group_a_schedule_cannot_be_edited_by_group_b_admin(self) -> None:
262 group_a = GroupFactory()
263 group_b = GroupFactory()
265 group_a_schedule = TaskScheduleFactory(group=group_a)
267 group_b_user = UserFactory()
268 UserGroupMembershipFactory(
269 group_id=group_b.id, user_id=group_b_user.id, groupadmin=True
270 )
271 self.req._debugging_user = group_b_user
273 multidict = MultiDict(
274 [
275 ("_charset_", UTF8),
276 ("__formid__", "deform"),
277 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
278 (ViewParam.NAME, "Something else"),
279 (ViewParam.GROUP_ID, group_b.id),
280 (FormAction.SUBMIT, "submit"),
281 ]
282 )
284 self.req.fake_request_post_from_dict(multidict)
285 self.req.add_get_params(
286 {ViewParam.SCHEDULE_ID: str(group_a_schedule.id)},
287 set_method_get=False,
288 )
290 view = EditTaskScheduleView(self.req)
292 with self.assertRaises(HTTPBadRequest) as cm:
293 view.dispatch()
295 self.assertIn("not a group administrator", cm.exception.message)
298class DeleteTaskScheduleViewTests(DemoRequestTestCase):
299 def test_schedule_item_is_deleted(self) -> None:
300 user = self.req._debugging_user = UserFactory()
301 group = GroupFactory()
302 UserGroupMembershipFactory(
303 group_id=group.id, user_id=user.id, groupadmin=True
304 )
305 schedule = TaskScheduleFactory(group=group)
306 self.assertIsNotNone(self.dbsession.query(TaskSchedule).one_or_none())
308 multidict = MultiDict(
309 [
310 ("_charset_", UTF8),
311 ("__formid__", "deform"),
312 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
313 ("confirm_1_t", "true"),
314 ("confirm_2_t", "true"),
315 ("confirm_4_t", "true"),
316 ("__start__", "danger:mapping"),
317 ("target", "7176"),
318 ("user_entry", "7176"),
319 ("__end__", "danger:mapping"),
320 ("delete", "delete"),
321 (FormAction.DELETE, "delete"),
322 ]
323 )
325 self.req.fake_request_post_from_dict(multidict)
327 self.req.add_get_params(
328 {ViewParam.SCHEDULE_ID: str(schedule.id)},
329 set_method_get=False,
330 )
331 view = DeleteTaskScheduleView(self.req)
333 with self.assertRaises(HTTPFound) as e:
334 view.dispatch()
336 self.assertEqual(e.exception.status_code, 302)
337 self.assertIn(
338 Routes.VIEW_TASK_SCHEDULES, e.exception.headers["Location"]
339 )
341 self.assertIsNone(self.dbsession.query(TaskSchedule).one_or_none())
344class AddTaskScheduleItemViewTests(BasicDatabaseTestCase):
345 def setUp(self) -> None:
346 super().setUp()
348 self.schedule = TaskScheduleFactory(group=self.group)
350 def test_schedule_item_form_displayed(self) -> None:
351 view = AddTaskScheduleItemView(self.req)
353 self.req.add_get_params({ViewParam.SCHEDULE_ID: str(self.schedule.id)})
355 response = view.dispatch()
356 self.assertEqual(response.status_code, 200)
357 self.assertEqual(response.body.decode(UTF8).count("<form"), 1)
359 def test_schedule_item_is_created(self) -> None:
360 multidict = MultiDict(
361 [
362 ("_charset_", UTF8),
363 ("__formid__", "deform"),
364 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
365 (ViewParam.SCHEDULE_ID, self.schedule.id),
366 (ViewParam.TABLE_NAME, "ace3"),
367 (ViewParam.CLINICIAN_CONFIRMATION, "true"),
368 ("__start__", "due_from:mapping"),
369 ("months", "1"),
370 ("weeks", "2"),
371 ("days", "3"),
372 ("__end__", "due_from:mapping"),
373 ("__start__", "due_within:mapping"),
374 ("months", "2"), # 60 days
375 ("weeks", "3"), # 21 days
376 ("days", "15"), # 15 days
377 ("__end__", "due_within:mapping"),
378 (FormAction.SUBMIT, "submit"),
379 ]
380 )
382 self.req.fake_request_post_from_dict(multidict)
384 view = AddTaskScheduleItemView(self.req)
386 with self.assertRaises(HTTPFound) as e:
387 view.dispatch()
389 item = self.dbsession.query(TaskScheduleItem).one()
391 self.assertEqual(item.schedule_id, self.schedule.id)
392 self.assertEqual(item.task_table_name, "ace3")
393 self.assertEqual(item.due_from.in_days(), 47)
394 self.assertEqual(item.due_by.in_days(), 143)
396 self.assertEqual(e.exception.status_code, 302)
397 self.assertIn(
398 f"{Routes.VIEW_TASK_SCHEDULE_ITEMS}"
399 f"?{ViewParam.SCHEDULE_ID}={self.schedule.id}",
400 e.exception.headers["Location"],
401 )
403 def test_schedule_item_is_not_created_on_cancel(self) -> None:
404 multidict = MultiDict(
405 [
406 ("_charset_", UTF8),
407 ("__formid__", "deform"),
408 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
409 (ViewParam.SCHEDULE_ID, self.schedule.id),
410 (ViewParam.TABLE_NAME, "ace3"),
411 ("__start__", "due_from:mapping"),
412 ("months", "1"),
413 ("weeks", "2"),
414 ("days", "3"),
415 ("__end__", "due_from:mapping"),
416 ("__start__", "due_within:mapping"),
417 ("months", "4"),
418 ("weeks", "3"),
419 ("days", "2"),
420 ("__end__", "due_within:mapping"),
421 (FormAction.CANCEL, "cancel"),
422 ]
423 )
425 self.req.fake_request_post_from_dict(multidict)
427 view = AddTaskScheduleItemView(self.req)
429 with self.assertRaises(HTTPFound):
430 view.dispatch()
432 item = self.dbsession.query(TaskScheduleItem).one_or_none()
434 self.assertIsNone(item)
436 def test_non_existent_schedule_handled(self) -> None:
437 self.req.add_get_params({ViewParam.SCHEDULE_ID: "99999"})
439 view = AddTaskScheduleItemView(self.req)
441 with self.assertRaises(HTTPBadRequest):
442 view.dispatch()
445class EditTaskScheduleItemViewTests(BasicDatabaseTestCase):
446 def setUp(self) -> None:
447 super().setUp()
448 self.schedule = TaskScheduleFactory(group=self.group)
450 def test_schedule_item_is_updated(self) -> None:
451 item = TaskScheduleItemFactory(
452 task_schedule=self.schedule,
453 task_table_name="ace3",
454 due_from=Duration(days=30),
455 due_by=Duration(days=60),
456 )
458 multidict = MultiDict(
459 [
460 ("_charset_", UTF8),
461 ("__formid__", "deform"),
462 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
463 (ViewParam.SCHEDULE_ID, self.schedule.id),
464 (ViewParam.TABLE_NAME, "bmi"),
465 ("__start__", "due_from:mapping"),
466 ("months", "0"),
467 ("weeks", "0"),
468 ("days", "30"),
469 ("__end__", "due_from:mapping"),
470 ("__start__", "due_within:mapping"),
471 ("months", "0"),
472 ("weeks", "0"),
473 ("days", "60"),
474 ("__end__", "due_within:mapping"),
475 (FormAction.SUBMIT, "submit"),
476 ]
477 )
479 self.req.fake_request_post_from_dict(multidict)
481 self.req.add_get_params(
482 {ViewParam.SCHEDULE_ITEM_ID: str(item.id)},
483 set_method_get=False,
484 )
485 view = EditTaskScheduleItemView(self.req)
487 with self.assertRaises(HTTPFound) as cm:
488 view.dispatch()
490 self.assertEqual(item.task_table_name, "bmi")
491 self.assertEqual(cm.exception.status_code, 302)
492 self.assertIn(
493 f"{Routes.VIEW_TASK_SCHEDULE_ITEMS}"
494 f"?{ViewParam.SCHEDULE_ID}={item.schedule_id}",
495 cm.exception.headers["Location"],
496 )
498 def test_schedule_item_is_not_updated_on_cancel(self) -> None:
499 item = TaskScheduleItemFactory(
500 task_schedule=self.schedule,
501 task_table_name="ace3",
502 due_from=Duration(days=30),
503 due_by=Duration(days=60),
504 )
506 multidict = MultiDict(
507 [
508 ("_charset_", UTF8),
509 ("__formid__", "deform"),
510 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
511 (ViewParam.SCHEDULE_ID, self.schedule.id),
512 (ViewParam.TABLE_NAME, "bmi"),
513 ("__start__", "due_from:mapping"),
514 ("months", "0"),
515 ("weeks", "0"),
516 ("days", "30"),
517 ("__end__", "due_from:mapping"),
518 ("__start__", "due_within:mapping"),
519 ("months", "0"),
520 ("weeks", "0"),
521 ("days", "60"),
522 ("__end__", "due_within:mapping"),
523 (FormAction.CANCEL, "cancel"),
524 ]
525 )
527 self.req.fake_request_post_from_dict(multidict)
529 self.req.add_get_params(
530 {ViewParam.SCHEDULE_ITEM_ID: str(item.id)},
531 set_method_get=False,
532 )
533 view = EditTaskScheduleItemView(self.req)
535 with self.assertRaises(HTTPFound):
536 view.dispatch()
538 self.assertEqual(item.task_table_name, "ace3")
540 def test_non_existent_item_handled(self) -> None:
541 self.req.add_get_params({ViewParam.SCHEDULE_ITEM_ID: "99999"})
543 view = EditTaskScheduleItemView(self.req)
545 with self.assertRaises(HTTPBadRequest):
546 view.dispatch()
548 def test_null_item_handled(self) -> None:
549 view = EditTaskScheduleItemView(self.req)
551 with self.assertRaises(HTTPBadRequest):
552 view.dispatch()
554 def test_get_form_values(self) -> None:
555 item = TaskScheduleItemFactory(
556 task_schedule=self.schedule,
557 task_table_name="ace3",
558 due_from=Duration(days=30),
559 due_by=Duration(days=60),
560 )
561 view = EditTaskScheduleItemView(self.req)
562 view.object = item
564 form_values = view.get_form_values()
566 self.assertEqual(form_values[ViewParam.SCHEDULE_ID], self.schedule.id)
567 self.assertEqual(
568 form_values[ViewParam.TABLE_NAME], item.task_table_name
569 )
570 self.assertEqual(form_values[ViewParam.DUE_FROM], item.due_from)
572 due_within = item.due_by - item.due_from
573 self.assertEqual(form_values[ViewParam.DUE_WITHIN], due_within)
575 def test_group_a_item_cannot_be_edited_by_group_b_admin(self) -> None:
576 group_a = GroupFactory()
577 group_b = GroupFactory()
579 group_b_admin = self.req._debugging_user = UserFactory()
580 UserGroupMembershipFactory(
581 group_id=group_b.id, user_id=group_b_admin.id, groupadmin=True
582 )
584 group_a_schedule = TaskScheduleFactory(group=group_a)
585 group_a_item = TaskScheduleItemFactory(task_schedule=group_a_schedule)
587 view = EditTaskScheduleItemView(self.req)
588 view.object = group_a_item
590 with self.assertRaises(HTTPBadRequest) as cm:
591 view.get_schedule()
593 self.assertIn("not a group administrator", cm.exception.message)
596class DeleteTaskScheduleItemViewTests(BasicDatabaseTestCase):
597 def setUp(self) -> None:
598 super().setUp()
600 self.schedule = TaskScheduleFactory(group=self.group)
602 self.schedule = TaskScheduleFactory()
603 self.item = TaskScheduleItemFactory(
604 task_schedule=self.schedule, task_table_name="ace3"
605 )
607 def test_delete_form_displayed(self) -> None:
608 view = DeleteTaskScheduleItemView(self.req)
610 self.req.add_get_params(
611 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)}
612 )
614 response = view.dispatch()
615 self.assertEqual(response.status_code, 200)
616 self.assertEqual(response.body.decode(UTF8).count("<form"), 1)
618 def test_errors_displayed_when_deletion_validation_fails(self) -> None:
619 self.req.fake_request_post_from_dict({FormAction.DELETE: "delete"})
621 self.req.add_get_params(
622 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)},
623 set_method_get=False,
624 )
625 view = DeleteTaskScheduleItemView(self.req)
627 response = view.dispatch()
628 self.assertIn(
629 "Errors have been highlighted", response.body.decode(UTF8)
630 )
632 def test_schedule_item_is_deleted(self) -> None:
633 multidict = MultiDict(
634 [
635 ("_charset_", UTF8),
636 ("__formid__", "deform"),
637 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
638 ("confirm_1_t", "true"),
639 ("confirm_2_t", "true"),
640 ("confirm_4_t", "true"),
641 ("__start__", "danger:mapping"),
642 ("target", "7176"),
643 ("user_entry", "7176"),
644 ("__end__", "danger:mapping"),
645 ("delete", "delete"),
646 (FormAction.DELETE, "delete"),
647 ]
648 )
650 self.req.fake_request_post_from_dict(multidict)
652 self.req.add_get_params(
653 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)},
654 set_method_get=False,
655 )
656 view = DeleteTaskScheduleItemView(self.req)
658 with self.assertRaises(HTTPFound) as e:
659 view.dispatch()
661 self.assertEqual(e.exception.status_code, 302)
662 self.assertIn(
663 f"{Routes.VIEW_TASK_SCHEDULE_ITEMS}"
664 f"?{ViewParam.SCHEDULE_ID}={self.item.schedule_id}",
665 e.exception.headers["Location"],
666 )
668 item = self.dbsession.query(TaskScheduleItem).one_or_none()
670 self.assertIsNone(item)
672 def test_schedule_item_not_deleted_on_cancel(self) -> None:
673 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"})
675 self.req.add_get_params(
676 {ViewParam.SCHEDULE_ITEM_ID: str(self.item.id)},
677 set_method_get=False,
678 )
679 view = DeleteTaskScheduleItemView(self.req)
681 with self.assertRaises(HTTPFound):
682 view.dispatch()
684 item = self.dbsession.query(TaskScheduleItem).one_or_none()
686 self.assertIsNotNone(item)
689class EditFinalizedPatientViewTests(DemoRequestTestCase):
690 def setUp(self) -> None:
691 super().setUp()
693 self.group = AnyIdNumGroupFactory()
694 user = self.req._debugging_user = UserFactory()
696 UserGroupMembershipFactory(
697 group_id=self.group.id,
698 user_id=user.id,
699 groupadmin=True,
700 view_all_patients_when_unfiltered=True,
701 )
703 def test_raises_when_patient_does_not_exists(self) -> None:
704 with self.assertRaises(HTTPBadRequest) as cm:
705 edit_finalized_patient(self.req)
707 self.assertEqual(
708 str(cm.exception), "Cannot find Patient with _pk:None"
709 )
711 @unittest.skip("Can't save patient in database without group")
712 def test_raises_when_patient_not_in_a_group(self) -> None:
713 patient = PatientFactory(_group=None)
715 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
717 with self.assertRaises(HTTPBadRequest) as cm:
718 edit_finalized_patient(self.req)
720 self.assertEqual(str(cm.exception), "Bad patient: not in a group")
722 def test_raises_when_not_authorized(self) -> None:
723 patient = PatientFactory()
725 with mock.patch.object(
726 self.req._debugging_user,
727 "may_administer_group",
728 return_value=False,
729 ):
730 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
732 with self.assertRaises(HTTPBadRequest) as cm:
733 edit_finalized_patient(self.req)
735 self.assertEqual(
736 str(cm.exception), "Not authorized to edit this patient"
737 )
739 def test_raises_when_patient_not_finalized(self) -> None:
740 patient = PatientFactory(_era=ERA_NOW, _group=self.group)
742 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
744 with self.assertRaises(HTTPBadRequest) as cm:
745 edit_finalized_patient(self.req)
747 self.assertIn("Patient is not editable", str(cm.exception))
749 def test_patient_updated(self) -> None:
750 patient = PatientFactory(_group=self.group)
751 nhs_patient_idnum = NHSPatientIdNumFactory(patient=patient)
753 self.req.add_get_params(
754 {ViewParam.SERVER_PK: str(patient.pk)}, set_method_get=False
755 )
757 new_sex = Fake.en_gb.sex()
758 new_forename = Fake.en_gb.forename(new_sex)
759 new_surname = Fake.en_gb.last_name()
760 new_address = Fake.en_gb.address()
761 new_email = Fake.en_gb.email()
762 new_gp = Fake.en_gb.name()
763 new_other = Fake.en_us.paragraph()
764 new_dob = Fake.en_gb.consistent_date_of_birth()
765 new_nhs_number = Fake.en_gb.nhs_number()
767 multidict = MultiDict(
768 [
769 ("_charset_", UTF8),
770 ("__formid__", "deform"),
771 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
772 (ViewParam.SERVER_PK, str(patient.pk)),
773 (ViewParam.GROUP_ID, str(patient.group.id)),
774 (ViewParam.FORENAME, new_forename),
775 (ViewParam.SURNAME, new_surname),
776 ("__start__", "dob:mapping"),
777 ("date", new_dob),
778 ("__end__", "dob:mapping"),
779 ("__start__", "sex:rename"),
780 ("deformField7", new_sex),
781 ("__end__", "sex:rename"),
782 (ViewParam.ADDRESS, new_address),
783 (ViewParam.EMAIL, new_email),
784 (ViewParam.GP, new_gp),
785 (ViewParam.OTHER, new_other),
786 ("__start__", "id_references:sequence"),
787 ("__start__", "idnum_sequence:mapping"),
788 (ViewParam.WHICH_IDNUM, nhs_patient_idnum.which_idnum),
789 (ViewParam.IDNUM_VALUE, new_nhs_number),
790 ("__end__", "idnum_sequence:mapping"),
791 ("__end__", "id_references:sequence"),
792 ("__start__", "danger:mapping"),
793 ("target", "7836"),
794 ("user_entry", "7836"),
795 ("__end__", "danger:mapping"),
796 (FormAction.SUBMIT, "submit"),
797 ]
798 )
800 self.req.fake_request_post_from_dict(multidict)
802 with self.assertRaises(HTTPFound):
803 edit_finalized_patient(self.req)
805 self.dbsession.commit()
807 self.assertEqual(patient.forename, new_forename)
808 self.assertEqual(patient.surname, new_surname)
809 self.assertEqual(patient.dob, new_dob)
810 self.assertEqual(patient.sex, new_sex)
811 self.assertEqual(patient.address, new_address)
812 self.assertEqual(patient.email, new_email)
813 self.assertEqual(patient.gp, new_gp)
814 self.assertEqual(patient.other, new_other)
816 idnum = patient.get_idnum_objects()[0]
817 self.assertEqual(idnum.patient_id, patient.id)
818 self.assertEqual(idnum.which_idnum, nhs_patient_idnum.which_idnum)
819 self.assertEqual(idnum.idnum_value, new_nhs_number)
821 self.assertEqual(len(patient.special_notes), 1)
822 note = patient.special_notes[0].note
824 self.assertIn("Patient details edited", note)
825 self.assertIn("forename", note)
826 self.assertIn(new_forename, note)
828 self.assertIn("surname", note)
829 self.assertIn(new_surname, note)
831 self.assertIn(f"idnum{nhs_patient_idnum.which_idnum}", note)
832 self.assertIn(str(new_nhs_number), note)
834 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
836 self.assertIn(
837 f"Amended patient record with server PK {patient.pk}", messages[0]
838 )
839 self.assertIn("forename", messages[0])
840 self.assertIn(new_forename, messages[0])
842 self.assertIn("surname", messages[0])
843 self.assertIn(new_surname, messages[0])
845 self.assertIn("idnum1", messages[0])
846 self.assertIn(str(new_nhs_number), messages[0])
848 def test_message_when_no_changes(self) -> None:
849 patient = PatientFactory(_group=self.group)
851 patient_idnum = NHSPatientIdNumFactory(
852 patient=patient,
853 )
854 self.req.add_get_params(
855 {ViewParam.SERVER_PK: str(patient.pk)}, set_method_get=False
856 )
858 multidict = MultiDict(
859 [
860 ("_charset_", UTF8),
861 ("__formid__", "deform"),
862 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
863 (ViewParam.SERVER_PK, str(patient.pk)),
864 (ViewParam.GROUP_ID, patient.group.id),
865 (ViewParam.FORENAME, patient.forename),
866 (ViewParam.SURNAME, patient.surname),
867 ("__start__", "dob:mapping"),
868 ("date", patient.dob.isoformat()),
869 ("__end__", "dob:mapping"),
870 ("__start__", "sex:rename"),
871 ("deformField7", patient.sex),
872 ("__end__", "sex:rename"),
873 (ViewParam.ADDRESS, patient.address),
874 (ViewParam.EMAIL, patient.email),
875 (ViewParam.GP, patient.gp),
876 (ViewParam.OTHER, patient.other),
877 ("__start__", "id_references:sequence"),
878 ("__start__", "idnum_sequence:mapping"),
879 (ViewParam.WHICH_IDNUM, patient_idnum.which_idnum),
880 (ViewParam.IDNUM_VALUE, patient_idnum.idnum_value),
881 ("__end__", "idnum_sequence:mapping"),
882 ("__end__", "id_references:sequence"),
883 ("__start__", "danger:mapping"),
884 ("target", "7836"),
885 ("user_entry", "7836"),
886 ("__end__", "danger:mapping"),
887 (FormAction.SUBMIT, "submit"),
888 ]
889 )
891 self.req.fake_request_post_from_dict(multidict)
893 with self.assertRaises(HTTPFound):
894 edit_finalized_patient(self.req)
896 messages = self.req.session.peek_flash(FlashQueue.INFO)
898 self.assertIn("No changes required", messages[0])
900 def test_template_rendered_with_values(self) -> None:
901 patient = PatientFactory(_group=self.group)
902 NHSPatientIdNumFactory(patient=patient)
904 task1 = BmiFactory(patient=patient, _current=False)
905 task2 = BmiFactory(patient=patient, _current=False)
907 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
909 view = EditFinalizedPatientView(
910 self.req, task_tablename=task1.tablename, task_server_pk=task1.pk
911 )
912 with mock.patch.object(view, "render_to_response") as mock_render:
913 view.dispatch()
915 args, kwargs = mock_render.call_args
917 context = args[0]
919 self.assertIn("form", context)
920 self.assertIn(task1, context["tasks"])
921 self.assertIn(task2, context["tasks"])
923 def test_changes_to_simple_params(self) -> None:
924 view = EditFinalizedPatientView(self.req)
925 patient = PatientFactory()
926 old_forename = patient.forename
927 old_surname = patient.surname
928 old_address = patient.address
929 new_forename = Fake.en_gb.forename(patient.sex)
930 new_surname = Fake.en_gb.last_name()
931 new_address = Fake.en_gb.address()
933 view.object = patient
935 changes = OrderedDict() # type: OrderedDict
937 appstruct = {
938 ViewParam.FORENAME: new_forename,
939 ViewParam.SURNAME: new_surname,
940 ViewParam.DOB: patient.dob,
941 ViewParam.ADDRESS: new_address,
942 ViewParam.OTHER: patient.other,
943 }
945 view._save_simple_params(appstruct, changes)
947 self.assertEqual(
948 changes[ViewParam.FORENAME], (old_forename, new_forename)
949 )
950 self.assertEqual(
951 changes[ViewParam.SURNAME], (old_surname, new_surname)
952 )
953 self.assertNotIn(ViewParam.DOB, changes)
954 self.assertEqual(
955 changes[ViewParam.ADDRESS], (old_address, new_address)
956 )
957 self.assertNotIn(ViewParam.OTHER, changes)
959 def test_changes_to_idrefs(self) -> None:
960 view = EditFinalizedPatientView(self.req)
961 patient = PatientFactory()
962 nhs_patient_idnum = NHSPatientIdNumFactory(patient=patient)
963 study_patient_idnum = StudyPatientIdNumFactory(patient=patient)
964 rio_iddef = RioIdNumDefinitionFactory()
965 new_nhs_number = Fake.en_gb.nhs_number()
966 new_rio_number = 9999 # Below the range the factory would use
968 view.object = patient
970 changes = OrderedDict() # type: OrderedDict
972 appstruct = {
973 ViewParam.ID_REFERENCES: [
974 {
975 ViewParam.WHICH_IDNUM: nhs_patient_idnum.which_idnum,
976 ViewParam.IDNUM_VALUE: new_nhs_number,
977 },
978 {
979 ViewParam.WHICH_IDNUM: rio_iddef.which_idnum,
980 ViewParam.IDNUM_VALUE: new_rio_number,
981 },
982 ]
983 }
985 view._save_idrefs(appstruct, changes)
987 nhs_key = f"idnum{nhs_patient_idnum.which_idnum} (NHS number)"
988 self.assertIn(nhs_key, changes)
990 study_key = f"idnum{study_patient_idnum.which_idnum} (Study number)"
991 self.assertIn(study_key, changes)
993 rio_key = f"idnum{rio_iddef.which_idnum} (RiO number)"
994 self.assertIn(rio_key, changes)
996 self.assertEqual(
997 changes[nhs_key],
998 (nhs_patient_idnum.idnum_value, new_nhs_number),
999 )
1001 self.assertEqual(
1002 changes[study_key], (study_patient_idnum.idnum_value, None)
1003 )
1004 self.assertEqual(changes[rio_key], (None, new_rio_number))
1007class EditServerCreatedPatientViewTests(BasicDatabaseTestCase):
1008 def test_group_updated(self) -> None:
1009 patient = ServerCreatedPatientFactory(_group=self.group)
1010 old_group = patient.group
1011 new_group = GroupFactory()
1013 view = EditServerCreatedPatientView(self.req)
1014 view.object = patient
1016 appstruct = {ViewParam.GROUP_ID: new_group.id}
1018 view.save_object(appstruct)
1020 self.assertEqual(patient.group_id, new_group.id)
1022 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
1024 self.assertIn(old_group.name, messages[0])
1025 self.assertIn(new_group.name, messages[0])
1026 self.assertIn("group:", messages[0])
1028 def test_raises_when_not_created_on_the_server(self) -> None:
1029 patient = PatientFactory()
1031 view = EditServerCreatedPatientView(self.req)
1033 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
1035 with self.assertRaises(HTTPBadRequest) as cm:
1036 view.get_object()
1038 self.assertIn("Patient is not editable", str(cm.exception))
1040 def test_patient_task_schedules_updated(self) -> None:
1041 patient = ServerCreatedPatientFactory()
1042 nhs_patient_idnum = NHSPatientIdNumFactory(patient=patient)
1043 group = patient._group
1045 schedule1 = TaskScheduleFactory(group=group)
1046 schedule2 = TaskScheduleFactory(group=group)
1047 schedule3 = TaskScheduleFactory(group=group)
1049 PatientTaskScheduleFactory(
1050 patient=patient,
1051 task_schedule=schedule1,
1052 start_datetime=local(2020, 6, 12, 9),
1053 settings={
1054 "name 1": "value 1",
1055 "name 2": "value 2",
1056 "name 3": "value 3",
1057 },
1058 )
1060 PatientTaskScheduleFactory(
1061 patient=patient,
1062 task_schedule=schedule3,
1063 )
1064 self.req.add_get_params(
1065 {ViewParam.SERVER_PK: str(patient.pk)}, set_method_get=False
1066 )
1068 changed_schedule_1_settings = {
1069 "name 1": "new value 1",
1070 "name 2": "new value 2",
1071 "name 3": "new value 3",
1072 }
1073 changed_schedule_1_datetime = local(2020, 6, 19, 8, 0, 0)
1074 new_schedule_2_settings = {
1075 "name 4": "value 4",
1076 "name 5": "value 5",
1077 "name 6": "value 6",
1078 }
1079 new_schedule_2_datetime = local(2020, 7, 1, 13, 45, 0)
1080 new_nhs_number = Fake.en_gb.nhs_number()
1081 multidict = MultiDict(
1082 [
1083 ("_charset_", UTF8),
1084 ("__formid__", "deform"),
1085 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
1086 (ViewParam.SERVER_PK, patient.pk),
1087 (ViewParam.GROUP_ID, patient.group.id),
1088 (ViewParam.FORENAME, patient.forename),
1089 (ViewParam.SURNAME, patient.surname),
1090 ("__start__", "dob:mapping"),
1091 ("date", ""),
1092 ("__end__", "dob:mapping"),
1093 ("__start__", "sex:rename"),
1094 ("deformField7", patient.sex),
1095 ("__end__", "sex:rename"),
1096 (ViewParam.ADDRESS, patient.address),
1097 (ViewParam.GP, patient.gp),
1098 (ViewParam.OTHER, patient.other),
1099 ("__start__", "id_references:sequence"),
1100 ("__start__", "idnum_sequence:mapping"),
1101 (ViewParam.WHICH_IDNUM, nhs_patient_idnum.which_idnum),
1102 (ViewParam.IDNUM_VALUE, str(new_nhs_number)),
1103 ("__end__", "idnum_sequence:mapping"),
1104 ("__end__", "id_references:sequence"),
1105 ("__start__", "danger:mapping"),
1106 ("target", "7836"),
1107 ("user_entry", "7836"),
1108 ("__end__", "danger:mapping"),
1109 ("__start__", "task_schedules:sequence"),
1110 ("__start__", "task_schedule_sequence:mapping"),
1111 ("schedule_id", schedule1.id),
1112 ("__start__", "start_datetime:mapping"),
1113 ("date", changed_schedule_1_datetime.to_date_string()),
1114 ("time", changed_schedule_1_datetime.to_time_string()),
1115 ("__end__", "start_datetime:mapping"),
1116 ("settings", json.dumps(changed_schedule_1_settings)),
1117 ("__end__", "task_schedule_sequence:mapping"),
1118 ("__start__", "task_schedule_sequence:mapping"),
1119 ("schedule_id", schedule2.id),
1120 ("__start__", "start_datetime:mapping"),
1121 ("date", new_schedule_2_datetime.to_date_string()),
1122 ("time", new_schedule_2_datetime.to_time_string()),
1123 ("__end__", "start_datetime:mapping"),
1124 ("settings", json.dumps(new_schedule_2_settings)),
1125 ("__end__", "task_schedule_sequence:mapping"),
1126 ("__end__", "task_schedules:sequence"),
1127 (FormAction.SUBMIT, "submit"),
1128 ]
1129 )
1131 self.req.fake_request_post_from_dict(multidict)
1133 with self.assertRaises(HTTPFound):
1134 edit_server_created_patient(self.req)
1136 self.dbsession.commit()
1138 schedules = {
1139 pts.task_schedule.name: pts for pts in patient.task_schedules
1140 }
1141 self.assertIn(schedule1.name, schedules)
1142 self.assertIn(schedule2.name, schedules)
1143 self.assertNotIn(schedule3.name, schedules)
1145 self.assertEqual(
1146 schedules[schedule1.name].start_datetime,
1147 changed_schedule_1_datetime,
1148 )
1149 self.assertEqual(
1150 schedules[schedule1.name].settings, changed_schedule_1_settings
1151 )
1152 self.assertEqual(
1153 schedules[schedule2.name].start_datetime,
1154 new_schedule_2_datetime,
1155 )
1156 self.assertEqual(
1157 schedules[schedule2.name].settings, new_schedule_2_settings
1158 )
1160 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
1162 self.assertIn(
1163 f"Amended patient record with server PK {patient.pk}", messages[0]
1164 )
1165 self.assertIn("Task schedules", messages[0])
1167 def test_unprivileged_user_cannot_edit_patient(self) -> None:
1168 patient = ServerCreatedPatientFactory()
1170 self.req._debugging_user = UserFactory()
1172 view = EditServerCreatedPatientView(self.req)
1173 view.object = patient
1175 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
1177 with self.assertRaises(HTTPBadRequest) as cm:
1178 view.dispatch()
1180 self.assertEqual(
1181 cm.exception.message, "Not authorized to edit this patient"
1182 )
1184 def test_patient_can_be_assigned_the_same_schedule_twice(self) -> None:
1185 patient = ServerCreatedPatientFactory()
1187 schedule1 = TaskScheduleFactory(group=self.group)
1189 pts = PatientTaskScheduleFactory(
1190 patient=patient,
1191 task_schedule=schedule1,
1192 start_datetime=local(2020, 6, 12, 12, 34),
1193 )
1195 appstruct = {
1196 ViewParam.TASK_SCHEDULES: [
1197 {
1198 ViewParam.PATIENT_TASK_SCHEDULE_ID: pts.id,
1199 ViewParam.SCHEDULE_ID: schedule1.id,
1200 ViewParam.START_DATETIME: local(2020, 6, 12, 12, 34),
1201 ViewParam.SETTINGS: {},
1202 },
1203 {
1204 ViewParam.PATIENT_TASK_SCHEDULE_ID: None,
1205 ViewParam.SCHEDULE_ID: schedule1.id,
1206 ViewParam.START_DATETIME: None,
1207 ViewParam.SETTINGS: {},
1208 },
1209 ]
1210 }
1212 view = EditServerCreatedPatientView(self.req)
1213 view.object = patient
1215 changes: OrderedDict = OrderedDict()
1216 view._save_task_schedules(appstruct, changes)
1217 self.req.dbsession.commit()
1219 self.assertEqual(patient.task_schedules[0].task_schedule, schedule1)
1220 self.assertEqual(patient.task_schedules[1].task_schedule, schedule1)
1222 def test_form_values_for_existing_patient(self) -> None:
1223 patient = PatientFactory()
1225 schedule1 = TaskScheduleFactory(
1226 group=self.group,
1227 )
1229 patient_task_schedule = PatientTaskScheduleFactory(
1230 patient=patient,
1231 task_schedule=schedule1,
1232 start_datetime=local(2020, 6, 12),
1233 settings={
1234 "name 1": "value 1",
1235 "name 2": "value 2",
1236 "name 3": "value 3",
1237 },
1238 )
1240 patient_idnum = NHSPatientIdNumFactory(patient=patient)
1241 self.req.add_get_params({ViewParam.SERVER_PK: str(patient.pk)})
1243 view = EditServerCreatedPatientView(self.req)
1244 view.object = patient
1246 form_values = view.get_form_values()
1248 self.assertEqual(form_values[ViewParam.FORENAME], patient.forename)
1249 self.assertEqual(form_values[ViewParam.SURNAME], patient.surname)
1250 self.assertEqual(form_values[ViewParam.DOB], patient.dob)
1251 self.assertEqual(form_values[ViewParam.SEX], patient.sex)
1252 self.assertEqual(form_values[ViewParam.ADDRESS], patient.address)
1253 self.assertEqual(form_values[ViewParam.EMAIL], patient.email)
1254 self.assertEqual(form_values[ViewParam.GP], patient.gp)
1255 self.assertEqual(form_values[ViewParam.OTHER], patient.other)
1257 self.assertEqual(form_values[ViewParam.SERVER_PK], patient.pk)
1258 self.assertEqual(form_values[ViewParam.GROUP_ID], patient.group.id)
1260 idnum = form_values[ViewParam.ID_REFERENCES][0]
1261 self.assertEqual(
1262 idnum[ViewParam.WHICH_IDNUM],
1263 patient_idnum.which_idnum,
1264 )
1265 self.assertEqual(
1266 idnum[ViewParam.IDNUM_VALUE], patient_idnum.idnum_value
1267 )
1269 task_schedule = form_values[ViewParam.TASK_SCHEDULES][0]
1270 self.assertEqual(
1271 task_schedule[ViewParam.PATIENT_TASK_SCHEDULE_ID],
1272 patient_task_schedule.id,
1273 )
1274 self.assertEqual(
1275 task_schedule[ViewParam.SCHEDULE_ID],
1276 patient_task_schedule.schedule_id,
1277 )
1278 self.assertEqual(
1279 task_schedule[ViewParam.START_DATETIME],
1280 patient_task_schedule.start_datetime,
1281 )
1282 self.assertEqual(
1283 task_schedule[ViewParam.SETTINGS], patient_task_schedule.settings
1284 )
1287class AddPatientViewTests(BasicDatabaseTestCase):
1288 def test_patient_created(self) -> None:
1289 view = AddPatientView(self.req)
1291 schedule1 = TaskScheduleFactory()
1292 schedule2 = TaskScheduleFactory()
1294 start_datetime1 = local(2020, 6, 12)
1295 start_datetime2 = local(2020, 7, 1)
1297 settings1 = json.dumps(
1298 {"name 1": "value 1", "name 2": "value 2", "name 3": "value 3"}
1299 )
1301 nhs_iddef = NHSIdNumDefinitionFactory()
1302 nhs_number = Fake.en_gb.nhs_number()
1304 appstruct = {
1305 ViewParam.GROUP_ID: self.group.id,
1306 ViewParam.FORENAME: "Jo",
1307 ViewParam.SURNAME: "Patient",
1308 ViewParam.DOB: datetime.date(1958, 4, 19),
1309 ViewParam.SEX: "F",
1310 ViewParam.ADDRESS: "Address",
1311 ViewParam.EMAIL: "jopatient@example.com",
1312 ViewParam.GP: "GP",
1313 ViewParam.OTHER: "Other",
1314 ViewParam.ID_REFERENCES: [
1315 {
1316 ViewParam.WHICH_IDNUM: nhs_iddef.which_idnum,
1317 ViewParam.IDNUM_VALUE: nhs_number,
1318 }
1319 ],
1320 ViewParam.TASK_SCHEDULES: [
1321 {
1322 ViewParam.SCHEDULE_ID: schedule1.id,
1323 ViewParam.START_DATETIME: start_datetime1,
1324 ViewParam.SETTINGS: settings1,
1325 },
1326 {
1327 ViewParam.SCHEDULE_ID: schedule2.id,
1328 ViewParam.START_DATETIME: start_datetime2,
1329 ViewParam.SETTINGS: {},
1330 },
1331 ],
1332 }
1334 view.save_object(appstruct)
1335 self.dbsession.commit()
1337 patient = cast(Patient, view.object)
1339 server_device = Device.get_server_device(self.req.dbsession)
1341 self.assertEqual(patient.device_id, server_device.id)
1342 self.assertEqual(patient.era, ERA_NOW)
1343 self.assertEqual(patient.group.id, self.group.id)
1345 self.assertEqual(patient.forename, "Jo")
1346 self.assertEqual(patient.surname, "Patient")
1347 self.assertEqual(patient.dob.isoformat(), "1958-04-19")
1348 self.assertEqual(patient.sex, "F")
1349 self.assertEqual(patient.address, "Address")
1350 self.assertEqual(patient.email, "jopatient@example.com")
1351 self.assertEqual(patient.gp, "GP")
1352 self.assertEqual(patient.other, "Other")
1354 idnum = patient.get_idnum_objects()[0]
1355 self.assertEqual(idnum.patient_id, patient.id)
1356 self.assertEqual(idnum.which_idnum, nhs_iddef.which_idnum)
1357 self.assertEqual(idnum.idnum_value, nhs_number)
1359 patient_task_schedules = {
1360 pts.task_schedule.name: pts for pts in patient.task_schedules
1361 }
1363 self.assertIn(schedule1.name, patient_task_schedules)
1364 self.assertIn(schedule2.name, patient_task_schedules)
1366 self.assertEqual(
1367 patient_task_schedules[schedule1.name].start_datetime,
1368 start_datetime1,
1369 )
1370 self.assertEqual(
1371 patient_task_schedules[schedule1.name].settings, settings1
1372 )
1373 self.assertEqual(
1374 patient_task_schedules[schedule2.name].start_datetime,
1375 start_datetime2,
1376 )
1378 def test_patient_takes_next_available_id(self) -> None:
1379 patient = ServerCreatedPatientFactory(id=1234)
1380 nhs_iddef = NHSIdNumDefinitionFactory()
1382 view = AddPatientView(self.req)
1384 appstruct = {
1385 ViewParam.GROUP_ID: self.group.id,
1386 ViewParam.FORENAME: "Jo",
1387 ViewParam.SURNAME: "Patient",
1388 ViewParam.DOB: datetime.date(1958, 4, 19),
1389 ViewParam.SEX: "F",
1390 ViewParam.ADDRESS: "Address",
1391 ViewParam.GP: "GP",
1392 ViewParam.OTHER: "Other",
1393 ViewParam.ID_REFERENCES: [
1394 {
1395 ViewParam.WHICH_IDNUM: nhs_iddef.which_idnum,
1396 ViewParam.IDNUM_VALUE: Fake.en_gb.nhs_number(),
1397 }
1398 ],
1399 ViewParam.TASK_SCHEDULES: [],
1400 }
1402 view.save_object(appstruct)
1404 patient = cast(Patient, view.object)
1406 self.assertEqual(patient.id, 1235)
1408 def test_form_rendered_with_values(self) -> None:
1409 view = AddPatientView(self.req)
1411 with mock.patch.object(view, "render_to_response") as mock_render:
1412 view.dispatch()
1414 args, kwargs = mock_render.call_args
1416 context = args[0]
1418 self.assertIn("form", context)
1420 def test_unprivileged_user_cannot_add_patient(self) -> None:
1421 user = UserFactory(username="testuser")
1423 self.req._debugging_user = user
1425 with self.assertRaises(HTTPBadRequest) as cm:
1426 add_patient(self.req)
1428 self.assertEqual(
1429 cm.exception.message, "Not authorized to manage patients"
1430 )
1432 def test_group_listed_for_privileged_group_member(self) -> None:
1433 user = UserFactory()
1434 group = GroupFactory()
1435 UserGroupMembershipFactory(
1436 user_id=user.id, group_id=group.id, may_manage_patients=True
1437 )
1439 self.req._debugging_user = user
1441 view = AddPatientView(self.req)
1443 with mock.patch.object(view, "render_to_response") as mock_render:
1444 view.dispatch()
1446 args, kwargs = mock_render.call_args
1448 context = args[0]
1450 self.assertIn(group.name, context["form"])
1453class DeleteServerCreatedPatientViewTests(BasicDatabaseTestCase):
1454 def setUp(self) -> None:
1455 super().setUp()
1457 self.patient = ServerCreatedPatientFactory()
1459 idnum = ServerCreatedNHSPatientIdNumFactory(patient=self.patient)
1460 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession)
1462 self.schedule = TaskScheduleFactory(group=self.group)
1464 PatientTaskScheduleFactory(
1465 patient=self.patient,
1466 task_schedule=self.schedule,
1467 )
1469 self.multidict = MultiDict(
1470 [
1471 ("_charset_", UTF8),
1472 ("__formid__", "deform"),
1473 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
1474 ("confirm_1_t", "true"),
1475 ("confirm_2_t", "true"),
1476 ("confirm_4_t", "true"),
1477 ("__start__", "danger:mapping"),
1478 ("target", "7176"),
1479 ("user_entry", "7176"),
1480 ("__end__", "danger:mapping"),
1481 ("delete", "delete"),
1482 (FormAction.DELETE, "delete"),
1483 ]
1484 )
1486 def test_patient_schedule_and_idnums_deleted(self) -> None:
1487 self.req.fake_request_post_from_dict(self.multidict)
1489 patient_pk = self.patient.pk
1490 self.req.add_get_params(
1491 {ViewParam.SERVER_PK: str(patient_pk)}, set_method_get=False
1492 )
1493 view = DeleteServerCreatedPatientView(self.req)
1495 with self.assertRaises(HTTPFound) as e:
1496 view.dispatch()
1498 self.assertEqual(e.exception.status_code, 302)
1499 self.assertIn(
1500 Routes.VIEW_PATIENT_TASK_SCHEDULES, e.exception.headers["Location"]
1501 )
1503 deleted_patient = (
1504 self.dbsession.query(Patient)
1505 .filter(Patient._pk == patient_pk)
1506 .one_or_none()
1507 )
1509 self.assertIsNone(deleted_patient)
1511 pts = (
1512 self.dbsession.query(PatientTaskSchedule)
1513 .filter(PatientTaskSchedule.patient_pk == patient_pk)
1514 .one_or_none()
1515 )
1517 self.assertIsNone(pts)
1519 idnum = (
1520 self.dbsession.query(PatientIdNum)
1521 .filter(
1522 PatientIdNum.patient_id == self.patient.id,
1523 PatientIdNum._device_id == self.patient.device_id,
1524 PatientIdNum._era == self.patient.era,
1525 PatientIdNum._current == True, # noqa: E712
1526 )
1527 .one_or_none()
1528 )
1530 self.assertIsNone(idnum)
1532 def test_registered_patient_deleted(self) -> None:
1533 from camcops_server.cc_modules.client_api import (
1534 get_or_create_single_user,
1535 )
1537 user1, _ = get_or_create_single_user(self.req, "test", self.patient)
1538 self.assertEqual(user1.single_patient, self.patient)
1540 user2, _ = get_or_create_single_user(self.req, "test", self.patient)
1541 self.assertEqual(user2.single_patient, self.patient)
1543 self.req.fake_request_post_from_dict(self.multidict)
1545 patient_pk = self.patient.pk
1546 self.req.add_get_params(
1547 {ViewParam.SERVER_PK: str(patient_pk)}, set_method_get=False
1548 )
1549 view = DeleteServerCreatedPatientView(self.req)
1551 with self.assertRaises(HTTPFound):
1552 view.dispatch()
1554 self.dbsession.commit()
1556 deleted_patient = (
1557 self.dbsession.query(Patient)
1558 .filter(Patient._pk == patient_pk)
1559 .one_or_none()
1560 )
1562 self.assertIsNone(deleted_patient)
1564 # TODO: We get weird behaviour when all the tests are run together
1565 # (fine for --test_class=DeleteServerCreatedPatientViewTests)
1566 # the assertion below fails with sqlite in spite of the commit()
1567 # above.
1569 # user = self.dbsession.query(User).filter(
1570 # User.id == user1.id).one_or_none()
1571 # self.assertIsNone(user.single_patient_pk)
1573 # user = self.dbsession.query(User).filter(
1574 # User.id == user2.id).one_or_none()
1575 # self.assertIsNone(user.single_patient_pk)
1577 def test_unrelated_patient_unaffected(self) -> None:
1578 other_patient = ServerCreatedPatientFactory()
1579 patient_pk = other_patient._pk
1581 saved_patient = (
1582 self.dbsession.query(Patient)
1583 .filter(Patient._pk == patient_pk)
1584 .one_or_none()
1585 )
1587 self.assertIsNotNone(saved_patient)
1589 idnum = ServerCreatedNHSPatientIdNumFactory(patient=other_patient)
1591 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession)
1593 saved_idnum = (
1594 self.dbsession.query(PatientIdNum)
1595 .filter(
1596 PatientIdNum.patient_id == other_patient.id,
1597 PatientIdNum._device_id == other_patient.device_id,
1598 PatientIdNum._era == other_patient.era,
1599 PatientIdNum._current == True, # noqa: E712
1600 )
1601 .one_or_none()
1602 )
1604 self.assertIsNotNone(saved_idnum)
1606 PatientTaskScheduleFactory(
1607 patient=other_patient, task_schedule=self.schedule
1608 )
1610 self.req.fake_request_post_from_dict(self.multidict)
1612 self.req.add_get_params(
1613 {ViewParam.SERVER_PK: self.patient._pk}, set_method_get=False
1614 )
1615 view = DeleteServerCreatedPatientView(self.req)
1617 with self.assertRaises(HTTPFound):
1618 view.dispatch()
1620 saved_patient = (
1621 self.dbsession.query(Patient)
1622 .filter(Patient._pk == patient_pk)
1623 .one_or_none()
1624 )
1626 self.assertIsNotNone(saved_patient)
1628 saved_pts = (
1629 self.dbsession.query(PatientTaskSchedule)
1630 .filter(PatientTaskSchedule.patient_pk == patient_pk)
1631 .one_or_none()
1632 )
1634 self.assertIsNotNone(saved_pts)
1636 saved_idnum = (
1637 self.dbsession.query(PatientIdNum)
1638 .filter(
1639 PatientIdNum.patient_id == other_patient.id,
1640 PatientIdNum._device_id == other_patient.device_id,
1641 PatientIdNum._era == other_patient.era,
1642 PatientIdNum._current == True, # noqa: E712
1643 )
1644 .one_or_none()
1645 )
1647 self.assertIsNotNone(saved_idnum)
1649 def test_unprivileged_user_cannot_delete_patient(self) -> None:
1650 self.req.fake_request_post_from_dict(self.multidict)
1652 patient_pk = self.patient.pk
1653 self.req.add_get_params(
1654 {ViewParam.SERVER_PK: str(patient_pk)}, set_method_get=False
1655 )
1656 view = DeleteServerCreatedPatientView(self.req)
1658 user = UserFactory(username="testuser")
1660 self.req._debugging_user = user
1662 with self.assertRaises(HTTPBadRequest) as cm:
1663 view.dispatch()
1665 self.assertEqual(
1666 cm.exception.message, "Not authorized to delete this patient"
1667 )
1669 def test_unprivileged_user_cannot_see_delete_form(self) -> None:
1670 self.req.fake_request_post_from_dict(self.multidict)
1672 patient_pk = self.patient.pk
1673 self.req.add_get_params({ViewParam.SERVER_PK: str(patient_pk)})
1674 view = DeleteServerCreatedPatientView(self.req)
1676 user = UserFactory()
1678 self.req._debugging_user = user
1680 with self.assertRaises(HTTPBadRequest) as cm:
1681 view.dispatch()
1683 self.assertEqual(
1684 cm.exception.message, "Not authorized to delete this patient"
1685 )
1688class EraseTaskTestCase(BasicDatabaseTestCase):
1689 def setUp(self) -> None:
1690 super().setUp()
1692 self.patient = PatientFactory(_group=self.group)
1695class EraseTaskLeavingPlaceholderViewTests(EraseTaskTestCase):
1696 def test_displays_form(self) -> None:
1697 task = BmiFactory(patient=self.patient)
1698 self.req.add_get_params(
1699 {
1700 ViewParam.SERVER_PK: str(task.pk),
1701 ViewParam.TABLE_NAME: task.tablename,
1702 },
1703 set_method_get=False,
1704 )
1705 view = EraseTaskLeavingPlaceholderView(self.req)
1707 with mock.patch.object(view, "render_to_response") as mock_render:
1708 view.dispatch()
1710 args, kwargs = mock_render.call_args
1711 context = args[0]
1713 self.assertIn("form", context)
1715 def test_deletes_task_leaving_placeholder(self) -> None:
1716 task = BmiFactory(patient=self.patient)
1717 multidict = MultiDict(
1718 [
1719 ("_charset_", UTF8),
1720 ("__formid__", "deform"),
1721 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
1722 (ViewParam.SERVER_PK, task.pk),
1723 (ViewParam.TABLE_NAME, task.tablename),
1724 ("confirm_1_t", "true"),
1725 ("confirm_2_t", "true"),
1726 ("confirm_4_t", "true"),
1727 ("__start__", "danger:mapping"),
1728 ("target", "7176"),
1729 ("user_entry", "7176"),
1730 ("__end__", "danger:mapping"),
1731 ("delete", "delete"),
1732 (FormAction.DELETE, "delete"),
1733 ]
1734 )
1736 self.req.fake_request_post_from_dict(multidict)
1738 view = EraseTaskLeavingPlaceholderView(self.req)
1739 with mock.patch.object(task, "manually_erase") as mock_manually_erase:
1741 with self.assertRaises(HTTPFound):
1742 view.dispatch()
1744 mock_manually_erase.assert_called_once()
1745 args, kwargs = mock_manually_erase.call_args
1746 request = args[0]
1748 self.assertEqual(request, self.req)
1750 def test_task_not_deleted_on_cancel(self) -> None:
1751 task = BmiFactory(patient=self.patient)
1752 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"})
1754 self.req.add_get_params(
1755 {
1756 ViewParam.SERVER_PK: str(task.pk),
1757 ViewParam.TABLE_NAME: task.tablename,
1758 },
1759 set_method_get=False,
1760 )
1761 view = EraseTaskLeavingPlaceholderView(self.req)
1763 with self.assertRaises(HTTPFound):
1764 view.dispatch()
1766 task = self.dbsession.query(task.__class__).one_or_none()
1768 self.assertIsNotNone(task)
1770 def test_redirect_on_cancel(self) -> None:
1771 task = BmiFactory(patient=self.patient)
1772 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"})
1774 self.req.add_get_params(
1775 {
1776 ViewParam.SERVER_PK: str(task.pk),
1777 ViewParam.TABLE_NAME: task.tablename,
1778 },
1779 set_method_get=False,
1780 )
1781 view = EraseTaskLeavingPlaceholderView(self.req)
1783 with self.assertRaises(HTTPFound) as cm:
1784 view.dispatch()
1786 self.assertEqual(cm.exception.status_code, 302)
1787 self.assertIn(f"/{Routes.TASK}", cm.exception.headers["Location"])
1788 self.assertIn(
1789 f"{ViewParam.TABLE_NAME}={task.tablename}",
1790 cm.exception.headers["Location"],
1791 )
1792 self.assertIn(
1793 f"{ViewParam.SERVER_PK}={task.pk}",
1794 cm.exception.headers["Location"],
1795 )
1796 self.assertIn(
1797 f"{ViewParam.VIEWTYPE}={ViewArg.HTML}",
1798 cm.exception.headers["Location"],
1799 )
1801 def test_raises_when_task_does_not_exist(self) -> None:
1802 self.req.add_get_params(
1803 {ViewParam.SERVER_PK: "123", ViewParam.TABLE_NAME: "phq9"},
1804 set_method_get=False,
1805 )
1806 view = EraseTaskLeavingPlaceholderView(self.req)
1808 with self.assertRaises(HTTPBadRequest) as cm:
1809 view.dispatch()
1811 self.assertEqual(cm.exception.message, "No such task: phq9, PK=123")
1813 def test_raises_when_task_is_live_on_tablet(self) -> None:
1814 task = BmiFactory(patient=self.patient, _era=ERA_NOW)
1816 self.req.add_get_params(
1817 {
1818 ViewParam.SERVER_PK: str(task.pk),
1819 ViewParam.TABLE_NAME: task.tablename,
1820 },
1821 set_method_get=False,
1822 )
1823 view = EraseTaskLeavingPlaceholderView(self.req)
1825 with self.assertRaises(HTTPBadRequest) as cm:
1826 view.dispatch()
1828 self.assertIn("Task is live on tablet", cm.exception.message)
1830 def test_raises_when_user_not_authorized_to_erase(self) -> None:
1831 task = BmiFactory(patient=self.patient)
1832 user = UserFactory()
1834 self.req._debugging_user = user
1835 UserGroupMembershipFactory(
1836 user_id=user.id, group_id=self.group.id, groupadmin=True
1837 )
1839 with mock.patch.object(
1840 user, "authorized_to_erase_tasks", return_value=False
1841 ):
1842 self.req.add_get_params(
1843 {
1844 ViewParam.SERVER_PK: str(task.pk),
1845 ViewParam.TABLE_NAME: task.tablename,
1846 },
1847 set_method_get=False,
1848 )
1849 view = EraseTaskLeavingPlaceholderView(self.req)
1851 with self.assertRaises(HTTPBadRequest) as cm:
1852 view.dispatch()
1854 self.assertIn("Not authorized to erase tasks", cm.exception.message)
1856 def test_raises_when_task_already_erased(self) -> None:
1857 task = BmiFactory(patient=self.patient, _manually_erased=True)
1859 self.req.add_get_params(
1860 {
1861 ViewParam.SERVER_PK: str(task.pk),
1862 ViewParam.TABLE_NAME: task.tablename,
1863 },
1864 set_method_get=False,
1865 )
1866 view = EraseTaskLeavingPlaceholderView(self.req)
1868 with self.assertRaises(HTTPBadRequest) as cm:
1869 view.dispatch()
1871 self.assertIn("already erased", cm.exception.message)
1874class EraseTaskEntirelyViewTests(EraseTaskTestCase):
1875 def test_deletes_task_entirely(self) -> None:
1876 task = BmiFactory(patient=self.patient)
1877 multidict = MultiDict(
1878 [
1879 ("_charset_", UTF8),
1880 ("__formid__", "deform"),
1881 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
1882 (ViewParam.SERVER_PK, task.pk),
1883 (ViewParam.TABLE_NAME, task.tablename),
1884 ("confirm_1_t", "true"),
1885 ("confirm_2_t", "true"),
1886 ("confirm_4_t", "true"),
1887 ("__start__", "danger:mapping"),
1888 ("target", "7176"),
1889 ("user_entry", "7176"),
1890 ("__end__", "danger:mapping"),
1891 ("delete", "delete"),
1892 (FormAction.DELETE, "delete"),
1893 ]
1894 )
1896 self.req.fake_request_post_from_dict(multidict)
1898 view = EraseTaskEntirelyView(self.req)
1900 with mock.patch.object(
1901 task, "delete_entirely"
1902 ) as mock_delete_entirely:
1904 with self.assertRaises(HTTPFound):
1905 view.dispatch()
1907 mock_delete_entirely.assert_called_once()
1908 args, kwargs = mock_delete_entirely.call_args
1909 request = args[0]
1911 self.assertEqual(request, self.req)
1913 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
1914 self.assertTrue(len(messages) > 0)
1916 self.assertIn("Task erased", messages[0])
1917 self.assertIn(task.tablename, messages[0])
1918 self.assertIn("server PK {}".format(task.pk), messages[0])
1921class EditGroupViewTests(DemoRequestTestCase):
1922 def test_group_updated(self) -> None:
1923 groupadmin = self.req._debugging_user = UserFactory()
1924 group = GroupFactory()
1925 UserGroupMembershipFactory(
1926 group_id=group.id, user_id=groupadmin.id, groupadmin=True
1927 )
1928 other_group_1 = GroupFactory()
1929 other_group_2 = GroupFactory()
1931 nhs_iddef = NHSIdNumDefinitionFactory()
1933 new_name = "new-name"
1934 new_description = "new description"
1935 new_upload_policy = "anyidnum AND sex"
1936 new_finalize_policy = f"idnum{nhs_iddef.which_idnum} AND sex"
1938 multidict = MultiDict(
1939 [
1940 ("_charset_", UTF8),
1941 ("__formid__", "deform"),
1942 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
1943 (ViewParam.GROUP_ID, group.id),
1944 (ViewParam.NAME, new_name),
1945 (ViewParam.DESCRIPTION, new_description),
1946 (ViewParam.UPLOAD_POLICY, new_upload_policy),
1947 (ViewParam.FINALIZE_POLICY, new_finalize_policy),
1948 ("__start__", "group_ids:sequence"),
1949 ("group_id_sequence", str(other_group_1.id)),
1950 ("group_id_sequence", str(other_group_2.id)),
1951 ("__end__", "group_ids:sequence"),
1952 (FormAction.SUBMIT, "submit"),
1953 ]
1954 )
1955 self.req.fake_request_post_from_dict(multidict)
1957 with self.assertRaises(HTTPFound):
1958 edit_group(self.req)
1960 self.assertEqual(group.name, new_name)
1961 self.assertEqual(group.description, new_description)
1962 self.assertEqual(group.upload_policy, new_upload_policy)
1963 self.assertEqual(group.finalize_policy, new_finalize_policy)
1964 self.assertIn(other_group_1, group.can_see_other_groups)
1965 self.assertIn(other_group_2, group.can_see_other_groups)
1967 def test_ip_use_added(self) -> None:
1968 from camcops_server.cc_modules.cc_ipuse import IpContexts
1970 group = GroupFactory()
1971 groupadmin = self.req._debugging_user = UserFactory()
1972 UserGroupMembershipFactory(
1973 group_id=group.id, user_id=groupadmin.id, groupadmin=True
1974 )
1975 nhs_iddef = NHSIdNumDefinitionFactory()
1977 new_name = "new-name"
1978 new_description = "new description"
1979 new_upload_policy = "anyidnum AND sex"
1980 new_finalize_policy = f"idnum{nhs_iddef.which_idnum} AND sex"
1982 multidict = MultiDict(
1983 [
1984 ("_charset_", UTF8),
1985 ("__formid__", "deform"),
1986 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
1987 (ViewParam.GROUP_ID, group.id),
1988 (ViewParam.NAME, new_name),
1989 (ViewParam.DESCRIPTION, new_description),
1990 (ViewParam.UPLOAD_POLICY, new_upload_policy),
1991 (ViewParam.FINALIZE_POLICY, new_finalize_policy),
1992 ("__start__", "ip_use:mapping"),
1993 (IpContexts.CLINICAL, "true"),
1994 (IpContexts.COMMERCIAL, "true"),
1995 ("__end__", "ip_use:mapping"),
1996 (FormAction.SUBMIT, "submit"),
1997 ]
1998 )
1999 self.req.fake_request_post_from_dict(multidict)
2001 with self.assertRaises(HTTPFound):
2002 edit_group(self.req)
2004 self.assertTrue(group.ip_use.clinical)
2005 self.assertTrue(group.ip_use.commercial)
2006 self.assertFalse(group.ip_use.educational)
2007 self.assertFalse(group.ip_use.research)
2009 def test_ip_use_updated(self) -> None:
2010 from camcops_server.cc_modules.cc_ipuse import IpContexts
2012 group = GroupFactory(ip_use__educational=True, ip_use__research=True)
2013 groupadmin = self.req._debugging_user = UserFactory()
2014 UserGroupMembershipFactory(
2015 group_id=group.id, user_id=groupadmin.id, groupadmin=True
2016 )
2018 old_id = group.ip_use.id
2020 nhs_iddef = NHSIdNumDefinitionFactory()
2022 new_name = "new-name"
2023 new_description = "new description"
2024 new_upload_policy = "anyidnum AND sex"
2025 new_finalize_policy = f"idnum{nhs_iddef.which_idnum} AND sex"
2027 multidict = MultiDict(
2028 [
2029 ("_charset_", UTF8),
2030 ("__formid__", "deform"),
2031 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
2032 (ViewParam.GROUP_ID, group.id),
2033 (ViewParam.NAME, new_name),
2034 (ViewParam.DESCRIPTION, new_description),
2035 (ViewParam.UPLOAD_POLICY, new_upload_policy),
2036 (ViewParam.FINALIZE_POLICY, new_finalize_policy),
2037 ("__start__", "ip_use:mapping"),
2038 (IpContexts.CLINICAL, "true"),
2039 (IpContexts.COMMERCIAL, "true"),
2040 ("__end__", "ip_use:mapping"),
2041 (FormAction.SUBMIT, "submit"),
2042 ]
2043 )
2044 self.req.fake_request_post_from_dict(multidict)
2046 with self.assertRaises(HTTPFound):
2047 edit_group(self.req)
2049 self.assertTrue(group.ip_use.clinical)
2050 self.assertTrue(group.ip_use.commercial)
2051 self.assertFalse(group.ip_use.educational)
2052 self.assertFalse(group.ip_use.research)
2053 self.assertEqual(group.ip_use.id, old_id)
2055 def test_other_groups_displayed_in_form(self) -> None:
2056 z_group = GroupFactory(name="z-group")
2057 a_group = GroupFactory(name="a-group")
2059 other_groups = Group.get_groups_from_id_list(
2060 self.dbsession, [z_group.id, a_group.id]
2061 )
2062 group = GroupFactory(can_see_other_groups=other_groups)
2064 view = EditGroupView(self.req)
2065 view.object = group
2067 form_values = view.get_form_values()
2069 self.assertEqual(
2070 form_values[ViewParam.GROUP_IDS], [a_group.id, z_group.id]
2071 )
2073 def test_group_id_displayed_in_form(self) -> None:
2074 group = GroupFactory()
2075 view = EditGroupView(self.req)
2076 view.object = group
2078 form_values = view.get_form_values()
2080 self.assertEqual(form_values[ViewParam.GROUP_ID], group.id)
2082 def test_ip_use_displayed_in_form(self) -> None:
2083 group = GroupFactory()
2084 view = EditGroupView(self.req)
2085 view.object = group
2087 form_values = view.get_form_values()
2089 self.assertEqual(form_values[ViewParam.IP_USE], group.ip_use)
2092class SendEmailFromPatientTaskScheduleViewTests(BasicDatabaseTestCase):
2093 def setUp(self) -> None:
2094 super().setUp()
2096 self.patient = ServerCreatedPatientFactory()
2097 idnum = ServerCreatedNHSPatientIdNumFactory(patient=self.patient)
2099 PatientIdNumIndexEntry.index_idnum(idnum, self.dbsession)
2101 self.schedule = TaskScheduleFactory(group=self.group)
2103 self.pts = PatientTaskScheduleFactory(
2104 patient=self.patient, task_schedule=self.schedule
2105 )
2107 def test_displays_form(self) -> None:
2108 self.req.add_get_params(
2109 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)}
2110 )
2112 view = SendEmailFromPatientTaskScheduleView(self.req)
2113 with mock.patch.object(view, "render_to_response") as mock_render:
2114 view.dispatch()
2116 args, kwargs = mock_render.call_args
2117 context = args[0]
2119 self.assertIn("form", context)
2121 def test_raises_for_missing_pts_id(self) -> None:
2122 view = SendEmailFromPatientTaskScheduleView(self.req)
2123 with self.assertRaises(HTTPBadRequest) as cm:
2124 view.dispatch()
2126 self.assertIn(
2127 "Patient task schedule does not exist", cm.exception.message
2128 )
2130 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2131 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2132 def test_sends_email(
2133 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2134 ) -> None:
2135 self.req.config.email_host = "smtp.example.com"
2136 self.req.config.email_port = 587
2137 self.req.config.email_host_username = "mailuser"
2138 self.req.config.email_host_password = "mailpassword"
2139 self.req.config.email_use_tls = True
2141 multidict = MultiDict(
2142 [
2143 (ViewParam.EMAIL, "patient@example.com"),
2144 (ViewParam.EMAIL_FROM, "server@example.com"),
2145 (ViewParam.EMAIL_SUBJECT, "Subject"),
2146 (ViewParam.EMAIL_BODY, "Email body"),
2147 (FormAction.SUBMIT, "submit"),
2148 ]
2149 )
2151 self.req.fake_request_post_from_dict(multidict)
2152 self.req.add_get_params(
2153 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2154 set_method_get=False,
2155 )
2156 view = SendEmailFromPatientTaskScheduleView(self.req)
2158 with self.assertRaises(HTTPFound):
2159 view.dispatch()
2161 args, kwargs = mock_make_email.call_args_list[0]
2162 self.assertEqual(kwargs["from_addr"], "server@example.com")
2163 self.assertEqual(kwargs["to"], "patient@example.com")
2164 self.assertEqual(kwargs["subject"], "Subject")
2165 self.assertEqual(kwargs["body"], "Email body")
2166 self.assertEqual(kwargs["content_type"], MimeType.HTML)
2168 args, kwargs = mock_send_msg.call_args
2169 self.assertEqual(kwargs["host"], "smtp.example.com")
2170 self.assertEqual(kwargs["user"], "mailuser")
2171 self.assertEqual(kwargs["password"], "mailpassword")
2172 self.assertEqual(kwargs["port"], 587)
2173 self.assertTrue(kwargs["use_tls"])
2175 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2176 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2177 def test_sends_cc_of_email(
2178 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2179 ) -> None:
2180 self.req.config.email_host = "smtp.example.com"
2181 self.req.config.email_port = 587
2182 self.req.config.email_host_username = "mailuser"
2183 self.req.config.email_host_password = "mailpassword"
2184 self.req.config.email_use_tls = True
2186 multidict = MultiDict(
2187 [
2188 (ViewParam.EMAIL, "patient@example.com"),
2189 (ViewParam.EMAIL_CC, "cc@example.com"),
2190 (ViewParam.EMAIL_FROM, "server@example.com"),
2191 (ViewParam.EMAIL_SUBJECT, "Subject"),
2192 (ViewParam.EMAIL_BODY, "Email body"),
2193 (FormAction.SUBMIT, "submit"),
2194 ]
2195 )
2197 self.req.fake_request_post_from_dict(multidict)
2198 self.req.add_get_params(
2199 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2200 set_method_get=False,
2201 )
2202 view = SendEmailFromPatientTaskScheduleView(self.req)
2204 with self.assertRaises(HTTPFound):
2205 view.dispatch()
2207 args, kwargs = mock_make_email.call_args
2208 self.assertEqual(kwargs["to"], "patient@example.com")
2209 self.assertEqual(kwargs["cc"], "cc@example.com")
2211 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2212 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2213 def test_sends_bcc_of_email(
2214 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2215 ) -> None:
2216 self.req.config.email_host = "smtp.example.com"
2217 self.req.config.email_port = 587
2218 self.req.config.email_host_username = "mailuser"
2219 self.req.config.email_host_password = "mailpassword"
2220 self.req.config.email_use_tls = True
2222 multidict = MultiDict(
2223 [
2224 (ViewParam.EMAIL, "patient@example.com"),
2225 (ViewParam.EMAIL_CC, "cc@example.com"),
2226 (ViewParam.EMAIL_BCC, "bcc@example.com"),
2227 (ViewParam.EMAIL_FROM, "server@example.com"),
2228 (ViewParam.EMAIL_SUBJECT, "Subject"),
2229 (ViewParam.EMAIL_BODY, "Email body"),
2230 (FormAction.SUBMIT, "submit"),
2231 ]
2232 )
2234 self.req.fake_request_post_from_dict(multidict)
2235 self.req.add_get_params(
2236 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2237 set_method_get=False,
2238 )
2239 view = SendEmailFromPatientTaskScheduleView(self.req)
2241 with self.assertRaises(HTTPFound):
2242 view.dispatch()
2244 args, kwargs = mock_make_email.call_args
2245 self.assertEqual(kwargs["to"], "patient@example.com")
2246 self.assertEqual(kwargs["bcc"], "bcc@example.com")
2248 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2249 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2250 def test_message_on_success(
2251 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2252 ) -> None:
2253 multidict = MultiDict(
2254 [
2255 (ViewParam.EMAIL, "patient@example.com"),
2256 (ViewParam.EMAIL_FROM, "server@example.com"),
2257 (ViewParam.EMAIL_SUBJECT, "Subject"),
2258 (ViewParam.EMAIL_BODY, "Email body"),
2259 (FormAction.SUBMIT, "submit"),
2260 ]
2261 )
2263 self.req.fake_request_post_from_dict(multidict)
2264 self.req.add_get_params(
2265 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2266 set_method_get=False,
2267 )
2268 view = SendEmailFromPatientTaskScheduleView(self.req)
2270 with self.assertRaises(HTTPFound):
2271 view.dispatch()
2273 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
2274 self.assertTrue(len(messages) > 0)
2276 self.assertIn("Email sent to patient@example.com", messages[0])
2278 @mock.patch(
2279 "camcops_server.cc_modules.cc_email.send_msg",
2280 side_effect=RuntimeError("Something bad happened"),
2281 )
2282 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2283 def test_message_on_failure(
2284 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2285 ) -> None:
2286 multidict = MultiDict(
2287 [
2288 (ViewParam.EMAIL, "patient@example.com"),
2289 (ViewParam.EMAIL_FROM, "server@example.com"),
2290 (ViewParam.EMAIL_SUBJECT, "Subject"),
2291 (ViewParam.EMAIL_BODY, "Email body"),
2292 (FormAction.SUBMIT, "submit"),
2293 ]
2294 )
2296 self.req.fake_request_post_from_dict(multidict)
2297 self.req.add_get_params(
2298 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2299 set_method_get=False,
2300 )
2301 view = SendEmailFromPatientTaskScheduleView(self.req)
2303 with self.assertRaises(HTTPFound):
2304 view.dispatch()
2306 messages = self.req.session.peek_flash(FlashQueue.DANGER)
2307 self.assertTrue(len(messages) > 0)
2309 self.assertIn(
2310 "Failed to send email to patient@example.com", messages[0]
2311 )
2313 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2314 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2315 def test_email_record_created(
2316 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2317 ) -> None:
2318 multidict = MultiDict(
2319 [
2320 (ViewParam.EMAIL, "patient@example.com"),
2321 (ViewParam.EMAIL_FROM, "server@example.com"),
2322 (ViewParam.EMAIL_SUBJECT, "Subject"),
2323 (ViewParam.EMAIL_BODY, "Email body"),
2324 (FormAction.SUBMIT, "submit"),
2325 ]
2326 )
2328 self.req.fake_request_post_from_dict(multidict)
2329 self.req.add_get_params(
2330 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2331 set_method_get=False,
2332 )
2333 view = SendEmailFromPatientTaskScheduleView(self.req)
2335 self.assertEqual(len(self.pts.emails), 0)
2337 with self.assertRaises(HTTPFound):
2338 view.dispatch()
2340 self.assertEqual(len(self.pts.emails), 1)
2341 self.assertEqual(self.pts.emails[0].email.to, "patient@example.com")
2343 def test_unprivileged_user_cannot_email_patient(self) -> None:
2344 user = UserFactory(username="testuser")
2346 self.req._debugging_user = user
2348 multidict = MultiDict(
2349 [
2350 (ViewParam.EMAIL, "patient@example.com"),
2351 (ViewParam.EMAIL_FROM, "server@example.com"),
2352 (ViewParam.EMAIL_SUBJECT, "Subject"),
2353 (ViewParam.EMAIL_BODY, "Email body"),
2354 (FormAction.SUBMIT, "submit"),
2355 ]
2356 )
2358 self.req.fake_request_post_from_dict(multidict)
2359 self.req.add_get_params(
2360 {ViewParam.PATIENT_TASK_SCHEDULE_ID: str(self.pts.id)},
2361 set_method_get=False,
2362 )
2364 with self.assertRaises(HTTPBadRequest) as cm:
2365 view = SendEmailFromPatientTaskScheduleView(self.req)
2366 view.dispatch()
2368 self.assertEqual(
2369 cm.exception.message, "Not authorized to email patients"
2370 )
2373class LoginViewTests(TestStateMixin, BasicDatabaseTestCase):
2374 def setUp(self) -> None:
2375 super().setUp()
2377 self.req.matched_route.name = "login_view"
2379 def test_form_rendered_with_values(self) -> None:
2380 self.req.add_get_params(
2381 {ViewParam.REDIRECT_URL: "https://www.example.com"}
2382 )
2383 view = LoginView(self.req)
2385 with mock.patch.object(view, "render_to_response") as mock_render:
2386 view.dispatch()
2388 args, kwargs = mock_render.call_args
2389 context = args[0]
2391 self.assertIn("form", context)
2392 self.assertIn("https://www.example.com", context["form"])
2394 def test_template_rendered(self) -> None:
2395 view = LoginView(self.req)
2396 response = view.dispatch()
2398 self.assertIn("Log in", response.body.decode(UTF8))
2400 def test_password_autocomplete_read_from_config(self) -> None:
2401 self.req.config.disable_password_autocomplete = False
2403 view = LoginView(self.req)
2405 with mock.patch.object(view, "render_to_response") as mock_render:
2406 view.dispatch()
2408 args, kwargs = mock_render.call_args
2409 context = args[0]
2411 self.assertIn('autocomplete="current-password"', context["form"])
2413 def test_fails_when_user_locked_out(self) -> None:
2414 user = UserFactory(
2415 username="test", password__request=self.req, password="secret"
2416 )
2417 SecurityAccountLockout.lock_user_out(
2418 self.req, user.username, lockout_minutes=1
2419 )
2421 multidict = MultiDict(
2422 [
2423 (ViewParam.USERNAME, user.username),
2424 (ViewParam.PASSWORD, "secret"),
2425 (FormAction.SUBMIT, "submit"),
2426 ]
2427 )
2429 self.req.fake_request_post_from_dict(multidict)
2431 view = LoginView(self.req)
2433 with mock.patch.object(
2434 view, "fail_locked_out", side_effect=HTTPFound
2435 ) as mock_fail_locked_out:
2436 with self.assertRaises(HTTPFound):
2437 view.dispatch()
2439 args, kwargs = mock_fail_locked_out.call_args
2440 locked_out_until = SecurityAccountLockout.user_locked_out_until(
2441 self.req, user.username
2442 )
2443 self.assertEqual(args[0], locked_out_until)
2445 @mock.patch("camcops_server.cc_modules.webview.audit")
2446 def test_user_can_log_in(self, mock_audit: mock.Mock) -> None:
2447 user = UserFactory(
2448 username="test", password__request=self.req, password="secret"
2449 )
2450 UserGroupMembershipFactory(
2451 user_id=user.id, group_id=self.group.id, may_use_webviewer=True
2452 )
2454 multidict = MultiDict(
2455 [
2456 (ViewParam.USERNAME, user.username),
2457 (ViewParam.PASSWORD, "secret"),
2458 (FormAction.SUBMIT, "submit"),
2459 ]
2460 )
2462 self.req.fake_request_post_from_dict(multidict)
2464 view = LoginView(self.req)
2466 with mock.patch.object(user, "login") as mock_user_login:
2467 with mock.patch.object(
2468 self.req.camcops_session, "login"
2469 ) as mock_session_login:
2470 with self.assertRaises(HTTPFound):
2471 view.dispatch()
2473 args, kwargs = mock_user_login.call_args
2474 self.assertEqual(args[0], self.req)
2476 args, kwargs = mock_session_login.call_args
2477 self.assertEqual(args[0], user)
2479 args, kwargs = mock_audit.call_args
2480 self.assertEqual(args[0], self.req)
2481 self.assertEqual(args[1], "Login")
2482 self.assertEqual(kwargs["user_id"], user.id)
2484 def test_user_with_totp_sees_token_form(self) -> None:
2485 user = UserFactory(
2486 username="test",
2487 mfa_secret_key=pyotp.random_base32(),
2488 mfa_method=MfaMethod.TOTP,
2489 password__request=self.req,
2490 password="secret",
2491 )
2492 group = GroupFactory()
2493 UserGroupMembershipFactory(
2494 user_id=user.id, group_id=group.id, may_use_webviewer=True
2495 )
2497 view = LoginView(self.req)
2498 view.state.update(
2499 mfa_user_id=user.id,
2500 step=MfaMixin.STEP_MFA,
2501 mfa_time=int(time.time()),
2502 )
2504 with mock.patch.object(view, "render_to_response") as mock_render:
2505 view.dispatch()
2507 args, kwargs = mock_render.call_args
2508 context = args[0]
2510 self.assertIn("form", context)
2511 self.assertIn("Enter the six-digit code", context["form"])
2512 self.assertIn(
2513 "Enter the code for CamCOPS displayed",
2514 context[MfaMixin.KEY_INSTRUCTIONS],
2515 )
2517 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2518 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2519 def test_user_with_hotp_email_sees_token_form(
2520 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2521 ) -> None:
2522 user = UserFactory(
2523 username="test",
2524 mfa_secret_key=pyotp.random_base32(),
2525 mfa_method=MfaMethod.HOTP_EMAIL,
2526 email="user@example.com",
2527 hotp_counter=0,
2528 password__request=self.req,
2529 password="secret",
2530 )
2531 group = GroupFactory()
2532 UserGroupMembershipFactory(
2533 user_id=user.id, group_id=group.id, may_use_webviewer=True
2534 )
2535 view = LoginView(self.req)
2536 view.state.update(
2537 mfa_user_id=user.id,
2538 step=MfaMixin.STEP_MFA,
2539 mfa_time=int(time.time()),
2540 )
2542 with mock.patch.object(view, "render_to_response") as mock_render:
2543 view.dispatch()
2545 args, kwargs = mock_render.call_args
2546 context = args[0]
2548 self.assertIn("form", context)
2549 self.assertIn("Enter the six-digit code", context["form"])
2550 self.assertIn(
2551 "We've sent a code by email", context[MfaMixin.KEY_INSTRUCTIONS]
2552 )
2554 def test_user_with_hotp_sms_sees_token_form(self) -> None:
2555 self.req.config.sms_backend = get_sms_backend(
2556 SmsBackendNames.CONSOLE, {}
2557 )
2559 phone_number_str = Fake.en_gb.valid_phone_number()
2560 user = UserFactory(
2561 username="test",
2562 mfa_secret_key=pyotp.random_base32(),
2563 mfa_method=MfaMethod.HOTP_SMS,
2564 phone_number=phonenumbers.parse(phone_number_str),
2565 hotp_counter=0,
2566 password__request=self.req,
2567 password="secret",
2568 )
2569 group = GroupFactory()
2570 UserGroupMembershipFactory(
2571 user_id=user.id, group_id=group.id, may_use_webviewer=True
2572 )
2574 view = LoginView(self.req)
2575 view.state.update(
2576 mfa_user_id=user.id,
2577 step=MfaMixin.STEP_MFA,
2578 mfa_time=int(time.time()),
2579 )
2581 with mock.patch.object(view, "render_to_response") as mock_render:
2582 view.dispatch()
2584 args, kwargs = mock_render.call_args
2585 context = args[0]
2587 self.assertIn("form", context)
2588 self.assertIn("Enter the six-digit code", context["form"])
2589 self.assertIn(
2590 "We've sent a code by text message",
2591 context[MfaMixin.KEY_INSTRUCTIONS],
2592 )
2594 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2595 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2596 @mock.patch("camcops_server.cc_modules.webview.time")
2597 def test_session_state_set_for_user_with_mfa(
2598 self,
2599 mock_time: mock.Mock,
2600 mock_make_email: mock.Mock,
2601 mock_send_msg: mock.Mock,
2602 ) -> None:
2603 user = UserFactory(
2604 username="test",
2605 mfa_secret_key=pyotp.random_base32(),
2606 mfa_method=MfaMethod.HOTP_EMAIL,
2607 email="user@example.com",
2608 password__request=self.req,
2609 password="secret",
2610 )
2611 group = GroupFactory()
2612 UserGroupMembershipFactory(
2613 user_id=user.id, group_id=group.id, may_use_webviewer=True
2614 )
2616 multidict = MultiDict(
2617 [
2618 (ViewParam.USERNAME, user.username),
2619 (ViewParam.PASSWORD, "secret"),
2620 (FormAction.SUBMIT, "submit"),
2621 ]
2622 )
2624 self.req.fake_request_post_from_dict(multidict)
2626 view = LoginView(self.req)
2628 with mock.patch.object(
2629 mock_time, "time", return_value=1234567890.1234567
2630 ):
2631 view.dispatch()
2633 self.assertEqual(
2634 self.req.camcops_session.form_state[LoginView.KEY_MFA_USER_ID],
2635 user.id,
2636 )
2637 self.assertEqual(
2638 self.req.camcops_session.form_state[MfaMixin.KEY_MFA_TIME],
2639 1234567890,
2640 )
2641 self.assertEqual(
2642 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP],
2643 MfaMixin.STEP_MFA,
2644 )
2646 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2647 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2648 def test_user_with_hotp_is_sent_email(
2649 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2650 ) -> None:
2651 self.req.config.email_host = "smtp.example.com"
2652 self.req.config.email_port = 587
2653 self.req.config.email_host_username = "mailuser"
2654 self.req.config.email_host_password = "mailpassword"
2655 self.req.config.email_use_tls = True
2656 self.req.config.email_from = "server@example.com"
2658 user = UserFactory(
2659 username="test",
2660 email="user@example.com",
2661 mfa_secret_key=pyotp.random_base32(),
2662 mfa_method=MfaMethod.HOTP_EMAIL,
2663 hotp_counter=0,
2664 password__request=self.req,
2665 password="secret",
2666 )
2667 group = GroupFactory()
2668 UserGroupMembershipFactory(
2669 user_id=user.id, group_id=group.id, may_use_webviewer=True
2670 )
2672 multidict = MultiDict(
2673 [
2674 (ViewParam.USERNAME, user.username),
2675 (ViewParam.PASSWORD, "secret"),
2676 (FormAction.SUBMIT, "submit"),
2677 ]
2678 )
2680 self.req.fake_request_post_from_dict(multidict)
2682 view = LoginView(self.req)
2683 expected_code = pyotp.HOTP(user.mfa_secret_key).at(1)
2684 view.dispatch()
2686 args, kwargs = mock_make_email.call_args_list[0]
2687 self.assertEqual(kwargs["from_addr"], "server@example.com")
2688 self.assertEqual(kwargs["to"], "user@example.com")
2689 self.assertEqual(kwargs["subject"], "CamCOPS authentication")
2690 self.assertIn(
2691 f"Your CamCOPS verification code is {expected_code}",
2692 kwargs["body"],
2693 )
2694 self.assertEqual(kwargs["content_type"], "text/plain")
2696 args, kwargs = mock_send_msg.call_args
2697 self.assertEqual(kwargs["host"], "smtp.example.com")
2698 self.assertEqual(kwargs["user"], "mailuser")
2699 self.assertEqual(kwargs["password"], "mailpassword")
2700 self.assertEqual(kwargs["port"], 587)
2701 self.assertTrue(kwargs["use_tls"])
2703 def test_user_with_hotp_is_sent_sms(self) -> None:
2704 test_config = {"username": "testuser", "password": "testpass"}
2706 self.req.config.sms_backend = get_sms_backend(
2707 SmsBackendNames.CONSOLE, {}
2708 )
2709 self.req.config.sms_config = test_config
2711 phone_number_str = Fake.en_gb.valid_phone_number()
2712 user = UserFactory(
2713 username="test",
2714 email="user@example.com",
2715 phone_number=phonenumbers.parse(phone_number_str),
2716 mfa_secret_key=pyotp.random_base32(),
2717 mfa_method=MfaMethod.HOTP_SMS,
2718 hotp_counter=0,
2719 password__request=self.req,
2720 password="secret",
2721 )
2722 group = GroupFactory()
2723 UserGroupMembershipFactory(
2724 user_id=user.id, group_id=group.id, may_use_webviewer=True
2725 )
2727 multidict = MultiDict(
2728 [
2729 (ViewParam.USERNAME, user.username),
2730 (ViewParam.PASSWORD, "secret"),
2731 (FormAction.SUBMIT, "submit"),
2732 ]
2733 )
2735 self.req.fake_request_post_from_dict(multidict)
2737 view = LoginView(self.req)
2738 expected_code = pyotp.HOTP(user.mfa_secret_key).at(1)
2740 with self.assertLogs(level=logging.INFO) as logging_cm:
2741 view.dispatch()
2743 expected_message = f"Your CamCOPS verification code is {expected_code}"
2745 self.assertIn(
2746 ConsoleSmsBackend.make_msg(phone_number_str, expected_message),
2747 logging_cm.output[0],
2748 )
2750 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
2751 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
2752 def test_login_with_hotp_increments_counter(
2753 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
2754 ) -> None:
2755 user = UserFactory(
2756 username="test",
2757 email="user@example.com",
2758 mfa_secret_key=pyotp.random_base32(),
2759 mfa_method=MfaMethod.HOTP_EMAIL,
2760 hotp_counter=0,
2761 password__request=self.req,
2762 password="secret",
2763 )
2764 group = GroupFactory()
2765 UserGroupMembershipFactory(
2766 user_id=user.id, group_id=group.id, may_use_webviewer=True
2767 )
2769 multidict = MultiDict(
2770 [
2771 (ViewParam.USERNAME, user.username),
2772 (ViewParam.PASSWORD, "secret"),
2773 (FormAction.SUBMIT, "submit"),
2774 ]
2775 )
2777 self.req.fake_request_post_from_dict(multidict)
2779 view = LoginView(self.req)
2781 view.dispatch()
2783 self.assertEqual(user.hotp_counter, 1)
2785 @mock.patch("camcops_server.cc_modules.webview.audit")
2786 def test_user_with_totp_can_log_in(self, mock_audit: mock.Mock) -> None:
2787 user = UserFactory(
2788 username="test",
2789 mfa_method=MfaMethod.TOTP,
2790 mfa_secret_key=pyotp.random_base32(),
2791 password__request=self.req,
2792 password="secret",
2793 )
2795 group = GroupFactory()
2796 UserGroupMembershipFactory(
2797 user_id=user.id, group_id=group.id, may_use_webviewer=True
2798 )
2800 totp = pyotp.TOTP(user.mfa_secret_key)
2802 multidict = MultiDict(
2803 [
2804 (ViewParam.ONE_TIME_PASSWORD, totp.now()),
2805 (FormAction.SUBMIT, "submit"),
2806 ]
2807 )
2809 self.req.fake_request_post_from_dict(multidict)
2811 view = LoginView(self.req)
2812 view.state.update(mfa_user_id=user.id, step=MfaMixin.STEP_MFA)
2814 with mock.patch.object(user, "login") as mock_user_login:
2815 with mock.patch.object(
2816 self.req.camcops_session, "login"
2817 ) as mock_session_login:
2818 with mock.patch.object(view, "timed_out", return_value=False):
2819 with self.assertRaises(HTTPFound):
2820 view.dispatch()
2822 args, kwargs = mock_user_login.call_args
2823 self.assertEqual(args[0], self.req)
2825 args, kwargs = mock_session_login.call_args
2826 self.assertEqual(args[0], user)
2828 args, kwargs = mock_audit.call_args
2829 self.assertEqual(args[0], self.req)
2830 self.assertEqual(args[1], "Login")
2831 self.assertEqual(kwargs["user_id"], user.id)
2832 self.assert_state_is_finished()
2834 @mock.patch("camcops_server.cc_modules.webview.audit")
2835 def test_user_with_hotp_can_log_in(self, mock_audit: mock.Mock) -> None:
2836 user = UserFactory(
2837 username="test",
2838 mfa_method=MfaMethod.HOTP_EMAIL,
2839 mfa_secret_key=pyotp.random_base32(),
2840 hotp_counter=1,
2841 password__request=self.req,
2842 password="secret",
2843 )
2844 group = GroupFactory()
2845 UserGroupMembershipFactory(
2846 user_id=user.id, group_id=group.id, may_use_webviewer=True
2847 )
2849 hotp = pyotp.HOTP(user.mfa_secret_key)
2850 multidict = MultiDict(
2851 [
2852 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)),
2853 (FormAction.SUBMIT, "submit"),
2854 ]
2855 )
2857 self.req.fake_request_post_from_dict(multidict)
2859 view = LoginView(self.req)
2860 view.state.update(mfa_user_id=user.id, step=MfaMixin.STEP_MFA)
2862 with mock.patch.object(user, "login") as mock_user_login:
2863 with mock.patch.object(
2864 self.req.camcops_session, "login"
2865 ) as mock_session_login:
2866 with mock.patch.object(view, "timed_out", return_value=False):
2867 with self.assertRaises(HTTPFound):
2868 view.dispatch()
2870 args, kwargs = mock_user_login.call_args
2871 self.assertEqual(args[0], self.req)
2873 args, kwargs = mock_session_login.call_args
2874 self.assertEqual(args[0], user)
2876 args, kwargs = mock_audit.call_args
2877 self.assertEqual(args[0], self.req)
2878 self.assertEqual(args[1], "Login")
2879 self.assertEqual(kwargs["user_id"], user.id)
2880 self.assert_state_is_finished()
2882 def test_form_state_cleared_on_failed_login(self) -> None:
2883 user = UserFactory(
2884 username="test",
2885 mfa_method=MfaMethod.HOTP_EMAIL,
2886 mfa_secret_key=pyotp.random_base32(),
2887 hotp_counter=1,
2888 password__request=self.req,
2889 password="secret",
2890 )
2891 group = GroupFactory()
2892 UserGroupMembershipFactory(
2893 user_id=user.id, group_id=group.id, may_use_webviewer=True
2894 )
2896 hotp = pyotp.HOTP(user.mfa_secret_key)
2898 multidict = MultiDict(
2899 [
2900 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)),
2901 (FormAction.SUBMIT, "submit"),
2902 ]
2903 )
2905 self.req.fake_request_post_from_dict(multidict)
2907 view = LoginView(self.req)
2908 view.state.update(step=MfaMixin.STEP_MFA, mfa_user_id=user.id)
2910 with mock.patch.object(view, "timed_out", return_value=False):
2911 with self.assertRaises(HTTPFound):
2912 view.dispatch()
2914 messages = self.req.session.peek_flash(FlashQueue.DANGER)
2915 self.assertTrue(len(messages) > 0)
2916 self.assertIn("You entered an invalid code", messages[0])
2918 self.assert_state_is_clean()
2920 def test_user_cannot_log_in_if_timed_out(self) -> None:
2921 self.req.config.mfa_timeout_s = 600
2922 user = UserFactory(
2923 username="test",
2924 mfa_method=MfaMethod.TOTP,
2925 mfa_secret_key=pyotp.random_base32(),
2926 password__request=self.req,
2927 password="secret",
2928 )
2929 group = GroupFactory()
2930 UserGroupMembershipFactory(
2931 user_id=user.id, group_id=group.id, may_use_webviewer=True
2932 )
2934 totp = pyotp.TOTP(user.mfa_secret_key)
2936 multidict = MultiDict(
2937 [
2938 (ViewParam.ONE_TIME_PASSWORD, totp.now()),
2939 (FormAction.SUBMIT, "submit"),
2940 ]
2941 )
2943 self.req.fake_request_post_from_dict(multidict)
2945 view = LoginView(self.req)
2946 view.state.update(
2947 mfa_user=user.id,
2948 mfa_time=int(time.time() - 601),
2949 step=MfaMixin.STEP_MFA,
2950 )
2952 with mock.patch.object(
2953 view, "fail_timed_out", side_effect=HTTPFound
2954 ) as mock_fail_timed_out:
2955 with self.assertRaises(HTTPFound):
2956 view.dispatch()
2958 mock_fail_timed_out.assert_called_once()
2960 def test_unprivileged_user_cannot_log_in(self) -> None:
2961 user = UserFactory(
2962 username="test", password__request=self.req, password="secret"
2963 )
2964 UserGroupMembershipFactory(
2965 user_id=user.id, group_id=self.group.id, may_use_webviewer=False
2966 )
2968 multidict = MultiDict(
2969 [
2970 (ViewParam.USERNAME, user.username),
2971 (ViewParam.PASSWORD, "secret"),
2972 (FormAction.SUBMIT, "submit"),
2973 ]
2974 )
2976 self.req.fake_request_post_from_dict(multidict)
2978 view = LoginView(self.req)
2980 with mock.patch.object(
2981 view, "fail_not_authorized", side_effect=HTTPFound
2982 ) as mock_fail_not_authorized:
2983 # The fail_not_authorized() function raises an exception
2984 # (of type HTTPFound) so the mock must do too. Otherwise
2985 # it will fall through inappropriately (and crash).
2986 with self.assertRaises(HTTPFound):
2987 view.dispatch()
2989 mock_fail_not_authorized.assert_called_once()
2991 def test_unknown_user_cannot_log_in(self) -> None:
2992 multidict = MultiDict(
2993 [
2994 (ViewParam.USERNAME, "unknown"),
2995 (ViewParam.PASSWORD, "secret"),
2996 (FormAction.SUBMIT, "submit"),
2997 ]
2998 )
3000 self.req.fake_request_post_from_dict(multidict)
3002 view = LoginView(self.req)
3004 with mock.patch.object(
3005 SecurityLoginFailure, "act_on_login_failure"
3006 ) as mock_act:
3007 with mock.patch.object(
3008 self.req.camcops_session, "logout"
3009 ) as mock_logout:
3010 with mock.patch.object(
3011 view, "fail_not_authorized", side_effect=HTTPFound
3012 ) as mock_fail_not_authorized:
3013 with self.assertRaises(HTTPFound):
3014 view.dispatch()
3016 args, kwargs = mock_act.call_args
3017 self.assertEqual(args[0], self.req)
3018 self.assertEqual(args[1], "unknown")
3020 mock_logout.assert_called_once()
3021 mock_fail_not_authorized.assert_called_once()
3023 def test_timed_out_false_when_timeout_zero(self) -> None:
3024 self.req.config.mfa_timeout_s = 0
3025 view = LoginView(self.req)
3026 view.state["mfa_time"] = 0
3028 self.assertFalse(view.timed_out())
3030 def test_timed_out_false_when_no_authenticated_user(self) -> None:
3031 view = LoginView(self.req)
3033 self.assertFalse(view.timed_out())
3035 def test_timed_out_false_when_no_authentication_time(self) -> None:
3036 view = LoginView(self.req)
3038 user = UserFactory(username="test")
3039 # Should never be the case that we have a user ID but no
3040 # authentication time
3041 view.state["mfa_user_id"] = user.id
3043 self.assertFalse(view.timed_out())
3046class EditUserViewTests(BasicDatabaseTestCase):
3047 def test_redirect_on_cancel(self) -> None:
3048 regular_user = UserFactory(username="regular_user")
3049 self.req.fake_request_post_from_dict({FormAction.CANCEL: "cancel"})
3050 self.req.add_get_params(
3051 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3052 )
3054 with self.assertRaises(HTTPFound) as cm:
3055 edit_user(self.req)
3057 self.assertEqual(cm.exception.status_code, 302)
3058 self.assertIn(
3059 f"/{Routes.VIEW_ALL_USERS}", cm.exception.headers["Location"]
3060 )
3062 def test_raises_if_user_may_not_edit_another(self) -> None:
3063 self.req.add_get_params({ViewParam.USER_ID: str(self.system_user.id)})
3065 regular_user = UserFactory(username="regular_user")
3066 self.req._debugging_user = regular_user
3067 with self.assertRaises(HTTPBadRequest) as cm:
3068 edit_user(self.req)
3070 self.assertIn("Nobody may edit the system user", cm.exception.message)
3072 def test_superuser_sees_full_form(self) -> None:
3073 superuser = UserFactory(username="admin", superuser=True)
3074 self.req._debugging_user = superuser
3076 self.req.add_get_params({ViewParam.USER_ID: str(superuser.id)})
3078 response = edit_user(self.req)
3080 self.assertIn("Superuser (CAUTION!)", response.body.decode(UTF8))
3082 def test_groupadmin_sees_groupadmin_form(self) -> None:
3083 groupadmin = UserFactory(username="groupadmin")
3084 regular_user = UserFactory(username="regular_user")
3085 group = GroupFactory()
3086 UserGroupMembershipFactory(
3087 user_id=groupadmin.id, group_id=group.id, groupadmin=True
3088 )
3089 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id)
3090 self.req._debugging_user = groupadmin
3092 self.req.add_get_params({ViewParam.USER_ID: str(regular_user.id)})
3094 response = edit_user(self.req)
3095 content = response.body.decode(UTF8)
3097 self.assertIn("Full name", content)
3098 self.assertNotIn("Superuser (CAUTION!)", content)
3100 def test_raises_for_conflicting_user_name(self) -> None:
3101 UserFactory(username="existing_user")
3102 other_user = UserFactory(username="other_user")
3104 multidict = MultiDict(
3105 [
3106 (ViewParam.USERNAME, "existing_user"),
3107 (FormAction.SUBMIT, "submit"),
3108 ]
3109 )
3110 self.req.fake_request_post_from_dict(multidict)
3111 self.req.add_get_params(
3112 {ViewParam.USER_ID: str(other_user.id)}, set_method_get=False
3113 )
3115 with self.assertRaises(HTTPBadRequest) as cm:
3116 edit_user(self.req)
3118 self.assertIn("Can't rename user", cm.exception.message)
3120 def test_user_is_updated(self) -> None:
3121 user = UserFactory(
3122 username="old_username",
3123 fullname="Old Name",
3124 email="old@example.com",
3125 language="da_DK",
3126 )
3128 multidict = MultiDict(
3129 [
3130 (ViewParam.USERNAME, "new_username"),
3131 (ViewParam.FULLNAME, "New Name"),
3132 (ViewParam.EMAIL, "new@example.com"),
3133 (ViewParam.LANGUAGE, "en_GB"),
3134 (FormAction.SUBMIT, "submit"),
3135 ]
3136 )
3137 self.req.fake_request_post_from_dict(multidict)
3138 self.req.add_get_params(
3139 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3140 )
3142 with self.assertRaises(HTTPFound):
3143 edit_user(self.req)
3145 self.assertEqual(user.username, "new_username")
3146 self.assertEqual(user.fullname, "New Name")
3147 self.assertEqual(user.email, "new@example.com")
3148 self.assertEqual(user.language, "en_GB")
3150 def test_user_is_added_to_group(self) -> None:
3151 user = UserFactory()
3152 group = GroupFactory()
3154 multidict = MultiDict(
3155 [
3156 (ViewParam.USERNAME, user.username),
3157 ("__start__", "group_ids:sequence"),
3158 ("group_id_sequence", str(group.id)),
3159 ("__end__", "group_ids:sequence"),
3160 (FormAction.SUBMIT, "submit"),
3161 ]
3162 )
3163 self.req.fake_request_post_from_dict(multidict)
3164 self.req.add_get_params(
3165 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3166 )
3168 with mock.patch.object(user, "set_group_ids") as mock_set_group_ids:
3169 with self.assertRaises(HTTPFound):
3170 edit_user(self.req)
3172 mock_set_group_ids.assert_called_once_with([group.id])
3174 def test_user_stays_in_group_the_groupadmin_cannot_edit(self) -> None:
3175 regular_user = UserFactory(username="regular_user")
3176 group_b_admin = UserFactory(username="group_b_admin")
3177 group_a = GroupFactory()
3178 group_b = GroupFactory()
3179 UserGroupMembershipFactory(
3180 user_id=regular_user.id, group_id=group_a.id
3181 )
3182 UserGroupMembershipFactory(
3183 user_id=regular_user.id, group_id=group_b.id
3184 )
3185 UserGroupMembershipFactory(
3186 user_id=group_b_admin.id, group_id=group_b.id, groupadmin=True
3187 )
3188 self.req._debugging_user = group_b_admin
3190 multidict = MultiDict(
3191 [
3192 (ViewParam.USERNAME, regular_user.username),
3193 ("__start__", "group_ids:sequence"),
3194 ("group_id_sequence", str(group_b.id)),
3195 ("__end__", "group_ids:sequence"),
3196 (FormAction.SUBMIT, "submit"),
3197 ]
3198 )
3199 self.req.fake_request_post_from_dict(multidict)
3200 self.req.add_get_params(
3201 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3202 )
3204 with mock.patch.object(
3205 regular_user, "set_group_ids"
3206 ) as mock_set_group_ids:
3207 with self.assertRaises(HTTPFound):
3208 edit_user(self.req)
3210 [actual_group_ids] = mock_set_group_ids.call_args[0]
3211 self.assertEqual(
3212 sorted(actual_group_ids), sorted([group_a.id, group_b.id])
3213 )
3215 def test_upload_group_id_unset_when_membership_removed(self) -> None:
3216 group_a = GroupFactory()
3217 group_b = GroupFactory()
3218 regular_user = UserFactory(upload_group=group_a)
3219 groupadmin = UserFactory()
3220 UserGroupMembershipFactory(
3221 group_id=group_a.id, user_id=regular_user.id
3222 )
3223 UserGroupMembershipFactory(
3224 group_id=group_b.id, user_id=regular_user.id
3225 )
3226 UserGroupMembershipFactory(
3227 group_id=group_a.id, user_id=groupadmin.id, groupadmin=True
3228 )
3229 UserGroupMembershipFactory(
3230 group_id=group_b.id, user_id=groupadmin.id, groupadmin=True
3231 )
3232 self.req._debugging_user = groupadmin
3234 multidict = MultiDict(
3235 [
3236 (ViewParam.USERNAME, regular_user.username),
3237 ("__start__", "group_ids:sequence"),
3238 ("group_id_sequence", str(group_b.id)),
3239 ("__end__", "group_ids:sequence"),
3240 (FormAction.SUBMIT, "submit"),
3241 ]
3242 )
3243 self.req.fake_request_post_from_dict(multidict)
3244 self.req.add_get_params(
3245 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3246 )
3248 with self.assertRaises(HTTPFound):
3249 edit_user(self.req)
3251 self.assertIsNone(regular_user.upload_group_id)
3253 def test_get_form_values(self) -> None:
3254 regular_user = UserFactory(
3255 username="regular_user",
3256 fullname="Full Name",
3257 email="user@example.com",
3258 language="da_DK",
3259 )
3260 group_b_admin = UserFactory(username="group_b_admin")
3261 group_a = GroupFactory()
3262 group_b = GroupFactory()
3263 UserGroupMembershipFactory(
3264 user_id=regular_user.id, group_id=group_a.id
3265 )
3266 UserGroupMembershipFactory(
3267 user_id=regular_user.id, group_id=group_b.id
3268 )
3269 UserGroupMembershipFactory(
3270 user_id=group_b_admin.id, group_id=group_b.id, groupadmin=True
3271 )
3272 self.req._debugging_user = group_b_admin
3274 view = EditUserGroupAdminView(self.req)
3275 # Would normally be set when going through dispatch()
3276 view.object = regular_user
3278 form_values = view.get_form_values()
3280 self.assertEqual(
3281 form_values[ViewParam.USERNAME], regular_user.username
3282 )
3283 self.assertEqual(
3284 form_values[ViewParam.FULLNAME], regular_user.fullname
3285 )
3286 self.assertEqual(form_values[ViewParam.EMAIL], regular_user.email)
3287 self.assertEqual(
3288 form_values[ViewParam.LANGUAGE], regular_user.language
3289 )
3290 self.assertEqual(form_values[ViewParam.GROUP_IDS], [group_b.id])
3292 def test_raises_if_email_address_used_for_mfa(self) -> None:
3293 regular_user = UserFactory(
3294 username="regular_user",
3295 mfa_method=MfaMethod.HOTP_EMAIL,
3296 email="user@example.com",
3297 )
3299 multidict = MultiDict(
3300 [
3301 (ViewParam.USERNAME, regular_user.username),
3302 (ViewParam.EMAIL, ""),
3303 (FormAction.SUBMIT, "submit"),
3304 ]
3305 )
3306 self.req.fake_request_post_from_dict(multidict)
3307 self.req.add_get_params(
3308 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3309 )
3311 with self.assertRaises(HTTPBadRequest) as cm:
3312 edit_user(self.req)
3314 self.assertIn(
3315 "used for multi-factor authentication", cm.exception.message
3316 )
3319class EditOwnUserMfaViewTests(BasicDatabaseTestCase):
3320 def test_get_form_values_mfa_method(self) -> None:
3321 regular_user = UserFactory(
3322 username="regular_user", mfa_method=MfaMethod.HOTP_SMS
3323 )
3324 self.req._debugging_user = regular_user
3325 view = EditOwnUserMfaView(self.req)
3327 # Would normally be set when going through dispatch()
3328 view.object = regular_user
3330 form_values = view.get_form_values()
3332 self.assertEqual(
3333 form_values[ViewParam.MFA_METHOD], regular_user.mfa_method
3334 )
3336 def test_get_form_values_hotp_email(self) -> None:
3337 regular_user = UserFactory(
3338 username="regular_user",
3339 mfa_method=MfaMethod.HOTP_EMAIL,
3340 email="regular_user@example.com",
3341 )
3342 self.req._debugging_user = regular_user
3343 view = EditOwnUserMfaView(self.req)
3345 # Would normally be set when going through dispatch()
3346 view.object = regular_user
3347 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_EMAIL)
3349 mock_secret_key = pyotp.random_base32()
3350 with mock.patch(
3351 "camcops_server.cc_modules.webview.pyotp.random_base32",
3352 return_value=mock_secret_key,
3353 ) as mock_random_base32:
3354 form_values = view.get_form_values()
3356 mock_random_base32.assert_called_once()
3358 self.assertEqual(
3359 form_values[ViewParam.MFA_SECRET_KEY], mock_secret_key
3360 )
3361 self.assertEqual(form_values[ViewParam.EMAIL], regular_user.email)
3363 def test_get_form_values_hotp_sms(self) -> None:
3364 regular_user = UserFactory(
3365 username="regular_user",
3366 mfa_method=MfaMethod.HOTP_SMS,
3367 phone_number=phonenumbers.parse(Fake.en_gb.valid_phone_number()),
3368 )
3369 self.req._debugging_user = regular_user
3370 view = EditOwnUserMfaView(self.req)
3372 # Would normally be set when going through dispatch()
3373 view.object = regular_user
3374 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_SMS)
3376 mock_secret_key = pyotp.random_base32()
3377 with mock.patch(
3378 "camcops_server.cc_modules.webview.pyotp.random_base32",
3379 return_value=mock_secret_key,
3380 ) as mock_random_base32:
3381 form_values = view.get_form_values()
3383 mock_random_base32.assert_called_once()
3385 self.assertEqual(
3386 form_values[ViewParam.MFA_SECRET_KEY], mock_secret_key
3387 )
3388 self.assertEqual(
3389 form_values[ViewParam.PHONE_NUMBER], regular_user.phone_number
3390 )
3392 def test_get_form_values_totp(self) -> None:
3393 regular_user = UserFactory(
3394 username="regular_user", mfa_method=MfaMethod.TOTP
3395 )
3396 self.req._debugging_user = regular_user
3397 view = EditOwnUserMfaView(self.req)
3399 # Would normally be set when going through dispatch()
3400 view.object = regular_user
3401 view.state.update(step=EditOwnUserMfaView.STEP_TOTP)
3403 mock_secret_key = pyotp.random_base32()
3404 with mock.patch(
3405 "camcops_server.cc_modules.webview.pyotp.random_base32",
3406 return_value=mock_secret_key,
3407 ) as mock_random_base32:
3408 form_values = view.get_form_values()
3410 mock_random_base32.assert_called_once()
3412 self.assertEqual(
3413 form_values[ViewParam.MFA_SECRET_KEY], mock_secret_key
3414 )
3416 def test_user_can_set_secret_key(self) -> None:
3417 regular_user = UserFactory(username="regular_user")
3418 regular_user.mfa_method = MfaMethod.TOTP
3419 regular_user.ensure_mfa_info()
3420 # ... otherwise, the absence of e.g. the HOTP counter will cause a
3421 # secret key reset.
3422 mfa_secret_key = pyotp.random_base32()
3424 multidict = MultiDict(
3425 [
3426 (ViewParam.MFA_SECRET_KEY, mfa_secret_key),
3427 (FormAction.SUBMIT, "submit"),
3428 ]
3429 )
3430 self.req._debugging_user = regular_user
3431 self.req.fake_request_post_from_dict(multidict)
3432 self.req.config.mfa_methods = [MfaMethod.TOTP]
3434 view = EditOwnUserMfaView(self.req)
3435 view.state.update(step=EditOwnUserMfaView.STEP_TOTP)
3437 view.dispatch()
3439 self.assertEqual(regular_user.mfa_secret_key, mfa_secret_key)
3441 def test_user_can_set_method_totp(self) -> None:
3442 regular_user = UserFactory(username="regular_user")
3443 multidict = MultiDict(
3444 [
3445 (ViewParam.MFA_METHOD, MfaMethod.TOTP),
3446 (FormAction.SUBMIT, "submit"),
3447 ]
3448 )
3449 self.req._debugging_user = regular_user
3450 self.req.fake_request_post_from_dict(multidict)
3451 self.req.config.mfa_methods = [MfaMethod.TOTP]
3453 view = EditOwnUserMfaView(self.req)
3455 view.dispatch()
3457 self.assertEqual(regular_user.mfa_method, MfaMethod.TOTP)
3459 def test_user_can_set_method_hotp_email(self) -> None:
3460 regular_user = UserFactory(username="regular_user")
3461 multidict = MultiDict(
3462 [
3463 (ViewParam.MFA_METHOD, MfaMethod.HOTP_EMAIL),
3464 (FormAction.SUBMIT, "submit"),
3465 ]
3466 )
3467 self.req._debugging_user = regular_user
3468 self.req.fake_request_post_from_dict(multidict)
3469 self.req.config.mfa_methods = [MfaMethod.HOTP_EMAIL]
3471 view = EditOwnUserMfaView(self.req)
3473 view.dispatch()
3475 self.assertEqual(regular_user.mfa_method, MfaMethod.HOTP_EMAIL)
3476 self.assertEqual(regular_user.hotp_counter, 0)
3478 def test_user_can_set_method_hotp_sms(self) -> None:
3479 regular_user = UserFactory(username="regular_user")
3480 multidict = MultiDict(
3481 [
3482 (ViewParam.MFA_METHOD, MfaMethod.HOTP_SMS),
3483 (FormAction.SUBMIT, "submit"),
3484 ]
3485 )
3486 self.req._debugging_user = regular_user
3487 self.req.fake_request_post_from_dict(multidict)
3488 self.req.config.mfa_methods = [MfaMethod.HOTP_SMS]
3490 view = EditOwnUserMfaView(self.req)
3492 view.dispatch()
3494 self.assertEqual(regular_user.mfa_method, MfaMethod.HOTP_SMS)
3495 self.assertEqual(regular_user.hotp_counter, 0)
3497 def test_user_can_disable_mfa(self) -> None:
3498 regular_user = UserFactory(
3499 username="regular_user", mfa_method=MfaMethod.TOTP
3500 )
3501 multidict = MultiDict(
3502 [
3503 (ViewParam.MFA_METHOD, MfaMethod.NO_MFA),
3504 (FormAction.SUBMIT, "submit"),
3505 ]
3506 )
3507 self.req._debugging_user = regular_user
3508 self.req.fake_request_post_from_dict(multidict)
3509 self.req.config.mfa_methods = [
3510 MfaMethod.TOTP,
3511 MfaMethod.HOTP_SMS,
3512 MfaMethod.HOTP_EMAIL,
3513 MfaMethod.NO_MFA,
3514 ]
3516 view = EditOwnUserMfaView(self.req)
3518 with self.assertRaises(HTTPFound):
3519 view.dispatch()
3521 self.assertEqual(regular_user.mfa_method, MfaMethod.NO_MFA)
3523 def test_user_can_set_phone_number(self) -> None:
3524 regular_user = UserFactory(username="regular_user")
3525 regular_user.mfa_method = MfaMethod.HOTP_SMS
3527 phone_number_str = Fake.en_gb.valid_phone_number()
3529 multidict = MultiDict(
3530 [
3531 (ViewParam.PHONE_NUMBER, phone_number_str),
3532 (FormAction.SUBMIT, "submit"),
3533 ]
3534 )
3535 self.req._debugging_user = regular_user
3536 self.req.fake_request_post_from_dict(multidict)
3537 self.req.config.mfa_methods = [MfaMethod.HOTP_SMS]
3539 view = EditOwnUserMfaView(self.req)
3540 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_SMS)
3542 view.dispatch()
3544 self.assertEqual(
3545 regular_user.phone_number, phonenumbers.parse(phone_number_str)
3546 )
3548 def test_user_can_set_email_address(self) -> None:
3549 regular_user = UserFactory(username="regular_user")
3550 # We're going to force this user to the e-mail verification step, so
3551 # we need to ensure it's set to use e-mail MFA:
3552 regular_user.mfa_method = MfaMethod.HOTP_EMAIL
3553 multidict = MultiDict(
3554 [
3555 (ViewParam.EMAIL, "regular_user@example.com"),
3556 (FormAction.SUBMIT, "submit"),
3557 ]
3558 )
3559 self.req._debugging_user = regular_user
3560 self.req.fake_request_post_from_dict(multidict)
3562 view = EditOwnUserMfaView(self.req)
3563 view.state.update(step=EditOwnUserMfaView.STEP_HOTP_EMAIL)
3565 view.dispatch()
3567 self.assertEqual(regular_user.email, "regular_user@example.com")
3570class ChangeOtherPasswordViewTests(TestStateMixin, BasicDatabaseTestCase):
3571 def setUp(self) -> None:
3572 super().setUp()
3574 self.req.matched_route.name = "change_other_password"
3576 def test_raises_for_invalid_user(self) -> None:
3577 multidict = MultiDict([(FormAction.SUBMIT, "submit")])
3578 self.req.fake_request_post_from_dict(multidict)
3580 self.req.add_get_params(
3581 {ViewParam.USER_ID: "123"}, set_method_get=False
3582 )
3584 view = ChangeOtherPasswordView(self.req)
3585 with self.assertRaises(HTTPBadRequest) as cm:
3586 view.dispatch()
3588 self.assertIn("Cannot find User with id:123", cm.exception.message)
3590 def test_raises_when_user_may_not_edit_other_user(self) -> None:
3591 regular_user = UserFactory(username="regular_user")
3592 multidict = MultiDict(
3593 [
3594 ("__start__", "new_password:mapping"),
3595 (ViewParam.NEW_PASSWORD, "monkeybusiness"),
3596 ("new_password-confirm", "monkeybusiness"),
3597 ("__end__", "new_password:mapping"),
3598 (FormAction.SUBMIT, "submit"),
3599 ]
3600 )
3601 self.req._debugging_user = regular_user
3602 self.req.fake_request_post_from_dict(multidict)
3604 self.req.add_get_params(
3605 {ViewParam.USER_ID: str(self.system_user.id)}, set_method_get=False
3606 )
3608 view = ChangeOtherPasswordView(self.req)
3609 with self.assertRaises(HTTPBadRequest) as cm:
3610 view.dispatch()
3612 self.assertIn("Nobody may edit the system user", cm.exception.message)
3614 def test_password_set(self) -> None:
3615 groupadmin = UserFactory(username="groupadmin")
3616 regular_user = UserFactory(username="regular_user")
3617 group = GroupFactory()
3618 UserGroupMembershipFactory(
3619 user_id=groupadmin.id, group_id=group.id, groupadmin=True
3620 )
3621 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id)
3623 self.assertFalse(regular_user.must_change_password)
3625 multidict = MultiDict(
3626 [
3627 ("__start__", "new_password:mapping"),
3628 (ViewParam.NEW_PASSWORD, "monkeybusiness"),
3629 ("new_password-confirm", "monkeybusiness"),
3630 ("__end__", "new_password:mapping"),
3631 (FormAction.SUBMIT, "submit"),
3632 ]
3633 )
3634 self.req._debugging_user = groupadmin
3635 self.req.fake_request_post_from_dict(multidict)
3637 self.req.add_get_params(
3638 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3639 )
3641 view = ChangeOtherPasswordView(self.req)
3643 with mock.patch.object(
3644 regular_user, "set_password"
3645 ) as mock_set_password:
3646 with self.assertRaises(HTTPFound):
3647 view.dispatch()
3649 mock_set_password.assert_called_once_with(self.req, "monkeybusiness")
3650 self.assertFalse(regular_user.must_change_password)
3652 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
3653 self.assertTrue(len(messages) > 0)
3654 self.assertIn("Password changed for user 'regular_user'", messages[0])
3656 def test_user_forced_to_change_password(self) -> None:
3657 groupadmin = UserFactory(username="groupadmin")
3658 regular_user = UserFactory(username="regular_user")
3659 group = GroupFactory()
3660 UserGroupMembershipFactory(
3661 user_id=groupadmin.id, group_id=group.id, groupadmin=True
3662 )
3663 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id)
3664 multidict = MultiDict(
3665 [
3666 (ViewParam.MUST_CHANGE_PASSWORD, "true"),
3667 ("__start__", "new_password:mapping"),
3668 (ViewParam.NEW_PASSWORD, "monkeybusiness"),
3669 ("new_password-confirm", "monkeybusiness"),
3670 ("__end__", "new_password:mapping"),
3671 (FormAction.SUBMIT, "submit"),
3672 ]
3673 )
3674 self.req._debugging_user = groupadmin
3675 self.req.fake_request_post_from_dict(multidict)
3677 self.req.add_get_params(
3678 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3679 )
3681 view = ChangeOtherPasswordView(self.req)
3683 with mock.patch.object(
3684 regular_user, "force_password_change"
3685 ) as mock_force_change:
3686 with self.assertRaises(HTTPFound):
3687 view.dispatch()
3689 mock_force_change.assert_called_once()
3691 def test_redirects_if_editing_own_account(self) -> None:
3692 superuser = UserFactory(username="admin", superuser=True)
3693 self.req._debugging_user = superuser
3694 self.req.add_get_params(
3695 {ViewParam.USER_ID: str(superuser.id)}, set_method_get=False
3696 )
3698 view = ChangeOtherPasswordView(self.req)
3699 with self.assertRaises(HTTPFound) as cm:
3700 view.dispatch()
3702 self.assertEqual(cm.exception.status_code, 302)
3703 self.assertIn(
3704 f"/{Routes.CHANGE_OWN_PASSWORD}", cm.exception.headers["Location"]
3705 )
3707 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
3708 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
3709 def test_user_sees_otp_form_if_mfa_setup(
3710 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
3711 ) -> None:
3712 superuser = UserFactory(
3713 username="admin",
3714 superuser=True,
3715 email="admin@example.com",
3716 mfa_method=MfaMethod.HOTP_EMAIL,
3717 mfa_secret_key=pyotp.random_base32(),
3718 hotp_counter=0,
3719 )
3721 user = UserFactory(username="user")
3722 self.req._debugging_user = superuser
3723 self.req.add_get_params(
3724 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3725 )
3727 view = ChangeOtherPasswordView(self.req)
3729 with mock.patch.object(view, "render_to_response") as mock_render:
3730 view.dispatch()
3732 args, kwargs = mock_render.call_args
3733 context = args[0]
3735 self.assertIn("form", context)
3736 self.assertIn("Enter the six-digit code", context["form"])
3738 def test_code_sent_if_mfa_setup(self) -> None:
3739 self.req.config.sms_backend = get_sms_backend(
3740 SmsBackendNames.CONSOLE, {}
3741 )
3743 phone_number_str = Fake.en_gb.valid_phone_number()
3744 superuser = UserFactory(
3745 username="admin",
3746 superuser=True,
3747 email="admin@example.com",
3748 phone_number=phonenumbers.parse(phone_number_str),
3749 mfa_secret_key=pyotp.random_base32(),
3750 mfa_method=MfaMethod.HOTP_SMS,
3751 hotp_counter=0,
3752 )
3753 user = UserFactory(username="user", email="user@example.com")
3754 self.req._debugging_user = superuser
3755 self.req.add_get_params(
3756 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3757 )
3759 view = ChangeOtherPasswordView(self.req)
3760 with self.assertLogs(level=logging.INFO) as logging_cm:
3761 view.dispatch()
3763 expected_code = pyotp.HOTP(superuser.mfa_secret_key).at(1)
3764 expected_message = f"Your CamCOPS verification code is {expected_code}"
3766 self.assertIn(
3767 ConsoleSmsBackend.make_msg(phone_number_str, expected_message),
3768 logging_cm.output[0],
3769 )
3771 def test_user_can_enter_token(self) -> None:
3772 superuser = UserFactory(
3773 username="admin",
3774 superuser=True,
3775 mfa_method=MfaMethod.HOTP_EMAIL,
3776 mfa_secret_key=pyotp.random_base32(),
3777 email="user@example.com",
3778 hotp_counter=1,
3779 )
3780 user = UserFactory(username="user", email="user@example.com")
3781 self.req._debugging_user = superuser
3782 self.req.add_get_params(
3783 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3784 )
3786 hotp = pyotp.HOTP(superuser.mfa_secret_key)
3787 multidict = MultiDict(
3788 [
3789 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)),
3790 (FormAction.SUBMIT, "submit"),
3791 ]
3792 )
3793 self.req.fake_request_post_from_dict(multidict)
3795 view = ChangeOtherPasswordView(self.req)
3797 response = view.dispatch()
3799 self.assertEqual(
3800 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP],
3801 ChangeOtherPasswordView.STEP_CHANGE_PASSWORD,
3802 )
3803 self.assertIn("Change password for user:", response.body.decode(UTF8))
3804 self.assertIn(
3805 "Type the new password and confirm it", response.body.decode(UTF8)
3806 )
3808 def test_form_state_cleared_on_invalid_token(self) -> None:
3809 superuser = UserFactory(
3810 username="superuser",
3811 superuser=True,
3812 mfa_method=MfaMethod.HOTP_EMAIL,
3813 mfa_secret_key=pyotp.random_base32(),
3814 email="user@example.com",
3815 hotp_counter=1,
3816 )
3817 user = UserFactory(username="user", email="user@example.com")
3818 self.req._debugging_user = superuser
3819 self.req.add_get_params(
3820 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3821 )
3823 hotp = pyotp.HOTP(superuser.mfa_secret_key)
3824 multidict = MultiDict(
3825 [
3826 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)),
3827 (FormAction.SUBMIT, "submit"),
3828 ]
3829 )
3830 self.req.fake_request_post_from_dict(multidict)
3832 view = ChangeOtherPasswordView(self.req)
3834 with self.assertRaises(HTTPFound):
3835 view.dispatch()
3837 messages = self.req.session.peek_flash(FlashQueue.DANGER)
3838 self.assertTrue(len(messages) > 0)
3839 self.assertIn("You entered an invalid code", messages[0])
3841 self.assert_state_is_clean()
3843 def test_cannot_change_password_if_timed_out(self) -> None:
3844 self.req.config.mfa_timeout_s = 600
3845 superuser = UserFactory(
3846 username="admin",
3847 superuser=True,
3848 mfa_method=MfaMethod.TOTP,
3849 mfa_secret_key=pyotp.random_base32(),
3850 )
3851 user = UserFactory(username="user")
3852 self.req._debugging_user = superuser
3853 self.req.add_get_params(
3854 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3855 )
3857 totp = pyotp.TOTP(superuser.mfa_secret_key)
3858 multidict = MultiDict(
3859 [
3860 (ViewParam.ONE_TIME_PASSWORD, totp.now()),
3861 (FormAction.SUBMIT, "submit"),
3862 ]
3863 )
3864 self.req.fake_request_post_from_dict(multidict)
3866 view = ChangeOtherPasswordView(self.req)
3867 view.state.update(
3868 mfa_user=superuser.id,
3869 mfa_time=int(time.time() - 601),
3870 step=MfaMixin.STEP_MFA,
3871 )
3873 with mock.patch.object(
3874 view, "fail_timed_out", side_effect=HTTPFound
3875 ) as mock_fail_timed_out:
3876 with self.assertRaises(HTTPFound):
3877 view.dispatch()
3879 mock_fail_timed_out.assert_called_once()
3882class EditOtherUserMfaViewTests(TestStateMixin, BasicDatabaseTestCase):
3883 def setUp(self) -> None:
3884 super().setUp()
3886 self.req.matched_route.name = "edit_other_user_mfa"
3888 def test_raises_for_invalid_user(self) -> None:
3889 multidict = MultiDict([(FormAction.SUBMIT, "submit")])
3890 self.req.fake_request_post_from_dict(multidict)
3892 self.req.add_get_params(
3893 {ViewParam.USER_ID: "123"}, set_method_get=False
3894 )
3896 view = EditOtherUserMfaView(self.req)
3897 with self.assertRaises(HTTPBadRequest) as cm:
3898 view.dispatch()
3900 self.assertIn("Cannot find User with id:123", cm.exception.message)
3902 def test_raises_when_user_may_not_edit_other_user(self) -> None:
3903 regular_user = UserFactory(username="regular_user")
3904 multidict = MultiDict([(FormAction.SUBMIT, "submit")])
3905 self.req._debugging_user = regular_user
3906 self.req.fake_request_post_from_dict(multidict)
3908 self.req.add_get_params(
3909 {ViewParam.USER_ID: str(self.system_user.id)}, set_method_get=False
3910 )
3912 view = EditOtherUserMfaView(self.req)
3913 with self.assertRaises(HTTPBadRequest) as cm:
3914 view.dispatch()
3916 self.assertIn("Nobody may edit the system user", cm.exception.message)
3918 def test_disable_mfa(self) -> None:
3919 groupadmin = UserFactory(username="groupadmin")
3920 regular_user = UserFactory(
3921 username="regular_user", mfa_method=MfaMethod.TOTP
3922 )
3923 group = GroupFactory()
3924 UserGroupMembershipFactory(
3925 user_id=groupadmin.id, group_id=group.id, groupadmin=True
3926 )
3927 UserGroupMembershipFactory(user_id=regular_user.id, group_id=group.id)
3928 self.assertFalse(regular_user.must_change_password)
3930 multidict = MultiDict(
3931 [(ViewParam.DISABLE_MFA, "true"), (FormAction.SUBMIT, "submit")]
3932 )
3933 self.req._debugging_user = groupadmin
3934 self.req.fake_request_post_from_dict(multidict)
3936 self.req.add_get_params(
3937 {ViewParam.USER_ID: str(regular_user.id)}, set_method_get=False
3938 )
3940 view = EditOtherUserMfaView(self.req)
3941 with self.assertRaises(HTTPFound):
3942 view.dispatch()
3944 self.assertEqual(regular_user.mfa_method, MfaMethod.NO_MFA)
3946 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
3947 self.assertTrue(len(messages) > 0)
3948 self.assertIn(
3949 "Multi-factor authentication disabled for user 'regular_user'",
3950 messages[0],
3951 )
3953 def test_redirects_if_editing_own_account(self) -> None:
3954 superuser = UserFactory(username="admin", superuser=True)
3955 self.req._debugging_user = superuser
3956 self.req.add_get_params({ViewParam.USER_ID: str(superuser.id)})
3958 view = EditOtherUserMfaView(self.req)
3959 with self.assertRaises(HTTPFound) as cm:
3960 view.dispatch()
3962 self.assertEqual(cm.exception.status_code, 302)
3963 self.assertIn(
3964 f"/{Routes.EDIT_OWN_USER_MFA}", cm.exception.headers["Location"]
3965 )
3967 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
3968 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
3969 def test_user_sees_otp_form_if_mfa_setup(
3970 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
3971 ) -> None:
3972 superuser = UserFactory(
3973 username="admin",
3974 superuser=True,
3975 email="admin@example.com",
3976 mfa_method=MfaMethod.HOTP_EMAIL,
3977 mfa_secret_key=pyotp.random_base32(),
3978 hotp_counter=0,
3979 )
3981 user = UserFactory(username="user")
3982 self.req._debugging_user = superuser
3983 self.req.add_get_params(
3984 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
3985 )
3987 view = EditOtherUserMfaView(self.req)
3989 with mock.patch.object(view, "render_to_response") as mock_render:
3990 view.dispatch()
3992 args, kwargs = mock_render.call_args
3993 context = args[0]
3995 self.assertIn("form", context)
3996 self.assertIn("Enter the six-digit code", context["form"])
3998 def test_code_sent_if_mfa_setup(self) -> None:
3999 self.req.config.sms_backend = get_sms_backend(
4000 SmsBackendNames.CONSOLE, {}
4001 )
4003 phone_number_str = Fake.en_gb.valid_phone_number()
4004 superuser = UserFactory(
4005 username="admin",
4006 superuser=True,
4007 email="admin@example.com",
4008 phone_number=phonenumbers.parse(phone_number_str),
4009 mfa_secret_key=pyotp.random_base32(),
4010 mfa_method=MfaMethod.HOTP_SMS,
4011 hotp_counter=0,
4012 )
4013 user = UserFactory(username="user", email="user@example.com")
4014 self.req._debugging_user = superuser
4015 self.req.add_get_params(
4016 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
4017 )
4019 view = EditOtherUserMfaView(self.req)
4020 with self.assertLogs(level=logging.INFO) as logging_cm:
4021 view.dispatch()
4023 expected_code = pyotp.HOTP(superuser.mfa_secret_key).at(1)
4024 expected_message = f"Your CamCOPS verification code is {expected_code}"
4026 self.assertIn(
4027 ConsoleSmsBackend.make_msg(phone_number_str, expected_message),
4028 logging_cm.output[0],
4029 )
4031 def test_user_can_enter_token(self) -> None:
4032 superuser = UserFactory(
4033 username="admin",
4034 superuser=True,
4035 mfa_method=MfaMethod.HOTP_EMAIL,
4036 mfa_secret_key=pyotp.random_base32(),
4037 email="user@example.com",
4038 hotp_counter=1,
4039 )
4040 user = UserFactory(username="user", email="user@example.com")
4041 self.req._debugging_user = superuser
4042 self.req.add_get_params(
4043 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
4044 )
4046 hotp = pyotp.HOTP(superuser.mfa_secret_key)
4047 multidict = MultiDict(
4048 [
4049 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)),
4050 (FormAction.SUBMIT, "submit"),
4051 ]
4052 )
4053 self.req.fake_request_post_from_dict(multidict)
4055 view = EditOtherUserMfaView(self.req)
4057 response = view.dispatch()
4059 self.assertEqual(
4060 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP],
4061 "other_user_mfa",
4062 )
4063 self.assertIn(
4064 "Edit multi-factor authentication for user:",
4065 response.body.decode(UTF8),
4066 )
4068 def test_form_state_cleared_on_invalid_token(self) -> None:
4069 superuser = UserFactory(
4070 username="superuser",
4071 superuser=True,
4072 mfa_method=MfaMethod.HOTP_EMAIL,
4073 mfa_secret_key=pyotp.random_base32(),
4074 email="user@example.com",
4075 hotp_counter=1,
4076 )
4077 user = UserFactory(username="user", email="user@example.com")
4078 self.req._debugging_user = superuser
4079 self.req.add_get_params(
4080 {ViewParam.USER_ID: str(user.id)}, set_method_get=False
4081 )
4083 hotp = pyotp.HOTP(superuser.mfa_secret_key)
4084 multidict = MultiDict(
4085 [
4086 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)),
4087 (FormAction.SUBMIT, "submit"),
4088 ]
4089 )
4090 self.req.fake_request_post_from_dict(multidict)
4092 view = EditOtherUserMfaView(self.req)
4094 with self.assertRaises(HTTPFound):
4095 view.dispatch()
4097 messages = self.req.session.peek_flash(FlashQueue.DANGER)
4098 self.assertTrue(len(messages) > 0)
4099 self.assertIn("You entered an invalid code", messages[0])
4101 self.assert_state_is_clean()
4104class EditUserGroupMembershipViewTests(DemoRequestTestCase):
4105 def test_superuser_can_update_user_group_membership(self) -> None:
4106 regular_user = UserFactory()
4107 groupadmin = UserFactory()
4108 group = GroupFactory()
4110 UserGroupMembershipFactory(
4111 user_id=groupadmin.id,
4112 group_id=group.id,
4113 groupadmin=True,
4114 )
4116 ugm = UserGroupMembershipFactory(
4117 user_id=regular_user.id, group_id=group.id
4118 )
4120 self.req._debugging_user = groupadmin
4122 self.assertFalse(ugm.may_upload)
4123 self.assertFalse(ugm.may_register_devices)
4124 self.assertFalse(ugm.may_use_webviewer)
4125 self.assertFalse(ugm.view_all_patients_when_unfiltered)
4126 self.assertFalse(ugm.may_dump_data)
4127 self.assertFalse(ugm.may_run_reports)
4128 self.assertFalse(ugm.may_add_notes)
4129 self.assertFalse(ugm.may_manage_patients)
4130 self.assertFalse(ugm.may_email_patients)
4131 self.assertFalse(ugm.groupadmin)
4133 multidict = MultiDict(
4134 [
4135 (ViewParam.MAY_UPLOAD, "true"),
4136 (ViewParam.MAY_REGISTER_DEVICES, "true"),
4137 (ViewParam.MAY_USE_WEBVIEWER, "true"),
4138 (ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED, "true"),
4139 (ViewParam.MAY_DUMP_DATA, "true"),
4140 (ViewParam.MAY_RUN_REPORTS, "true"),
4141 (ViewParam.MAY_ADD_NOTES, "true"),
4142 (ViewParam.MAY_MANAGE_PATIENTS, "true"),
4143 (ViewParam.MAY_EMAIL_PATIENTS, "true"),
4144 (ViewParam.GROUPADMIN, "true"),
4145 (FormAction.SUBMIT, "submit"),
4146 ]
4147 )
4149 self.req.fake_request_post_from_dict(multidict)
4150 self.req.add_get_params(
4151 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)},
4152 set_method_get=False,
4153 )
4155 with self.assertRaises(HTTPFound):
4156 edit_user_group_membership(self.req)
4158 self.assertTrue(ugm.may_upload)
4159 self.assertTrue(ugm.may_register_devices)
4160 self.assertTrue(ugm.may_use_webviewer)
4161 self.assertTrue(ugm.view_all_patients_when_unfiltered)
4162 self.assertTrue(ugm.may_dump_data)
4163 self.assertTrue(ugm.may_run_reports)
4164 self.assertTrue(ugm.may_add_notes)
4165 self.assertTrue(ugm.may_manage_patients)
4166 self.assertTrue(ugm.may_email_patients)
4168 def test_groupadmin_can_update_user_group_membership(self) -> None:
4169 regular_user = UserFactory()
4170 groupadmin = UserFactory()
4171 group = GroupFactory()
4173 UserGroupMembershipFactory(
4174 user_id=groupadmin.id,
4175 group_id=group.id,
4176 groupadmin=True,
4177 )
4179 ugm = UserGroupMembershipFactory(
4180 user_id=regular_user.id, group_id=group.id
4181 )
4183 self.req._debugging_user = groupadmin
4185 self.assertFalse(ugm.may_upload)
4186 self.assertFalse(ugm.may_register_devices)
4187 self.assertFalse(ugm.may_use_webviewer)
4188 self.assertFalse(ugm.view_all_patients_when_unfiltered)
4189 self.assertFalse(ugm.may_dump_data)
4190 self.assertFalse(ugm.may_run_reports)
4191 self.assertFalse(ugm.may_add_notes)
4192 self.assertFalse(ugm.may_manage_patients)
4193 self.assertFalse(ugm.may_email_patients)
4195 multidict = MultiDict(
4196 [
4197 (ViewParam.MAY_UPLOAD, "true"),
4198 (ViewParam.MAY_REGISTER_DEVICES, "true"),
4199 (ViewParam.MAY_USE_WEBVIEWER, "true"),
4200 (ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED, "true"),
4201 (ViewParam.MAY_DUMP_DATA, "true"),
4202 (ViewParam.MAY_RUN_REPORTS, "true"),
4203 (ViewParam.MAY_ADD_NOTES, "true"),
4204 (ViewParam.MAY_MANAGE_PATIENTS, "true"),
4205 (ViewParam.MAY_EMAIL_PATIENTS, "true"),
4206 (FormAction.SUBMIT, "submit"),
4207 ]
4208 )
4210 self.req.fake_request_post_from_dict(multidict)
4211 self.req.add_get_params(
4212 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)},
4213 set_method_get=False,
4214 )
4216 with self.assertRaises(HTTPFound):
4217 edit_user_group_membership(self.req)
4219 self.assertTrue(ugm.may_upload)
4220 self.assertTrue(ugm.may_register_devices)
4221 self.assertTrue(ugm.may_use_webviewer)
4222 self.assertTrue(ugm.view_all_patients_when_unfiltered)
4223 self.assertTrue(ugm.may_dump_data)
4224 self.assertTrue(ugm.may_run_reports)
4225 self.assertTrue(ugm.may_add_notes)
4226 self.assertTrue(ugm.may_manage_patients)
4227 self.assertTrue(ugm.may_email_patients)
4229 def test_raises_if_cant_edit_user(self) -> None:
4230 system_user = User.get_system_user(self.dbsession)
4231 groupadmin = UserFactory()
4232 group = GroupFactory()
4234 UserGroupMembershipFactory(
4235 user_id=groupadmin.id,
4236 group_id=group.id,
4237 groupadmin=True,
4238 )
4240 system_ugm = UserGroupMembershipFactory(
4241 user_id=system_user.id, group_id=group.id
4242 )
4244 self.req._debugging_user = groupadmin
4246 multidict = MultiDict([(FormAction.SUBMIT, "submit")])
4248 self.req.fake_request_post_from_dict(multidict)
4249 self.req.add_get_params(
4250 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(system_ugm.id)},
4251 set_method_get=False,
4252 )
4254 with self.assertRaises(HTTPBadRequest) as cm:
4255 edit_user_group_membership(self.req)
4257 self.assertIn("Nobody may edit the system user", cm.exception.message)
4259 def test_raises_if_cant_administer_group(self) -> None:
4260 group_a = GroupFactory()
4261 group_b = GroupFactory()
4263 user1 = UserFactory()
4264 user2 = UserFactory()
4266 # User 1 is a group administrator for group A,
4267 # User 2 is a member if group A
4268 UserGroupMembershipFactory(
4269 user_id=user1.id, group_id=group_a.id, groupadmin=True
4270 )
4271 UserGroupMembershipFactory(user_id=user2.id, group_id=group_a.id),
4273 # User 1 is not an administrator of group B
4274 # User 2 is a member of group B
4275 ugm = UserGroupMembershipFactory(user_id=user2.id, group_id=group_b.id)
4277 multidict = MultiDict([(FormAction.SUBMIT, "submit")])
4279 self.req.fake_request_post_from_dict(multidict)
4280 self.req.add_get_params(
4281 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)},
4282 set_method_get=False,
4283 )
4285 self.req._debugging_user = user1
4287 with self.assertRaises(HTTPBadRequest) as cm:
4288 edit_user_group_membership(self.req)
4290 self.assertIn(
4291 "You may not administer this group", cm.exception.message
4292 )
4294 def test_cancel_returns_to_users_list(self) -> None:
4295 regular_user = UserFactory()
4296 groupadmin = UserFactory()
4297 group = GroupFactory()
4299 UserGroupMembershipFactory(
4300 user_id=groupadmin.id,
4301 group_id=group.id,
4302 groupadmin=True,
4303 )
4305 ugm = UserGroupMembershipFactory(
4306 user_id=regular_user.id, group_id=group.id
4307 )
4309 self.req._debugging_user = groupadmin
4310 multidict = MultiDict([(FormAction.CANCEL, "cancel")])
4312 self.req.fake_request_post_from_dict(multidict)
4313 self.req.add_get_params(
4314 {ViewParam.USER_GROUP_MEMBERSHIP_ID: str(ugm.id)},
4315 set_method_get=False,
4316 )
4318 with self.assertRaises(HTTPFound) as cm:
4319 edit_user_group_membership(self.req)
4321 self.assertEqual(cm.exception.status_code, 302)
4323 self.assertIn(Routes.VIEW_ALL_USERS, cm.exception.headers["Location"])
4326class ChangeOwnPasswordViewTests(TestStateMixin, BasicDatabaseTestCase):
4327 def setUp(self) -> None:
4328 super().setUp()
4330 self.req.matched_route.name = "change_own_password"
4332 def test_user_can_change_password(self) -> None:
4333 new_password = "monkeybusiness"
4335 user = UserFactory(
4336 username="user",
4337 mfa_method=MfaMethod.NO_MFA,
4338 password__request=self.req,
4339 password="secret",
4340 )
4341 multidict = MultiDict(
4342 [
4343 (ViewParam.OLD_PASSWORD, "secret"),
4344 ("__start__", "new_password:mapping"),
4345 (ViewParam.NEW_PASSWORD, new_password),
4346 ("new_password-confirm", new_password),
4347 ("__end__", "new_password-mapping"),
4348 (FormAction.SUBMIT, "submit"),
4349 ]
4350 )
4352 self.req.fake_request_post_from_dict(multidict)
4353 self.req._debugging_user = user
4355 with mock.patch.object(user, "set_password") as mock_set_password:
4356 with self.assertRaises(HTTPFound):
4357 change_own_password(self.req)
4359 mock_set_password.assert_called_once_with(self.req, new_password)
4361 messages = self.req.session.peek_flash(FlashQueue.SUCCESS)
4362 self.assertTrue(len(messages) > 0)
4363 self.assertIn("You have changed your password", messages[0])
4364 self.assert_state_is_finished()
4366 def test_user_sees_expiry_message(self) -> None:
4367 user = UserFactory(
4368 username="user",
4369 mfa_method=MfaMethod.NO_MFA,
4370 must_change_password=True,
4371 )
4372 self.req._debugging_user = user
4374 with mock.patch.object(self.req.session, "flash") as mock_flash:
4375 change_own_password(self.req)
4377 args, kwargs = mock_flash.call_args
4378 self.assertIn("Your password has expired", args[0])
4379 self.assertEqual(kwargs["queue"], FlashQueue.DANGER)
4381 def test_password_must_differ(self) -> None:
4382 view = ChangeOwnPasswordView(self.req)
4384 form_kwargs = view.get_form_kwargs()
4385 self.assertIn("must_differ", form_kwargs)
4386 self.assertTrue(form_kwargs["must_differ"])
4388 @mock.patch("camcops_server.cc_modules.cc_email.send_msg")
4389 @mock.patch("camcops_server.cc_modules.cc_email.make_email")
4390 def test_user_sees_otp_form_if_mfa_setup(
4391 self, mock_make_email: mock.Mock, mock_send_msg: mock.Mock
4392 ) -> None:
4393 user = UserFactory(
4394 username="user",
4395 email="user@example.com",
4396 mfa_method=MfaMethod.HOTP_EMAIL,
4397 mfa_secret_key=pyotp.random_base32(),
4398 hotp_counter=0,
4399 )
4400 self.req._debugging_user = user
4402 view = ChangeOwnPasswordView(self.req)
4404 with mock.patch.object(view, "render_to_response") as mock_render:
4405 view.dispatch()
4407 args, kwargs = mock_render.call_args
4408 context = args[0]
4410 self.assertIn("form", context)
4411 self.assertIn("Enter the six-digit code", context["form"])
4413 def test_code_sent_if_mfa_setup(self) -> None:
4414 self.req.config.sms_backend = get_sms_backend(
4415 SmsBackendNames.CONSOLE, {}
4416 )
4417 phone_number_str = Fake.en_gb.valid_phone_number()
4418 user = UserFactory(
4419 username="user",
4420 email="user@example.com",
4421 phone_number=phonenumbers.parse(phone_number_str),
4422 mfa_secret_key=pyotp.random_base32(),
4423 mfa_method=MfaMethod.HOTP_SMS,
4424 hotp_counter=0,
4425 )
4427 self.req._debugging_user = user
4428 view = ChangeOwnPasswordView(self.req)
4429 with self.assertLogs(level=logging.INFO) as logging_cm:
4430 view.dispatch()
4432 expected_code = pyotp.HOTP(user.mfa_secret_key).at(1)
4433 expected_message = f"Your CamCOPS verification code is {expected_code}"
4435 self.assertIn(
4436 ConsoleSmsBackend.make_msg(phone_number_str, expected_message),
4437 logging_cm.output[0],
4438 )
4440 def test_user_can_enter_token(self) -> None:
4441 user = UserFactory(
4442 username="user",
4443 mfa_method=MfaMethod.HOTP_EMAIL,
4444 mfa_secret_key=pyotp.random_base32(),
4445 email="user@example.com",
4446 hotp_counter=1,
4447 password__request=self.req,
4448 password="secret",
4449 )
4450 self.req._debugging_user = user
4452 hotp = pyotp.HOTP(user.mfa_secret_key)
4453 multidict = MultiDict(
4454 [
4455 (ViewParam.ONE_TIME_PASSWORD, hotp.at(1)), # the token
4456 (FormAction.SUBMIT, "submit"),
4457 ]
4458 )
4459 self.req.fake_request_post_from_dict(multidict)
4461 view = ChangeOwnPasswordView(self.req)
4463 response = view.dispatch()
4465 self.assertEqual(
4466 self.req.camcops_session.form_state[FormWizardMixin.PARAM_STEP],
4467 ChangeOwnPasswordView.STEP_CHANGE_PASSWORD,
4468 )
4469 self.assertIn("Change your password", response.body.decode(UTF8))
4470 self.assertIn(
4471 "Type the new password and confirm it", response.body.decode(UTF8)
4472 )
4474 def test_form_state_cleared_on_invalid_token(self) -> None:
4475 user = UserFactory(
4476 username="user",
4477 mfa_method=MfaMethod.HOTP_EMAIL,
4478 mfa_secret_key=pyotp.random_base32(),
4479 email="user@example.com",
4480 hotp_counter=1,
4481 password__request=self.req,
4482 password="secret",
4483 )
4484 self.req._debugging_user = user
4486 hotp = pyotp.HOTP(user.mfa_secret_key)
4487 multidict = MultiDict(
4488 [
4489 (ViewParam.ONE_TIME_PASSWORD, hotp.at(2)),
4490 (FormAction.SUBMIT, "submit"),
4491 ]
4492 )
4493 self.req.fake_request_post_from_dict(multidict)
4495 view = ChangeOwnPasswordView(self.req)
4497 with self.assertRaises(HTTPFound):
4498 view.dispatch()
4500 messages = self.req.session.peek_flash(FlashQueue.DANGER)
4501 self.assertTrue(len(messages) > 0)
4502 self.assertIn("You entered an invalid code", messages[0])
4504 self.assert_state_is_clean()
4506 def test_cannot_change_password_if_timed_out(self) -> None:
4507 self.req.config.mfa_timeout_s = 600
4508 user = UserFactory(
4509 username="user",
4510 mfa_method=MfaMethod.TOTP,
4511 mfa_secret_key=pyotp.random_base32(),
4512 password__request=self.req,
4513 password="secret",
4514 )
4515 self.req._debugging_user = user
4517 totp = pyotp.TOTP(user.mfa_secret_key)
4518 multidict = MultiDict(
4519 [
4520 (ViewParam.ONE_TIME_PASSWORD, totp.now()),
4521 (FormAction.SUBMIT, "submit"),
4522 ]
4523 )
4524 self.req.fake_request_post_from_dict(multidict)
4526 view = ChangeOwnPasswordView(self.req)
4527 view.state.update(
4528 mfa_user=user.id,
4529 mfa_time=int(time.time() - 601),
4530 step=MfaMixin.STEP_MFA,
4531 )
4533 with mock.patch.object(
4534 view, "fail_timed_out", side_effect=HTTPFound
4535 ) as mock_fail_timed_out:
4536 with self.assertRaises(HTTPFound):
4537 view.dispatch()
4539 mock_fail_timed_out.assert_called_once()
4542class AddUserTests(DemoRequestTestCase):
4543 def setUp(self) -> None:
4544 super().setUp()
4546 self.groupadmin = self.req._debugging_user = UserFactory()
4548 def test_user_created(self) -> None:
4549 group_1 = GroupFactory()
4550 group_2 = GroupFactory()
4552 UserGroupMembershipFactory(
4553 user_id=self.groupadmin.id, group_id=group_1.id, groupadmin=True
4554 )
4555 UserGroupMembershipFactory(
4556 user_id=self.groupadmin.id, group_id=group_2.id, groupadmin=True
4557 )
4559 multidict = MultiDict(
4560 [
4561 ("_charset_", UTF8),
4562 ("__formid__", "deform"),
4563 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
4564 (ViewParam.USERNAME, "test"),
4565 ("__start__", "new_password:mapping"),
4566 (ViewParam.NEW_PASSWORD, "monkeybusiness"),
4567 ("new_password-confirm", "monkeybusiness"),
4568 ("__end__", "new_password:mapping"),
4569 (ViewParam.MUST_CHANGE_PASSWORD, "true"),
4570 ("__start__", "group_ids:sequence"),
4571 ("group_id_sequence", str(group_1.id)),
4572 ("group_id_sequence", str(group_2.id)),
4573 ("__end__", "group_ids:sequence"),
4574 (FormAction.SUBMIT, "submit"),
4575 ]
4576 )
4577 self.req.fake_request_post_from_dict(multidict)
4579 with self.assertRaises(HTTPFound):
4580 add_user(self.req)
4582 user = (
4583 self.dbsession.query(User)
4584 .filter(
4585 User.username == "test",
4586 )
4587 .one_or_none()
4588 )
4590 self.assertIsNotNone(user)
4592 self.assertTrue(user.must_change_password)
4593 self.assertIn(group_1.id, user.group_ids)
4594 self.assertIn(group_2.id, user.group_ids)
4597class ForciblyFinalizeTests(BasicDatabaseTestCase):
4598 def setUp(self) -> None:
4599 super().setUp()
4601 self.req._debugging_user = self.groupadmin
4603 def test_cancel_returns_to_home(self) -> None:
4604 multidict = MultiDict([(FormAction.CANCEL, "cancel")])
4605 self.req.fake_request_post_from_dict(multidict)
4607 exc = forcibly_finalize(self.req)
4608 self.assertIsInstance(exc, HTTPFound)
4609 self.assertEqual(exc.status_code, 302)
4610 self.assertEqual(urlparse(exc.headers["Location"]).path, "/")
4612 def test_renders_first_form_on_get(self) -> None:
4613 mock_render = mock.Mock()
4614 with mock.patch.multiple(
4615 "camcops_server.cc_modules.webview", render_to_response=mock_render
4616 ):
4617 forcibly_finalize(self.req)
4619 args, kwargs = mock_render.call_args
4620 context = args[1]
4622 self.assertIn("form", context)
4623 self.assertIn("<select", context["form"])
4625 def test_renders_confirm_form_on_submit(self) -> None:
4626 device = DeviceFactory()
4628 multidict = MultiDict(
4629 [
4630 ("_charset_", UTF8),
4631 ("__formid__", "deform"),
4632 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
4633 (ViewParam.DEVICE_ID, device.id),
4634 ("__start__", "danger:mapping"),
4635 ("target", "7176"),
4636 ("user_entry", "7176"),
4637 ("__end__", "danger:mapping"),
4638 (FormAction.SUBMIT, "submit")
4639 ]
4640 )
4641 self.req.fake_request_post_from_dict(multidict)
4643 mock_render = mock.Mock()
4644 with mock.patch.multiple(
4645 "camcops_server.cc_modules.webview", render_to_response=mock_render
4646 ):
4647 forcibly_finalize(self.req)
4649 args, kwargs = mock_render.call_args
4650 context = args[1]
4652 self.assertIn("form", context)
4653 self.assertIn("Forcibly finalize", context["form"])
4655 def test_finalizes_on_submit(self) -> None:
4656 device = DeviceFactory()
4657 patient = PatientFactory(_device=device, _group=self.group)
4658 bmis = BmiFactory.create_batch(3, patient=patient, _era=ERA_NOW)
4660 multidict = MultiDict(
4661 [
4662 ("_charset_", UTF8),
4663 ("__formid__", "deform"),
4664 (ViewParam.CSRF_TOKEN, self.req.session.get_csrf_token()),
4665 (ViewParam.DEVICE_ID, device.id),
4666 ("confirm_1_t", "true"),
4667 ("confirm_2_t", "true"),
4668 ("confirm_4_t", "true"),
4669 ("__start__", "danger:mapping"),
4670 ("target", "7176"),
4671 ("user_entry", "7176"),
4672 ("__end__", "danger:mapping"),
4673 (FormAction.FINALIZE, "Forcibly finalize")
4674 ]
4675 )
4676 self.req.fake_request_post_from_dict(multidict)
4678 with self.assertRaises(HTTPFound) as cm:
4679 forcibly_finalize(self.req)
4681 self.assertEqual(cm.exception.status_code, 302)
4682 self.assertEqual(urlparse(cm.exception.headers["Location"]).path, "/")
4684 for bmi in bmis:
4685 self.assertEqual(bmi._preserving_user_id, self.groupadmin.id)
4686 self.assertTrue(bmi._forcibly_preserved)