Coverage for cc_modules/webview.py: 25%
2277 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/cc_modules/webview.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Implements the CamCOPS web front end.**
28Quick tutorial on Pyramid views:
30- The configurator registers routes, and routes have URLs associated with
31 them. Those URLs can be templatized, e.g. to accept numerical parameters.
32 The configurator associates view callables ("views" for short) with routes,
33 and one method for doing that is an automatic scan via Venusian for views
34 decorated with @view_config().
36- All views take a Request object and return a Response or raise an exception
37 that Pyramid will translate into a Response.
39- Having matched a route, Pyramid uses its "view lookup" process to choose
40 one from potentially several views. For example, a single route might be
41 associated with:
43 .. code-block:: python
45 @view_config(route_name="myroute")
46 def myroute_default(req: Request) -> Response:
47 pass
49 @view_config(route_name="myroute", request_method="POST")
50 def myroute_post(req: Request) -> Response:
51 pass
53 In this example, POST requests will go to the second; everything else will
54 go to the first. Pyramid's view lookup rule is essentially: if multiple
55 views match, choose the one with the most specifiers.
57- Specifiers include:
59 .. code-block:: none
61 route_name=ROUTENAME
63 the route
65 request_method="POST"
67 requires HTTP GET, POST, etc.
69 request_param="XXX"
71 ... requires the presence of a GET/POST variable with this name in
72 the request.params dictionary
74 request_param="XXX=YYY"
76 ... requires the presence of a GET/POST variable called XXX whose
77 value is YYY, in the request.params dictionary
79 match_param="XXX=YYY"
81 .. requires the presence of this key/value pair in
82 request.matchdict, which contains parameters from the URL
84 https://docs.pylonsproject.org/projects/pyramid/en/latest/api/config.html#pyramid.config.Configurator.add_view # noqa
86- Getting parameters
88 .. code-block:: none
90 request.params
92 ... parameters from HTTP GET or POST, including both the query
93 string (as in https://somewhere/path?key=value) and the body (e.g.
94 POST).
96 request.matchdict
98 ... parameters from the URL, via URL dispatch; see
99 https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/urldispatch.html#urldispatch-chapter # noqa
101- Regarding rendering:
103 There might be some simplicity benefits from converting to a template
104 system like Mako. On the downside, that would entail a bit more work;
105 likely a minor performance hit (relative to plain Python string rendering);
106 and a loss of type checking. The type checking is also why we prefer:
108 .. code-block:: python
110 html = " ... {param_blah} ...".format(param_blah=PARAM.BLAH)
112 to
114 .. code-block:: python
116 html = " ... {PARAM.BLAH} ...".format(PARAM=PARAM)
118 as in the first situation, PyCharm will check that "BLAH" is present in
119 "PARAM", and in the second it won't. Automatic checking is worth a lot.
121"""
123from collections import OrderedDict
124import json
125import logging
126import os
128# from pprint import pformat
129import time
130from typing import (
131 Any,
132 cast,
133 Dict,
134 List,
135 NoReturn,
136 Optional,
137 Tuple,
138 Type,
139 TYPE_CHECKING,
140)
142from cardinal_pythonlib.datetimefunc import format_datetime
143from cardinal_pythonlib.deform_utils import get_head_form_html
144from cardinal_pythonlib.httpconst import HttpMethod, MimeType
145from cardinal_pythonlib.logs import BraceStyleAdapter
146from cardinal_pythonlib.pyramid.responses import (
147 BinaryResponse,
148 JsonResponse,
149 PdfResponse,
150 XmlResponse,
151)
152from cardinal_pythonlib.sqlalchemy.dialect import (
153 get_dialect_name,
154 SqlaDialectName,
155)
156from cardinal_pythonlib.sizeformatter import bytes2human
157from cardinal_pythonlib.sqlalchemy.orm_inspect import gen_orm_classes_from_base
158from cardinal_pythonlib.sqlalchemy.orm_query import CountStarSpecializedQuery
159from cardinal_pythonlib.sqlalchemy.session import get_engine_from_session
160from deform.exception import ValidationFailure
161from pendulum import DateTime as Pendulum
162import pyotp
163from pyramid.httpexceptions import HTTPBadRequest, HTTPFound, HTTPNotFound
164from pyramid.view import (
165 forbidden_view_config,
166 notfound_view_config,
167 view_config,
168)
169from pyramid.renderers import render_to_response
170from pyramid.response import Response
171from pyramid.security import Authenticated, NO_PERMISSION_REQUIRED
172import pygments
173import pygments.lexers
174import pygments.lexers.sql
175import pygments.lexers.web
176import pygments.formatters
177from sqlalchemy.orm import joinedload, Query
178from sqlalchemy.sql.functions import func
179from sqlalchemy.sql.expression import desc, or_, select, update
181from camcops_server.cc_modules.cc_audit import audit, AuditEntry
182from camcops_server.cc_modules.cc_all_models import CLIENT_TABLE_MAP
183from camcops_server.cc_modules.cc_client_api_core import (
184 BatchDetails,
185 get_server_live_records,
186 UploadTableChanges,
187 values_preserve_now,
188)
189from camcops_server.cc_modules.cc_client_api_helpers import (
190 upload_commit_order_sorter,
191)
192from camcops_server.cc_modules.cc_constants import (
193 CAMCOPS_URL,
194 DateFormat,
195 ERA_NOW,
196 GITHUB_RELEASES_URL,
197 JSON_INDENT,
198 MfaMethod,
199)
200from camcops_server.cc_modules.cc_db import (
201 GenericTabletRecordMixin,
202 FN_DEVICE_ID,
203 FN_ERA,
204 FN_GROUP_ID,
205 FN_PK,
206)
207from camcops_server.cc_modules.cc_device import Device
208from camcops_server.cc_modules.cc_email import Email
209from camcops_server.cc_modules.cc_export import (
210 DownloadOptions,
211 make_exporter,
212 UserDownloadFile,
213)
214from camcops_server.cc_modules.cc_exportmodels import (
215 ExportedTask,
216 ExportedTaskEmail,
217 ExportedTaskFhir,
218 ExportedTaskFhirEntry,
219 ExportedTaskFileGroup,
220 ExportedTaskHL7Message,
221 ExportedTaskRedcap,
222)
223from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
224from camcops_server.cc_modules.cc_forms import (
225 AddGroupForm,
226 AddIdDefinitionForm,
227 AddSpecialNoteForm,
228 AddUserGroupadminForm,
229 AddUserSuperuserForm,
230 AuditTrailForm,
231 ChangeOtherPasswordForm,
232 ChangeOwnPasswordForm,
233 ChooseTrackerForm,
234 DEFORM_ACCORDION_BUG,
235 DEFAULT_ROWS_PER_PAGE,
236 DeleteGroupForm,
237 DeleteIdDefinitionForm,
238 DeletePatientChooseForm,
239 DeletePatientConfirmForm,
240 DeleteServerCreatedPatientForm,
241 DeleteSpecialNoteForm,
242 DeleteTaskScheduleForm,
243 DeleteTaskScheduleItemForm,
244 DeleteUserForm,
245 EDIT_PATIENT_SIMPLE_PARAMS,
246 EditFinalizedPatientForm,
247 EditGroupForm,
248 EditIdDefinitionForm,
249 EditOtherUserMfaForm,
250 EditServerCreatedPatientForm,
251 EditServerSettingsForm,
252 EditTaskFilterForm,
253 EditTaskScheduleForm,
254 EditTaskScheduleItemForm,
255 EditUserFullForm,
256 EditUserGroupAdminForm,
257 EditUserGroupMembershipGroupAdminForm,
258 EditUserGroupPermissionsFullForm,
259 EraseTaskForm,
260 ExportedTaskListForm,
261 ForciblyFinalizeChooseDeviceForm,
262 ForciblyFinalizeConfirmForm,
263 get_sql_dialect_choices,
264 LoginForm,
265 MfaHotpEmailForm,
266 MfaHotpSmsForm,
267 MfaMethodForm,
268 MfaTotpForm,
269 OfferBasicDumpForm,
270 OfferSqlDumpForm,
271 OfferTermsForm,
272 OtpTokenForm,
273 RefreshTasksForm,
274 SendEmailForm,
275 SetUserUploadGroupForm,
276 TasksPerPageForm,
277 UserDownloadDeleteForm,
278 UserFilterForm,
279 ViewDdlForm,
280)
281from camcops_server.cc_modules.cc_group import Group
282from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
283from camcops_server.cc_modules.cc_membership import UserGroupMembership
284from camcops_server.cc_modules.cc_patient import Patient
285from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
287# noinspection PyUnresolvedReferences
288import camcops_server.cc_modules.cc_plot # import side effects (configure matplotlib) # noqa
289from camcops_server.cc_modules.cc_pyramid import (
290 CamcopsPage,
291 FlashQueue,
292 FormAction,
293 HTTPFoundDebugVersion,
294 Icons,
295 PageUrl,
296 Permission,
297 Routes,
298 SqlalchemyOrmPage,
299 ViewArg,
300 ViewParam,
301)
302from camcops_server.cc_modules.cc_report import get_report_instance
303from camcops_server.cc_modules.cc_request import CamcopsRequest
304from camcops_server.cc_modules.cc_simpleobjects import (
305 IdNumReference,
306 TaskExportOptions,
307)
308from camcops_server.cc_modules.cc_specialnote import SpecialNote
309from camcops_server.cc_modules.cc_session import CamcopsSession
310from camcops_server.cc_modules.cc_sqlalchemy import get_all_ddl
311from camcops_server.cc_modules.cc_task import (
312 tablename_to_task_class_dict,
313 Task,
314)
315from camcops_server.cc_modules.cc_taskcollection import (
316 TaskFilter,
317 TaskCollection,
318 TaskSortMethod,
319)
320from camcops_server.cc_modules.cc_taskfactory import task_factory
321from camcops_server.cc_modules.cc_taskfilter import (
322 task_classes_from_table_names,
323 TaskClassSortMethod,
324)
325from camcops_server.cc_modules.cc_taskindex import (
326 PatientIdNumIndexEntry,
327 TaskIndexEntry,
328 update_indexes_and_push_exports,
329)
330from camcops_server.cc_modules.cc_taskschedule import (
331 PatientTaskSchedule,
332 PatientTaskScheduleEmail,
333 TaskSchedule,
334 TaskScheduleItem,
335 task_schedule_item_sort_order,
336)
337from camcops_server.cc_modules.cc_text import SS
338from camcops_server.cc_modules.cc_tracker import ClinicalTextView, Tracker
339from camcops_server.cc_modules.cc_user import (
340 SecurityAccountLockout,
341 SecurityLoginFailure,
342 User,
343)
344from camcops_server.cc_modules.cc_validators import (
345 validate_download_filename,
346 validate_export_recipient_name,
347 validate_ip_address,
348 validate_task_tablename,
349 validate_username,
350)
351from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION
352from camcops_server.cc_modules.cc_view_classes import (
353 CreateView,
354 DeleteView,
355 FormView,
356 FormWizardMixin,
357 UpdateView,
358)
360if TYPE_CHECKING:
361 # noinspection PyUnresolvedReferences
362 from deform.form import Form
364 # noinspection PyUnresolvedReferences
365 from camcops_server.cc_modules.cc_sqlalchemy import Base
367log = BraceStyleAdapter(logging.getLogger(__name__))
370# =============================================================================
371# Debugging options
372# =============================================================================
374DEBUG_REDIRECT = False
376if DEBUG_REDIRECT:
377 log.warning("Debugging options enabled!")
379if DEBUG_REDIRECT:
380 HTTPFound = HTTPFoundDebugVersion # noqa: F811
383# =============================================================================
384# Cache control, for the http_cache parameter of view_config etc.
385# =============================================================================
387NEVER_CACHE = 0
390# =============================================================================
391# Constants -- for Mako templates
392# =============================================================================
393# Keys that will be added to a context dictionary that is passed to a Mako
394# template. For example, a key of "title" can be rendered within the template
395# as ${title}. Some are used frequently, so we have them here as constants.
397MAKO_VAR_TITLE = "title"
398TEMPLATE_GENERIC_FORM = "generic_form.mako"
401# =============================================================================
402# Constants -- mutated into translated phrases
403# =============================================================================
406def errormsg_cannot_dump(req: "CamcopsRequest") -> str:
407 _ = req.gettext
408 return _("User not authorized to dump data (for any group).")
411def errormsg_cannot_report(req: "CamcopsRequest") -> str:
412 _ = req.gettext
413 return _("User not authorized to run reports (for any group).")
416def errormsg_task_live(req: "CamcopsRequest") -> str:
417 _ = req.gettext
418 return _("Task is live on tablet; finalize (or force-finalize) first.")
421# =============================================================================
422# Unused
423# =============================================================================
425# def query_result_html_core(req: "CamcopsRequest",
426# descriptions: Sequence[str],
427# rows: Sequence[Sequence[Any]],
428# null_html: str = "<i>NULL</i>") -> str:
429# return render("query_result_core.mako",
430# dict(descriptions=descriptions,
431# rows=rows,
432# null_html=null_html),
433# request=req)
436# def query_result_html_orm(req: "CamcopsRequest",
437# attrnames: List[str],
438# descriptions: List[str],
439# orm_objects: Sequence[Sequence[Any]],
440# null_html: str = "<i>NULL</i>") -> str:
441# return render("query_result_orm.mako",
442# dict(attrnames=attrnames,
443# descriptions=descriptions,
444# orm_objects=orm_objects,
445# null_html=null_html),
446# request=req)
449# =============================================================================
450# Error views
451# =============================================================================
454# noinspection PyUnusedLocal
455@notfound_view_config(renderer="not_found.mako", http_cache=NEVER_CACHE)
456def not_found(req: "CamcopsRequest") -> Dict[str, Any]:
457 """
458 "Page not found" view.
459 """
460 return {"msg": "", "extra_html": ""}
463# noinspection PyUnusedLocal
464@view_config(
465 context=HTTPBadRequest, renderer="bad_request.mako", http_cache=NEVER_CACHE
466)
467def bad_request(req: "CamcopsRequest") -> Dict[str, Any]:
468 """
469 "Bad request" view.
471 NOTE that this view only gets used from
473 .. code-block:: python
475 raise HTTPBadRequest("message")
477 and not
479 .. code-block:: python
481 return HTTPBadRequest("message")
483 ... so always raise it.
484 """
485 return {"msg": "", "extra_html": ""}
488# =============================================================================
489# Test pages
490# =============================================================================
493# noinspection PyUnusedLocal
494@view_config(
495 route_name=Routes.TESTPAGE_PUBLIC_1,
496 permission=NO_PERMISSION_REQUIRED,
497 http_cache=NEVER_CACHE,
498)
499def test_page_1(req: "CamcopsRequest") -> Response:
500 """
501 A public test page with no content.
502 """
503 _ = req.gettext
504 return Response(_("Hello! This is a public CamCOPS test page."))
507# noinspection PyUnusedLocal
508@view_config(
509 route_name=Routes.TEST_NHS_NUMBERS,
510 permission=NO_PERMISSION_REQUIRED,
511 renderer="test_nhs_numbers.mako",
512 http_cache=NEVER_CACHE,
513)
514def test_nhs_numbers(req: "CamcopsRequest") -> Response:
515 """
516 Random Test NHS numbers for testing
517 """
518 from cardinal_pythonlib.nhs import generate_random_nhs_number
520 nhs_numbers = [generate_random_nhs_number() for _ in range(10)]
521 return dict(test_nhs_numbers=nhs_numbers)
524# noinspection PyUnusedLocal
525@view_config(route_name=Routes.TESTPAGE_PRIVATE_1, http_cache=NEVER_CACHE)
526def test_page_private_1(req: "CamcopsRequest") -> Response:
527 """
528 A private test page with no informative content, but which should only
529 be accessible to authenticated users.
530 """
531 _ = req.gettext
532 return Response(_("Private test page."))
535# noinspection PyUnusedLocal
536@view_config(
537 route_name=Routes.TESTPAGE_PRIVATE_2,
538 permission=Permission.SUPERUSER,
539 renderer="testpage.mako",
540 http_cache=NEVER_CACHE,
541)
542def test_page_2(req: "CamcopsRequest") -> Dict[str, Any]:
543 """
544 A private test page containing POTENTIALLY SENSITIVE test information,
545 including environment variables, that should only be accessible to
546 superusers.
547 """
548 return dict(param1="world")
551# noinspection PyUnusedLocal
552@view_config(
553 route_name=Routes.TESTPAGE_PRIVATE_3,
554 permission=Permission.SUPERUSER,
555 renderer="inherit_cache_test_child.mako",
556 http_cache=NEVER_CACHE,
557)
558def test_page_3(req: "CamcopsRequest") -> Dict[str, Any]:
559 """
560 A private test page that tests template inheritance.
561 """
562 return {}
565# noinspection PyUnusedLocal
566@view_config(
567 route_name=Routes.TESTPAGE_PRIVATE_4,
568 permission=Permission.SUPERUSER,
569 renderer="test_template_filters.mako",
570 http_cache=NEVER_CACHE,
571)
572def test_page_4(req: "CamcopsRequest") -> Dict[str, Any]:
573 """
574 A private test page that tests Mako filtering.
575 """
576 return dict(test_strings=["plain", "normal <b>bold</b> normal"])
579# noinspection PyUnusedLocal,PyTypeChecker
580@view_config(
581 route_name=Routes.CRASH,
582 permission=Permission.SUPERUSER,
583 http_cache=NEVER_CACHE,
584)
585def crash(req: "CamcopsRequest") -> Response:
586 """
587 A view that deliberately raises an exception.
588 """
589 _ = req.gettext
590 raise RuntimeError(
591 _("Deliberately crashed. Should not affect other processes.")
592 )
595# noinspection PyUnusedLocal
596@view_config(
597 route_name=Routes.DEVELOPER,
598 permission=Permission.SUPERUSER,
599 renderer="developer.mako",
600 http_cache=NEVER_CACHE,
601)
602def developer_page(req: "CamcopsRequest") -> Dict[str, Any]:
603 """
604 Shows the developer menu.
605 """
606 return {}
609# noinspection PyUnusedLocal
610@view_config(
611 route_name=Routes.AUDIT_MENU,
612 permission=Permission.SUPERUSER,
613 renderer="audit_menu.mako",
614 http_cache=NEVER_CACHE,
615)
616def audit_menu(req: "CamcopsRequest") -> Dict[str, Any]:
617 """
618 Shows the auditing menu.
619 """
620 return {}
623# =============================================================================
624# Authorization: login, logout, login failures, terms/conditions
625# =============================================================================
627# Do NOT use extra parameters for functions decorated with @view_config;
628# @view_config can take functions like "def view(request)" but also
629# "def view(context, request)", so if you add additional parameters, it thinks
630# you're doing the latter and sends parameters accordingly.
633class MfaMixin(FormWizardMixin):
634 """
635 Enhances FormWizardMixin to include a multi-factor authentication step.
636 This must be named "mfa" in the subclass, via the ``SELF_MFA`` variable.
638 This handles:
640 - Timing out
641 - Generating, sending and checking the six-digit code used for
642 authentication
644 The subclass should:
646 - Set ``mfa_user`` on the class to be an instance of the User to be
647 authenticated.
648 - Call ``handle_authentication_type()`` in the appropriate step.
649 - Call ``otp_is_valid()`` and ``fail_bad_mfa_code()`` in the appropriate
650 step.
652 See ``LoginView`` for an example that works with the yet-to-be-logged-in
653 user.
654 See ``ChangeOwnPasswordView`` for an example with the logged-in user.
655 """
657 STEP_PASSWORD = "password"
658 STEP_MFA = "mfa"
660 KEY_TITLE_HTML = "title_html"
661 KEY_INSTRUCTIONS = "instructions"
662 KEY_MFA_TIME = "mfa_time"
664 def __init__(self, *args: Any, **kwargs: Any) -> None:
665 self._mfa_user: Optional[User] = None
666 super().__init__(*args, **kwargs)
668 # -------------------------------------------------------------------------
669 # mfa_user
670 # -------------------------------------------------------------------------
671 # Set during __init__ by LoggedInUserMfaMixin, or via a more complex
672 # process by LoginView.
674 @property
675 def mfa_user(self) -> Optional[User]:
676 """
677 The user undergoing authentication.
678 """
679 return self._mfa_user
681 @mfa_user.setter
682 def mfa_user(self, user: Optional[User]) -> None:
683 """
684 Sets the current user being authenticated.
685 """
686 self._mfa_user = user
688 # -------------------------------------------------------------------------
689 # Dispatch and timeouts
690 # -------------------------------------------------------------------------
692 def dispatch(self) -> Response:
693 # Docstring in superclass.
694 if self.timed_out():
695 self.fail_timed_out() # will raise
697 return super().dispatch()
699 def timed_out(self) -> bool:
700 """
701 Has authentication timed out?
702 """
703 if self.step != self.STEP_MFA:
704 return False
706 timeout = self.request.config.mfa_timeout_s
707 if timeout == 0:
708 return False
710 login_time = self.state.get(self.KEY_MFA_TIME)
711 if login_time is None:
712 return False
714 return int(time.time()) > login_time + timeout
716 # -------------------------------------------------------------------------
717 # Extra context for templates
718 # -------------------------------------------------------------------------
720 def get_extra_context(self) -> Dict[str, Any]:
721 # Docstring in superclass.
722 if self.step == self.STEP_MFA:
723 context = {
724 self.KEY_TITLE_HTML: self.request.icon_text(
725 icon=self.get_mfa_icon(), text=self.get_mfa_title()
726 ),
727 self.KEY_INSTRUCTIONS: self.get_mfa_instructions(),
728 }
729 return context
730 else:
731 return {}
733 def get_mfa_icon(self) -> str:
734 """
735 Returns an icon to let the user know which MFA method is being used.
736 """
737 method = self.mfa_user.mfa_method
739 if method == MfaMethod.TOTP:
740 return "shield-shaded"
742 elif method == MfaMethod.HOTP_EMAIL:
743 return "envelope"
745 elif method == MfaMethod.HOTP_SMS:
746 return "chat-left-dots"
748 else:
749 return "Error: get_mfa_icon() called for invalid MFA method"
751 def get_mfa_title(self) -> str:
752 """
753 Returns a title for the page that requests the code itself.
754 """
755 _ = self.request.gettext
756 method = self.mfa_user.mfa_method
758 if method == MfaMethod.TOTP:
759 return _("Authenticate via your authentication app")
761 elif method == MfaMethod.HOTP_EMAIL:
762 return _("Authenticate via e-mail")
764 elif method == MfaMethod.HOTP_SMS:
765 return _("Authenticate via SMS")
767 else:
768 return "Error: get_mfa_title() called for invalid MFA method"
770 def get_mfa_instructions(self) -> str:
771 """
772 Return user instructions for the relevant MFA method.
773 """
774 _ = self.request.gettext
775 method = self.mfa_user.mfa_method
777 if method == MfaMethod.TOTP:
778 return _(
779 "Enter the code for CamCOPS displayed on your "
780 "authentication app."
781 )
783 elif method == MfaMethod.HOTP_EMAIL:
784 return _("We've sent a code by email to {}.").format(
785 self.mfa_user.partial_email
786 )
788 elif method == MfaMethod.HOTP_SMS:
789 return _("We've sent a code by text message to {}").format(
790 self.mfa_user.partial_phone_number
791 )
793 else:
794 return "Error: get_mfa_instruction() called for invalid MFA method"
796 # -------------------------------------------------------------------------
797 # MFA handling
798 # -------------------------------------------------------------------------
800 def handle_authentication_type(self) -> None:
801 """
802 Function to be called when we want an MFA code to be created.
803 """
804 mfa_user = self.mfa_user
805 mfa_user.ensure_mfa_info()
806 mfa_method = mfa_user.mfa_method
808 if mfa_method == MfaMethod.TOTP:
809 # Nothing to do. The app generates the code.
810 return
812 # Record the time of code creation:
813 self.state[self.KEY_MFA_TIME] = int(time.time())
815 if mfa_method == MfaMethod.HOTP_EMAIL:
816 self.send_authentication_email()
817 elif mfa_method == MfaMethod.HOTP_SMS:
818 self.send_authentication_sms()
819 else:
820 raise ValueError(
821 f"MfaMixin.handle_authentication_type: "
822 f"unexpected mfa_method {mfa_method!r}"
823 )
825 def send_authentication_email(self) -> None:
826 """
827 E-mail the code to the user.
828 """
829 _ = self.request.gettext
830 config = self.request.config
831 kwargs = dict(
832 from_addr=config.email_from,
833 to=self.mfa_user.email,
834 subject=_("CamCOPS authentication"),
835 body=self.get_hotp_message(),
836 content_type=MimeType.TEXT,
837 )
839 email = Email(**kwargs)
840 success = email.send(
841 host=config.email_host,
842 username=config.email_host_username,
843 password=config.email_host_password,
844 port=config.email_port,
845 use_tls=config.email_use_tls,
846 )
847 if success:
848 msg = _("E-mail sent")
849 queue = FlashQueue.SUCCESS
850 else:
851 msg = _(
852 "Failed to send e-mail! "
853 "Please try again or contact your administrator."
854 )
855 queue = FlashQueue.DANGER
856 self.request.session.flash(msg, queue=queue)
858 def send_authentication_sms(self) -> None:
859 """
860 Send a code to the user via SMS (text message).
861 """
862 backend = self.request.config.sms_backend
863 backend.send_sms(
864 self.mfa_user.raw_phone_number, self.get_hotp_message()
865 )
867 def get_hotp_message(self) -> str:
868 """
869 Return a human-readable message containing an HOTP (HMAC-Based One-Time
870 Password).
871 """
872 self.mfa_user.hotp_counter += 1
873 self.request.dbsession.add(self.mfa_user)
874 _ = self.request.gettext
875 key = self.mfa_user.mfa_secret_key
876 assert key, f"Bug: self.mfa_user.mfa_secret_key = {key!r}"
877 handler = pyotp.HOTP(key)
878 code = handler.at(self.mfa_user.hotp_counter)
879 return _("Your CamCOPS verification code is {}").format(code)
881 def otp_is_valid(self, appstruct: Dict[str, Any]) -> bool:
882 """
883 Is the code being offered by the user the right one?
884 """
885 otp = appstruct.get(ViewParam.ONE_TIME_PASSWORD)
886 return self.mfa_user.verify_one_time_password(otp)
888 # -------------------------------------------------------------------------
889 # Ways to fail
890 # -------------------------------------------------------------------------
892 def fail_bad_mfa_code(self) -> NoReturn:
893 """
894 Fail because the code was wrong.
895 """
896 _ = self.request.gettext
897 self.fail(_("You entered an invalid code. Please try again."))
899 def fail_timed_out(self) -> NoReturn:
900 """
901 Fail because the process timed out.
902 """
903 _ = self.request.gettext
904 self.fail(_("Your code expired. Please try again."))
907class LoggedInUserMfaMixin(MfaMixin):
908 """
909 Handles multi-factor authentication for the currently logged in user
910 (everything except :class:`LoginView`).
911 """
913 def __init__(self, *args: Any, **kwargs: Any) -> None:
914 super().__init__(*args, **kwargs)
915 self.mfa_user = self.request.user
918class LoginView(MfaMixin, FormView):
919 """
920 Multi-factor authentication for the login process.
921 Sequences is: (1) password; (2) MFA, if enabled.
923 Inheritance (as of 2021-10-06):
925 - webview.LoginView
927 - webview.MfaMixin
929 - cc_view_classes.FormWizardMixin
931 - cc_view_classes.FormView
933 - cc_view_classes.TemplateResponseMixin
935 - cc_view_classes.BaseFormView
937 - cc_view_classes.FormMixin
939 - cc_view_classes.ContextMixin
941 - cc_view_classes.ProcessFormView -- provides ``get()``, ``post()``
943 - cc_view_classes.View -- owns ``request``, provides ``dispatch()``
944 """
946 KEY_MFA_USER_ID = "mfa_user_id"
948 _mfa_user: Optional[User]
949 wizard_first_step = MfaMixin.STEP_PASSWORD
950 wizard_forms = {
951 MfaMixin.STEP_PASSWORD: LoginForm, # 1. enter username/password
952 MfaMixin.STEP_MFA: OtpTokenForm, # 2. enter one-time code
953 }
954 wizard_templates = {
955 MfaMixin.STEP_PASSWORD: "login.mako",
956 MfaMixin.STEP_MFA: "login_token.mako",
957 }
959 def __init__(self, *args: Any, **kwargs: Any) -> None:
960 super().__init__(*args, **kwargs)
962 # -------------------------------------------------------------------------
963 # mfa_user
964 # -------------------------------------------------------------------------
965 # Slightly more complex here, since our user isn't logged in properly yet.
967 @property
968 def mfa_user(self) -> Optional[User]:
969 # Docstring in superclass.
970 if self._mfa_user is None:
971 try:
972 user_id = self.state[self.KEY_MFA_USER_ID]
973 self._mfa_user = (
974 self.request.dbsession.query(User)
975 .filter(User.id == user_id)
976 .one_or_none()
977 )
978 except KeyError:
979 pass
981 return self._mfa_user
983 @mfa_user.setter
984 def mfa_user(self, user: Optional[User]) -> None:
985 # Docstring in superclass.
986 self._mfa_user = user
987 if user is None:
988 self.state[self.KEY_MFA_USER_ID] = None
989 return
991 self.state[self.KEY_MFA_USER_ID] = user.id
993 # -------------------------------------------------------------------------
994 # Content for forms
995 # -------------------------------------------------------------------------
997 def get_form_values(self) -> Dict:
998 # Docstring in superclass.
999 return {ViewParam.REDIRECT_URL: self.get_redirect_url()}
1001 def get_form_kwargs(self) -> Dict[str, Any]:
1002 # Docstring in superclass.
1003 kwargs = super().get_form_kwargs()
1005 cfg = self.request.config
1006 autocomplete_password = not cfg.disable_password_autocomplete
1007 kwargs["autocomplete_password"] = autocomplete_password
1009 return kwargs
1011 # -------------------------------------------------------------------------
1012 # Form validation, and sequence handling
1013 # -------------------------------------------------------------------------
1015 def form_valid_process_data(
1016 self, form: "Form", appstruct: Dict[str, Any]
1017 ) -> None:
1018 # Docstring in superclass.
1019 if self.step == self.STEP_PASSWORD:
1020 self._form_valid_password(appstruct)
1021 else:
1022 self._form_valid_mfa(appstruct)
1024 super().form_valid_process_data(form, appstruct)
1026 def _form_valid_password(self, appstruct: Dict[str, Any]) -> None:
1027 """
1028 Called when the user has entered a username/password (via a validated
1029 form).
1030 """
1031 username = appstruct.get(ViewParam.USERNAME)
1033 # Is the user locked?
1034 locked_out_until = SecurityAccountLockout.user_locked_out_until(
1035 self.request, username
1036 )
1037 if locked_out_until is not None:
1038 self.fail_locked_out(locked_out_until) # will raise
1040 password = appstruct.get(ViewParam.PASSWORD)
1042 # Is the username/password combination correct?
1043 user = User.get_user_from_username_password(
1044 self.request, username, password
1045 ) # checks password
1047 # Some trade-off between usability and security here.
1048 # For failed attempts, the user has some idea as to what the problem
1049 # is.
1050 if user is None:
1051 # Unsuccessful. Note that the username may/may not be genuine.
1052 SecurityLoginFailure.act_on_login_failure(self.request, username)
1053 # ... may lock the account
1054 # Now, call audit() before session.logout(), as the latter
1055 # will wipe the session IP address:
1056 self.request.camcops_session.logout()
1057 self.fail_not_authorized() # will raise
1059 if not user.may_use_webviewer:
1060 # This means a user who can upload from tablet but who cannot
1061 # log in via the web front end.
1062 self.fail_not_authorized() # will raise
1064 self.mfa_user = user
1065 self._password_next_step()
1066 self._form_valid_success()
1068 def _password_next_step(self) -> None:
1069 """
1070 The user has entered a password correctly; what's the next step?
1071 """
1072 method = self.mfa_user.mfa_method
1073 if MfaMethod.requires_second_step(method):
1074 self.step = self.STEP_MFA
1075 self.handle_authentication_type()
1076 else:
1077 self.finish()
1078 # Guaranteed to be valid; see constructor.
1080 def _form_valid_mfa(self, appstruct: Dict[str, Any]) -> None:
1081 """
1082 Called when the user has entered an MFA code (via a validated form).
1083 """
1084 if not self.otp_is_valid(appstruct):
1085 self.fail_bad_mfa_code() # will raise
1087 self.finish()
1088 self._form_valid_success()
1090 def _form_valid_success(self) -> None:
1091 """
1092 Called when the next step has been determined. One possible outcome is
1093 a successful login.
1094 """
1095 if self.finished():
1096 # Successful login.
1097 self.mfa_user.login(
1098 self.request
1099 ) # will clear login failure record
1100 self.request.camcops_session.login(self.mfa_user)
1101 audit(self.request, "Login", user_id=self.mfa_user.id)
1103 # OK, logged in.
1104 # Redirect to the main menu, or wherever the user was heading.
1105 # HOWEVER, that may lead us to a "change password" or "agree terms"
1106 # page, via the permissions system (Permission.HAPPY or not).
1108 # -------------------------------------------------------------------------
1109 # Next destinations
1110 # -------------------------------------------------------------------------
1112 def get_success_url(self) -> str:
1113 # Docstring in superclass.
1114 if self.finished():
1115 return self.get_redirect_url()
1117 return self.request.route_url(
1118 Routes.LOGIN,
1119 _query={ViewParam.REDIRECT_URL: self.get_redirect_url()},
1120 )
1122 def get_failure_url(self) -> None:
1123 # Docstring in superclass.
1124 return self.request.route_url(
1125 Routes.LOGIN,
1126 _query={ViewParam.REDIRECT_URL: self.get_redirect_url()},
1127 )
1129 def get_redirect_url(self) -> str:
1130 """
1131 We may be logging in after a timeout, in which case we can redirect the
1132 user back to where they were before. Otherwise, they go to the main
1133 page.
1134 """
1135 return self.request.get_redirect_url_param(
1136 ViewParam.REDIRECT_URL, default=self.request.route_url(Routes.HOME)
1137 )
1139 # -------------------------------------------------------------------------
1140 # Ways to fail
1141 # -------------------------------------------------------------------------
1143 def fail_not_authorized(self) -> NoReturn:
1144 """
1145 Fail because the user has not logged in correctly or is not authorized
1146 to log in.
1148 Pretends to the type checker that it returns a response, so callers can
1149 use ``return`` for code safety.
1150 """
1151 _ = self.request.gettext
1152 self.fail(
1153 _("Invalid username/password (or user not authorized).")
1154 ) # will raise
1155 # assert False, "Bug: LoginView.fail_not_authorized() falling through"
1157 def fail_locked_out(self, locked_until: Pendulum) -> NoReturn:
1158 """
1159 Raises a failure because the user is locked out.
1161 Pretends to the type checker that it returns a response, so callers can
1162 use ``return`` for code safety.
1163 """
1164 _ = self.request.gettext
1165 locked_until = format_datetime(
1166 locked_until, DateFormat.LONG_DATETIME_WITH_DAY, _("(never)")
1167 )
1168 message = _(
1169 "Account locked until {} due to multiple login failures. "
1170 "Try again later or contact your administrator."
1171 ).format(locked_until)
1172 self.fail(message) # will raise
1173 # assert False, "Bug: LoginView.fail_locked_out() falling through"
1176@view_config(
1177 route_name=Routes.LOGIN,
1178 permission=NO_PERMISSION_REQUIRED,
1179 http_cache=NEVER_CACHE,
1180)
1181def login_view(req: "CamcopsRequest") -> Response:
1182 """
1183 Login view.
1185 - GET: presents the login screen
1186 - POST/submit: attempts to log in (with optional multi-factor
1187 authentication);
1189 - failure: returns a login failure view or an account lockout view
1190 - success:
1192 - redirects to the redirection view if one was specified;
1193 - redirects to the home view if not.
1194 """
1195 return LoginView(req).dispatch()
1198@view_config(
1199 route_name=Routes.LOGOUT,
1200 permission=Authenticated,
1201 renderer="logged_out.mako",
1202 http_cache=NEVER_CACHE,
1203)
1204def logout(req: "CamcopsRequest") -> Dict[str, Any]:
1205 """
1206 Logs a session out, and returns the "logged out" view.
1207 """
1208 audit(req, "Logout")
1209 ccsession = req.camcops_session
1210 ccsession.logout()
1211 return dict()
1214@view_config(
1215 route_name=Routes.OFFER_TERMS,
1216 permission=Authenticated,
1217 renderer="offer_terms.mako",
1218 http_cache=NEVER_CACHE,
1219)
1220def offer_terms(req: "CamcopsRequest") -> Response:
1221 """
1222 - GET: show terms/conditions and request acknowledgement
1223 - POST/submit: note the user's agreement; redirect to the home view.
1224 """
1225 form = OfferTermsForm(
1226 request=req, agree_button_text=req.wsstring(SS.DISCLAIMER_AGREE)
1227 )
1229 if FormAction.SUBMIT in req.POST:
1230 req.user.agree_terms(req)
1231 return HTTPFound(req.route_url(Routes.HOME)) # redirect
1233 return render_to_response(
1234 "offer_terms.mako",
1235 dict(
1236 title=req.wsstring(SS.DISCLAIMER_TITLE),
1237 subtitle=req.wsstring(SS.DISCLAIMER_SUBTITLE),
1238 content=req.wsstring(SS.DISCLAIMER_CONTENT),
1239 form=form.render(),
1240 head_form_html=get_head_form_html(req, [form]),
1241 ),
1242 request=req,
1243 )
1246@forbidden_view_config(http_cache=NEVER_CACHE)
1247def forbidden(req: "CamcopsRequest") -> Response:
1248 """
1249 Generic place that Pyramid comes when permission is denied for a view.
1251 We will offer one of these:
1253 - Must change password? Redirect to "change own password" view.
1254 - Must agree terms? Redirect to "offer terms" view.
1255 - Otherwise: a generic "forbidden" view.
1256 """
1257 # I was doing this:
1258 if req.has_permission(Authenticated):
1259 user = req.user
1260 assert user, "Bug! Authenticated but no user...!?"
1261 if user.must_change_password:
1262 return HTTPFound(req.route_url(Routes.CHANGE_OWN_PASSWORD))
1263 if user.must_agree_terms:
1264 return HTTPFound(req.route_url(Routes.OFFER_TERMS))
1265 if user.must_set_mfa_method(req):
1266 return HTTPFound(req.route_url(Routes.EDIT_OWN_USER_MFA))
1267 # ... but with "raise HTTPFound" instead.
1268 # BUT there is only one level of exception handling in Pyramid, i.e. you
1269 # can't raise exceptions from exceptions:
1270 # https://github.com/Pylons/pyramid/issues/436
1271 # The simplest way round is to use "return", not "raise".
1273 redirect_url = req.url
1274 # Redirects to login page, with onwards redirection to requested
1275 # destination once logged in:
1276 querydict = {ViewParam.REDIRECT_URL: redirect_url}
1277 return render_to_response(
1278 "forbidden.mako", dict(querydict=querydict), request=req
1279 )
1282# =============================================================================
1283# Changing passwords
1284# =============================================================================
1287class ChangeOwnPasswordView(LoggedInUserMfaMixin, UpdateView):
1288 """
1289 View to change one's own password.
1291 If MFA is enabled, you need to (re-)authenticate via MFA to do so.
1292 Then, you need to supply your own password to change it (regardless).
1293 Sequence is therefore (1) MFA, optionally; (2) change password.
1295 Most documentation in superclass.
1296 """
1298 model_form_dict: Dict[str, "Form"] = {}
1299 STEP_CHANGE_PASSWORD = "change_password"
1301 wizard_forms = {
1302 MfaMixin.STEP_MFA: OtpTokenForm,
1303 STEP_CHANGE_PASSWORD: ChangeOwnPasswordForm,
1304 }
1306 wizard_templates = {
1307 MfaMixin.STEP_MFA: "login_token.mako",
1308 STEP_CHANGE_PASSWORD: "change_own_password.mako",
1309 }
1311 wizard_extra_contexts: Dict[str, Dict[str, Any]] = {
1312 MfaMixin.STEP_MFA: {},
1313 STEP_CHANGE_PASSWORD: {},
1314 }
1316 def get_first_step(self) -> str:
1317 if self.request.user.mfa_method == MfaMethod.NO_MFA:
1318 return self.STEP_CHANGE_PASSWORD
1320 return self.STEP_MFA
1322 def get(self) -> Response:
1323 if self.step == self.STEP_MFA:
1324 self.handle_authentication_type()
1326 _ = self.request.gettext
1328 if self.request.user.must_change_password:
1329 self.request.session.flash(
1330 _("Your password has expired and must be changed."),
1331 queue=FlashQueue.DANGER,
1332 )
1333 return super().get()
1335 def get_object(self) -> User:
1336 return self.request.user
1338 def get_form_kwargs(self) -> Dict[str, Any]:
1339 kwargs = super().get_form_kwargs()
1340 kwargs.update(must_differ=True)
1341 return kwargs
1343 def get_success_url(self) -> str:
1344 if self.finished():
1345 return self.request.route_url(Routes.HOME)
1347 return self.request.route_url(Routes.CHANGE_OWN_PASSWORD)
1349 def get_failure_url(self) -> str:
1350 return self.request.route_url(Routes.HOME)
1352 def form_valid_process_data(
1353 self, form: "Form", appstruct: Dict[str, Any]
1354 ) -> None:
1355 if self.step == self.STEP_MFA:
1356 if not self.otp_is_valid(appstruct):
1357 self.fail_bad_mfa_code() # will raise
1359 super().form_valid_process_data(form, appstruct)
1361 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
1362 # Superclass method overridden, not called.
1363 if self.step == self.STEP_MFA:
1364 self.step = self.STEP_CHANGE_PASSWORD
1365 elif self.step == self.STEP_CHANGE_PASSWORD:
1366 self.set_password(appstruct)
1367 self.finish()
1368 else:
1369 assert f"ChangeOwnPasswordView: bad step {self.step!r}"
1371 def set_password(self, appstruct: Dict[str, Any]) -> None:
1372 """
1373 Success; change the user's password.
1374 """
1375 user = cast(User, self.object)
1376 # ... form has validated old password, etc.
1377 new_password = appstruct[ViewParam.NEW_PASSWORD]
1378 user.set_password(self.request, new_password)
1380 _ = self.request.gettext
1381 self.request.session.flash(
1382 _(
1383 "You have changed your password. "
1384 "If you store your password in your CamCOPS tablet "
1385 "application, remember to change it there as well."
1386 ),
1387 queue=FlashQueue.SUCCESS,
1388 )
1391@view_config(
1392 route_name=Routes.CHANGE_OWN_PASSWORD,
1393 permission=Authenticated,
1394 http_cache=NEVER_CACHE,
1395)
1396def change_own_password(req: "CamcopsRequest") -> Response:
1397 """
1398 For any user: to change their own password.
1400 - GET: offer "change own password" view
1401 - POST/submit: change the password and display success message.
1402 """
1403 view = ChangeOwnPasswordView(req)
1405 return view.dispatch()
1408class EditUserAuthenticationView(LoggedInUserMfaMixin, UpdateView):
1409 """
1410 View to edit aspects of another user.
1411 """
1413 model_form_dict: Dict[str, "Form"] = {}
1414 object_class = User
1415 pk_param = ViewParam.USER_ID
1416 server_pk_name = "id"
1418 def get(self) -> Response:
1419 if self.step == self.STEP_MFA:
1420 self.handle_authentication_type()
1422 return super().get()
1424 def get_object(self) -> User:
1425 user = cast(User, super().get_object())
1426 assert_may_edit_user(self.request, user)
1428 return user
1430 def get_extra_context(self) -> Dict[str, Any]:
1431 if self.step == self.STEP_MFA:
1432 return super().get_extra_context()
1434 user = cast(User, self.object)
1436 return {"username": user.username}
1438 def form_valid_process_data(
1439 self, form: "Form", appstruct: Dict[str, Any]
1440 ) -> None:
1441 if self.step == self.STEP_MFA:
1442 if not self.otp_is_valid(appstruct):
1443 self.fail_bad_mfa_code() # will raise
1445 super().form_valid_process_data(form, appstruct)
1447 def get_failure_url(self) -> str:
1448 return self.request.route_url(Routes.VIEW_ALL_USERS)
1451class ChangeOtherPasswordView(EditUserAuthenticationView):
1452 """
1453 View to change the password for another user.
1454 """
1456 STEP_CHANGE_PASSWORD = "change_password"
1458 wizard_forms = {
1459 MfaMixin.STEP_MFA: OtpTokenForm,
1460 STEP_CHANGE_PASSWORD: ChangeOtherPasswordForm,
1461 }
1463 wizard_templates = {
1464 MfaMixin.STEP_MFA: "login_token.mako",
1465 STEP_CHANGE_PASSWORD: "change_other_password.mako",
1466 }
1468 def get(self) -> Response:
1469 if self.get_pk_value() == self.request.user_id:
1470 raise HTTPFound(self.request.route_url(Routes.CHANGE_OWN_PASSWORD))
1472 return super().get()
1474 def get_first_step(self) -> str:
1475 if self.request.user.mfa_method != MfaMethod.NO_MFA:
1476 return self.STEP_MFA
1478 return self.STEP_CHANGE_PASSWORD
1480 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
1481 # Superclass method overridden, not called.
1482 if self.step == self.STEP_CHANGE_PASSWORD:
1483 self.set_password(appstruct)
1484 self.finish()
1485 return
1487 if self.step == self.STEP_MFA:
1488 self.step = self.STEP_CHANGE_PASSWORD
1490 def set_password(self, appstruct: Dict[str, Any]) -> None:
1491 """
1492 Success; change the password for the other user.
1493 """
1494 user = cast(User, self.object)
1495 _ = self.request.gettext
1496 new_password = appstruct[ViewParam.NEW_PASSWORD]
1497 user.set_password(self.request, new_password)
1498 must_change_pw = appstruct.get(ViewParam.MUST_CHANGE_PASSWORD)
1499 if must_change_pw:
1500 user.force_password_change()
1501 self.request.session.flash(
1502 _("Password changed for user '{username}'").format(
1503 username=user.username
1504 ),
1505 queue=FlashQueue.SUCCESS,
1506 )
1508 def get_success_url(self) -> str:
1509 if self.finished():
1510 return self.request.route_url(Routes.VIEW_ALL_USERS)
1512 user = cast(User, self.object)
1514 return self.request.route_url(
1515 Routes.CHANGE_OTHER_PASSWORD, _query={ViewParam.USER_ID: user.id}
1516 )
1519@view_config(
1520 route_name=Routes.CHANGE_OTHER_PASSWORD,
1521 permission=Permission.GROUPADMIN,
1522 http_cache=NEVER_CACHE,
1523)
1524def change_other_password(req: "CamcopsRequest") -> Response:
1525 """
1526 For administrators, to change another's password.
1528 - GET: offer "change another's password" view (except that if you're
1529 changing your own password, return :func:`change_own_password`.
1530 - POST/submit: change the password and display success message.
1531 """
1532 view = ChangeOtherPasswordView(req)
1533 return view.dispatch()
1536class EditOtherUserMfaView(EditUserAuthenticationView):
1537 """
1538 View to edit the MFA method for another user. Only permits disabling of
1539 MFA. (If MFA is mandatory, that will require the other user to set their
1540 MFA method at next logon.)
1541 """
1543 STEP_OTHER_USER_MFA = "other_user_mfa"
1545 wizard_forms = {
1546 MfaMixin.STEP_MFA: OtpTokenForm,
1547 STEP_OTHER_USER_MFA: EditOtherUserMfaForm,
1548 }
1550 wizard_templates = {
1551 MfaMixin.STEP_MFA: "login_token.mako",
1552 STEP_OTHER_USER_MFA: "edit_other_user_mfa.mako",
1553 }
1555 def get(self) -> Response:
1556 if self.get_pk_value() == self.request.user_id:
1557 raise HTTPFound(self.request.route_url(Routes.EDIT_OWN_USER_MFA))
1559 return super().get()
1561 def get_first_step(self) -> str:
1562 if self.request.user.mfa_method != MfaMethod.NO_MFA:
1563 return self.STEP_MFA
1565 return self.STEP_OTHER_USER_MFA
1567 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
1568 # Superclass method overridden, not called.
1569 if self.step == self.STEP_OTHER_USER_MFA:
1570 self.maybe_disable_mfa(appstruct)
1571 self.finish()
1572 return
1574 if self.step == self.STEP_MFA:
1575 self.step = self.STEP_OTHER_USER_MFA
1577 def maybe_disable_mfa(self, appstruct: Dict[str, Any]) -> None:
1578 """
1579 If our user asked for it, disable MFA for the user being edited.
1580 """
1581 if appstruct.get(ViewParam.DISABLE_MFA):
1582 user = cast(User, self.object)
1583 _ = self.request.gettext
1585 user.mfa_method = MfaMethod.NO_MFA
1586 self.request.session.flash(
1587 _(
1588 "Multi-factor authentication disabled for user "
1589 "'{username}'"
1590 ).format(username=user.username),
1591 queue=FlashQueue.SUCCESS,
1592 )
1594 def get_success_url(self) -> str:
1595 if self.finished():
1596 return self.request.route_url(Routes.VIEW_ALL_USERS)
1598 user = cast(User, self.object)
1600 return self.request.route_url(
1601 Routes.EDIT_OTHER_USER_MFA, _query={ViewParam.USER_ID: user.id}
1602 )
1605@view_config(
1606 route_name=Routes.EDIT_OTHER_USER_MFA,
1607 permission=Permission.GROUPADMIN,
1608 http_cache=NEVER_CACHE,
1609)
1610def edit_other_user_mfa(req: "CamcopsRequest") -> Response:
1611 """
1612 For administrators, to change another users's Multi-factor Authentication.
1613 Currently it is only possible to disable Multi-factor authentication for
1614 a user.
1616 - GET: offer "edit another's MFA" view (except that if you're
1617 changing your own MFA, return :func:`edit_own_user_mfa`.
1618 - POST/submit: edit MFA and display success message.
1619 """
1620 view = EditOtherUserMfaView(req)
1621 return view.dispatch()
1624class EditOwnUserMfaView(LoggedInUserMfaMixin, UpdateView):
1625 """
1626 View to edit your own MFA method.
1628 The inheritance (as of 2021-10-06) illustrates a typical situation:
1630 SPECIMEN VIEW CLASS:
1632 - webview.EditOwnUserMfaView
1634 - webview.LoggedInUserMfaMixin
1636 - webview.MfaMixin
1638 - cc_view_classes.FormWizardMixin -- with typehint for FormMixin --
1639 implements ``state``.
1641 - cc_view_classes.UpdateView
1643 - cc_view_classes.TemplateResponseMixin
1645 - cc_view_classes.BaseUpdateView
1647 - cc_view_classes.ModelFormMixin -- implements ``form_valid()`` -->
1648 ``save_object()`` > ``set_object_properties()``
1650 - cc_view_classes.FormMixin -- implements ``form_valid()``,
1651 ``get_context_data()``, etc.
1653 - cc_view_classes.ContextMixin
1655 - cc_view_classes.SingleObjectMixin -- implements ``get_object()``
1656 etc.
1658 - cc_view_classes.ContextMixin
1660 - cc_view_classes.ProcessFormView -- implements ``get()``, ``post()``
1662 - cc_view_classes.View -- owns ``request``, implements
1663 ``dispatch()`` (which calls ``get()``, ``post()``).
1665 SPECIMEN FORM WITHIN THAT VIEW:
1667 - cc_forms.MfaMethodForm
1669 - cc_forms.InformativeNonceForm
1671 - cc_forms.InformativeForm
1673 - deform.Form
1675 If you subclass A(B, C), then B's superclass methods are called before C's:
1676 https://www.python.org/download/releases/2.3/mro/;
1677 https://makina-corpus.com/blog/metier/2014/python-tutorial-understanding-python-mro-class-search-path;
1678 """
1680 STEP_MFA_METHOD = "mfa_method"
1681 STEP_TOTP = MfaMethod.TOTP
1682 STEP_HOTP_EMAIL = MfaMethod.HOTP_EMAIL
1683 STEP_HOTP_SMS = MfaMethod.HOTP_SMS
1684 wizard_first_step = STEP_MFA_METHOD
1686 wizard_forms = {
1687 STEP_MFA_METHOD: MfaMethodForm, # 1. choose your MFA method
1688 STEP_TOTP: MfaTotpForm, # 2a. show TOTP (auth app) QR/alphanumeric code # noqa: E501
1689 STEP_HOTP_EMAIL: MfaHotpEmailForm, # 2b. choose e-mail address
1690 STEP_HOTP_SMS: MfaHotpSmsForm, # 2c. choose phone number for SMS
1691 MfaMixin.STEP_MFA: OtpTokenForm, # 4. request code from user
1692 }
1694 FORM_WITH_TITLE_TEMPLATE = "form_with_title.mako"
1696 wizard_templates = {
1697 STEP_MFA_METHOD: FORM_WITH_TITLE_TEMPLATE,
1698 STEP_TOTP: FORM_WITH_TITLE_TEMPLATE,
1699 STEP_HOTP_EMAIL: FORM_WITH_TITLE_TEMPLATE,
1700 STEP_HOTP_SMS: FORM_WITH_TITLE_TEMPLATE,
1701 MfaMixin.STEP_MFA: "login_token.mako",
1702 }
1704 hotp_steps = (STEP_HOTP_EMAIL, STEP_HOTP_SMS)
1705 secret_key_steps = (STEP_TOTP, STEP_HOTP_EMAIL, STEP_HOTP_SMS)
1707 def get(self) -> Response:
1708 if self.step == self.STEP_MFA:
1709 self.handle_authentication_type()
1711 return super().get()
1713 def get_model_form_dict(self) -> Dict[str, Any]:
1714 model_form_dict = {}
1716 # Dictionary keys here are attribute names of the User object.
1717 # Values are form attributes.
1719 if self.step == self.STEP_MFA_METHOD:
1720 model_form_dict["mfa_method"] = ViewParam.MFA_METHOD
1722 elif self.step == self.STEP_HOTP_EMAIL:
1723 model_form_dict["email"] = ViewParam.EMAIL
1725 elif self.step == self.STEP_HOTP_SMS:
1726 model_form_dict["phone_number"] = ViewParam.PHONE_NUMBER
1728 if self.step in self.secret_key_steps:
1729 model_form_dict["mfa_secret_key"] = ViewParam.MFA_SECRET_KEY
1731 return model_form_dict
1733 def get_object(self) -> User:
1734 return self.request.user
1736 def get_form_values(self) -> Dict[str, Any]:
1737 # Will call get_model_form_dict()
1738 form_values = super().get_form_values()
1740 if self.step in self.secret_key_steps:
1741 # Always create a new secret key. This will be written to the
1742 # user object at the next step, via set_object_properties.
1743 form_values[ViewParam.MFA_SECRET_KEY] = pyotp.random_base32()
1745 return form_values
1747 def get_extra_context(self) -> Dict[str, Any]:
1748 req = self.request
1749 _ = req.gettext
1750 if self.step == self.STEP_MFA:
1751 test_msg = _("Let's test it!") + " "
1752 context = super().get_extra_context()
1753 context[self.KEY_INSTRUCTIONS] = (
1754 test_msg + self.get_mfa_instructions()
1755 )
1756 return context
1758 titles = {
1759 self.STEP_MFA_METHOD: req.icon_text(
1760 icon=Icons.MFA,
1761 text=_("Configure multi-factor authentication settings"),
1762 ),
1763 self.STEP_TOTP: req.icon_text(
1764 icon=Icons.APP_AUTHENTICATOR,
1765 text=_("Configure authentication with app"),
1766 ),
1767 self.STEP_HOTP_EMAIL: req.icon_text(
1768 icon=Icons.EMAIL_SEND,
1769 text=_("Configure authentication by email"),
1770 ),
1771 self.STEP_HOTP_SMS: req.icon_text(
1772 icon=Icons.SMS,
1773 text=_("Configure authentication by text message"),
1774 ),
1775 }
1776 return {MAKO_VAR_TITLE: titles[self.step]}
1778 def get_success_url(self) -> str:
1779 if self.finished():
1780 return self.request.route_url(Routes.HOME)
1782 return self.request.route_url(Routes.EDIT_OWN_USER_MFA)
1784 def get_failure_url(self) -> str:
1785 # We get here because the user, who has already logged in successfully,
1786 # has changed their MFA method. Failure doesn't mean they should be
1787 # logged out instantly -- they may have (for example) misconfigured
1788 # their phone number, and if they are forcibly logged out now, they are
1789 # stuffed and require administrator assistance. Instead, we return them
1790 # to the home screen.
1791 return self.request.route_url(Routes.HOME)
1793 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
1794 # Called by ModelFormMixin.form_valid_process_data() ->
1795 # ModelFormMixin.save_object().
1797 super().set_object_properties(appstruct)
1799 if self.step == self.STEP_MFA_METHOD:
1800 # We are setting the MFA method, including secret key etc.
1801 user = cast(User, self.object)
1802 user.set_mfa_method(appstruct.get(ViewParam.MFA_METHOD))
1804 elif self.step == self.STEP_MFA:
1805 # Code entered.
1806 if self.otp_is_valid(appstruct):
1807 _ = self.request.gettext
1808 self.request.session.flash(
1809 _("Multi-factor authentication: success!"),
1810 queue=FlashQueue.SUCCESS,
1811 )
1812 # ... and continue as below
1813 else:
1814 return self.fail_bad_mfa_code() # type: ignore[return-value]
1816 self._next_step(appstruct)
1818 def _next_step(self, appstruct: Dict[str, Any]) -> None:
1819 if self.step == self.STEP_MFA_METHOD:
1820 # The user has just chosen their method.
1821 # 2. Offer them method-specific options
1822 mfa_method = appstruct.get(ViewParam.MFA_METHOD)
1823 if mfa_method == MfaMethod.NO_MFA:
1824 self.finish()
1825 else:
1826 self.step = mfa_method
1828 elif self.step in (
1829 self.STEP_TOTP,
1830 self.STEP_HOTP_EMAIL,
1831 self.STEP_HOTP_SMS,
1832 ):
1833 # Coming from one of the method-specific steps.
1834 # 3. Ask for the authentication code.
1835 self.step = self.STEP_MFA
1837 elif self.step == self.STEP_MFA:
1838 # Authentication code provided. End.
1839 self.finish()
1841 else:
1842 raise AssertionError(
1843 f"EditOwnUserMfaView.next_step(): " f"Bad step {self.step!r}"
1844 )
1847@view_config(
1848 route_name=Routes.EDIT_OWN_USER_MFA,
1849 permission=Authenticated,
1850 http_cache=NEVER_CACHE,
1851)
1852def edit_own_user_mfa(request: "CamcopsRequest") -> Response:
1853 """
1854 Edit your own MFA method.
1855 """
1856 view = EditOwnUserMfaView(request)
1857 return view.dispatch()
1860# =============================================================================
1861# Main menu; simple information things
1862# =============================================================================
1865@view_config(
1866 route_name=Routes.HOME, renderer="main_menu.mako", http_cache=NEVER_CACHE
1867)
1868def main_menu(req: "CamcopsRequest") -> Dict[str, Any]:
1869 """
1870 Main CamCOPS menu view.
1871 """
1872 user = req.user
1873 result = dict(
1874 authorized_as_groupadmin=user.authorized_as_groupadmin,
1875 authorized_as_superuser=user.superuser,
1876 authorized_for_reports=user.authorized_for_reports,
1877 authorized_to_dump=user.authorized_to_dump,
1878 authorized_to_manage_patients=user.authorized_to_manage_patients,
1879 camcops_url=CAMCOPS_URL,
1880 now=format_datetime(req.now, DateFormat.SHORT_DATETIME_SECONDS),
1881 server_version=CAMCOPS_SERVER_VERSION,
1882 )
1883 return result
1886# =============================================================================
1887# Tasks
1888# =============================================================================
1891def edit_filter(
1892 req: "CamcopsRequest", task_filter: TaskFilter, redirect_url: str
1893) -> Response:
1894 """
1895 Edit the task filter for the current user.
1897 Args:
1898 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1899 task_filter: the user's
1900 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`
1901 redirect_url: URL to redirect (back) to upon success
1902 """
1903 if FormAction.SET_FILTERS in req.POST:
1904 form = EditTaskFilterForm(request=req)
1905 try:
1906 controls = list(req.POST.items())
1907 fa = form.validate(controls)
1908 # -----------------------------------------------------------------
1909 # Apply the changes
1910 # -----------------------------------------------------------------
1911 who = fa.get(ViewParam.WHO)
1912 what = fa.get(ViewParam.WHAT)
1913 when = fa.get(ViewParam.WHEN)
1914 admin = fa.get(ViewParam.ADMIN)
1915 task_filter.surname = who.get(ViewParam.SURNAME)
1916 task_filter.forename = who.get(ViewParam.FORENAME)
1917 task_filter.dob = who.get(ViewParam.DOB)
1918 task_filter.sex = who.get(ViewParam.SEX)
1919 task_filter.idnum_criteria = [
1920 IdNumReference(
1921 which_idnum=x[ViewParam.WHICH_IDNUM],
1922 idnum_value=x[ViewParam.IDNUM_VALUE],
1923 )
1924 for x in who.get(ViewParam.ID_REFERENCES)
1925 ]
1926 task_filter.task_types = what.get(ViewParam.TASKS)
1927 task_filter.text_contents = what.get(ViewParam.TEXT_CONTENTS)
1928 task_filter.complete_only = what.get(ViewParam.COMPLETE_ONLY)
1929 task_filter.start_datetime = when.get(ViewParam.START_DATETIME)
1930 task_filter.end_datetime = when.get(ViewParam.END_DATETIME)
1931 task_filter.device_ids = admin.get(ViewParam.DEVICE_IDS)
1932 task_filter.adding_user_ids = admin.get(ViewParam.USER_IDS)
1933 task_filter.group_ids = admin.get(ViewParam.GROUP_IDS)
1935 return HTTPFound(redirect_url)
1936 except ValidationFailure as e:
1937 rendered_form = e.render()
1938 else:
1939 if FormAction.CLEAR_FILTERS in req.POST:
1940 # skip validation
1941 task_filter.clear()
1942 who = {
1943 ViewParam.SURNAME: task_filter.surname,
1944 ViewParam.FORENAME: task_filter.forename,
1945 ViewParam.DOB: task_filter.dob,
1946 ViewParam.SEX: task_filter.sex or "",
1947 ViewParam.ID_REFERENCES: [
1948 {
1949 ViewParam.WHICH_IDNUM: x.which_idnum,
1950 ViewParam.IDNUM_VALUE: x.idnum_value,
1951 }
1952 for x in task_filter.idnum_criteria
1953 ],
1954 }
1955 what = {
1956 ViewParam.TASKS: task_filter.task_types,
1957 ViewParam.TEXT_CONTENTS: task_filter.text_contents,
1958 ViewParam.COMPLETE_ONLY: task_filter.complete_only,
1959 }
1960 when = {
1961 ViewParam.START_DATETIME: task_filter.start_datetime,
1962 ViewParam.END_DATETIME: task_filter.end_datetime,
1963 }
1964 admin = {
1965 ViewParam.DEVICE_IDS: task_filter.device_ids,
1966 ViewParam.USER_IDS: task_filter.adding_user_ids,
1967 ViewParam.GROUP_IDS: task_filter.group_ids,
1968 }
1969 open_who = any(i for i in who.values())
1970 open_what = any(i for i in what.values())
1971 open_when = any(i for i in when.values())
1972 open_admin = any(i for i in admin.values())
1973 fa = {
1974 ViewParam.WHO: who,
1975 ViewParam.WHAT: what,
1976 ViewParam.WHEN: when,
1977 ViewParam.ADMIN: admin,
1978 }
1979 form = EditTaskFilterForm(
1980 request=req,
1981 open_admin=open_admin,
1982 open_what=open_what,
1983 open_when=open_when,
1984 open_who=open_who,
1985 )
1986 rendered_form = form.render(fa)
1988 return render_to_response(
1989 "filter_edit.mako",
1990 dict(
1991 form=rendered_form, head_form_html=get_head_form_html(req, [form])
1992 ),
1993 request=req,
1994 )
1997@view_config(route_name=Routes.SET_FILTERS, http_cache=NEVER_CACHE)
1998def set_filters(req: "CamcopsRequest") -> Response:
1999 """
2000 View to set the task filters for the current user.
2001 """
2002 redirect_url = req.get_redirect_url_param(
2003 ViewParam.REDIRECT_URL, req.route_url(Routes.VIEW_TASKS)
2004 )
2005 task_filter = req.camcops_session.get_task_filter()
2006 return edit_filter(req, task_filter=task_filter, redirect_url=redirect_url)
2009@view_config(
2010 route_name=Routes.VIEW_TASKS,
2011 renderer="view_tasks.mako",
2012 http_cache=NEVER_CACHE,
2013)
2014def view_tasks(req: "CamcopsRequest") -> Dict[str, Any]:
2015 """
2016 Main view displaying tasks and applicable filters.
2017 """
2018 ccsession = req.camcops_session
2019 user = req.user
2020 taskfilter = ccsession.get_task_filter()
2022 # Read from the GET parameters (or in some cases potentially POST but those
2023 # will be re-read).
2024 rows_per_page = req.get_int_param(
2025 ViewParam.ROWS_PER_PAGE,
2026 ccsession.number_to_view or DEFAULT_ROWS_PER_PAGE,
2027 )
2028 page_num = req.get_int_param(ViewParam.PAGE, 1)
2029 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True)
2031 errors = False
2033 # "Number of tasks per page" form
2034 tpp_form = TasksPerPageForm(request=req)
2035 if FormAction.SUBMIT_TASKS_PER_PAGE in req.POST:
2036 try:
2037 controls = list(req.POST.items())
2038 tpp_appstruct = tpp_form.validate(controls)
2039 rows_per_page = tpp_appstruct.get(ViewParam.ROWS_PER_PAGE)
2040 ccsession.number_to_view = rows_per_page
2041 except ValidationFailure:
2042 errors = True
2043 rendered_tpp_form = tpp_form.render()
2044 else:
2045 tpp_appstruct = {ViewParam.ROWS_PER_PAGE: rows_per_page}
2046 rendered_tpp_form = tpp_form.render(tpp_appstruct)
2048 # Refresh tasks. Slightly pointless. Doesn't need validating. The user
2049 # could just press the browser's refresh button, but this improves the UI
2050 # slightly.
2051 refresh_form = RefreshTasksForm(request=req)
2052 rendered_refresh_form = refresh_form.render()
2054 # Get tasks, unless there have been form errors.
2055 # In principle, for some filter settings (single task, no "complete"
2056 # preference...) we could produce an ORM query and use SqlalchemyOrmPage,
2057 # which would apply LIMIT/OFFSET (or equivalent) to the query, and be
2058 # very nippy. In practice, this is probably an unusual setting, so we'll
2059 # simplify things here with a Python list regardless of the settings.
2060 if errors:
2061 collection = [] # type: ignore[var-annotated]
2062 else:
2063 collection = (
2064 # SECURITY APPLIED HERE
2065 TaskCollection( # type: ignore[assignment]
2066 req=req,
2067 taskfilter=taskfilter,
2068 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
2069 via_index=via_index,
2070 ).all_tasks_or_indexes_or_query
2071 or []
2072 )
2073 paginator = (
2074 SqlalchemyOrmPage if isinstance(collection, Query) else CamcopsPage
2075 )
2076 page = paginator(
2077 collection,
2078 page=page_num,
2079 items_per_page=rows_per_page,
2080 url_maker=PageUrl(req),
2081 request=req,
2082 )
2083 return dict(
2084 page=page,
2085 head_form_html=get_head_form_html(req, [tpp_form, refresh_form]),
2086 tpp_form=rendered_tpp_form,
2087 refresh_form=rendered_refresh_form,
2088 no_patient_selected_and_user_restricted=(
2089 not user.may_view_all_patients_when_unfiltered
2090 and not taskfilter.any_specific_patient_filtering()
2091 ),
2092 user=user,
2093 )
2096@view_config(route_name=Routes.TASK, http_cache=NEVER_CACHE)
2097def serve_task(req: "CamcopsRequest") -> Response:
2098 """
2099 View that serves an individual task, in a variety of possible formats
2100 (e.g. HTML, PDF, XML).
2101 """
2102 _ = req.gettext
2103 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML, lower=True)
2104 tablename = req.get_str_param(
2105 ViewParam.TABLE_NAME, validator=validate_task_tablename
2106 )
2107 server_pk = req.get_int_param(ViewParam.SERVER_PK)
2108 anonymise = req.get_bool_param(ViewParam.ANONYMISE, False)
2110 task = task_factory(req, tablename, server_pk) # SECURITY APPLIED HERE
2112 if task is None:
2113 raise HTTPNotFound( # raise, don't return
2114 f"{_('Task not found or not permitted:')} "
2115 f"tablename={tablename!r}, server_pk={server_pk!r}"
2116 )
2118 task.audit(req, "Viewed " + viewtype.upper())
2120 if viewtype == ViewArg.HTML:
2121 return Response(task.get_html(req=req, anonymise=anonymise))
2122 elif viewtype == ViewArg.PDF:
2123 return PdfResponse(
2124 body=task.get_pdf(req, anonymise=anonymise),
2125 filename=task.suggested_pdf_filename(req, anonymise=anonymise),
2126 )
2127 elif viewtype == ViewArg.PDFHTML: # debugging option; no direct hyperlink
2128 return Response(task.get_pdf_html(req, anonymise=anonymise))
2129 elif viewtype == ViewArg.XML:
2130 options = TaskExportOptions(
2131 xml_include_ancillary=True,
2132 include_blobs=req.get_bool_param(ViewParam.INCLUDE_BLOBS, True),
2133 xml_include_comments=req.get_bool_param(
2134 ViewParam.INCLUDE_COMMENTS, True
2135 ),
2136 xml_include_calculated=req.get_bool_param(
2137 ViewParam.INCLUDE_CALCULATED, True
2138 ),
2139 xml_include_patient=req.get_bool_param(
2140 ViewParam.INCLUDE_PATIENT, True
2141 ),
2142 xml_include_plain_columns=True,
2143 xml_include_snomed=req.get_bool_param(
2144 ViewParam.INCLUDE_SNOMED, True
2145 ),
2146 xml_with_header_comments=True,
2147 )
2148 return XmlResponse(task.get_xml(req=req, options=options))
2149 elif viewtype == ViewArg.FHIRJSON: # debugging option
2150 dummy_recipient = ExportRecipient()
2151 bundle = task.get_fhir_bundle(
2152 req, dummy_recipient, skip_docs_if_other_content=True
2153 )
2154 return JsonResponse(json.dumps(bundle.as_json(), indent=JSON_INDENT))
2155 else:
2156 permissible = (
2157 ViewArg.FHIRJSON,
2158 ViewArg.HTML,
2159 ViewArg.PDF,
2160 ViewArg.PDFHTML,
2161 ViewArg.XML,
2162 )
2163 raise HTTPBadRequest(
2164 f"{_('Bad output type:')} {viewtype!r} "
2165 f"({_('permissible:')} {permissible!r})"
2166 )
2169def view_patient(req: "CamcopsRequest", patient_server_pk: int) -> Response:
2170 """
2171 Primarily for FHIR views: show just a patient's details.
2172 Must check security carefully for this one.
2173 """
2174 user = req.user
2175 patient = Patient.get_patient_by_pk(req.dbsession, patient_server_pk)
2176 if not patient or not patient.user_may_view(user):
2177 _ = req.gettext
2178 raise HTTPBadRequest(_("No such patient or not authorized"))
2179 return render_to_response(
2180 "patient.mako",
2181 dict(patient=patient, viewtype=ViewArg.HTML),
2182 request=req,
2183 )
2186# =============================================================================
2187# Trackers, CTVs
2188# =============================================================================
2191def choose_tracker_or_ctv(
2192 req: "CamcopsRequest", as_ctv: bool
2193) -> Dict[str, Any]:
2194 """
2195 Returns a dictionary for a Mako template to configure a
2196 :class:`camcops_server.cc_modules.cc_tracker.Tracker` or
2197 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`.
2199 Upon success, it redirects to the tracker or CTV view itself, with the
2200 tracker's parameters embedded as URL parameters.
2202 Args:
2203 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2204 as_ctv: CTV, rather than tracker?
2205 """
2207 form = ChooseTrackerForm(req, as_ctv=as_ctv) # , css_class="form-inline")
2209 if FormAction.SUBMIT in req.POST:
2210 try:
2211 controls = list(req.POST.items())
2212 appstruct = form.validate(controls)
2213 keys = [
2214 ViewParam.WHICH_IDNUM,
2215 ViewParam.IDNUM_VALUE,
2216 ViewParam.START_DATETIME,
2217 ViewParam.END_DATETIME,
2218 ViewParam.TASKS,
2219 ViewParam.ALL_TASKS,
2220 ViewParam.VIA_INDEX,
2221 ViewParam.VIEWTYPE,
2222 ]
2223 querydict = {k: appstruct.get(k) for k in keys}
2224 # Not so obvious this can be redirected cleanly via POST.
2225 # It is possible by returning a form that then autosubmits: see
2226 # https://stackoverflow.com/questions/46582/response-redirect-with-post-instead-of-get # noqa
2227 # However, since everything's on this server, we could just return
2228 # an appropriate Response directly. But the request information is
2229 # not sensitive, so we lose nothing by using a GET redirect:
2230 raise HTTPFound(
2231 req.route_url(
2232 Routes.CTV if as_ctv else Routes.TRACKER, _query=querydict
2233 )
2234 )
2235 except ValidationFailure as e:
2236 rendered_form = e.render()
2237 else:
2238 rendered_form = form.render()
2239 return dict(
2240 form=rendered_form, head_form_html=get_head_form_html(req, [form])
2241 )
2244@view_config(
2245 route_name=Routes.CHOOSE_TRACKER,
2246 renderer="choose_tracker.mako",
2247 http_cache=NEVER_CACHE,
2248)
2249def choose_tracker(req: "CamcopsRequest") -> Dict[str, Any]:
2250 """
2251 View to choose/configure a
2252 :class:`camcops_server.cc_modules.cc_tracker.Tracker`.
2253 """
2254 return choose_tracker_or_ctv(req, as_ctv=False)
2257@view_config(
2258 route_name=Routes.CHOOSE_CTV,
2259 renderer="choose_ctv.mako",
2260 http_cache=NEVER_CACHE,
2261)
2262def choose_ctv(req: "CamcopsRequest") -> Dict[str, Any]:
2263 """
2264 View to choose/configure a
2265 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`.
2266 """
2267 return choose_tracker_or_ctv(req, as_ctv=True)
2270def serve_tracker_or_ctv(req: "CamcopsRequest", as_ctv: bool) -> Response:
2271 """
2272 Returns a response to show a
2273 :class:`camcops_server.cc_modules.cc_tracker.Tracker` or
2274 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`, in a
2275 variety of formats (e.g. HTML, PDF, XML).
2277 Args:
2278 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2279 as_ctv: CTV, rather than tracker?
2280 """
2281 as_tracker = not as_ctv
2282 _ = req.gettext
2283 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
2284 idnum_value = req.get_int_param(ViewParam.IDNUM_VALUE)
2285 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME)
2286 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME)
2287 tasks = req.get_str_list_param(
2288 ViewParam.TASKS, validator=validate_task_tablename
2289 )
2290 all_tasks = req.get_bool_param(ViewParam.ALL_TASKS, True)
2291 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML)
2292 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True)
2294 if all_tasks:
2295 task_classes = [] # type: List[Type[Task]]
2296 else:
2297 try:
2298 task_classes = task_classes_from_table_names(
2299 tasks, sortmethod=TaskClassSortMethod.SHORTNAME
2300 )
2301 except KeyError:
2302 raise HTTPBadRequest(_("Invalid tasks specified"))
2303 if as_tracker and not all(c.provides_trackers for c in task_classes):
2304 raise HTTPBadRequest(_("Not all tasks specified provide trackers"))
2306 iddefs = [IdNumReference(which_idnum, idnum_value)]
2308 taskfilter = TaskFilter()
2309 taskfilter.task_types = [
2310 tc.__tablename__ for tc in task_classes
2311 ] # a bit silly...
2312 taskfilter.idnum_criteria = iddefs
2313 taskfilter.start_datetime = start_datetime
2314 taskfilter.end_datetime = end_datetime
2315 taskfilter.complete_only = True # trackers require complete tasks
2316 taskfilter.set_sort_method(TaskClassSortMethod.SHORTNAME)
2317 taskfilter.tasks_offering_trackers_only = as_tracker
2318 taskfilter.tasks_with_patient_only = True
2320 tracker_ctv_class = ClinicalTextView if as_ctv else Tracker
2321 tracker = tracker_ctv_class(
2322 req=req, taskfilter=taskfilter, via_index=via_index
2323 )
2325 if viewtype == ViewArg.HTML:
2326 return Response(tracker.get_html())
2327 elif viewtype == ViewArg.PDF:
2328 return PdfResponse(
2329 body=tracker.get_pdf(), filename=tracker.suggested_pdf_filename()
2330 )
2331 elif viewtype == ViewArg.PDFHTML: # debugging option
2332 return Response(tracker.get_pdf_html())
2333 elif viewtype == ViewArg.XML:
2334 include_comments = req.get_bool_param(ViewParam.INCLUDE_COMMENTS, True)
2335 return XmlResponse(tracker.get_xml(include_comments=include_comments))
2336 else:
2337 permissible = [ViewArg.HTML, ViewArg.PDF, ViewArg.PDFHTML, ViewArg.XML]
2338 raise HTTPBadRequest(
2339 f"{_('Invalid view type:')} {viewtype!r} "
2340 f"({_('permissible:')} {permissible!r})"
2341 )
2344@view_config(route_name=Routes.TRACKER, http_cache=NEVER_CACHE)
2345def serve_tracker(req: "CamcopsRequest") -> Response:
2346 """
2347 View to serve a :class:`camcops_server.cc_modules.cc_tracker.Tracker`; see
2348 :func:`serve_tracker_or_ctv`.
2349 """
2350 return serve_tracker_or_ctv(req, as_ctv=False)
2353@view_config(route_name=Routes.CTV, http_cache=NEVER_CACHE)
2354def serve_ctv(req: "CamcopsRequest") -> Response:
2355 """
2356 View to serve a
2357 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`; see
2358 :func:`serve_tracker_or_ctv`.
2359 """
2360 return serve_tracker_or_ctv(req, as_ctv=True)
2363# =============================================================================
2364# Reports
2365# =============================================================================
2368@view_config(
2369 route_name=Routes.REPORTS_MENU,
2370 renderer="reports_menu.mako",
2371 http_cache=NEVER_CACHE,
2372)
2373def reports_menu(req: "CamcopsRequest") -> Dict[str, Any]:
2374 """
2375 Offer a menu of reports.
2377 Note: Reports are not group-specific.
2378 If you're authorized to see any, you'll see the whole menu.
2379 (The *data* you get will be restricted to the group's you're authorized
2380 to run reports for.)
2381 """
2382 if not req.user.authorized_for_reports:
2383 raise HTTPBadRequest(errormsg_cannot_report(req))
2384 return {}
2387@view_config(route_name=Routes.OFFER_REPORT, http_cache=NEVER_CACHE)
2388def offer_report(req: "CamcopsRequest") -> Response:
2389 """
2390 Offer configuration options for a single report, or (following submission)
2391 redirect to serve that report (with configuration parameters in the URL).
2392 """
2393 if not req.user.authorized_for_reports:
2394 raise HTTPBadRequest(errormsg_cannot_report(req))
2395 report_id = req.get_str_param(ViewParam.REPORT_ID)
2396 report = get_report_instance(report_id)
2397 _ = req.gettext
2398 if not report:
2399 raise HTTPBadRequest(f"{_('No such report ID:')} {report_id!r}")
2400 if report.superuser_only and not req.user.superuser:
2401 raise HTTPBadRequest(
2402 f"{_('Report is restricted to the superuser:')} {report_id!r}"
2403 )
2404 form = report.get_form(req)
2405 if FormAction.SUBMIT in req.POST:
2406 try:
2407 controls = list(req.POST.items())
2408 appstruct = form.validate(controls) # may raise
2409 keys = report.get_http_query_keys()
2410 querydict = {k: appstruct.get(k) for k in keys}
2411 querydict[ViewParam.REPORT_ID] = report_id
2412 querydict[ViewParam.PAGE] = 1
2413 # Send the user to the actual data using GET: this allows page
2414 # navigation whilst maintaining any report-specific parameters.
2415 raise HTTPFound(req.route_url(Routes.REPORT, _query=querydict))
2416 except ValidationFailure as e:
2417 rendered_form = e.render()
2418 else:
2419 rendered_form = form.render({ViewParam.REPORT_ID: report_id})
2420 return render_to_response(
2421 "report_offer.mako",
2422 dict(
2423 report=report,
2424 form=rendered_form,
2425 head_form_html=get_head_form_html(req, [form]),
2426 ),
2427 request=req,
2428 )
2431@view_config(route_name=Routes.REPORT, http_cache=NEVER_CACHE)
2432def serve_report(req: "CamcopsRequest") -> Response:
2433 """
2434 Serve a configured report.
2435 """
2436 if not req.user.authorized_for_reports:
2437 raise HTTPBadRequest(errormsg_cannot_report(req))
2438 report_id = req.get_str_param(ViewParam.REPORT_ID)
2439 report = get_report_instance(report_id)
2440 _ = req.gettext
2441 if not report:
2442 raise HTTPBadRequest(f"{_('No such report ID:')} {report_id!r}")
2443 if report.superuser_only and not req.user.superuser:
2444 raise HTTPBadRequest(
2445 f"{_('Report is restricted to the superuser:')} {report_id!r}"
2446 )
2448 return report.get_response(req)
2451# =============================================================================
2452# Research downloads
2453# =============================================================================
2456@view_config(route_name=Routes.OFFER_BASIC_DUMP, http_cache=NEVER_CACHE)
2457def offer_basic_dump(req: "CamcopsRequest") -> Response:
2458 """
2459 View to configure a basic research dump.
2460 Following submission success, it redirects to a view serving a TSV/ZIP
2461 dump.
2462 """
2463 if not req.user.authorized_to_dump:
2464 raise HTTPBadRequest(errormsg_cannot_dump(req))
2465 form = OfferBasicDumpForm(request=req)
2466 if FormAction.SUBMIT in req.POST:
2467 try:
2468 controls = list(req.POST.items())
2469 appstruct = form.validate(controls)
2470 manual = appstruct.get(ViewParam.MANUAL)
2471 querydict = {
2472 ViewParam.DUMP_METHOD: appstruct.get(ViewParam.DUMP_METHOD),
2473 ViewParam.SORT: appstruct.get(ViewParam.SORT),
2474 ViewParam.GROUP_IDS: manual.get(ViewParam.GROUP_IDS),
2475 ViewParam.TASKS: manual.get(ViewParam.TASKS),
2476 ViewParam.VIEWTYPE: appstruct.get(ViewParam.VIEWTYPE),
2477 ViewParam.DELIVERY_MODE: appstruct.get(
2478 ViewParam.DELIVERY_MODE
2479 ),
2480 ViewParam.INCLUDE_SCHEMA: appstruct.get(
2481 ViewParam.INCLUDE_SCHEMA
2482 ),
2483 ViewParam.SIMPLIFIED: appstruct.get(ViewParam.SIMPLIFIED),
2484 }
2485 # We could return a response, or redirect via GET.
2486 # The request is not sensitive, so let's redirect.
2487 return HTTPFound(
2488 req.route_url(Routes.BASIC_DUMP, _query=querydict)
2489 )
2490 except ValidationFailure as e:
2491 rendered_form = e.render()
2492 else:
2493 rendered_form = form.render()
2494 return render_to_response(
2495 "dump_basic_offer.mako",
2496 dict(
2497 form=rendered_form, head_form_html=get_head_form_html(req, [form])
2498 ),
2499 request=req,
2500 )
2503def get_dump_collection(req: "CamcopsRequest") -> TaskCollection:
2504 """
2505 Returns the collection of tasks being requested for a dump operation.
2506 Raises an error if the request is bad.
2507 """
2508 if not req.user.authorized_to_dump:
2509 raise HTTPBadRequest(errormsg_cannot_dump(req))
2510 # -------------------------------------------------------------------------
2511 # Get parameters
2512 # -------------------------------------------------------------------------
2513 dump_method = req.get_str_param(ViewParam.DUMP_METHOD)
2514 group_ids = req.get_int_list_param(ViewParam.GROUP_IDS)
2515 task_names = req.get_str_list_param(
2516 ViewParam.TASKS, validator=validate_task_tablename
2517 )
2519 # -------------------------------------------------------------------------
2520 # Select tasks
2521 # -------------------------------------------------------------------------
2522 if dump_method == ViewArg.EVERYTHING:
2523 taskfilter = TaskFilter()
2524 elif dump_method == ViewArg.USE_SESSION_FILTER:
2525 taskfilter = req.camcops_session.get_task_filter()
2526 elif dump_method == ViewArg.SPECIFIC_TASKS_GROUPS:
2527 taskfilter = TaskFilter()
2528 taskfilter.task_types = task_names
2529 taskfilter.group_ids = group_ids
2530 else:
2531 _ = req.gettext
2532 raise HTTPBadRequest(
2533 f"{_('Bad parameter:')} "
2534 f"{ViewParam.DUMP_METHOD}={dump_method!r}"
2535 )
2536 return TaskCollection(
2537 req=req,
2538 taskfilter=taskfilter,
2539 as_dump=True,
2540 sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC,
2541 )
2544@view_config(route_name=Routes.BASIC_DUMP, http_cache=NEVER_CACHE)
2545def serve_basic_dump(req: "CamcopsRequest") -> Response:
2546 """
2547 View serving a spreadsheet-style basic research dump.
2548 """
2549 # Get view-specific parameters
2550 simplified = req.get_bool_param(ViewParam.SIMPLIFIED, False)
2551 sort_by_heading = req.get_bool_param(ViewParam.SORT, False)
2552 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.XLSX, lower=True)
2553 delivery_mode = req.get_str_param(
2554 ViewParam.DELIVERY_MODE, ViewArg.EMAIL, lower=True
2555 )
2556 include_schema = req.get_bool_param(ViewParam.INCLUDE_SCHEMA, False)
2558 # Get tasks (and perform checks)
2559 collection = get_dump_collection(req)
2560 # Create object that knows how to export
2561 exporter = make_exporter(
2562 req=req,
2563 collection=collection,
2564 options=DownloadOptions(
2565 # Exporting to spreadsheets
2566 user_id=req.user_id,
2567 viewtype=viewtype,
2568 delivery_mode=delivery_mode,
2569 spreadsheet_simplified=simplified,
2570 spreadsheet_sort_by_heading=sort_by_heading,
2571 include_information_schema_columns=include_schema,
2572 include_summary_schema=True,
2573 ),
2574 ) # may raise
2575 # Export, or schedule an email/download
2576 return exporter.immediate_response(req)
2579@view_config(route_name=Routes.OFFER_SQL_DUMP, http_cache=NEVER_CACHE)
2580def offer_sql_dump(req: "CamcopsRequest") -> Response:
2581 """
2582 View to configure a SQL research dump.
2583 Following submission success, it redirects to a view serving the SQL dump.
2584 """
2585 if not req.user.authorized_to_dump:
2586 raise HTTPBadRequest(errormsg_cannot_dump(req))
2587 form = OfferSqlDumpForm(request=req)
2588 if FormAction.SUBMIT in req.POST:
2589 try:
2590 controls = list(req.POST.items())
2591 appstruct = form.validate(controls)
2592 manual = appstruct.get(ViewParam.MANUAL)
2593 querydict = {
2594 ViewParam.DUMP_METHOD: appstruct.get(ViewParam.DUMP_METHOD),
2595 ViewParam.SQLITE_METHOD: appstruct.get(
2596 ViewParam.SQLITE_METHOD
2597 ),
2598 ViewParam.INCLUDE_BLOBS: appstruct.get(
2599 ViewParam.INCLUDE_BLOBS
2600 ),
2601 ViewParam.PATIENT_ID_PER_ROW: appstruct.get(
2602 ViewParam.PATIENT_ID_PER_ROW
2603 ),
2604 ViewParam.GROUP_IDS: manual.get(ViewParam.GROUP_IDS),
2605 ViewParam.TASKS: manual.get(ViewParam.TASKS),
2606 ViewParam.DELIVERY_MODE: appstruct.get(
2607 ViewParam.DELIVERY_MODE
2608 ),
2609 ViewParam.INCLUDE_SCHEMA: appstruct.get(
2610 ViewParam.INCLUDE_SCHEMA
2611 ),
2612 }
2613 # We could return a response, or redirect via GET.
2614 # The request is not sensitive, so let's redirect.
2615 return HTTPFound(req.route_url(Routes.SQL_DUMP, _query=querydict))
2616 except ValidationFailure as e:
2617 rendered_form = e.render()
2618 else:
2619 rendered_form = form.render()
2620 return render_to_response(
2621 "dump_sql_offer.mako",
2622 dict(
2623 form=rendered_form, head_form_html=get_head_form_html(req, [form])
2624 ),
2625 request=req,
2626 )
2629@view_config(route_name=Routes.SQL_DUMP, http_cache=NEVER_CACHE)
2630def sql_dump(req: "CamcopsRequest") -> Response:
2631 """
2632 View serving an SQL dump in the chosen format (e.g. SQLite binary, SQL).
2633 """
2634 # Get view-specific parameters
2635 sqlite_method = req.get_str_param(ViewParam.SQLITE_METHOD)
2636 include_blobs = req.get_bool_param(ViewParam.INCLUDE_BLOBS, False)
2637 patient_id_per_row = req.get_bool_param(ViewParam.PATIENT_ID_PER_ROW, True)
2638 delivery_mode = req.get_str_param(
2639 ViewParam.DELIVERY_MODE, ViewArg.EMAIL, lower=True
2640 )
2641 include_schema = req.get_bool_param(ViewParam.INCLUDE_SCHEMA, False)
2643 # Get tasks (and perform checks)
2644 collection = get_dump_collection(req)
2645 # Create object that knows how to export
2646 exporter = make_exporter(
2647 req=req,
2648 collection=collection,
2649 options=DownloadOptions(
2650 # Exporting to SQL
2651 user_id=req.user_id,
2652 viewtype=sqlite_method,
2653 delivery_mode=delivery_mode,
2654 db_include_blobs=include_blobs,
2655 db_patient_id_per_row=patient_id_per_row,
2656 include_information_schema_columns=include_schema,
2657 include_summary_schema=include_schema, # doesn't do much for SQL export at present # noqa
2658 ),
2659 ) # may raise
2660 # Export, or schedule an email/download
2661 return exporter.immediate_response(req)
2664# noinspection PyUnusedLocal
2665@view_config(
2666 route_name=Routes.DOWNLOAD_AREA,
2667 renderer="download_area.mako",
2668 http_cache=NEVER_CACHE,
2669)
2670def download_area(req: "CamcopsRequest") -> Dict[str, Any]:
2671 """
2672 Shows the user download area.
2673 """
2674 userdir = req.user_download_dir
2675 if userdir:
2676 files = UserDownloadFile.from_directory_scan(
2677 directory=userdir,
2678 permitted_lifespan_min=req.config.user_download_file_lifetime_min,
2679 req=req,
2680 )
2681 else:
2682 files = [] # type: ignore[no-redef] # type: List[UserDownloadFile]
2683 return dict(
2684 files=files,
2685 available=bytes2human(req.user_download_bytes_available),
2686 permitted=bytes2human(req.user_download_bytes_permitted),
2687 used=bytes2human(req.user_download_bytes_used),
2688 lifetime_min=req.config.user_download_file_lifetime_min,
2689 )
2692@view_config(route_name=Routes.DOWNLOAD_FILE, http_cache=NEVER_CACHE)
2693def download_file(req: "CamcopsRequest") -> Response:
2694 """
2695 Downloads a file.
2696 """
2697 _ = req.gettext
2698 filename = req.get_str_param(
2699 ViewParam.FILENAME, "", validator=validate_download_filename
2700 )
2701 # Security comes here: we do NOT permit any path information in the
2702 # filename. It MUST be relative to and within the user download directory.
2703 # We cannot trust the input.
2704 filename = os.path.basename(filename)
2705 udf = UserDownloadFile(directory=req.user_download_dir, filename=filename)
2706 if not udf.exists:
2707 raise HTTPBadRequest(f'{_("No such file:")} {filename}')
2708 try:
2709 return BinaryResponse(
2710 body=udf.contents,
2711 filename=udf.filename,
2712 content_type=MimeType.BINARY,
2713 as_inline=False,
2714 )
2715 except OSError:
2716 raise HTTPBadRequest(f'{_("Error reading file:")} {filename}')
2719@view_config(
2720 route_name=Routes.DELETE_FILE,
2721 request_method=HttpMethod.POST,
2722 http_cache=NEVER_CACHE,
2723)
2724def delete_file(req: "CamcopsRequest") -> Response:
2725 """
2726 Deletes a file.
2727 """
2728 form = UserDownloadDeleteForm(request=req)
2729 controls = list(req.POST.items())
2730 appstruct = form.validate(controls) # CSRF; may raise ValidationError
2731 filename = appstruct.get(ViewParam.FILENAME, "")
2732 # Security comes here: we do NOT permit any path information in the
2733 # filename. It MUST be relative to and within the user download directory.
2734 # We cannot trust the input.
2735 filename = os.path.basename(filename)
2736 udf = UserDownloadFile(directory=req.user_download_dir, filename=filename)
2737 if not udf.exists:
2738 _ = req.gettext
2739 raise HTTPBadRequest(f'{_("No such file:")} {filename}')
2740 udf.delete()
2741 return HTTPFound(req.route_url(Routes.DOWNLOAD_AREA)) # redirect
2744# =============================================================================
2745# View DDL (table definitions)
2746# =============================================================================
2748LEXERMAP = {
2749 SqlaDialectName.MYSQL: pygments.lexers.sql.MySqlLexer,
2750 SqlaDialectName.MSSQL: pygments.lexers.sql.SqlLexer, # generic
2751 SqlaDialectName.ORACLE: pygments.lexers.sql.SqlLexer, # generic
2752 SqlaDialectName.FIREBIRD: pygments.lexers.sql.SqlLexer, # generic
2753 SqlaDialectName.POSTGRES: pygments.lexers.sql.PostgresLexer,
2754 SqlaDialectName.SQLITE: pygments.lexers.sql.SqlLexer, # generic; SqliteConsoleLexer is wrong # noqa
2755 SqlaDialectName.SYBASE: pygments.lexers.sql.SqlLexer, # generic
2756}
2759def format_sql_as_html(
2760 sql: str, dialect: str = SqlaDialectName.MYSQL
2761) -> Tuple[str, str]:
2762 """
2763 Formats SQL as HTML with CSS.
2764 """
2765 lexer = LEXERMAP[dialect]()
2766 # noinspection PyUnresolvedReferences
2767 formatter = pygments.formatters.HtmlFormatter()
2768 html = pygments.highlight(sql, lexer, formatter)
2769 css = formatter.get_style_defs(".highlight")
2770 return html, css
2773@view_config(route_name=Routes.VIEW_DDL, http_cache=NEVER_CACHE)
2774def view_ddl(req: "CamcopsRequest") -> Response:
2775 """
2776 Inspect table definitions (data definition language, DDL) with field
2777 comments.
2779 2021-04-30: restricted to users with "dump" authority -- not because this
2780 is a vulnerability, as the penetration testers suggested, but just to make
2781 it consistent with the menu item for this.
2782 """
2783 if not req.user.authorized_to_dump:
2784 raise HTTPBadRequest(errormsg_cannot_dump(req))
2785 form = ViewDdlForm(request=req)
2786 if FormAction.SUBMIT in req.POST:
2787 try:
2788 controls = list(req.POST.items())
2789 appstruct = form.validate(controls)
2790 dialect = appstruct.get(ViewParam.DIALECT)
2791 ddl = get_all_ddl(dialect_name=dialect)
2792 html, css = format_sql_as_html(ddl, dialect)
2793 return render_to_response(
2794 "introspect_file.mako",
2795 dict(css=css, code_html=html),
2796 request=req,
2797 )
2798 except ValidationFailure as e:
2799 rendered_form = e.render()
2800 else:
2801 rendered_form = form.render()
2802 current_dialect = get_dialect_name(get_engine_from_session(req.dbsession))
2803 sql_dialect_choices = get_sql_dialect_choices(req)
2804 current_dialect_description = {k: v for k, v in sql_dialect_choices}.get(
2805 current_dialect, "?"
2806 )
2807 return render_to_response(
2808 "view_ddl_choose_dialect.mako",
2809 dict(
2810 current_dialect=current_dialect,
2811 current_dialect_description=current_dialect_description,
2812 form=rendered_form,
2813 head_form_html=get_head_form_html(req, [form]),
2814 ),
2815 request=req,
2816 )
2819# =============================================================================
2820# View audit trail
2821# =============================================================================
2824@view_config(
2825 route_name=Routes.OFFER_AUDIT_TRAIL,
2826 permission=Permission.SUPERUSER,
2827 http_cache=NEVER_CACHE,
2828)
2829def offer_audit_trail(req: "CamcopsRequest") -> Response:
2830 """
2831 View to configure how we'll view the audit trail. Once configured, it
2832 redirects to a view that shows the audit trail (with query parameters in
2833 the URL).
2834 """
2835 form = AuditTrailForm(request=req)
2836 if FormAction.SUBMIT in req.POST:
2837 try:
2838 controls = list(req.POST.items())
2839 appstruct = form.validate(controls)
2840 keys = [
2841 ViewParam.ROWS_PER_PAGE,
2842 ViewParam.START_DATETIME,
2843 ViewParam.END_DATETIME,
2844 ViewParam.SOURCE,
2845 ViewParam.REMOTE_IP_ADDR,
2846 ViewParam.USERNAME,
2847 ViewParam.TABLE_NAME,
2848 ViewParam.SERVER_PK,
2849 ViewParam.TRUNCATE,
2850 ]
2851 querydict = {k: appstruct.get(k) for k in keys}
2852 querydict[ViewParam.PAGE] = 1
2853 # Send the user to the actual data using GET:
2854 # (the parameters are NOT sensitive)
2855 raise HTTPFound(
2856 req.route_url(Routes.VIEW_AUDIT_TRAIL, _query=querydict)
2857 )
2858 except ValidationFailure as e:
2859 rendered_form = e.render()
2860 else:
2861 rendered_form = form.render()
2862 return render_to_response(
2863 "audit_trail_choices.mako",
2864 dict(
2865 form=rendered_form, head_form_html=get_head_form_html(req, [form])
2866 ),
2867 request=req,
2868 )
2871AUDIT_TRUNCATE_AT = 100
2874@view_config(
2875 route_name=Routes.VIEW_AUDIT_TRAIL,
2876 permission=Permission.SUPERUSER,
2877 http_cache=NEVER_CACHE,
2878)
2879def view_audit_trail(req: "CamcopsRequest") -> Response:
2880 """
2881 View to serve the audit trail.
2882 """
2883 rows_per_page = req.get_int_param(
2884 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
2885 )
2886 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME)
2887 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME)
2888 source = req.get_str_param(ViewParam.SOURCE, None)
2889 remote_addr = req.get_str_param(
2890 ViewParam.REMOTE_IP_ADDR, None, validator=validate_ip_address
2891 )
2892 username = req.get_str_param(
2893 ViewParam.USERNAME, None, validator=validate_username
2894 )
2895 table_name = req.get_str_param(
2896 ViewParam.TABLE_NAME, None, validator=validate_task_tablename
2897 )
2898 server_pk = req.get_int_param(ViewParam.SERVER_PK, None)
2899 truncate = req.get_bool_param(ViewParam.TRUNCATE, True)
2900 page_num = req.get_int_param(ViewParam.PAGE, 1)
2902 conditions = [] # type: List[str]
2904 def add_condition(key: str, value: Any) -> None:
2905 conditions.append(f"{key} = {value}")
2907 dbsession = req.dbsession
2908 q = dbsession.query(AuditEntry)
2909 if start_datetime:
2910 q = q.filter(AuditEntry.when_access_utc >= start_datetime)
2911 add_condition(ViewParam.START_DATETIME, start_datetime)
2912 if end_datetime:
2913 q = q.filter(AuditEntry.when_access_utc < end_datetime)
2914 add_condition(ViewParam.END_DATETIME, end_datetime)
2915 if source:
2916 q = q.filter(AuditEntry.source == source)
2917 add_condition(ViewParam.SOURCE, source)
2918 if remote_addr:
2919 q = q.filter(AuditEntry.remote_addr == remote_addr)
2920 add_condition(ViewParam.REMOTE_IP_ADDR, remote_addr)
2921 if username:
2922 # https://stackoverflow.com/questions/8561470/sqlalchemy-filtering-by-relationship-attribute # noqa
2923 q = q.join(User).filter(User.username == username)
2924 add_condition(ViewParam.USERNAME, username)
2925 if table_name:
2926 q = q.filter(AuditEntry.table_name == table_name)
2927 add_condition(ViewParam.TABLE_NAME, table_name)
2928 if server_pk is not None:
2929 q = q.filter(AuditEntry.server_pk == server_pk)
2930 add_condition(ViewParam.SERVER_PK, server_pk)
2932 q = q.order_by(desc(AuditEntry.id))
2934 # audit_entries = dbsession.execute(q).fetchall()
2935 # ... no! That executes to give you row-type results.
2936 # audit_entries = q.all()
2937 # ... yes! But let's paginate, too:
2938 page = SqlalchemyOrmPage(
2939 query=q,
2940 page=page_num,
2941 items_per_page=rows_per_page,
2942 url_maker=PageUrl(req),
2943 request=req,
2944 )
2945 return render_to_response(
2946 "audit_trail_view.mako",
2947 dict(
2948 conditions="; ".join(conditions),
2949 page=page,
2950 truncate=truncate,
2951 truncate_at=AUDIT_TRUNCATE_AT,
2952 ),
2953 request=req,
2954 )
2957# =============================================================================
2958# View export logs
2959# =============================================================================
2960# Overview:
2961# - View exported tasks (ExportedTask) collectively
2962# ... option to filter by recipient_name
2963# ... option to filter by date/etc.
2964# - View exported tasks (ExportedTask) individually
2965# ... hyperlinks to individual views of:
2966# Email (not necessary: ExportedTaskEmail)
2967# ExportRecipient
2968# ExportedTaskFileGroup
2969# ExportedTaskHL7Message
2972@view_config(
2973 route_name=Routes.OFFER_EXPORTED_TASK_LIST,
2974 permission=Permission.SUPERUSER,
2975 http_cache=NEVER_CACHE,
2976)
2977def offer_exported_task_list(req: "CamcopsRequest") -> Response:
2978 """
2979 View to choose how we'll view the exported task log.
2980 """
2981 form = ExportedTaskListForm(request=req)
2982 if FormAction.SUBMIT in req.POST:
2983 try:
2984 controls = list(req.POST.items())
2985 appstruct = form.validate(controls)
2986 keys = [
2987 ViewParam.ROWS_PER_PAGE,
2988 ViewParam.RECIPIENT_NAME,
2989 ViewParam.TABLE_NAME,
2990 ViewParam.SERVER_PK,
2991 ViewParam.ID,
2992 ViewParam.START_DATETIME,
2993 ViewParam.END_DATETIME,
2994 ]
2995 querydict = {k: appstruct.get(k) for k in keys}
2996 querydict[ViewParam.PAGE] = 1
2997 # Send the user to the actual data using GET
2998 # (the parameters are NOT sensitive)
2999 return HTTPFound(
3000 req.route_url(Routes.VIEW_EXPORTED_TASK_LIST, _query=querydict)
3001 )
3002 except ValidationFailure as e:
3003 rendered_form = e.render()
3004 else:
3005 rendered_form = form.render()
3006 return render_to_response(
3007 "exported_task_choose.mako",
3008 dict(
3009 form=rendered_form, head_form_html=get_head_form_html(req, [form])
3010 ),
3011 request=req,
3012 )
3015@view_config(
3016 route_name=Routes.VIEW_EXPORTED_TASK_LIST,
3017 permission=Permission.SUPERUSER,
3018 http_cache=NEVER_CACHE,
3019)
3020def view_exported_task_list(req: "CamcopsRequest") -> Response:
3021 """
3022 View to serve the exported task log.
3023 """
3024 rows_per_page = req.get_int_param(
3025 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
3026 )
3027 recipient_name = req.get_str_param(
3028 ViewParam.RECIPIENT_NAME,
3029 None,
3030 validator=validate_export_recipient_name,
3031 )
3032 table_name = req.get_str_param(
3033 ViewParam.TABLE_NAME, None, validator=validate_task_tablename
3034 )
3035 server_pk = req.get_int_param(ViewParam.SERVER_PK, None)
3036 et_id = req.get_int_param(ViewParam.ID, None)
3037 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME)
3038 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME)
3039 page_num = req.get_int_param(ViewParam.PAGE, 1)
3041 conditions = [] # type: List[str]
3043 def add_condition(key: str, value: Any) -> None:
3044 conditions.append(f"{key} = {value}")
3046 dbsession = req.dbsession
3047 q = dbsession.query(ExportedTask)
3049 if recipient_name:
3050 q = q.join(ExportRecipient).filter(
3051 ExportRecipient.recipient_name == recipient_name
3052 )
3053 add_condition(ViewParam.RECIPIENT_NAME, recipient_name)
3054 if table_name:
3055 q = q.filter(ExportedTask.basetable == table_name)
3056 add_condition(ViewParam.TABLE_NAME, table_name)
3057 if server_pk is not None:
3058 q = q.filter(ExportedTask.task_server_pk == server_pk)
3059 add_condition(ViewParam.SERVER_PK, server_pk)
3060 if et_id is not None:
3061 q = q.filter(ExportedTask.id == et_id)
3062 add_condition(ViewParam.ID, et_id)
3063 if start_datetime:
3064 q = q.filter(ExportedTask.start_at_utc >= start_datetime)
3065 add_condition(ViewParam.START_DATETIME, start_datetime)
3066 if end_datetime:
3067 q = q.filter(ExportedTask.start_at_utc < end_datetime)
3068 add_condition(ViewParam.END_DATETIME, end_datetime)
3070 q = q.order_by(desc(ExportedTask.id))
3072 page = SqlalchemyOrmPage(
3073 query=q,
3074 page=page_num,
3075 items_per_page=rows_per_page,
3076 url_maker=PageUrl(req),
3077 request=req,
3078 )
3079 return render_to_response(
3080 "exported_task_list.mako",
3081 dict(conditions="; ".join(conditions), page=page),
3082 request=req,
3083 )
3086# =============================================================================
3087# View helpers for ORM objects
3088# =============================================================================
3091def _view_generic_object_by_id(
3092 req: "CamcopsRequest",
3093 cls: Type,
3094 instance_name_for_mako: str,
3095 mako_template: str,
3096) -> Response:
3097 """
3098 Boilerplate code to view an individual SQLAlchemy ORM object. The object
3099 must have an integer ``id`` field as its primary key, and the ID value must
3100 be present in the ``ViewParam.ID`` field of the request.
3102 Args:
3103 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3104 cls: the SQLAlchemy ORM class
3105 instance_name_for_mako: what will the object be called when it's
3106 mako_template: Mako template filename
3108 Returns:
3109 :class:`pyramid.response.Response`
3110 """
3111 item_id = req.get_int_param(ViewParam.ID, None)
3112 dbsession = req.dbsession
3113 # noinspection PyUnresolvedReferences
3114 obj = dbsession.query(cls).filter(cls.id == item_id).first()
3115 if obj is None:
3116 _ = req.gettext
3117 raise HTTPBadRequest(
3118 f"{_('Bad ID for object type')} " f"{cls.__name__}: {item_id}"
3119 )
3120 d = {instance_name_for_mako: obj}
3121 return render_to_response(mako_template, d, request=req)
3124# =============================================================================
3125# Specialized views for ORM objects
3126# =============================================================================
3129@view_config(
3130 route_name=Routes.VIEW_EMAIL,
3131 permission=Permission.SUPERUSER,
3132 http_cache=NEVER_CACHE,
3133)
3134def view_email(req: "CamcopsRequest") -> Response:
3135 """
3136 View on an individual :class:`camcops_server.cc_modules.cc_email.Email`.
3137 """
3138 return _view_generic_object_by_id(
3139 req=req,
3140 cls=Email,
3141 instance_name_for_mako="email",
3142 mako_template="view_email.mako",
3143 )
3146@view_config(
3147 route_name=Routes.VIEW_EXPORT_RECIPIENT,
3148 permission=Permission.SUPERUSER,
3149 http_cache=NEVER_CACHE,
3150)
3151def view_export_recipient(req: "CamcopsRequest") -> Response:
3152 """
3153 View on an individual
3154 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTask`.
3155 """
3156 return _view_generic_object_by_id(
3157 req=req,
3158 cls=ExportRecipient,
3159 instance_name_for_mako="recipient",
3160 mako_template="export_recipient.mako",
3161 )
3164@view_config(
3165 route_name=Routes.VIEW_EXPORTED_TASK,
3166 permission=Permission.SUPERUSER,
3167 http_cache=NEVER_CACHE,
3168)
3169def view_exported_task(req: "CamcopsRequest") -> Response:
3170 """
3171 View on an individual
3172 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTask`.
3173 """
3174 return _view_generic_object_by_id(
3175 req=req,
3176 cls=ExportedTask,
3177 instance_name_for_mako="et",
3178 mako_template="exported_task.mako",
3179 )
3182@view_config(
3183 route_name=Routes.VIEW_EXPORTED_TASK_EMAIL,
3184 permission=Permission.SUPERUSER,
3185 http_cache=NEVER_CACHE,
3186)
3187def view_exported_task_email(req: "CamcopsRequest") -> Response:
3188 """
3189 View on an individual
3190 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskEmail`.
3191 """
3192 return _view_generic_object_by_id(
3193 req=req,
3194 cls=ExportedTaskEmail,
3195 instance_name_for_mako="ete",
3196 mako_template="exported_task_email.mako",
3197 )
3200@view_config(
3201 route_name=Routes.VIEW_EXPORTED_TASK_FILE_GROUP,
3202 permission=Permission.SUPERUSER,
3203 http_cache=NEVER_CACHE,
3204)
3205def view_exported_task_file_group(req: "CamcopsRequest") -> Response:
3206 """
3207 View on an individual
3208 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup`.
3209 """
3210 return _view_generic_object_by_id(
3211 req=req,
3212 cls=ExportedTaskFileGroup,
3213 instance_name_for_mako="fg",
3214 mako_template="exported_task_file_group.mako",
3215 )
3218@view_config(
3219 route_name=Routes.VIEW_EXPORTED_TASK_HL7_MESSAGE,
3220 permission=Permission.SUPERUSER,
3221 http_cache=NEVER_CACHE,
3222)
3223def view_exported_task_hl7_message(req: "CamcopsRequest") -> Response:
3224 """
3225 View on an individual
3226 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskHL7Message`.
3227 """
3228 return _view_generic_object_by_id(
3229 req=req,
3230 cls=ExportedTaskHL7Message,
3231 instance_name_for_mako="msg",
3232 mako_template="exported_task_hl7_message.mako",
3233 )
3236@view_config(
3237 route_name=Routes.VIEW_EXPORTED_TASK_REDCAP,
3238 permission=Permission.SUPERUSER,
3239 http_cache=NEVER_CACHE,
3240)
3241def view_exported_task_redcap(req: "CamcopsRequest") -> Response:
3242 """
3243 View on an individual
3244 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskRedcap`.
3245 """
3246 return _view_generic_object_by_id(
3247 req=req,
3248 cls=ExportedTaskRedcap,
3249 instance_name_for_mako="etr",
3250 mako_template="exported_task_redcap.mako",
3251 )
3254@view_config(
3255 route_name=Routes.VIEW_EXPORTED_TASK_FHIR,
3256 permission=Permission.SUPERUSER,
3257 http_cache=NEVER_CACHE,
3258)
3259def view_exported_task_fhir(req: "CamcopsRequest") -> Response:
3260 """
3261 View on an individual
3262 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskRedcap`.
3263 """
3264 return _view_generic_object_by_id(
3265 req=req,
3266 cls=ExportedTaskFhir,
3267 instance_name_for_mako="etf",
3268 mako_template="exported_task_fhir.mako",
3269 )
3272@view_config(
3273 route_name=Routes.VIEW_EXPORTED_TASK_FHIR_ENTRY,
3274 permission=Permission.SUPERUSER,
3275 http_cache=NEVER_CACHE,
3276)
3277def view_exported_task_fhir_entry(req: "CamcopsRequest") -> Response:
3278 """
3279 View on an individual
3280 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskRedcap`.
3281 """
3282 return _view_generic_object_by_id(
3283 req=req,
3284 cls=ExportedTaskFhirEntry,
3285 instance_name_for_mako="etfe",
3286 mako_template="exported_task_fhir_entry.mako",
3287 )
3290# =============================================================================
3291# User/server info views
3292# =============================================================================
3295@view_config(
3296 route_name=Routes.VIEW_OWN_USER_INFO,
3297 renderer="view_own_user_info.mako",
3298 http_cache=NEVER_CACHE,
3299)
3300def view_own_user_info(req: "CamcopsRequest") -> Dict[str, Any]:
3301 """
3302 View to provide information about your own user.
3303 """
3304 groups_page = CamcopsPage(
3305 req.user.groups, url_maker=PageUrl(req), request=req
3306 )
3307 return dict(
3308 user=req.user,
3309 groups_page=groups_page,
3310 valid_which_idnums=req.valid_which_idnums,
3311 )
3314@view_config(
3315 route_name=Routes.VIEW_SERVER_INFO,
3316 renderer="view_server_info.mako",
3317 http_cache=NEVER_CACHE,
3318)
3319def view_server_info(req: "CamcopsRequest") -> Dict[str, Any]:
3320 """
3321 View to show the server's ID policies, etc.
3322 """
3323 _ = req.gettext
3324 now = req.now
3325 recent_activity = OrderedDict(
3326 [
3327 (
3328 _("Last 1 minute"),
3329 CamcopsSession.n_sessions_active_since(
3330 req, now.subtract(minutes=1)
3331 ),
3332 ),
3333 (
3334 _("Last 5 minutes"),
3335 CamcopsSession.n_sessions_active_since(
3336 req, now.subtract(minutes=5)
3337 ),
3338 ),
3339 (
3340 _("Last 10 minutes"),
3341 CamcopsSession.n_sessions_active_since(
3342 req, now.subtract(minutes=10)
3343 ),
3344 ),
3345 (
3346 _("Last 1 hour"),
3347 CamcopsSession.n_sessions_active_since(
3348 req, now.subtract(hours=1)
3349 ),
3350 ),
3351 ]
3352 )
3353 return dict(
3354 idnum_definitions=req.idnum_definitions,
3355 string_families=req.extrastring_families(),
3356 recent_activity=recent_activity,
3357 session_timeout_minutes=req.config.session_timeout_minutes,
3358 restricted_tasks=req.config.restricted_tasks,
3359 )
3362# =============================================================================
3363# User management
3364# =============================================================================
3367def get_user_from_request_user_id_or_raise(req: "CamcopsRequest") -> User:
3368 """
3369 Returns the :class:`camcops_server.cc_modules.cc_user.User` represented by
3370 the request's ``ViewParam.USER_ID`` parameter, or raise
3371 :exc:`HTTPBadRequest`.
3372 """
3373 user_id = req.get_int_param(ViewParam.USER_ID)
3374 user = User.get_user_by_id(req.dbsession, user_id)
3375 if not user:
3376 _ = req.gettext
3377 raise HTTPBadRequest(f"{_('No such user ID:')} {user_id!r}")
3378 return user
3381def query_users_that_i_manage(req: "CamcopsRequest") -> Query:
3382 me = req.user
3383 return me.managed_users()
3386@view_config(
3387 route_name=Routes.VIEW_ALL_USERS,
3388 permission=Permission.GROUPADMIN,
3389 renderer="users_view.mako",
3390 http_cache=NEVER_CACHE,
3391)
3392def view_all_users(req: "CamcopsRequest") -> Dict[str, Any]:
3393 """
3394 View all users that the current user administers. The view has hyperlinks
3395 to edit those users too.
3396 """
3397 include_auto_generated = req.get_bool_param(
3398 ViewParam.INCLUDE_AUTO_GENERATED, False
3399 )
3400 rows_per_page = req.get_int_param(
3401 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
3402 )
3403 page_num = req.get_int_param(ViewParam.PAGE, 1)
3404 q = query_users_that_i_manage(req)
3405 if not include_auto_generated:
3406 q = q.filter(User.auto_generated == False) # noqa: E712
3407 page = SqlalchemyOrmPage(
3408 query=q,
3409 page=page_num,
3410 items_per_page=rows_per_page,
3411 url_maker=PageUrl(req),
3412 request=req,
3413 )
3415 form = UserFilterForm(request=req)
3416 appstruct = {ViewParam.INCLUDE_AUTO_GENERATED: include_auto_generated}
3417 rendered_form = form.render(appstruct)
3419 return dict(
3420 page=page,
3421 head_form_html=get_head_form_html(req, [form]),
3422 form=rendered_form,
3423 )
3426@view_config(
3427 route_name=Routes.VIEW_USER_EMAIL_ADDRESSES,
3428 permission=Permission.GROUPADMIN,
3429 renderer="view_user_email_addresses.mako",
3430 http_cache=NEVER_CACHE,
3431)
3432def view_user_email_addresses(req: "CamcopsRequest") -> Dict[str, Any]:
3433 """
3434 View e-mail addresses of all users that the requesting user is authorized
3435 to manage.
3436 """
3437 q = query_users_that_i_manage(req).filter(
3438 User.auto_generated == False # noqa: E712
3439 )
3440 return dict(query=q)
3443def assert_may_edit_user(req: "CamcopsRequest", user: User) -> None:
3444 """
3445 Checks that the requesting user (``req.user``) is allowed to edit the other
3446 user (``user``). Raises :exc:`HTTPBadRequest` otherwise.
3447 """
3448 may_edit, why_not = req.user.may_edit_user(req, user)
3449 if not may_edit:
3450 raise HTTPBadRequest(why_not)
3453def assert_may_administer_group(req: "CamcopsRequest", group_id: int) -> None:
3454 """
3455 Checks that the requesting user (``req.user``) is allowed to adminster the
3456 specified group (specified by ``group_id``). Raises :exc:`HTTPBadRequest`
3457 otherwise.
3458 """
3459 if not req.user.may_administer_group(group_id):
3460 _ = req.gettext
3461 raise HTTPBadRequest(_("You may not administer this group"))
3464@view_config(
3465 route_name=Routes.VIEW_USER,
3466 permission=Permission.GROUPADMIN,
3467 renderer="view_other_user_info.mako",
3468 http_cache=NEVER_CACHE,
3469)
3470def view_user(req: "CamcopsRequest") -> Dict[str, Any]:
3471 """
3472 View to show details of another user, for administrators.
3473 """
3474 user = get_user_from_request_user_id_or_raise(req)
3475 assert_may_edit_user(req, user)
3476 return dict(user=user)
3477 # Groupadmins may see some information regarding groups that aren't theirs
3478 # here, but can't alter it.
3481class EditUserBaseView(UpdateView):
3482 """
3483 Django-style view to edit a user and their groups
3484 """
3486 model_form_dict = {
3487 "username": ViewParam.USERNAME,
3488 "fullname": ViewParam.FULLNAME,
3489 "email": ViewParam.EMAIL,
3490 "must_change_password": ViewParam.MUST_CHANGE_PASSWORD,
3491 "language": ViewParam.LANGUAGE,
3492 }
3493 object_class = User
3494 pk_param = ViewParam.USER_ID
3495 server_pk_name = "id"
3496 template_name = "user_edit.mako"
3498 def get_success_url(self) -> str:
3499 return self.request.route_url(Routes.VIEW_ALL_USERS)
3501 def get_object(self) -> Any:
3502 user = cast(User, super().get_object())
3504 assert_may_edit_user(self.request, user)
3506 return user
3508 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
3509 user = cast(User, self.object)
3510 _ = self.request.gettext
3512 new_user_name = appstruct.get(ViewParam.USERNAME)
3513 existing_user = User.get_user_by_name(
3514 self.request.dbsession, new_user_name
3515 )
3516 if existing_user and existing_user.id != user.id:
3517 # noinspection PyUnresolvedReferences
3518 cant_rename_user = _("Can't rename user")
3519 conflicts = _("that conflicts with an existing user with ID")
3520 raise HTTPBadRequest(
3521 f"{cant_rename_user} {user.username!r} (#{user.id!r}) → "
3522 f"{new_user_name!r}; {conflicts} {existing_user.id!r}"
3523 )
3525 email = appstruct.get(ViewParam.EMAIL)
3526 if not email and user.mfa_method == MfaMethod.HOTP_EMAIL:
3527 message = _(
3528 "This user's email address is used for multi-factor "
3529 "authentication. If you want to remove their email "
3530 "address, you must first disable multi-factor "
3531 "authentication"
3532 )
3534 raise HTTPBadRequest(message)
3536 super().set_object_properties(appstruct)
3538 # Groups that we might change memberships for:
3539 all_fluid_groups = self.request.user.ids_of_groups_user_is_admin_for
3540 # All groups that the user is currently in:
3541 user_group_ids = user.group_ids
3542 # Group membership we won't touch:
3543 user_frozen_group_ids = list(
3544 set(user_group_ids) - set(all_fluid_groups)
3545 )
3546 group_ids = appstruct.get(ViewParam.GROUP_IDS)
3547 # Add back in the groups we're not going to alter:
3548 final_group_ids = list(set(group_ids) | set(user_frozen_group_ids))
3549 user.set_group_ids(final_group_ids)
3550 # Also, if the user was uploading to a group that they are now no
3551 # longer a member of, we need to fix that
3552 if user.upload_group_id not in final_group_ids:
3553 user.upload_group_id = None
3555 def get_form_values(self) -> Dict[str, Any]:
3556 # will populate with model_form_dict
3557 form_values = super().get_form_values()
3559 user = cast(User, self.object)
3561 # Superusers can do everything, of course.
3562 # Groupadmins can change group memberships only for groups they control
3563 # (here: "fluid"). That means that there may be a subset of group
3564 # memberships for this user that they will neither see nor be able to
3565 # alter (here: "frozen"). They can also edit only a restricted set of
3566 # permissions.
3568 # Groups that we might change memberships for:
3569 all_fluid_groups = self.request.user.ids_of_groups_user_is_admin_for
3570 # All groups that the user is currently in:
3571 user_group_ids = user.group_ids
3572 # Group memberships we might alter:
3573 user_fluid_group_ids = list(
3574 set(user_group_ids) & set(all_fluid_groups)
3575 )
3576 form_values.update(
3577 {
3578 ViewParam.USER_ID: user.id,
3579 ViewParam.GROUP_IDS: user_fluid_group_ids,
3580 }
3581 )
3583 return form_values
3586class EditUserGroupAdminView(EditUserBaseView):
3587 """
3588 For group administrators to edit a user.
3589 """
3591 form_class = EditUserGroupAdminForm
3594class EditUserSuperUserView(EditUserBaseView):
3595 """
3596 For superusers to edit a user.
3597 """
3599 form_class = EditUserFullForm
3601 def get_model_form_dict(self) -> Dict[str, Any]:
3602 model_form_dict = super().get_model_form_dict()
3603 model_form_dict["superuser"] = ViewParam.SUPERUSER
3605 return model_form_dict
3608@view_config(
3609 route_name=Routes.EDIT_USER,
3610 permission=Permission.GROUPADMIN,
3611 http_cache=NEVER_CACHE,
3612)
3613def edit_user(req: "CamcopsRequest") -> Response:
3614 """
3615 View to edit a user (for administrators).
3616 """
3617 view: EditUserBaseView
3619 if req.user.superuser:
3620 view = EditUserSuperUserView(req)
3621 else:
3622 view = EditUserGroupAdminView(req)
3624 return view.dispatch()
3627class EditUserGroupMembershipBaseView(UpdateView):
3628 """
3629 Django-style view to edit a user's group membership permissions.
3630 """
3632 model_form_dict = {
3633 "may_upload": ViewParam.MAY_UPLOAD,
3634 "may_register_devices": ViewParam.MAY_REGISTER_DEVICES,
3635 "may_use_webviewer": ViewParam.MAY_USE_WEBVIEWER,
3636 "view_all_patients_when_unfiltered": ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED, # noqa: E501
3637 "may_dump_data": ViewParam.MAY_DUMP_DATA,
3638 "may_run_reports": ViewParam.MAY_RUN_REPORTS,
3639 "may_add_notes": ViewParam.MAY_ADD_NOTES,
3640 "may_manage_patients": ViewParam.MAY_MANAGE_PATIENTS,
3641 "may_email_patients": ViewParam.MAY_EMAIL_PATIENTS,
3642 }
3644 object_class = UserGroupMembership
3645 pk_param = ViewParam.USER_GROUP_MEMBERSHIP_ID
3646 server_pk_name = "id"
3647 template_name = "user_edit_group_membership.mako"
3649 def get_success_url(self) -> str:
3650 return self.request.route_url(Routes.VIEW_ALL_USERS)
3652 def get_object(self) -> Any:
3653 # noinspection PyUnresolvedReferences
3654 ugm = cast(UserGroupMembership, super().get_object())
3655 user = ugm.user
3656 assert_may_edit_user(self.request, user)
3657 assert_may_administer_group(self.request, ugm.group_id)
3659 return ugm
3662class EditUserGroupMembershipSuperUserView(EditUserGroupMembershipBaseView):
3663 """
3664 For superusers to edit a user's group memberships.
3665 """
3667 form_class = EditUserGroupPermissionsFullForm
3669 def get_model_form_dict(self) -> Dict[str, str]:
3670 model_form_dict = super().get_model_form_dict()
3671 model_form_dict["groupadmin"] = ViewParam.GROUPADMIN
3673 return model_form_dict
3676class EditUserGroupMembershipGroupAdminView(EditUserGroupMembershipBaseView):
3677 """
3678 For group administrators to edit a user's group memberships.
3679 """
3681 form_class = EditUserGroupMembershipGroupAdminForm
3684@view_config(
3685 route_name=Routes.EDIT_USER_GROUP_MEMBERSHIP,
3686 permission=Permission.GROUPADMIN,
3687 http_cache=NEVER_CACHE,
3688)
3689def edit_user_group_membership(req: "CamcopsRequest") -> Response:
3690 """
3691 View to edit the group memberships of a user (for administrators).
3692 """
3693 if req.user.superuser:
3694 view = EditUserGroupMembershipSuperUserView(req)
3695 else:
3696 view = EditUserGroupMembershipGroupAdminView(req)
3698 return view.dispatch()
3701def set_user_upload_group(
3702 req: "CamcopsRequest", user: User, by_another: bool
3703) -> Response:
3704 """
3705 Provides a view to choose which group a user uploads into.
3707 TRUSTS ITS CALLER that this is permitted.
3709 Args:
3710 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3711 user: the :class:`camcops_server.cc_modules.cc_user.User` to edit
3712 by_another: is the current user a superuser/group administrator, i.e.
3713 another user? Determines the screen we return to afterwards.
3714 """
3715 route_back = Routes.VIEW_ALL_USERS if by_another else Routes.HOME
3716 if FormAction.CANCEL in req.POST:
3717 return HTTPFound(req.route_url(route_back))
3718 form = SetUserUploadGroupForm(request=req, user=user)
3719 # ... need to show the groups permitted to THAT user, not OUR user
3720 if FormAction.SUBMIT in req.POST:
3721 try:
3722 controls = list(req.POST.items())
3723 appstruct = form.validate(controls)
3724 # -----------------------------------------------------------------
3725 # Apply the changes
3726 # -----------------------------------------------------------------
3727 user.upload_group_id = appstruct.get(ViewParam.UPLOAD_GROUP_ID)
3728 return HTTPFound(req.route_url(route_back))
3729 except ValidationFailure as e:
3730 rendered_form = e.render()
3731 else:
3732 appstruct = {
3733 ViewParam.USER_ID: user.id,
3734 ViewParam.UPLOAD_GROUP_ID: user.upload_group_id,
3735 }
3736 rendered_form = form.render(appstruct)
3737 return render_to_response(
3738 "set_user_upload_group.mako",
3739 dict(
3740 user=user,
3741 form=rendered_form,
3742 head_form_html=get_head_form_html(req, [form]),
3743 ),
3744 request=req,
3745 )
3748@view_config(
3749 route_name=Routes.SET_OWN_USER_UPLOAD_GROUP, http_cache=NEVER_CACHE
3750)
3751def set_own_user_upload_group(req: "CamcopsRequest") -> Response:
3752 """
3753 View to set the upload group for your own user.
3754 """
3755 return set_user_upload_group(req, req.user, False)
3758@view_config(
3759 route_name=Routes.SET_OTHER_USER_UPLOAD_GROUP,
3760 permission=Permission.GROUPADMIN,
3761 http_cache=NEVER_CACHE,
3762)
3763def set_other_user_upload_group(req: "CamcopsRequest") -> Response:
3764 """
3765 View to set the upload group for another user.
3766 """
3767 user = get_user_from_request_user_id_or_raise(req)
3768 if user.id != req.user.id:
3769 assert_may_edit_user(req, user)
3770 # ... but always OK to edit this for your own user; no such check required
3771 return set_user_upload_group(req, user, True)
3774# noinspection PyTypeChecker
3775@view_config(
3776 route_name=Routes.UNLOCK_USER,
3777 permission=Permission.GROUPADMIN,
3778 http_cache=NEVER_CACHE,
3779)
3780def unlock_user(req: "CamcopsRequest") -> Response:
3781 """
3782 View to unlock a locked user account.
3783 """
3784 user = get_user_from_request_user_id_or_raise(req)
3785 assert_may_edit_user(req, user)
3786 user.enable(req)
3787 _ = req.gettext
3789 req.session.flash(
3790 _("User {username} enabled").format(username=user.username),
3791 queue=FlashQueue.SUCCESS,
3792 )
3793 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS))
3796@view_config(
3797 route_name=Routes.ADD_USER,
3798 permission=Permission.GROUPADMIN,
3799 renderer="user_add.mako",
3800 http_cache=NEVER_CACHE,
3801)
3802def add_user(req: "CamcopsRequest") -> Dict[str, Any]:
3803 """
3804 View to add a user.
3805 """
3806 route_back = Routes.VIEW_ALL_USERS
3807 if FormAction.CANCEL in req.POST:
3808 raise HTTPFound(req.route_url(route_back))
3809 if req.user.superuser:
3810 form = AddUserSuperuserForm(request=req)
3811 else:
3812 form = AddUserGroupadminForm(request=req)
3813 dbsession = req.dbsession
3814 if FormAction.SUBMIT in req.POST:
3815 try:
3816 controls = list(req.POST.items())
3817 appstruct = form.validate(controls)
3818 # -----------------------------------------------------------------
3819 # Add the user
3820 # -----------------------------------------------------------------
3821 user = User()
3822 user.username = appstruct.get(ViewParam.USERNAME)
3823 user.set_password(req, appstruct.get(ViewParam.NEW_PASSWORD))
3824 user.must_change_password = appstruct.get(
3825 ViewParam.MUST_CHANGE_PASSWORD
3826 )
3827 # We don't ask for language initially; that can be configured
3828 # later. But is is a reasonable guess that it should be the same
3829 # language as used by the person creating the new user.
3830 user.language = req.language
3831 if User.get_user_by_name(dbsession, user.username):
3832 raise HTTPBadRequest(
3833 f"User with username {user.username!r} already exists!"
3834 )
3835 dbsession.add(user)
3836 group_ids = appstruct.get(ViewParam.GROUP_IDS)
3837 for gid in group_ids:
3838 # noinspection PyUnresolvedReferences
3839 user.user_group_memberships.append(
3840 UserGroupMembership(user_id=user.id, group_id=gid)
3841 )
3842 raise HTTPFound(req.route_url(route_back))
3843 except ValidationFailure as e:
3844 rendered_form = e.render()
3845 else:
3846 rendered_form = form.render()
3847 return dict(
3848 form=rendered_form, head_form_html=get_head_form_html(req, [form])
3849 )
3852def any_records_use_user(req: "CamcopsRequest", user: User) -> bool:
3853 """
3854 Do any records in the database refer to the specified user?
3856 (Used when we're thinking about deleting a user; would it leave broken
3857 references? If so, we will prevent deletion; see :func:`delete_user`.)
3858 """
3859 dbsession = req.dbsession
3860 user_id = user.id
3861 # Device?
3862 q = CountStarSpecializedQuery(Device, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501
3863 or_(
3864 Device.registered_by_user_id == user_id,
3865 Device.uploading_user_id == user_id,
3866 )
3867 )
3868 if q.count_star() > 0:
3869 return True
3870 # SpecialNote?
3871 q = CountStarSpecializedQuery(SpecialNote, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501
3872 SpecialNote.user_id == user_id
3873 )
3874 if q.count_star() > 0:
3875 return True
3876 # Audit trail?
3877 q = CountStarSpecializedQuery(AuditEntry, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501
3878 AuditEntry.user_id == user_id
3879 )
3880 if q.count_star() > 0:
3881 return True
3882 # Uploaded records?
3883 for cls in gen_orm_classes_from_base(
3884 GenericTabletRecordMixin
3885 ): # type: Type[GenericTabletRecordMixin]
3886 # noinspection PyProtectedMember
3887 q = CountStarSpecializedQuery(cls, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501
3888 or_(
3889 cls._adding_user_id == user_id,
3890 cls._removing_user_id == user_id,
3891 cls._preserving_user_id == user_id,
3892 cls._manually_erasing_user_id == user_id,
3893 )
3894 )
3895 if q.count_star() > 0:
3896 return True
3897 # No; all clean.
3898 return False
3901@view_config(
3902 route_name=Routes.DELETE_USER,
3903 permission=Permission.GROUPADMIN,
3904 renderer="user_delete.mako",
3905 http_cache=NEVER_CACHE,
3906)
3907def delete_user(req: "CamcopsRequest") -> Dict[str, Any]:
3908 """
3909 View to delete a user (and make it hard work).
3910 """
3911 if FormAction.CANCEL in req.POST:
3912 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS))
3913 user = get_user_from_request_user_id_or_raise(req)
3914 assert_may_edit_user(req, user)
3915 form = DeleteUserForm(request=req)
3916 rendered_form = ""
3917 error = ""
3918 _ = req.gettext
3919 if user.id == req.user.id:
3920 error = _("Can't delete your own user!")
3921 elif user.may_use_webviewer or user.may_upload:
3922 error = _(
3923 "Unable to delete user: user still has webviewer login "
3924 "and/or tablet upload permission"
3925 )
3926 elif user.superuser and (not req.user.superuser):
3927 error = _(
3928 "Unable to delete user: " "they are a superuser and you are not"
3929 )
3930 elif (not req.user.superuser) and bool(
3931 set(user.group_ids) - set(req.user.ids_of_groups_user_is_admin_for)
3932 ):
3933 error = _(
3934 "Unable to delete user: "
3935 "user belongs to groups that you do not administer"
3936 )
3937 else:
3938 if any_records_use_user(req, user):
3939 error = _(
3940 "Unable to delete user; records (or audit trails) refer to "
3941 "that user. Disable login and upload permissions instead."
3942 )
3943 else:
3944 if FormAction.DELETE in req.POST:
3945 try:
3946 controls = list(req.POST.items())
3947 appstruct = form.validate(controls)
3948 assert appstruct.get(ViewParam.USER_ID) == user.id
3949 # ---------------------------------------------------------
3950 # Delete the user and associated objects
3951 # ---------------------------------------------------------
3952 # (*) Sessions belonging to this user
3953 # ... done by modifying its ForeignKey to use "ondelete"
3954 # (*) user_group_table mapping
3955 # https://docs.sqlalchemy.org/en/latest/orm/basic_relationships.html#relationships-many-to-many-deletion # noqa
3956 # Simplest way:
3957 user.groups = [] # will delete the mapping entries
3958 # (*) User itself
3959 req.dbsession.delete(user)
3960 # Done
3961 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS))
3962 except ValidationFailure as e:
3963 rendered_form = e.render()
3964 else:
3965 appstruct = {ViewParam.USER_ID: user.id}
3966 rendered_form = form.render(appstruct)
3968 return dict(
3969 user=user,
3970 error=error,
3971 form=rendered_form,
3972 head_form_html=get_head_form_html(req, [form]),
3973 )
3976# =============================================================================
3977# Group management
3978# =============================================================================
3981@view_config(
3982 route_name=Routes.VIEW_GROUPS,
3983 permission=Permission.SUPERUSER,
3984 renderer="groups_view.mako",
3985 http_cache=NEVER_CACHE,
3986)
3987def view_groups(req: "CamcopsRequest") -> Dict[str, Any]:
3988 """
3989 View to show all groups (with hyperlinks to edit them).
3990 Superusers only.
3991 """
3992 rows_per_page = req.get_int_param(
3993 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
3994 )
3995 page_num = req.get_int_param(ViewParam.PAGE, 1)
3996 dbsession = req.dbsession
3997 groups = (
3998 dbsession.query(Group).order_by(Group.name).all()
3999 ) # type: List[Group]
4000 page = CamcopsPage(
4001 collection=groups,
4002 page=page_num,
4003 items_per_page=rows_per_page,
4004 url_maker=PageUrl(req),
4005 request=req,
4006 )
4008 valid_which_idnums = req.valid_which_idnums
4010 return dict(groups_page=page, valid_which_idnums=valid_which_idnums)
4013def get_group_from_request_group_id_or_raise(req: "CamcopsRequest") -> Group:
4014 """
4015 Returns the :class:`camcops_server.cc_modules.cc_group.Group` represented
4016 by the request's ``ViewParam.GROUP_ID`` parameter, or raise
4017 :exc:`HTTPBadRequest`.
4018 """
4019 group_id = req.get_int_param(ViewParam.GROUP_ID)
4020 group = None
4021 if group_id is not None:
4022 dbsession = req.dbsession
4023 group = dbsession.query(Group).filter(Group.id == group_id).first()
4024 if not group:
4025 _ = req.gettext
4026 raise HTTPBadRequest(f"{_('No such group ID:')} {group_id!r}")
4027 return group
4030class EditGroupView(UpdateView):
4031 """
4032 Django-style view to edit a CamCOPS group.
4033 """
4035 form_class = EditGroupForm
4036 model_form_dict = {
4037 "name": ViewParam.NAME,
4038 "description": ViewParam.DESCRIPTION,
4039 "upload_policy": ViewParam.UPLOAD_POLICY,
4040 "finalize_policy": ViewParam.FINALIZE_POLICY,
4041 }
4042 object_class = Group
4043 pk_param = ViewParam.GROUP_ID
4044 server_pk_name = "id"
4045 template_name = "group_edit.mako"
4047 def get_form_kwargs(self) -> Dict[str, Any]:
4048 kwargs = super().get_form_kwargs()
4050 group = cast(Group, self.object)
4051 kwargs.update(group=group)
4053 return kwargs
4055 def get_form_values(self) -> Dict:
4056 # will populate with model_form_dict
4057 form_values = super().get_form_values()
4059 group = cast(Group, self.object)
4061 other_group_ids = list(group.ids_of_other_groups_group_may_see())
4062 other_groups = Group.get_groups_from_id_list(
4063 self.request.dbsession, other_group_ids
4064 )
4065 other_groups.sort(key=lambda g: g.name)
4067 form_values.update(
4068 {
4069 ViewParam.IP_USE: group.ip_use,
4070 ViewParam.GROUP_ID: group.id,
4071 ViewParam.GROUP_IDS: [g.id for g in other_groups],
4072 }
4073 )
4075 return form_values
4077 def get_success_url(self) -> str:
4078 return self.request.route_url(Routes.VIEW_GROUPS)
4080 def save_object(self, appstruct: Dict[str, Any]) -> None:
4081 super().save_object(appstruct)
4083 group = cast(Group, self.object)
4085 # Group cross-references
4086 group_ids = appstruct.get(ViewParam.GROUP_IDS)
4087 # The form validation will prevent our own group from being in here
4088 other_groups = Group.get_groups_from_id_list(
4089 self.request.dbsession, group_ids
4090 )
4091 group.can_see_other_groups = other_groups
4093 ip_use = appstruct.get(ViewParam.IP_USE)
4094 if group.ip_use is not None:
4095 ip_use.id = group.ip_use.id
4097 group.ip_use = ip_use
4100@view_config(
4101 route_name=Routes.EDIT_GROUP,
4102 permission=Permission.SUPERUSER,
4103 http_cache=NEVER_CACHE,
4104)
4105def edit_group(req: "CamcopsRequest") -> Response:
4106 """
4107 View to edit a group. Superusers only.
4108 """
4109 return EditGroupView(req).dispatch()
4112@view_config(
4113 route_name=Routes.ADD_GROUP,
4114 permission=Permission.SUPERUSER,
4115 renderer="group_add.mako",
4116 http_cache=NEVER_CACHE,
4117)
4118def add_group(req: "CamcopsRequest") -> Dict[str, Any]:
4119 """
4120 View to add a group. Superusers only.
4121 """
4122 route_back = Routes.VIEW_GROUPS
4123 if FormAction.CANCEL in req.POST:
4124 raise HTTPFound(req.route_url(route_back))
4125 form = AddGroupForm(request=req)
4126 dbsession = req.dbsession
4127 if FormAction.SUBMIT in req.POST:
4128 try:
4129 controls = list(req.POST.items())
4130 appstruct = form.validate(controls)
4131 # -----------------------------------------------------------------
4132 # Add the group
4133 # -----------------------------------------------------------------
4134 group = Group()
4135 group.name = appstruct.get(ViewParam.NAME)
4136 dbsession.add(group)
4137 raise HTTPFound(req.route_url(route_back))
4138 except ValidationFailure as e:
4139 rendered_form = e.render()
4140 else:
4141 rendered_form = form.render()
4142 return dict(
4143 form=rendered_form, head_form_html=get_head_form_html(req, [form])
4144 )
4147def any_records_use_group(req: "CamcopsRequest", group: Group) -> bool:
4148 """
4149 Do any records in the database refer to the specified group?
4151 (Used when we're thinking about deleting a group; would it leave broken
4152 references? If so, we will prevent deletion; see :func:`delete_group`.)
4153 """
4154 dbsession = req.dbsession
4155 group_id = group.id
4156 # Our own or users filtering on us?
4157 # ... doesn't matter; see TaskFilter; stored as a CSV list so not part of
4158 # database integrity checks.
4159 # Uploaded records?
4160 for cls in gen_orm_classes_from_base(
4161 GenericTabletRecordMixin
4162 ): # type: Type[GenericTabletRecordMixin]
4163 # noinspection PyProtectedMember
4164 q = CountStarSpecializedQuery(cls, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501
4165 cls._group_id == group_id
4166 )
4167 if q.count_star() > 0:
4168 return True
4169 # No; all clean.
4170 return False
4173@view_config(
4174 route_name=Routes.DELETE_GROUP,
4175 permission=Permission.SUPERUSER,
4176 renderer="group_delete.mako",
4177 http_cache=NEVER_CACHE,
4178)
4179def delete_group(req: "CamcopsRequest") -> Dict[str, Any]:
4180 """
4181 View to delete a group. Superusers only.
4182 """
4183 route_back = Routes.VIEW_GROUPS
4184 if FormAction.CANCEL in req.POST:
4185 raise HTTPFound(req.route_url(route_back))
4186 group = get_group_from_request_group_id_or_raise(req)
4187 form = DeleteGroupForm(request=req)
4188 rendered_form = ""
4189 error = ""
4190 _ = req.gettext
4191 if group.users:
4192 error = _("Unable to delete group; there are users who are members!")
4193 else:
4194 if any_records_use_group(req, group):
4195 error = _("Unable to delete group; records refer to it.")
4196 else:
4197 if FormAction.DELETE in req.POST:
4198 try:
4199 controls = list(req.POST.items())
4200 appstruct = form.validate(controls)
4201 assert appstruct.get(ViewParam.GROUP_ID) == group.id
4202 # ---------------------------------------------------------
4203 # Delete the group
4204 # ---------------------------------------------------------
4205 req.dbsession.delete(group)
4206 raise HTTPFound(req.route_url(route_back))
4207 except ValidationFailure as e:
4208 rendered_form = e.render()
4209 else:
4210 appstruct = {ViewParam.GROUP_ID: group.id}
4211 rendered_form = form.render(appstruct)
4212 return dict(
4213 group=group,
4214 error=error,
4215 form=rendered_form,
4216 head_form_html=get_head_form_html(req, [form]),
4217 )
4220# =============================================================================
4221# Edit server settings
4222# =============================================================================
4225@view_config(
4226 route_name=Routes.EDIT_SERVER_SETTINGS,
4227 permission=Permission.SUPERUSER,
4228 renderer="server_settings_edit.mako",
4229 http_cache=NEVER_CACHE,
4230)
4231def edit_server_settings(req: "CamcopsRequest") -> Dict[str, Any]:
4232 """
4233 View to edit server settings (like the database title).
4234 """
4235 if FormAction.CANCEL in req.POST:
4236 raise HTTPFound(req.route_url(Routes.HOME))
4237 form = EditServerSettingsForm(request=req)
4238 if FormAction.SUBMIT in req.POST:
4239 try:
4240 controls = list(req.POST.items())
4241 appstruct = form.validate(controls)
4242 title = appstruct.get(ViewParam.DATABASE_TITLE)
4243 # -----------------------------------------------------------------
4244 # Apply changes
4245 # -----------------------------------------------------------------
4246 req.set_database_title(title)
4247 raise HTTPFound(req.route_url(Routes.HOME))
4248 except ValidationFailure as e:
4249 rendered_form = e.render()
4250 else:
4251 title = req.database_title
4252 appstruct = {ViewParam.DATABASE_TITLE: title}
4253 rendered_form = form.render(appstruct)
4254 return dict(
4255 form=rendered_form, head_form_html=get_head_form_html(req, [form])
4256 )
4259@view_config(
4260 route_name=Routes.VIEW_ID_DEFINITIONS,
4261 permission=Permission.SUPERUSER,
4262 renderer="id_definitions_view.mako",
4263 http_cache=NEVER_CACHE,
4264)
4265def view_id_definitions(req: "CamcopsRequest") -> Dict[str, Any]:
4266 """
4267 View to show all ID number definitions (with hyperlinks to edit them).
4268 Superusers only.
4269 """
4270 return dict(idnum_definitions=req.idnum_definitions)
4273def get_iddef_from_request_which_idnum_or_raise(
4274 req: "CamcopsRequest",
4275) -> IdNumDefinition:
4276 """
4277 Returns the :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`
4278 represented by the request's ``ViewParam.WHICH_IDNUM`` parameter, or raise
4279 :exc:`HTTPBadRequest`.
4280 """
4281 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
4282 iddef = (
4283 req.dbsession.query(IdNumDefinition)
4284 .filter(IdNumDefinition.which_idnum == which_idnum)
4285 .first()
4286 )
4287 if not iddef:
4288 _ = req.gettext
4289 raise HTTPBadRequest(f"{_('No such ID definition:')} {which_idnum!r}")
4290 return iddef
4293@view_config(
4294 route_name=Routes.EDIT_ID_DEFINITION,
4295 permission=Permission.SUPERUSER,
4296 renderer="id_definition_edit.mako",
4297 http_cache=NEVER_CACHE,
4298)
4299def edit_id_definition(req: "CamcopsRequest") -> Dict[str, Any]:
4300 """
4301 View to edit an ID number definition. Superusers only.
4302 """
4303 route_back = Routes.VIEW_ID_DEFINITIONS
4304 if FormAction.CANCEL in req.POST:
4305 raise HTTPFound(req.route_url(route_back))
4306 iddef = get_iddef_from_request_which_idnum_or_raise(req)
4307 form = EditIdDefinitionForm(request=req)
4308 if FormAction.SUBMIT in req.POST:
4309 try:
4310 controls = list(req.POST.items())
4311 appstruct = form.validate(controls)
4312 # -----------------------------------------------------------------
4313 # Alter the ID definition
4314 # -----------------------------------------------------------------
4315 iddef.description = appstruct.get(ViewParam.DESCRIPTION)
4316 iddef.short_description = appstruct.get(
4317 ViewParam.SHORT_DESCRIPTION
4318 )
4319 iddef.validation_method = appstruct.get(
4320 ViewParam.VALIDATION_METHOD
4321 )
4322 iddef.hl7_id_type = appstruct.get(ViewParam.HL7_ID_TYPE)
4323 iddef.hl7_assigning_authority = appstruct.get(
4324 ViewParam.HL7_ASSIGNING_AUTHORITY
4325 )
4326 iddef.fhir_id_system = appstruct.get(ViewParam.FHIR_ID_SYSTEM)
4327 # REMOVED # clear_idnum_definition_cache() # SPECIAL
4328 raise HTTPFound(req.route_url(route_back))
4329 except ValidationFailure as e:
4330 rendered_form = e.render()
4331 else:
4332 appstruct = {
4333 ViewParam.WHICH_IDNUM: iddef.which_idnum,
4334 ViewParam.DESCRIPTION: iddef.description or "",
4335 ViewParam.SHORT_DESCRIPTION: iddef.short_description or "",
4336 ViewParam.VALIDATION_METHOD: iddef.validation_method or "",
4337 ViewParam.HL7_ID_TYPE: iddef.hl7_id_type or "",
4338 ViewParam.HL7_ASSIGNING_AUTHORITY: iddef.hl7_assigning_authority
4339 or "",
4340 ViewParam.FHIR_ID_SYSTEM: iddef.fhir_id_system or "",
4341 }
4342 rendered_form = form.render(appstruct)
4343 return dict(
4344 iddef=iddef,
4345 form=rendered_form,
4346 head_form_html=get_head_form_html(req, [form]),
4347 )
4350@view_config(
4351 route_name=Routes.ADD_ID_DEFINITION,
4352 permission=Permission.SUPERUSER,
4353 renderer="id_definition_add.mako",
4354 http_cache=NEVER_CACHE,
4355)
4356def add_id_definition(req: "CamcopsRequest") -> Dict[str, Any]:
4357 """
4358 View to add an ID number definition. Superusers only.
4359 """
4360 route_back = Routes.VIEW_ID_DEFINITIONS
4361 if FormAction.CANCEL in req.POST:
4362 raise HTTPFound(req.route_url(route_back))
4363 form = AddIdDefinitionForm(request=req)
4364 dbsession = req.dbsession
4365 if FormAction.SUBMIT in req.POST:
4366 try:
4367 controls = list(req.POST.items())
4368 appstruct = form.validate(controls)
4369 iddef = IdNumDefinition(
4370 which_idnum=appstruct.get(ViewParam.WHICH_IDNUM),
4371 description=appstruct.get(ViewParam.DESCRIPTION),
4372 short_description=appstruct.get(ViewParam.SHORT_DESCRIPTION),
4373 # we skip hl7_id_type at this stage
4374 # we skip hl7_assigning_authority at this stage
4375 validation_method=appstruct.get(ViewParam.VALIDATION_METHOD),
4376 )
4377 # -----------------------------------------------------------------
4378 # Add ID definition
4379 # -----------------------------------------------------------------
4380 dbsession.add(iddef)
4381 # REMOVED # clear_idnum_definition_cache() # SPECIAL
4382 raise HTTPFound(req.route_url(route_back))
4383 except ValidationFailure as e:
4384 rendered_form = e.render()
4385 else:
4386 rendered_form = form.render()
4387 return dict(
4388 form=rendered_form, head_form_html=get_head_form_html(req, [form])
4389 )
4392def any_records_use_iddef(
4393 req: "CamcopsRequest", iddef: IdNumDefinition
4394) -> bool:
4395 """
4396 Do any records in the database refer to the specified ID number definition?
4398 (Used when we're thinking about deleting one; would it leave broken
4399 references? If so, we will prevent deletion; see
4400 :func:`delete_id_definition`.)
4401 """
4402 # Helpfully, these are only referred to permanently from one place:
4403 q = CountStarSpecializedQuery(PatientIdNum, session=req.dbsession).filter( # type: ignore[arg-type] # noqa: E501
4404 PatientIdNum.which_idnum == iddef.which_idnum
4405 )
4406 if q.count_star() > 0:
4407 return True
4408 # No; all clean.
4409 return False
4412@view_config(
4413 route_name=Routes.DELETE_ID_DEFINITION,
4414 permission=Permission.SUPERUSER,
4415 renderer="id_definition_delete.mako",
4416 http_cache=NEVER_CACHE,
4417)
4418def delete_id_definition(req: "CamcopsRequest") -> Dict[str, Any]:
4419 """
4420 View to delete an ID number definition. Superusers only.
4421 """
4422 route_back = Routes.VIEW_ID_DEFINITIONS
4423 if FormAction.CANCEL in req.POST:
4424 raise HTTPFound(req.route_url(route_back))
4425 iddef = get_iddef_from_request_which_idnum_or_raise(req)
4426 form = DeleteIdDefinitionForm(request=req)
4427 rendered_form = ""
4428 error = ""
4429 if any_records_use_iddef(req, iddef):
4430 _ = req.gettext
4431 error = _("Unable to delete ID definition; records refer to it.")
4432 else:
4433 if FormAction.DELETE in req.POST:
4434 try:
4435 controls = list(req.POST.items())
4436 appstruct = form.validate(controls)
4437 assert (
4438 appstruct.get(ViewParam.WHICH_IDNUM) == iddef.which_idnum
4439 )
4440 # -------------------------------------------------------------
4441 # Delete ID definition
4442 # -------------------------------------------------------------
4443 req.dbsession.delete(iddef)
4444 # REMOVED # clear_idnum_definition_cache() # SPECIAL
4445 raise HTTPFound(req.route_url(route_back))
4446 except ValidationFailure as e:
4447 rendered_form = e.render()
4448 else:
4449 appstruct = {ViewParam.WHICH_IDNUM: iddef.which_idnum}
4450 rendered_form = form.render(appstruct)
4451 return dict(
4452 iddef=iddef,
4453 error=error,
4454 form=rendered_form,
4455 head_form_html=get_head_form_html(req, [form]),
4456 )
4459# =============================================================================
4460# Altering data. Some of the more complex logic is here.
4461# =============================================================================
4464@view_config(
4465 route_name=Routes.ADD_SPECIAL_NOTE,
4466 renderer="special_note_add.mako",
4467 http_cache=NEVER_CACHE,
4468)
4469def add_special_note(req: "CamcopsRequest") -> Dict[str, Any]:
4470 """
4471 View to add a special note to a task (after confirmation).
4473 (Note that users can't add special notes to patients -- those get added
4474 automatically when a patient is edited. So the context here is always of a
4475 task.)
4476 """
4477 table_name = req.get_str_param(
4478 ViewParam.TABLE_NAME, validator=validate_task_tablename
4479 )
4480 server_pk = req.get_int_param(ViewParam.SERVER_PK, None)
4481 url_back = req.route_url(
4482 Routes.TASK,
4483 _query={
4484 ViewParam.TABLE_NAME: table_name,
4485 ViewParam.SERVER_PK: server_pk,
4486 ViewParam.VIEWTYPE: ViewArg.HTML,
4487 },
4488 )
4489 if FormAction.CANCEL in req.POST:
4490 raise HTTPFound(url_back)
4491 task = task_factory(req, table_name, server_pk)
4492 _ = req.gettext
4493 if task is None:
4494 raise HTTPBadRequest(
4495 f"{_('No such task:')} {table_name}, PK={server_pk}"
4496 )
4497 user = req.user
4498 if not user.authorized_to_add_special_note(task.group_id):
4499 raise HTTPBadRequest(
4500 _("Not authorized to add special notes for this task's group")
4501 )
4502 form = AddSpecialNoteForm(request=req)
4503 if FormAction.SUBMIT in req.POST:
4504 try:
4505 controls = list(req.POST.items())
4506 appstruct = form.validate(controls)
4507 note = appstruct.get(ViewParam.NOTE)
4508 # -----------------------------------------------------------------
4509 # Apply special note
4510 # -----------------------------------------------------------------
4511 task.apply_special_note(req, note)
4512 raise HTTPFound(url_back)
4513 except ValidationFailure as e:
4514 rendered_form = e.render()
4515 else:
4516 appstruct = {
4517 ViewParam.TABLE_NAME: table_name,
4518 ViewParam.SERVER_PK: server_pk,
4519 }
4520 rendered_form = form.render(appstruct)
4521 return dict(
4522 task=task,
4523 form=rendered_form,
4524 head_form_html=get_head_form_html(req, [form]),
4525 viewtype=ViewArg.HTML,
4526 )
4529@view_config(
4530 route_name=Routes.DELETE_SPECIAL_NOTE,
4531 renderer="special_note_delete.mako",
4532 http_cache=NEVER_CACHE,
4533)
4534def delete_special_note(req: "CamcopsRequest") -> Dict[str, Any]:
4535 """
4536 View to delete a special note (after confirmation).
4537 """
4538 note_id = req.get_int_param(ViewParam.NOTE_ID, None)
4539 sn = SpecialNote.get_specialnote_by_id(req.dbsession, note_id)
4540 _ = req.gettext
4541 if sn is None:
4542 raise HTTPBadRequest(f"{_('No such SpecialNote:')} note_id={note_id}")
4543 if sn.hidden:
4544 raise HTTPBadRequest(
4545 f"{_('SpecialNote already deleted/hidden:')} " f"note_id={note_id}"
4546 )
4547 if not sn.user_may_delete_specialnote(req.user):
4548 raise HTTPBadRequest(_("Not authorized to delete this special note"))
4549 url_back = req.route_url(Routes.VIEW_TASKS) # default
4550 if sn.refers_to_patient():
4551 # Special note on a patient.
4552 # We might have come here from any number of tasks relating to this
4553 # patient. In principle this information is retrievable; in practice it
4554 # is a considerable faff for a rare operation, since special notes are
4555 # displayed via special_notes.mako, which only looks at information
4556 # stored with the note itself.
4557 pass
4558 else:
4559 # Special note on a task.
4560 task = sn.target_task()
4561 if task:
4562 url_back = req.route_url(
4563 Routes.TASK,
4564 _query={
4565 ViewParam.TABLE_NAME: task.tablename,
4566 ViewParam.SERVER_PK: task.pk,
4567 ViewParam.VIEWTYPE: ViewArg.HTML,
4568 },
4569 )
4570 if FormAction.CANCEL in req.POST:
4571 raise HTTPFound(url_back)
4572 form = DeleteSpecialNoteForm(request=req)
4573 if FormAction.SUBMIT in req.POST:
4574 try:
4575 controls = list(req.POST.items())
4576 form.validate(controls)
4577 # -----------------------------------------------------------------
4578 # Delete special note
4579 # -----------------------------------------------------------------
4580 sn.hidden = True
4581 raise HTTPFound(url_back)
4582 except ValidationFailure as e:
4583 rendered_form = e.render()
4584 else:
4585 appstruct = {ViewParam.NOTE_ID: note_id}
4586 rendered_form = form.render(appstruct)
4587 return dict(
4588 sn=sn,
4589 form=rendered_form,
4590 head_form_html=get_head_form_html(req, [form]),
4591 )
4594class EraseTaskBaseView(DeleteView):
4595 """
4596 Django-style view to erase a task.
4597 """
4599 form_class = EraseTaskForm
4601 def get_object(self) -> Any:
4602 # noinspection PyAttributeOutsideInit
4603 self.table_name = self.request.get_str_param(
4604 ViewParam.TABLE_NAME, validator=validate_task_tablename
4605 )
4606 # noinspection PyAttributeOutsideInit
4607 self.server_pk = self.request.get_int_param(ViewParam.SERVER_PK, None)
4609 task = task_factory(self.request, self.table_name, self.server_pk)
4610 _ = self.request.gettext
4611 if task is None:
4612 raise HTTPBadRequest(
4613 f"{_('No such task:')} {self.table_name}, PK={self.server_pk}"
4614 )
4615 if task.is_live_on_tablet():
4616 raise HTTPBadRequest(errormsg_task_live(self.request))
4617 self.check_user_is_authorized(task)
4619 return task
4621 def check_user_is_authorized(self, task: Task) -> None:
4622 if not self.request.user.authorized_to_erase_tasks(task.group_id):
4623 _ = self.request.gettext
4624 raise HTTPBadRequest(
4625 _("Not authorized to erase tasks for this task's group")
4626 )
4628 def get_cancel_url(self) -> str:
4629 return self.request.route_url(
4630 Routes.TASK,
4631 _query={
4632 ViewParam.TABLE_NAME: self.table_name,
4633 ViewParam.SERVER_PK: self.server_pk,
4634 ViewParam.VIEWTYPE: ViewArg.HTML,
4635 },
4636 )
4639class EraseTaskLeavingPlaceholderView(EraseTaskBaseView):
4640 """
4641 Django-style view to erase data from a task, leaving an empty
4642 "placeholder".
4643 """
4645 template_name = "task_erase.mako"
4647 def get_object(self) -> Any:
4648 task = cast(Task, super().get_object())
4649 if task.is_erased():
4650 _ = self.request.gettext
4651 raise HTTPBadRequest(_("Task already erased"))
4653 return task
4655 def delete(self) -> None:
4656 task = cast(Task, self.object)
4658 task.manually_erase(self.request)
4660 def get_success_url(self) -> str:
4661 return self.request.route_url(
4662 Routes.TASK,
4663 _query={
4664 ViewParam.TABLE_NAME: self.table_name,
4665 ViewParam.SERVER_PK: self.server_pk,
4666 ViewParam.VIEWTYPE: ViewArg.HTML,
4667 },
4668 )
4671class EraseTaskEntirelyView(EraseTaskBaseView):
4672 """
4673 Django-style view to erase (delete) a task entirely.
4674 """
4676 template_name = "task_erase_entirely.mako"
4678 def delete(self) -> None:
4679 task = cast(Task, self.object)
4681 TaskIndexEntry.unindex_task(task, self.request.dbsession)
4682 task.delete_entirely(self.request)
4684 _ = self.request.gettext
4686 msg_erased = _("Task erased:")
4688 self.request.session.flash(
4689 f"{msg_erased} ({self.table_name}, server PK {self.server_pk}).",
4690 queue=FlashQueue.SUCCESS,
4691 )
4693 def get_success_url(self) -> str:
4694 return self.request.route_url(Routes.VIEW_TASKS)
4697@view_config(
4698 route_name=Routes.ERASE_TASK_LEAVING_PLACEHOLDER,
4699 permission=Permission.GROUPADMIN,
4700 http_cache=NEVER_CACHE,
4701)
4702def erase_task_leaving_placeholder(req: "CamcopsRequest") -> Response:
4703 """
4704 View to wipe all data from a task (after confirmation).
4706 Leaves the task record as a placeholder.
4707 """
4708 return EraseTaskLeavingPlaceholderView(req).dispatch()
4711@view_config(
4712 route_name=Routes.ERASE_TASK_ENTIRELY,
4713 permission=Permission.GROUPADMIN,
4714 http_cache=NEVER_CACHE,
4715)
4716def erase_task_entirely(req: "CamcopsRequest") -> Response:
4717 """
4718 View to erase a task from the database entirely (after confirmation).
4719 """
4720 return EraseTaskEntirelyView(req).dispatch()
4723@view_config(
4724 route_name=Routes.DELETE_PATIENT,
4725 permission=Permission.GROUPADMIN,
4726 http_cache=NEVER_CACHE,
4727)
4728def delete_patient(req: "CamcopsRequest") -> Response:
4729 """
4730 View to delete completely all data for a patient (after confirmation),
4731 within a specific group.
4732 """
4733 if FormAction.CANCEL in req.POST:
4734 raise HTTPFound(req.route_url(Routes.HOME))
4736 first_form = DeletePatientChooseForm(request=req)
4737 second_form = DeletePatientConfirmForm(request=req)
4738 form = None
4739 final_phase = False
4740 if FormAction.SUBMIT in req.POST:
4741 # FIRST form has been submitted
4742 form = first_form
4743 elif FormAction.DELETE in req.POST:
4744 # SECOND AND FINAL form has been submitted
4745 form = second_form
4746 final_phase = True
4747 _ = req.gettext
4748 if form is not None:
4749 try:
4750 controls = list(req.POST.items())
4751 appstruct = form.validate(controls)
4752 which_idnum = appstruct.get(ViewParam.WHICH_IDNUM)
4753 idnum_value = appstruct.get(ViewParam.IDNUM_VALUE)
4754 group_id = appstruct.get(ViewParam.GROUP_ID)
4755 if group_id not in req.user.ids_of_groups_user_is_admin_for:
4756 # rare occurrence; form should prevent it;
4757 # unless superuser has changed status since form was read
4758 raise HTTPBadRequest(_("You're not an admin for this group"))
4759 # -----------------------------------------------------------------
4760 # Fetch tasks to be deleted.
4761 # -----------------------------------------------------------------
4762 dbsession = req.dbsession
4763 # Tasks first:
4764 idnum_ref = IdNumReference(
4765 which_idnum=which_idnum, idnum_value=idnum_value
4766 )
4767 taskfilter = TaskFilter()
4768 taskfilter.idnum_criteria = [idnum_ref]
4769 taskfilter.group_ids = [group_id]
4770 collection = TaskCollection(
4771 req=req,
4772 taskfilter=taskfilter,
4773 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
4774 current_only=False, # unusual option!
4775 )
4776 tasks = collection.all_tasks
4777 n_tasks = len(tasks)
4778 patient_lineage_instances = Patient.get_patients_by_idnum(
4779 dbsession=dbsession,
4780 which_idnum=which_idnum,
4781 idnum_value=idnum_value,
4782 group_id=group_id,
4783 current_only=False,
4784 )
4785 n_patient_instances = len(patient_lineage_instances)
4787 # -----------------------------------------------------------------
4788 # Bin out at this stage and offer confirmation page?
4789 # -----------------------------------------------------------------
4790 if not final_phase:
4791 # New appstruct; we don't want the validation code persisting
4792 appstruct = {
4793 ViewParam.WHICH_IDNUM: which_idnum,
4794 ViewParam.IDNUM_VALUE: idnum_value,
4795 ViewParam.GROUP_ID: group_id,
4796 }
4797 rendered_form = second_form.render(appstruct)
4798 return render_to_response(
4799 "patient_delete_confirm.mako",
4800 dict(
4801 form=rendered_form,
4802 tasks=tasks,
4803 n_patient_instances=n_patient_instances,
4804 head_form_html=get_head_form_html(req, [form]),
4805 ),
4806 request=req,
4807 )
4809 # -----------------------------------------------------------------
4810 # Delete patient and associated tasks
4811 # -----------------------------------------------------------------
4812 for task in tasks:
4813 TaskIndexEntry.unindex_task(task, req.dbsession)
4814 task.delete_entirely(req)
4815 # Then patients:
4816 for p in patient_lineage_instances:
4817 PatientIdNumIndexEntry.unindex_patient(p, req.dbsession)
4818 p.delete_with_dependants(req)
4819 msg = (
4820 f"{_('Patient and associated tasks DELETED from group')} "
4821 f"{group_id}: idnum{which_idnum} = {idnum_value}. "
4822 f"{_('Task records deleted:')} {n_tasks}."
4823 f"{_('Patient records (current and/or old) deleted')} "
4824 f"{n_patient_instances}."
4825 )
4826 audit(req, msg)
4828 req.session.flash(msg, FlashQueue.SUCCESS)
4829 raise HTTPFound(req.route_url(Routes.HOME))
4831 except ValidationFailure as e:
4832 rendered_form = e.render()
4833 else:
4834 form = first_form
4835 rendered_form = first_form.render()
4836 return render_to_response(
4837 "patient_delete_choose.mako",
4838 dict(
4839 form=rendered_form, head_form_html=get_head_form_html(req, [form])
4840 ),
4841 request=req,
4842 )
4845@view_config(
4846 route_name=Routes.FORCIBLY_FINALIZE,
4847 permission=Permission.GROUPADMIN,
4848 http_cache=NEVER_CACHE,
4849)
4850def forcibly_finalize(req: "CamcopsRequest") -> Response:
4851 """
4852 View to force-finalize all live (``_era == ERA_NOW``) records from a
4853 device. Available to group administrators if all those records are within
4854 their groups (otherwise, it's a superuser operation).
4855 """
4856 if FormAction.CANCEL in req.POST:
4857 return HTTPFound(req.route_url(Routes.HOME))
4859 dbsession = req.dbsession
4860 first_form = ForciblyFinalizeChooseDeviceForm(request=req)
4861 second_form = ForciblyFinalizeConfirmForm(request=req)
4862 form = None
4863 final_phase = False
4864 if FormAction.SUBMIT in req.POST:
4865 # FIRST form has been submitted
4866 form = first_form
4867 elif FormAction.FINALIZE in req.POST:
4868 # SECOND form has been submitted:
4869 form = second_form
4870 final_phase = True
4871 _ = req.gettext
4872 if form is not None:
4873 try:
4874 controls = list(req.POST.items())
4875 appstruct = form.validate(controls)
4876 # log.debug("{}", pformat(appstruct))
4877 device_id = appstruct.get(ViewParam.DEVICE_ID)
4878 device = Device.get_device_by_id(dbsession, device_id)
4879 if device is None:
4880 raise HTTPBadRequest(f"{_('No such device:')} {device_id!r}")
4881 # -----------------------------------------------------------------
4882 # If at the first stage, bin out and offer confirmation page
4883 # -----------------------------------------------------------------
4884 if not final_phase:
4885 appstruct = {ViewParam.DEVICE_ID: device_id}
4886 rendered_form = second_form.render(appstruct)
4887 taskfilter = TaskFilter()
4888 taskfilter.device_ids = [device_id]
4889 taskfilter.era = ERA_NOW
4890 collection = TaskCollection(
4891 req=req,
4892 taskfilter=taskfilter,
4893 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
4894 current_only=False, # unusual option!
4895 via_index=False, # required for current_only=False
4896 )
4897 tasks = collection.all_tasks
4898 return render_to_response(
4899 "device_forcibly_finalize_confirm.mako",
4900 dict(
4901 form=rendered_form,
4902 tasks=tasks,
4903 head_form_html=get_head_form_html(req, [form]),
4904 ),
4905 request=req,
4906 )
4907 # -----------------------------------------------------------------
4908 # Check it's permitted
4909 # -----------------------------------------------------------------
4910 if not req.user.superuser:
4911 admin_group_ids = req.user.ids_of_groups_user_is_admin_for
4912 for clienttable in CLIENT_TABLE_MAP.values():
4913 # noinspection PyPropertyAccess
4914 count_query = (
4915 select(func.count())
4916 .select_from(clienttable)
4917 .where(clienttable.c[FN_DEVICE_ID] == device_id)
4918 .where(clienttable.c[FN_ERA] == ERA_NOW)
4919 .where(
4920 clienttable.c[FN_GROUP_ID].notin_(admin_group_ids)
4921 )
4922 )
4923 n = dbsession.execute(count_query).scalar()
4924 if n > 0:
4925 raise HTTPBadRequest(
4926 _(
4927 "Some records for this device are in groups "
4928 "for which you are not an administrator"
4929 )
4930 )
4931 # -----------------------------------------------------------------
4932 # Forcibly finalize
4933 # -----------------------------------------------------------------
4934 msgs = [] # type: List[str]
4935 batchdetails = BatchDetails(batchtime=req.now_utc)
4936 alltables = sorted(
4937 CLIENT_TABLE_MAP.values(), key=upload_commit_order_sorter
4938 )
4939 for clienttable in alltables:
4940 liverecs = get_server_live_records(
4941 req, device_id, clienttable, current_only=False
4942 )
4943 preservation_pks = [r.server_pk for r in liverecs]
4944 if not preservation_pks:
4945 continue
4946 current_pks = [r.server_pk for r in liverecs if r.current]
4947 tablechanges = UploadTableChanges(clienttable)
4948 tablechanges.note_preservation_pks(preservation_pks)
4949 tablechanges.note_current_pks(current_pks)
4950 dbsession.execute(
4951 update(clienttable)
4952 .where(clienttable.c[FN_PK].in_(preservation_pks))
4953 .values(
4954 values_preserve_now(
4955 req, batchdetails, forcibly_preserved=True
4956 )
4957 )
4958 )
4959 update_indexes_and_push_exports(
4960 req, batchdetails, tablechanges
4961 )
4962 msgs.append(f"{clienttable.name} {preservation_pks}")
4963 # Field names are different in server-side tables, so they need
4964 # special handling:
4965 SpecialNote.forcibly_preserve_special_notes_for_device(
4966 req, device_id
4967 )
4968 # -----------------------------------------------------------------
4969 # Done
4970 # -----------------------------------------------------------------
4971 msg = (
4972 f"{_('Live records for device')} {device_id} "
4973 f"({device.friendly_name}) {_('forcibly finalized')} "
4974 f"(PKs: {'; '.join(msgs)})"
4975 )
4976 audit(req, msg)
4977 log.info(msg)
4979 req.session.flash(msg, queue=FlashQueue.SUCCESS)
4980 raise HTTPFound(req.route_url(Routes.HOME))
4982 except ValidationFailure as e:
4983 rendered_form = e.render()
4984 else:
4985 form = first_form
4986 rendered_form = form.render() # no appstruct
4987 return render_to_response(
4988 "device_forcibly_finalize_choose.mako",
4989 dict(
4990 form=rendered_form, head_form_html=get_head_form_html(req, [form])
4991 ),
4992 request=req,
4993 )
4996# =============================================================================
4997# Patient creation/editing (primarily for task scheduling)
4998# =============================================================================
5001class PatientMixin(object):
5002 """
5003 Mixin for views involving a patient.
5004 """
5006 object: Any
5007 object_class = Patient
5008 server_pk_name = "_pk"
5010 model_form_dict = {
5011 "forename": ViewParam.FORENAME,
5012 "surname": ViewParam.SURNAME,
5013 "dob": ViewParam.DOB,
5014 "sex": ViewParam.SEX,
5015 "email": ViewParam.EMAIL,
5016 "address": ViewParam.ADDRESS,
5017 "gp": ViewParam.GP,
5018 "other": ViewParam.OTHER,
5019 }
5021 def get_form_values(self) -> Dict:
5022 # will populate with model_form_dict
5023 # noinspection PyUnresolvedReferences
5024 form_values = super().get_form_values() # type: ignore[misc]
5026 patient = cast(Patient, self.object)
5028 if patient is not None:
5029 form_values[ViewParam.SERVER_PK] = patient.pk
5030 form_values[ViewParam.GROUP_ID] = patient.group.id
5031 form_values[ViewParam.ID_REFERENCES] = [
5032 {
5033 ViewParam.WHICH_IDNUM: pidnum.which_idnum,
5034 ViewParam.IDNUM_VALUE: pidnum.idnum_value,
5035 }
5036 for pidnum in patient.idnums
5037 ]
5038 ts_list = [] # type: List[Dict]
5039 for pts in patient.task_schedules:
5040 ts_dict = {
5041 ViewParam.PATIENT_TASK_SCHEDULE_ID: pts.id,
5042 ViewParam.SCHEDULE_ID: pts.schedule_id,
5043 ViewParam.START_DATETIME: pts.start_datetime,
5044 }
5045 if DEFORM_ACCORDION_BUG:
5046 ts_dict[ViewParam.SETTINGS] = pts.settings
5047 else:
5048 ts_dict[ViewParam.ADVANCED] = {
5049 ViewParam.SETTINGS: pts.settings
5050 }
5051 ts_list.append(ts_dict)
5052 form_values[ViewParam.TASK_SCHEDULES] = ts_list
5054 return form_values
5057class EditPatientBaseView(PatientMixin, UpdateView):
5058 """
5059 View to edit details for a patient.
5060 """
5062 pk_param = ViewParam.SERVER_PK
5064 def get_object(self) -> Any:
5065 patient = cast(Patient, super().get_object())
5067 _ = self.request.gettext
5069 if not patient.group:
5070 raise HTTPBadRequest(_("Bad patient: not in a group"))
5072 if not patient.user_may_edit(self.request):
5073 raise HTTPBadRequest(_("Not authorized to edit this patient"))
5075 return patient
5077 def save_object(self, appstruct: Dict[str, Any]) -> None:
5078 # -----------------------------------------------------------------
5079 # Apply edits
5080 # -----------------------------------------------------------------
5081 # Calculate the changes, and apply them to the Patient object
5082 _ = self.request.gettext
5084 patient = cast(Patient, self.object)
5086 changes = OrderedDict() # type: OrderedDict
5088 self.save_changes(appstruct, changes)
5090 if not changes:
5091 self.request.session.flash(
5092 f"{_('No changes required for patient record with server PK')} " # noqa
5093 f"{patient.pk} {_('(all new values matched old values)')}",
5094 queue=FlashQueue.INFO,
5095 )
5096 return
5098 formatted_changes = []
5100 for k, details in changes.items():
5101 if len(details) == 1:
5102 change = f"{k}: {details[0]}" # usually a plain message
5103 else:
5104 change = f"{k}: {details[0]!r} → {details[1]!r}"
5106 formatted_changes.append(change)
5108 # Below here, changes have definitely been made.
5109 change_msg = (
5110 _("Patient details edited. Changes:")
5111 + " "
5112 + "; ".join(formatted_changes)
5113 )
5115 # Apply special note to patient
5116 patient.apply_special_note(self.request, change_msg, "Patient edited")
5118 # Patient details changed, so resend any tasks via HL7
5119 for task in self.get_affected_tasks():
5120 task.cancel_from_export_log(self.request)
5122 # Done
5123 self.request.session.flash(
5124 f"{_('Amended patient record with server PK')} "
5125 f"{patient.pk}. "
5126 f"{_('Changes were:')} {change_msg}",
5127 queue=FlashQueue.SUCCESS,
5128 )
5130 def save_changes(
5131 self, appstruct: Dict[str, Any], changes: OrderedDict
5132 ) -> None:
5133 self._save_simple_params(appstruct, changes)
5134 self._save_idrefs(appstruct, changes)
5136 def _save_simple_params(
5137 self, appstruct: Dict[str, Any], changes: OrderedDict
5138 ) -> None:
5139 patient = cast(Patient, self.object)
5140 for k in EDIT_PATIENT_SIMPLE_PARAMS:
5141 new_value = appstruct.get(k)
5142 old_value = getattr(patient, k)
5143 if new_value == old_value:
5144 continue
5145 if new_value in (None, "") and old_value in (None, ""):
5146 # Nothing really changing!
5147 continue
5148 changes[k] = (old_value, new_value)
5149 setattr(patient, k, new_value)
5151 def _save_idrefs(
5152 self, appstruct: Dict[str, Any], changes: OrderedDict
5153 ) -> None:
5155 # The ID numbers are more complex.
5156 # log.debug("{}", pformat(appstruct))
5157 patient = cast(Patient, self.object)
5158 new_idrefs = [
5159 IdNumReference(
5160 which_idnum=idrefdict[ViewParam.WHICH_IDNUM],
5161 idnum_value=idrefdict[ViewParam.IDNUM_VALUE],
5162 )
5163 for idrefdict in appstruct.get(ViewParam.ID_REFERENCES, {})
5164 ]
5165 for idnum in patient.idnums:
5166 matching_idref = next(
5167 (
5168 idref
5169 for idref in new_idrefs
5170 if idref.which_idnum == idnum.which_idnum
5171 ),
5172 None,
5173 )
5174 if not matching_idref:
5175 # Delete ID numbers not present in the new set
5176 changes[
5177 "idnum{} ({})".format(
5178 idnum.which_idnum,
5179 self.request.get_id_desc(idnum.which_idnum),
5180 )
5181 ] = (idnum.idnum_value, None)
5182 idnum.mark_as_deleted(self.request)
5183 elif matching_idref.idnum_value != idnum.idnum_value:
5184 # Modify altered ID numbers present in the old + new sets
5185 changes[
5186 "idnum{} ({})".format(
5187 idnum.which_idnum,
5188 self.request.get_id_desc(idnum.which_idnum),
5189 )
5190 ] = (idnum.idnum_value, matching_idref.idnum_value)
5191 new_idnum = PatientIdNum()
5192 new_idnum.id = idnum.id
5193 new_idnum.patient_id = idnum.patient_id
5194 new_idnum.which_idnum = idnum.which_idnum
5195 new_idnum.idnum_value = matching_idref.idnum_value
5196 new_idnum.set_predecessor(self.request, idnum)
5198 for idref in new_idrefs:
5199 matching_idnum = next(
5200 (
5201 idnum
5202 for idnum in patient.idnums
5203 if idnum.which_idnum == idref.which_idnum
5204 ),
5205 None,
5206 )
5207 if not matching_idnum:
5208 # Create ID numbers where they were absent
5209 changes[
5210 "idnum{} ({})".format(
5211 idref.which_idnum,
5212 self.request.get_id_desc(idref.which_idnum),
5213 )
5214 ] = (None, idref.idnum_value)
5215 # We need to establish an "id" field, which is the PK as
5216 # seen by the tablet. The tablet has lost interest in these
5217 # records, since _era != ERA_NOW, so all we have to do is
5218 # pick a number that's not in use.
5219 new_idnum = PatientIdNum()
5220 new_idnum.patient_id = patient.id
5221 new_idnum.which_idnum = idref.which_idnum
5222 new_idnum.idnum_value = idref.idnum_value
5223 new_idnum.create_fresh(
5224 self.request,
5225 device_id=patient.device_id,
5226 era=patient.era,
5227 group_id=patient.group_id,
5228 )
5229 new_idnum.save_with_next_available_id(
5230 self.request, patient.device_id, era=patient.era
5231 )
5233 def get_context_data(self, **kwargs: Any) -> Any:
5234 # This parameter is (I think) used by Mako templates such as
5235 # finalized_patient_edit.mako
5236 # Todo:
5237 # Potential inefficiency: we fetch tasks regardless of the stage
5238 # of this form.
5239 kwargs["tasks"] = self.get_affected_tasks()
5241 return super().get_context_data(**kwargs)
5243 def get_affected_tasks(self) -> Optional[List[Task]]:
5244 patient = cast(Patient, self.object)
5246 taskfilter = TaskFilter()
5247 taskfilter.device_ids = [patient.device_id]
5248 taskfilter.group_ids = [patient.group.id]
5249 taskfilter.era = patient.era
5250 collection = TaskCollection(
5251 req=self.request,
5252 taskfilter=taskfilter,
5253 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
5254 current_only=False, # unusual option!
5255 via_index=False, # for current_only=False, or we'll get a warning
5256 )
5257 return collection.all_tasks
5260class EditServerCreatedPatientView(EditPatientBaseView):
5261 """
5262 View to edit a patient created on the server (as part of task scheduling).
5263 """
5265 template_name = "server_created_patient_edit.mako"
5266 form_class = EditServerCreatedPatientForm
5268 def get_success_url(self) -> str:
5269 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES)
5271 def get_object(self) -> Any:
5272 patient = cast(Patient, super().get_object())
5274 if not patient.created_on_server(self.request):
5275 _ = self.request.gettext
5277 raise HTTPBadRequest(
5278 _("Patient is not editable - was not created on the server")
5279 )
5281 return patient
5283 def save_changes(
5284 self, appstruct: Dict[str, Any], changes: OrderedDict
5285 ) -> None:
5286 self._save_group(appstruct, changes)
5287 super().save_changes(appstruct, changes)
5288 self._save_task_schedules(appstruct, changes)
5290 def _save_group(
5291 self, appstruct: Dict[str, Any], changes: OrderedDict
5292 ) -> None:
5293 patient = cast(Patient, self.object)
5295 old_group_id = patient.group.id
5296 old_group_name = patient.group.name
5297 new_group_id = appstruct.get(ViewParam.GROUP_ID, None)
5298 new_group = (
5299 self.request.dbsession.query(Group)
5300 .filter(Group.id == new_group_id)
5301 .first()
5302 )
5304 if old_group_id != new_group_id:
5305 patient._group_id = new_group_id
5306 changes["group"] = (old_group_name, new_group.name)
5308 def _save_task_schedules(
5309 self, appstruct: Dict[str, Any], changes: OrderedDict
5310 ) -> None:
5312 _ = self.request.gettext
5313 patient = cast(Patient, self.object)
5314 ids_to_delete = [pts.id for pts in patient.task_schedules]
5316 anything_changed = False
5318 for schedule_dict in appstruct.get(ViewParam.TASK_SCHEDULES, {}):
5319 pts_id = schedule_dict[ViewParam.PATIENT_TASK_SCHEDULE_ID]
5320 schedule_id = schedule_dict[ViewParam.SCHEDULE_ID]
5321 start_datetime = schedule_dict[ViewParam.START_DATETIME]
5322 if DEFORM_ACCORDION_BUG:
5323 settings = schedule_dict[ViewParam.SETTINGS]
5324 else:
5325 settings = schedule_dict[ViewParam.ADVANCED][
5326 ViewParam.SETTINGS
5327 ]
5329 if pts_id is None:
5330 pts = PatientTaskSchedule()
5331 pts.patient_pk = patient.pk
5332 pts.schedule_id = schedule_id
5333 pts.start_datetime = start_datetime
5334 pts.settings = settings
5336 self.request.dbsession.add(pts)
5337 anything_changed = True
5338 else:
5339 old_pts = (
5340 self.request.dbsession.query(PatientTaskSchedule)
5341 .filter(PatientTaskSchedule.id == pts_id)
5342 .first()
5343 )
5345 updates = {}
5346 if old_pts.start_datetime != start_datetime:
5347 updates[PatientTaskSchedule.start_datetime] = (
5348 start_datetime
5349 )
5351 if old_pts.schedule_id != schedule_id:
5352 updates[PatientTaskSchedule.schedule_id] = schedule_id # type: ignore[index] # noqa: E501
5354 if old_pts.settings != settings:
5355 updates[PatientTaskSchedule.settings] = settings
5357 if updates:
5358 anything_changed = True
5359 self.request.dbsession.query(PatientTaskSchedule).filter(
5360 PatientTaskSchedule.id == pts_id
5361 ).update(updates, synchronize_session="fetch")
5363 ids_to_delete.remove(pts_id)
5365 pts_to_delete = self.request.dbsession.query(
5366 PatientTaskSchedule
5367 ).filter(PatientTaskSchedule.id.in_(ids_to_delete))
5369 # Previously we had:
5370 # pts_to_delete.delete(synchronize_session="fetch")
5371 #
5372 # This won't cascade the deletion because we are calling delete() on
5373 # the query object. We could set up cascade at the database level
5374 # instead but there is little performance gain here.
5375 # https://stackoverflow.com/questions/19243964/sqlalchemy-delete-doesnt-cascade
5377 for pts in pts_to_delete:
5378 self.request.dbsession.delete(pts)
5379 anything_changed = True
5381 if anything_changed:
5382 changes[_("Task schedules")] = (_("Updated"),)
5385class EditFinalizedPatientView(EditPatientBaseView):
5386 """
5387 View to edit a finalized patient.
5388 """
5390 template_name = "finalized_patient_edit.mako"
5391 form_class = EditFinalizedPatientForm
5393 def __init__(
5394 self,
5395 req: CamcopsRequest,
5396 task_tablename: str = None,
5397 task_server_pk: int = None,
5398 ) -> None:
5399 """
5400 The two additional parameters are for returning the user to the task
5401 from which editing was initiated.
5402 """
5403 super().__init__(req)
5404 self.task_tablename = task_tablename
5405 self.task_server_pk = task_server_pk
5407 def get_success_url(self) -> str:
5408 """
5409 We got here by editing a patient from an uploaded task, so that's our
5410 return point.
5411 """
5412 if self.task_tablename and self.task_server_pk:
5413 return self.request.route_url(
5414 Routes.TASK,
5415 _query={
5416 ViewParam.TABLE_NAME: self.task_tablename,
5417 ViewParam.SERVER_PK: self.task_server_pk,
5418 ViewParam.VIEWTYPE: ViewArg.HTML,
5419 },
5420 )
5421 else:
5422 # Likely in a testing environment!
5423 return self.request.route_url(Routes.HOME)
5425 def get_object(self) -> Any:
5426 patient = cast(Patient, super().get_object())
5428 if not patient.is_finalized():
5429 _ = self.request.gettext
5431 raise HTTPBadRequest(
5432 _(
5433 "Patient is not editable (likely: not finalized, so a "
5434 "copy still on a client device)"
5435 )
5436 )
5438 return patient
5441@view_config(
5442 route_name=Routes.EDIT_FINALIZED_PATIENT,
5443 permission=Permission.GROUPADMIN,
5444 http_cache=NEVER_CACHE,
5445)
5446def edit_finalized_patient(req: "CamcopsRequest") -> Response:
5447 """
5448 View to edit details for a patient.
5449 """
5450 task_table_name = req.get_str_param(
5451 ViewParam.BACK_TASK_TABLENAME, validator=validate_task_tablename
5452 )
5453 task_server_pk = req.get_int_param(ViewParam.BACK_TASK_SERVER_PK, None)
5455 return EditFinalizedPatientView(
5456 req, task_tablename=task_table_name, task_server_pk=task_server_pk
5457 ).dispatch()
5460@view_config(
5461 route_name=Routes.EDIT_SERVER_CREATED_PATIENT, http_cache=NEVER_CACHE
5462)
5463def edit_server_created_patient(req: "CamcopsRequest") -> Response:
5464 """
5465 View to edit details for a patient created on the server (for scheduling
5466 tasks).
5467 """
5468 return EditServerCreatedPatientView(req).dispatch()
5471class AddPatientView(PatientMixin, CreateView):
5472 """
5473 View to add a patient (for task scheduling).
5474 """
5476 form_class = EditServerCreatedPatientForm
5477 template_name = "patient_add.mako"
5479 def dispatch(self) -> Response:
5480 if not self.request.user.authorized_to_manage_patients:
5481 _ = self.request.gettext
5482 raise HTTPBadRequest(_("Not authorized to manage patients"))
5484 return super().dispatch()
5486 def get_success_url(self) -> str:
5487 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES)
5489 def save_object(self, appstruct: Dict[str, Any]) -> None:
5490 server_device = Device.get_server_device(self.request.dbsession)
5492 patient = Patient()
5493 patient.create_fresh(
5494 self.request,
5495 device_id=server_device.id,
5496 era=ERA_NOW,
5497 group_id=appstruct.get(ViewParam.GROUP_ID),
5498 )
5500 for k in EDIT_PATIENT_SIMPLE_PARAMS:
5501 new_value = appstruct.get(k)
5502 setattr(patient, k, new_value)
5504 patient.save_with_next_available_id(self.request, server_device.id)
5506 new_idrefs = [
5507 IdNumReference(
5508 which_idnum=idrefdict[ViewParam.WHICH_IDNUM],
5509 idnum_value=idrefdict[ViewParam.IDNUM_VALUE],
5510 )
5511 for idrefdict in appstruct.get(ViewParam.ID_REFERENCES)
5512 ]
5514 for idref in new_idrefs:
5515 new_idnum = PatientIdNum()
5516 new_idnum.patient_id = patient.id
5517 new_idnum.which_idnum = idref.which_idnum
5518 new_idnum.idnum_value = idref.idnum_value
5519 new_idnum.create_fresh(
5520 self.request,
5521 device_id=server_device.id,
5522 era=ERA_NOW,
5523 group_id=appstruct.get(ViewParam.GROUP_ID),
5524 )
5526 new_idnum.save_with_next_available_id(
5527 self.request, server_device.id
5528 )
5530 task_schedules = appstruct.get(ViewParam.TASK_SCHEDULES)
5532 self.request.dbsession.commit()
5534 for task_schedule in task_schedules:
5535 schedule_id = task_schedule[ViewParam.SCHEDULE_ID]
5536 start_datetime = task_schedule[ViewParam.START_DATETIME]
5537 if DEFORM_ACCORDION_BUG:
5538 settings = task_schedule[ViewParam.SETTINGS]
5539 else:
5540 settings = task_schedule[ViewParam.ADVANCED][
5541 ViewParam.SETTINGS
5542 ]
5543 patient_task_schedule = PatientTaskSchedule()
5544 patient_task_schedule.patient_pk = patient.pk
5545 patient_task_schedule.schedule_id = schedule_id
5546 patient_task_schedule.start_datetime = start_datetime
5547 patient_task_schedule.settings = settings
5549 self.request.dbsession.add(patient_task_schedule)
5551 self.object = patient
5554@view_config(route_name=Routes.ADD_PATIENT, http_cache=NEVER_CACHE)
5555def add_patient(req: "CamcopsRequest") -> Response:
5556 """
5557 View to add a patient.
5558 """
5559 return AddPatientView(req).dispatch()
5562class DeleteServerCreatedPatientView(DeleteView):
5563 """
5564 View to delete a patient that had been created on the server.
5565 """
5567 form_class = DeleteServerCreatedPatientForm
5568 object_class = Patient
5569 pk_param = ViewParam.SERVER_PK
5570 server_pk_name = "_pk"
5571 template_name = TEMPLATE_GENERIC_FORM
5573 def get_object(self) -> Any:
5574 patient = cast(Patient, super().get_object())
5575 if not patient.user_may_edit(self.request):
5576 _ = self.request.gettext
5577 raise HTTPBadRequest(_("Not authorized to delete this patient"))
5578 return patient
5580 def get_extra_context(self) -> Dict[str, Any]:
5581 _ = self.request.gettext
5582 return {
5583 MAKO_VAR_TITLE: self.request.icon_text(
5584 icon=Icons.DELETE, text=_("Delete patient")
5585 )
5586 }
5588 def get_success_url(self) -> str:
5589 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES)
5591 def delete(self) -> None:
5592 patient = cast(Patient, self.object)
5594 PatientIdNumIndexEntry.unindex_patient(patient, self.request.dbsession)
5596 patient.delete_with_dependants(self.request)
5599@view_config(
5600 route_name=Routes.DELETE_SERVER_CREATED_PATIENT, http_cache=NEVER_CACHE
5601)
5602def delete_server_created_patient(req: "CamcopsRequest") -> Response:
5603 """
5604 Page to delete a patient created on the server (as part of task
5605 scheduling).
5606 """
5607 return DeleteServerCreatedPatientView(req).dispatch()
5610# =============================================================================
5611# Task scheduling
5612# =============================================================================
5615@view_config(
5616 route_name=Routes.VIEW_TASK_SCHEDULES,
5617 permission=Permission.GROUPADMIN,
5618 renderer="view_task_schedules.mako",
5619 http_cache=NEVER_CACHE,
5620)
5621def view_task_schedules(req: "CamcopsRequest") -> Dict[str, Any]:
5622 """
5623 View whole task schedules.
5624 """
5625 rows_per_page = req.get_int_param(
5626 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
5627 )
5628 page_num = req.get_int_param(ViewParam.PAGE, 1)
5629 group_ids = req.user.ids_of_groups_user_is_admin_for
5630 q = (
5631 req.dbsession.query(TaskSchedule)
5632 .join(TaskSchedule.group)
5633 .filter(TaskSchedule.group_id.in_(group_ids))
5634 .order_by(Group.name, TaskSchedule.name)
5635 )
5636 page = SqlalchemyOrmPage(
5637 query=q,
5638 page=page_num,
5639 items_per_page=rows_per_page,
5640 url_maker=PageUrl(req),
5641 request=req,
5642 )
5643 return dict(page=page)
5646@view_config(
5647 route_name=Routes.VIEW_TASK_SCHEDULE_ITEMS,
5648 permission=Permission.GROUPADMIN,
5649 renderer="view_task_schedule_items.mako",
5650 http_cache=NEVER_CACHE,
5651)
5652def view_task_schedule_items(req: "CamcopsRequest") -> Dict[str, Any]:
5653 """
5654 View items within a task schedule.
5655 """
5656 rows_per_page = req.get_int_param(
5657 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
5658 )
5659 page_num = req.get_int_param(ViewParam.PAGE, 1)
5660 schedule_id = req.get_int_param(ViewParam.SCHEDULE_ID)
5662 schedule = (
5663 req.dbsession.query(TaskSchedule)
5664 .filter(TaskSchedule.id == schedule_id)
5665 .one_or_none()
5666 )
5668 if schedule is None:
5669 _ = req.gettext
5670 raise HTTPBadRequest(_("Schedule does not exist"))
5672 q = (
5673 req.dbsession.query(TaskScheduleItem)
5674 .filter(TaskScheduleItem.schedule_id == schedule_id)
5675 .order_by(*task_schedule_item_sort_order())
5676 )
5677 page = SqlalchemyOrmPage(
5678 query=q,
5679 page=page_num,
5680 items_per_page=rows_per_page,
5681 url_maker=PageUrl(req),
5682 request=req,
5683 )
5684 return dict(page=page, schedule_name=schedule.name)
5687@view_config(
5688 route_name=Routes.VIEW_PATIENT_TASK_SCHEDULES,
5689 renderer="view_patient_task_schedules.mako",
5690 http_cache=NEVER_CACHE,
5691)
5692def view_patient_task_schedules(req: "CamcopsRequest") -> Dict[str, Any]:
5693 """
5694 View all patients and their assigned schedules (as well as their access
5695 keys, etc.).
5696 """
5697 server_device = Device.get_server_device(req.dbsession)
5699 rows_per_page = req.get_int_param(
5700 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE
5701 )
5702 page_num = req.get_int_param(ViewParam.PAGE, 1)
5703 allowed_group_ids = req.user.ids_of_groups_user_may_manage_patients_in
5704 # noinspection PyProtectedMember
5705 q = (
5706 req.dbsession.query(Patient)
5707 .filter(Patient._era == ERA_NOW)
5708 .filter(Patient._group_id.in_(allowed_group_ids))
5709 .filter(Patient._device_id == server_device.id)
5710 .order_by(Patient.surname, Patient.forename)
5711 )
5713 page = SqlalchemyOrmPage(
5714 query=q,
5715 page=page_num,
5716 items_per_page=rows_per_page,
5717 url_maker=PageUrl(req),
5718 request=req,
5719 )
5720 return dict(page=page)
5723@view_config(
5724 route_name=Routes.VIEW_PATIENT_TASK_SCHEDULE,
5725 renderer="view_patient_task_schedule.mako",
5726 http_cache=NEVER_CACHE,
5727)
5728def view_patient_task_schedule(req: "CamcopsRequest") -> Dict[str, Any]:
5729 """
5730 View scheduled tasks for one patient's specific task schedule.
5731 """
5732 pts_id = req.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID)
5734 pts = (
5735 req.dbsession.query(PatientTaskSchedule)
5736 .filter(PatientTaskSchedule.id == pts_id)
5737 .options(
5738 joinedload(PatientTaskSchedule.patient).joinedload(Patient.idnums),
5739 joinedload(PatientTaskSchedule.task_schedule).joinedload(
5740 TaskSchedule.items
5741 ),
5742 )
5743 .one_or_none()
5744 )
5746 _ = req.gettext
5747 if pts is None:
5748 raise HTTPBadRequest(_("Patient's task schedule does not exist"))
5750 if not pts.patient.user_may_edit(req):
5751 raise HTTPBadRequest(_("Not authorized to manage this patient"))
5753 patient_descriptor = pts.patient.prettystr(req)
5755 return dict(
5756 pts=pts,
5757 patient_descriptor=patient_descriptor,
5758 schedule_name=pts.task_schedule.name,
5759 task_list=pts.get_list_of_scheduled_tasks(req),
5760 )
5763class TaskScheduleMixin(object):
5764 """
5765 Mixin for viewing/editing a task schedule.
5766 """
5768 form_class = EditTaskScheduleForm
5769 model_form_dict = {
5770 "name": ViewParam.NAME,
5771 "group_id": ViewParam.GROUP_ID,
5772 "email_bcc": ViewParam.EMAIL_BCC,
5773 "email_cc": ViewParam.EMAIL_CC,
5774 "email_from": ViewParam.EMAIL_FROM,
5775 "email_subject": ViewParam.EMAIL_SUBJECT,
5776 "email_template": ViewParam.EMAIL_TEMPLATE,
5777 }
5778 object_class = TaskSchedule
5779 request: "CamcopsRequest"
5780 server_pk_name = "id"
5781 template_name = TEMPLATE_GENERIC_FORM
5783 def get_success_url(self) -> str:
5784 return self.request.route_url(Routes.VIEW_TASK_SCHEDULES)
5786 def get_object(self) -> Any:
5787 # noinspection PyUnresolvedReferences
5788 schedule = cast(TaskSchedule, super().get_object()) # type: ignore[misc] # noqa: E501
5790 if not schedule.user_may_edit(self.request):
5791 _ = self.request.gettext
5792 raise HTTPBadRequest(
5793 _(
5794 "You a not a group administrator for this "
5795 "task schedule's group"
5796 )
5797 )
5799 return schedule
5802class AddTaskScheduleView(TaskScheduleMixin, CreateView):
5803 """
5804 Django-style view class to add a task schedule.
5805 """
5807 def get_extra_context(self) -> Dict[str, Any]:
5808 _ = self.request.gettext
5809 return {
5810 MAKO_VAR_TITLE: self.request.icon_text(
5811 icon=Icons.TASK_SCHEDULE_ADD, text=_("Add a task schedule")
5812 )
5813 }
5816class EditTaskScheduleView(TaskScheduleMixin, UpdateView):
5817 """
5818 Django-style view class to edit a task schedule.
5819 """
5821 pk_param = ViewParam.SCHEDULE_ID
5823 def get_extra_context(self) -> Dict[str, Any]:
5824 _ = self.request.gettext
5825 return {
5826 MAKO_VAR_TITLE: self.request.icon_text(
5827 icon=Icons.TASK_SCHEDULE,
5828 text=_("Edit details for a task schedule"),
5829 )
5830 }
5833class DeleteTaskScheduleView(TaskScheduleMixin, DeleteView):
5834 """
5835 Django-style view class to delete a task schedule.
5836 """
5838 form_class = DeleteTaskScheduleForm
5839 pk_param = ViewParam.SCHEDULE_ID
5841 def get_extra_context(self) -> Dict[str, Any]:
5842 _ = self.request.gettext
5843 return {
5844 MAKO_VAR_TITLE: self.request.icon_text(
5845 icon=Icons.DELETE, text=_("Delete a task schedule")
5846 )
5847 }
5850@view_config(
5851 route_name=Routes.ADD_TASK_SCHEDULE,
5852 permission=Permission.GROUPADMIN,
5853 http_cache=NEVER_CACHE,
5854)
5855def add_task_schedule(req: "CamcopsRequest") -> Response:
5856 """
5857 View to add a task schedule.
5858 """
5859 return AddTaskScheduleView(req).dispatch()
5862@view_config(
5863 route_name=Routes.EDIT_TASK_SCHEDULE, permission=Permission.GROUPADMIN
5864)
5865def edit_task_schedule(req: "CamcopsRequest") -> Response:
5866 """
5867 View to edit a task schedule.
5868 """
5869 return EditTaskScheduleView(req).dispatch()
5872@view_config(
5873 route_name=Routes.DELETE_TASK_SCHEDULE, permission=Permission.GROUPADMIN
5874)
5875def delete_task_schedule(req: "CamcopsRequest") -> Response:
5876 """
5877 View to delete a task schedule.
5878 """
5879 return DeleteTaskScheduleView(req).dispatch()
5882class TaskScheduleItemMixin(object):
5883 """
5884 Mixin for viewing/editing a task schedule items.
5885 """
5887 form_class = EditTaskScheduleItemForm
5888 template_name = TEMPLATE_GENERIC_FORM
5889 model_form_dict = {
5890 "schedule_id": ViewParam.SCHEDULE_ID,
5891 "task_table_name": ViewParam.TABLE_NAME,
5892 "due_from": ViewParam.DUE_FROM,
5893 # we need to convert due_within to due_by
5894 }
5895 object: Any
5896 # noinspection PyTypeChecker
5897 object_class = cast(Type["Base"], TaskScheduleItem)
5898 pk_param = ViewParam.SCHEDULE_ITEM_ID
5899 request: "CamcopsRequest"
5900 server_pk_name = "id"
5902 def get_success_url(self) -> str:
5903 # noinspection PyUnresolvedReferences
5904 return self.request.route_url(
5905 Routes.VIEW_TASK_SCHEDULE_ITEMS,
5906 _query={ViewParam.SCHEDULE_ID: self.get_schedule_id()}, # type: ignore[attr-defined] # noqa: E501
5907 )
5910class EditTaskScheduleItemMixin(TaskScheduleItemMixin):
5911 """
5912 Django-style view class to edit a task schedule item.
5913 """
5915 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
5916 # noinspection PyUnresolvedReferences
5917 super().set_object_properties(appstruct) # type: ignore[misc]
5919 due_from = appstruct.get(ViewParam.DUE_FROM)
5920 due_within = appstruct.get(ViewParam.DUE_WITHIN)
5922 setattr(self.object, "due_by", due_from + due_within)
5924 def get_schedule(self) -> TaskSchedule:
5925 # noinspection PyUnresolvedReferences
5926 schedule_id = self.get_schedule_id() # type: ignore[attr-defined]
5928 schedule = (
5929 self.request.dbsession.query(TaskSchedule)
5930 .filter(TaskSchedule.id == schedule_id)
5931 .one_or_none()
5932 )
5934 if schedule is None:
5935 _ = self.request.gettext
5936 raise HTTPBadRequest(
5937 f"{_('Missing Task Schedule for id')} {schedule_id}"
5938 )
5940 if not schedule.user_may_edit(self.request):
5941 _ = self.request.gettext
5942 raise HTTPBadRequest(
5943 _(
5944 "You a not a group administrator for this "
5945 "task schedule's group"
5946 )
5947 )
5949 return schedule
5952class AddTaskScheduleItemView(EditTaskScheduleItemMixin, CreateView):
5953 """
5954 Django-style view class to add a task schedule item.
5955 """
5957 def get_extra_context(self) -> Dict[str, Any]:
5958 _ = self.request.gettext
5960 schedule = self.get_schedule()
5962 return {
5963 MAKO_VAR_TITLE: self.request.icon_text(
5964 icon=Icons.TASK_SCHEDULE_ITEM_ADD,
5965 text=_("Add an item to the {schedule_name} schedule").format(
5966 schedule_name=schedule.name
5967 ),
5968 )
5969 }
5971 def get_schedule_id(self) -> int:
5972 return self.request.get_int_param(ViewParam.SCHEDULE_ID)
5974 def get_form_values(self) -> Dict:
5975 schedule = self.get_schedule()
5977 form_values = super().get_form_values()
5978 form_values[ViewParam.SCHEDULE_ID] = schedule.id
5980 return form_values
5983class EditTaskScheduleItemView(EditTaskScheduleItemMixin, UpdateView):
5984 """
5985 Django-style view class to edit a task schedule item.
5986 """
5988 def get_extra_context(self) -> Dict[str, Any]:
5989 _ = self.request.gettext
5990 return {
5991 MAKO_VAR_TITLE: self.request.icon_text(
5992 icon=Icons.EDIT,
5993 text=_("Edit details for a task schedule item"),
5994 )
5995 }
5997 def get_schedule_id(self) -> int:
5998 item = cast(TaskScheduleItem, self.object)
6000 return item.schedule_id
6002 def get_form_values(self) -> Dict:
6003 schedule = self.get_schedule()
6005 form_values = super().get_form_values()
6006 form_values[ViewParam.SCHEDULE_ID] = schedule.id
6008 item = cast(TaskScheduleItem, self.object)
6009 due_within = item.due_by - form_values[ViewParam.DUE_FROM]
6010 form_values[ViewParam.DUE_WITHIN] = due_within
6012 return form_values
6015class DeleteTaskScheduleItemView(TaskScheduleItemMixin, DeleteView):
6016 """
6017 Django-style view class to delete a task schedule item.
6018 """
6020 form_class = DeleteTaskScheduleItemForm
6022 def get_extra_context(self) -> Dict[str, Any]:
6023 _ = self.request.gettext
6024 return {
6025 MAKO_VAR_TITLE: self.request.icon_text(
6026 icon=Icons.DELETE, text=_("Delete a task schedule item")
6027 )
6028 }
6030 def get_schedule_id(self) -> int:
6031 item = cast(TaskScheduleItem, self.object)
6033 return item.schedule_id
6036@view_config(
6037 route_name=Routes.ADD_TASK_SCHEDULE_ITEM, permission=Permission.GROUPADMIN
6038)
6039def add_task_schedule_item(req: "CamcopsRequest") -> Response:
6040 """
6041 View to add a task schedule item.
6042 """
6043 return AddTaskScheduleItemView(req).dispatch()
6046@view_config(
6047 route_name=Routes.EDIT_TASK_SCHEDULE_ITEM, permission=Permission.GROUPADMIN
6048)
6049def edit_task_schedule_item(req: "CamcopsRequest") -> Response:
6050 """
6051 View to edit a task schedule item.
6052 """
6053 return EditTaskScheduleItemView(req).dispatch()
6056@view_config(
6057 route_name=Routes.DELETE_TASK_SCHEDULE_ITEM,
6058 permission=Permission.GROUPADMIN,
6059)
6060def delete_task_schedule_item(req: "CamcopsRequest") -> Response:
6061 """
6062 View to delete a task schedule item.
6063 """
6064 return DeleteTaskScheduleItemView(req).dispatch()
6067@view_config(
6068 route_name=Routes.CLIENT_API,
6069 request_method=HttpMethod.GET,
6070 permission=NO_PERMISSION_REQUIRED,
6071 renderer="client_api_signposting.mako",
6072)
6073@view_config(
6074 route_name=Routes.CLIENT_API_ALIAS,
6075 request_method=HttpMethod.GET,
6076 permission=NO_PERMISSION_REQUIRED,
6077 renderer="client_api_signposting.mako",
6078)
6079def client_api_signposting(req: "CamcopsRequest") -> Dict[str, Any]:
6080 """
6081 Patients are likely to enter the ``/api`` address into a web browser,
6082 especially if it appears as a hyperlink in an email. If so, that will
6083 arrive as a ``GET`` request. This page will direct them to download the
6084 app.
6085 """
6086 return {
6087 "github_link": req.icon_text(
6088 icon=Icons.GITHUB, url=GITHUB_RELEASES_URL, text="GitHub"
6089 ),
6090 "server_url": req.route_url(Routes.CLIENT_API),
6091 }
6094class SendPatientEmailBaseView(FormView):
6095 """
6096 Send an e-mail to a patient (such as: "please download the app and register
6097 with this URL/code").
6098 """
6100 form_class = SendEmailForm
6101 template_name = "send_patient_email.mako"
6103 def __init__(self, *args: Any, **kwargs: Any) -> None:
6104 self._pts = None
6106 super().__init__(*args, **kwargs)
6108 def dispatch(self) -> Response:
6109 if not self.request.user.authorized_to_email_patients:
6110 _ = self.request.gettext
6111 raise HTTPBadRequest(_("Not authorized to email patients"))
6113 return super().dispatch()
6115 def get_context_data(self, **kwargs: Any) -> Dict[str, Any]:
6116 kwargs["pts"] = self._get_patient_task_schedule()
6118 return super().get_context_data(**kwargs)
6120 def form_valid(self, form: "Form", appstruct: Dict[str, Any]) -> Response:
6121 config = self.request.config
6123 patient_email = appstruct.get(ViewParam.EMAIL)
6125 kwargs = dict(
6126 from_addr=appstruct.get(ViewParam.EMAIL_FROM),
6127 to=patient_email,
6128 subject=appstruct.get(ViewParam.EMAIL_SUBJECT),
6129 body=appstruct.get(ViewParam.EMAIL_BODY),
6130 content_type=MimeType.HTML,
6131 )
6133 cc = appstruct.get(ViewParam.EMAIL_CC)
6134 if cc:
6135 kwargs["cc"] = cc
6137 bcc = appstruct.get(ViewParam.EMAIL_BCC)
6138 if bcc:
6139 kwargs["bcc"] = bcc
6141 email = Email(**kwargs)
6142 ok = email.send(
6143 host=config.email_host,
6144 username=config.email_host_username,
6145 password=config.email_host_password,
6146 port=config.email_port,
6147 use_tls=config.email_use_tls,
6148 )
6149 if ok:
6150 self._display_success_message(patient_email)
6151 else:
6152 self._display_failure_message(patient_email)
6154 self.request.dbsession.add(email)
6155 self.request.dbsession.flush()
6156 pts_id = self.request.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID)
6157 if pts_id is None:
6158 _ = self.request.gettext
6159 raise HTTPBadRequest(_("Patient task schedule does not exist"))
6161 pts_email = PatientTaskScheduleEmail()
6162 pts_email.patient_task_schedule_id = pts_id
6163 pts_email.email_id = email.id
6164 self.request.dbsession.add(pts_email)
6165 self.request.dbsession.commit()
6167 return super().form_valid(form, appstruct)
6169 def _display_success_message(self, patient_email: str) -> None:
6170 _ = self.request.gettext
6171 message = _("Email sent to {patient_email}").format(
6172 patient_email=patient_email
6173 )
6175 self.request.session.flash(message, queue=FlashQueue.SUCCESS)
6177 def _display_failure_message(self, patient_email: str) -> None:
6178 _ = self.request.gettext
6179 message = _("Failed to send email to {patient_email}").format(
6180 patient_email=patient_email
6181 )
6183 self.request.session.flash(message, queue=FlashQueue.DANGER)
6185 def get_form_values(self) -> Dict:
6186 pts = self._get_patient_task_schedule()
6188 if pts is None:
6189 _ = self.request.gettext
6190 raise HTTPBadRequest(_("Patient task schedule does not exist"))
6192 return {
6193 ViewParam.EMAIL: pts.patient.email,
6194 ViewParam.EMAIL_CC: pts.task_schedule.email_cc,
6195 ViewParam.EMAIL_BCC: pts.task_schedule.email_bcc,
6196 ViewParam.EMAIL_FROM: pts.task_schedule.email_from,
6197 ViewParam.EMAIL_SUBJECT: pts.task_schedule.email_subject,
6198 ViewParam.EMAIL_BODY: pts.email_body(self.request),
6199 }
6201 def _get_patient_task_schedule(self) -> Optional[PatientTaskSchedule]:
6202 if self._pts is not None:
6203 return self._pts
6205 pts_id = self.request.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID)
6207 self._pts = (
6208 self.request.dbsession.query(PatientTaskSchedule)
6209 .filter(PatientTaskSchedule.id == pts_id)
6210 .one_or_none()
6211 )
6213 return self._pts
6216class SendEmailFromPatientListView(SendPatientEmailBaseView):
6217 """
6218 Send an e-mail to a patient and return to the patient task schedule list
6219 view.
6220 """
6222 def get_success_url(self) -> str:
6223 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES)
6226class SendEmailFromPatientTaskScheduleView(SendPatientEmailBaseView):
6227 """
6228 Send an e-mail to a patient and return to the task schedule view for that
6229 specific patient.
6230 """
6232 def get_success_url(self) -> str:
6233 pts_id = self.request.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID)
6235 return self.request.route_url(
6236 Routes.VIEW_PATIENT_TASK_SCHEDULE,
6237 _query={ViewParam.PATIENT_TASK_SCHEDULE_ID: pts_id},
6238 )
6241@view_config(
6242 route_name=Routes.SEND_EMAIL_FROM_PATIENT_TASK_SCHEDULE,
6243 http_cache=NEVER_CACHE,
6244)
6245def send_email_from_patient_task_schedule(req: "CamcopsRequest") -> Response:
6246 """
6247 View to send an email to a patient from their task schedule page.
6248 """
6249 return SendEmailFromPatientTaskScheduleView(req).dispatch()
6252@view_config(
6253 route_name=Routes.SEND_EMAIL_FROM_PATIENT_LIST, http_cache=NEVER_CACHE
6254)
6255def send_email_from_patient_list(req: "CamcopsRequest") -> Response:
6256 """
6257 View to send an email to a patient from the list of patients.
6258 """
6259 return SendEmailFromPatientListView(req).dispatch()
6262# =============================================================================
6263# FHIR identifier "system" information
6264# =============================================================================
6267@view_config(
6268 route_name=Routes.FHIR_PATIENT_ID_SYSTEM,
6269 request_method=HttpMethod.GET,
6270 renderer="fhir_patient_id_system.mako",
6271 http_cache=NEVER_CACHE,
6272)
6273def view_fhir_patient_id_system(req: "CamcopsRequest") -> Dict[str, Any]:
6274 """
6275 Placeholder view for FHIR patient identifier "system" types (from the ID
6276 that we may have provided to a FHIR server).
6278 Within each system, the "value" is the actual patient's ID number (not
6279 part of what we show here).
6280 """
6281 which_idnum = int(req.matchdict[ViewParam.WHICH_IDNUM])
6282 if which_idnum not in req.valid_which_idnums:
6283 _ = req.gettext
6284 raise HTTPBadRequest(
6285 f"{_('Unknown patient ID type:')} " f"{which_idnum!r}"
6286 )
6287 return dict(which_idnum=which_idnum)
6290# noinspection PyUnusedLocal
6291@view_config(
6292 route_name=Routes.FHIR_QUESTIONNAIRE_SYSTEM,
6293 request_method=HttpMethod.GET,
6294 renderer="all_tasks.mako",
6295 http_cache=NEVER_CACHE,
6296)
6297@view_config(
6298 route_name=Routes.TASK_LIST,
6299 request_method=HttpMethod.GET,
6300 renderer="all_tasks.mako",
6301 http_cache=NEVER_CACHE,
6302)
6303def view_task_list(req: "CamcopsRequest") -> Dict[str, Any]:
6304 """
6305 Lists all tasks.
6307 Also the placeholder view for FHIR Questionnaire "system".
6308 There's only one system -- the "value" is the task type.
6309 """
6310 return dict(all_task_classes=Task.all_subclasses_by_tablename())
6313@view_config(
6314 route_name=Routes.TASK_DETAILS,
6315 request_method=HttpMethod.GET,
6316 renderer="task_details.mako",
6317 http_cache=NEVER_CACHE,
6318)
6319def view_task_details(req: "CamcopsRequest") -> Dict[str, Any]:
6320 """
6321 View details of a specific task type.
6323 Used also for for FHIR DocumentReference, Observation,and
6324 QuestionnaireResponse "system" types. (There's one system per task. Within
6325 each task, the "value" relates to the specific task PK.)
6326 """
6327 table_name = req.matchdict[ViewParam.TABLE_NAME]
6328 task_class_dict = tablename_to_task_class_dict()
6329 if table_name not in task_class_dict:
6330 _ = req.gettext
6331 raise HTTPBadRequest(f"{_('Unknown task:')} {table_name!r}")
6332 task_class = task_class_dict[table_name]
6333 task_instance = task_class()
6335 fhir_aq_items = task_instance.get_fhir_questionnaire(req)
6336 # ddl = task_instance.get_ddl()
6337 # ddl_html, ddl_css = format_sql_as_html(ddl)
6339 return dict(
6340 task_class=task_class,
6341 task_instance=task_instance,
6342 fhir_aq_items=fhir_aq_items,
6343 # ddl_html=ddl_html,
6344 # css=ddl_css,
6345 )
6348@view_config(
6349 route_name=Routes.FHIR_CONDITION,
6350 request_method=HttpMethod.GET,
6351 http_cache=NEVER_CACHE,
6352)
6353@view_config(
6354 route_name=Routes.FHIR_DOCUMENT_REFERENCE,
6355 request_method=HttpMethod.GET,
6356 http_cache=NEVER_CACHE,
6357)
6358@view_config(
6359 route_name=Routes.FHIR_OBSERVATION,
6360 request_method=HttpMethod.GET,
6361 http_cache=NEVER_CACHE,
6362)
6363@view_config(
6364 route_name=Routes.FHIR_PRACTITIONER,
6365 request_method=HttpMethod.GET,
6366 http_cache=NEVER_CACHE,
6367)
6368@view_config(
6369 route_name=Routes.FHIR_QUESTIONNAIRE_RESPONSE,
6370 request_method=HttpMethod.GET,
6371 http_cache=NEVER_CACHE,
6372)
6373def fhir_view_task(req: "CamcopsRequest") -> Response:
6374 """
6375 Retrieve parameters from a FHIR URL referring back to this server, and
6376 serve the relevant task (as HTML).
6378 The "canonical URL" or "business identifier" of a FHIR resource is the
6379 reference to the master copy -- in this case, our copy. See
6380 https://www.hl7.org/fhir/datatypes.html#Identifier;
6381 https://www.hl7.org/fhir/resource.html#identifiers.
6383 FHIR identifiers have a "system" (which is a URL) and a "value". I don't
6384 think that FHIR has a rule for combining the system and value to create a
6385 full URL. For some (but by no means all) identifiers that we provide to
6386 FHIR servers, the "system" refers to a CamCOPS task (and the value to some
6387 attribute of that task, like the answer to a question (value of a field),
6388 or a fixed string like "patient", and so on.
6389 """
6390 table_name = req.matchdict[ViewParam.TABLE_NAME]
6391 server_pk = req.matchdict[ViewParam.SERVER_PK]
6392 return HTTPFound(
6393 req.route_url(
6394 Routes.TASK,
6395 _query={
6396 ViewParam.TABLE_NAME: table_name,
6397 ViewParam.SERVER_PK: server_pk,
6398 ViewParam.VIEWTYPE: ViewArg.HTML,
6399 },
6400 )
6401 )
6404@view_config(
6405 route_name=Routes.FHIR_TABLENAME_PK_ID,
6406 request_method=HttpMethod.GET,
6407 http_cache=NEVER_CACHE,
6408)
6409def fhir_view_tablename_pk(req: "CamcopsRequest") -> Response:
6410 """
6411 Deal with the slightly silly system that just takes a tablename and PK
6412 directly. Security is key here!
6413 """
6414 table_name = req.matchdict[ViewParam.TABLE_NAME]
6415 server_pk = req.matchdict[ViewParam.SERVER_PK]
6416 if table_name == Patient.__tablename__:
6417 return view_patient(req, server_pk)
6418 return HTTPFound(
6419 req.route_url(
6420 Routes.TASK,
6421 _query={
6422 ViewParam.TABLE_NAME: table_name,
6423 ViewParam.SERVER_PK: server_pk,
6424 ViewParam.VIEWTYPE: ViewArg.HTML,
6425 },
6426 )
6427 )
6430# =============================================================================
6431# Static assets
6432# =============================================================================
6433# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/assets.html#advanced-static # noqa
6436def debug_form_rendering() -> None:
6437 r"""
6438 Test code for form rendering.
6440 From the command line:
6442 .. code-block:: bash
6444 # Start in the CamCOPS source root directory.
6445 # - Needs the "-f" option to follow forks.
6446 # - "open" doesn't show all files opened. To see what you need, try
6447 # strace cat /proc/version
6448 # - ... which shows that "openat" is most useful.
6450 strace -f --trace=openat \
6451 python -c 'from camcops_server.cc_modules.webview import debug_form_rendering; debug_form_rendering()' \
6452 | grep site-packages \
6453 | grep -v "\.pyc"
6455 This tells us that the templates are files like:
6457 .. code-block:: none
6459 site-packages/deform/templates/form.pt
6460 site-packages/deform/templates/select.pt
6461 site-packages/deform/templates/textinput.pt
6463 On 2020-06-29 we are interested in why a newer (Docker) installation
6464 renders buggy HTML like:
6466 .. code-block:: none
6468 <select name="which_idnum" id="deformField2" class=" form-control " multiple="False">
6469 <option value="1">CPFT RiO number</option>
6470 <option value="2">NHS number</option>
6471 <option value="1000">MyHospital number</option>
6472 </select>
6474 ... the bug being that ``multiple="False"`` is wrong; an HTML boolean
6475 attribute is false when *absent*, not when set to a certain value (see
6476 https://developer.mozilla.org/en-US/docs/Web/HTML/Attributes#Boolean_Attributes).
6477 The ``multiple`` attribute of ``<select>`` is a boolean attribute
6478 (https://developer.mozilla.org/en-US/docs/Web/HTML/Element/select).
6480 The ``select.pt`` file indicates that this is controlled by
6481 ``tal:attributes`` syntax. TAL is Template Attribution Language
6482 (https://sharptal.readthedocs.io/en/latest/tal.html).
6484 TAL is either provided by Zope (given ZPT files) or Chameleon or both. The
6485 tracing suggests Chameleon. So the TAL language reference is
6486 https://chameleon.readthedocs.io/en/latest/reference.html.
6488 Chameleon changelog is
6489 https://github.com/malthe/chameleon/blob/master/CHANGES.rst.
6491 Multiple sources for ``tal:attributes`` syntax say that a null value
6492 (presumably: ``None``) is required to omit the attribute, not a false
6493 value.
6495 """ # noqa
6497 import sys
6499 from camcops_server.cc_modules.cc_debug import makefunc_trace_unique_calls
6500 from camcops_server.cc_modules.cc_forms import ChooseTrackerForm
6501 from camcops_server.cc_modules.cc_request import get_core_debugging_request
6503 req = get_core_debugging_request()
6504 form = ChooseTrackerForm(req, as_ctv=False)
6506 sys.settrace(makefunc_trace_unique_calls(file_only=True))
6507 _ = form.render()
6508 sys.settrace(None)