Coverage for cc_modules/webview.py : 22%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/webview.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Implements the CamCOPS web front end.**
29Quick tutorial on Pyramid views:
31- The configurator registers routes, and routes have URLs associated with
32 them. Those URLs can be templatized, e.g. to accept numerical parameters.
33 The configurator associates view callables ("views" for short) with routes,
34 and one method for doing that is an automatic scan via Venusian for views
35 decorated with @view_config().
37- All views take a Request object and return a Response or raise an exception
38 that Pyramid will translate into a Response.
40- Having matched a route, Pyramid uses its "view lookup" process to choose
41 one from potentially several views. For example, a single route might be
42 associated with:
44 .. code-block:: python
46 @view_config(route_name="myroute")
47 def myroute_default(req: Request) -> Response:
48 pass
50 @view_config(route_name="myroute", request_method="POST")
51 def myroute_post(req: Request) -> Response:
52 pass
54 In this example, POST requests will go to the second; everything else will
55 go to the first. Pyramid's view lookup rule is essentially: if multiple
56 views match, choose the one with the most specifiers.
58- Specifiers include:
60 .. code-block:: none
62 route_name=ROUTENAME
64 the route
66 request_method="POST"
68 requires HTTP GET, POST, etc.
70 request_param="XXX"
72 ... requires the presence of a GET/POST variable with this name in
73 the request.params dictionary
75 request_param="XXX=YYY"
77 ... requires the presence of a GET/POST variable called XXX whose
78 value is YYY, in the request.params dictionary
80 match_param="XXX=YYY"
82 .. requires the presence of this key/value pair in
83 request.matchdict, which contains parameters from the URL
85 https://docs.pylonsproject.org/projects/pyramid/en/latest/api/config.html#pyramid.config.Configurator.add_view # noqa
87- Getting parameters
89 .. code-block:: none
91 request.params
93 ... parameters from HTTP GET or POST, including both the query
94 string (as in https://somewhere/path?key=value) and the body (e.g.
95 POST).
97 request.matchdict
99 ... parameters from the URL, via URL dispatch; see
100 https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/urldispatch.html#urldispatch-chapter # noqa
102- Regarding rendering:
104 There might be some simplicity benefits from converting to a template
105 system like Mako. On the downside, that would entail a bit more work;
106 likely a minor performance hit (relative to plain Python string rendering);
107 and a loss of type checking. The type checking is also why we prefer:
109 .. code-block:: python
111 html = " ... {param_blah} ...".format(param_blah=PARAM.BLAH)
113 to
115 .. code-block:: python
117 html = " ... {PARAM.BLAH} ...".format(PARAM=PARAM)
119 as in the first situation, PyCharm will check that "BLAH" is present in
120 "PARAM", and in the second it won't. Automatic checking is worth a lot.
122"""
124from collections import OrderedDict
125import logging
126import os
127# from pprint import pformat
128from typing import (
129 Any,
130 cast,
131 Dict,
132 List,
133 Optional,
134 Type,
135 TYPE_CHECKING,
136)
138from cardinal_pythonlib.datetimefunc import format_datetime
139from cardinal_pythonlib.deform_utils import get_head_form_html
140from cardinal_pythonlib.httpconst import MimeType
141from cardinal_pythonlib.logs import BraceStyleAdapter
142from cardinal_pythonlib.pyramid.responses import (
143 BinaryResponse,
144 PdfResponse,
145 XmlResponse,
146)
147from cardinal_pythonlib.sqlalchemy.dialect import (
148 get_dialect_name,
149 SqlaDialectName,
150)
151from cardinal_pythonlib.sizeformatter import bytes2human
152from cardinal_pythonlib.sqlalchemy.orm_inspect import gen_orm_classes_from_base
153from cardinal_pythonlib.sqlalchemy.orm_query import CountStarSpecializedQuery
154from cardinal_pythonlib.sqlalchemy.session import get_engine_from_session
155from deform.exception import ValidationFailure
156from pendulum import DateTime as Pendulum
157from pyramid.httpexceptions import HTTPBadRequest, HTTPFound, HTTPNotFound
158from pyramid.view import (
159 forbidden_view_config,
160 notfound_view_config,
161 view_config,
162)
163from pyramid.renderers import render_to_response
164from pyramid.response import Response
165from pyramid.security import Authenticated, NO_PERMISSION_REQUIRED
166import pygments
167import pygments.lexers
168import pygments.lexers.sql
169import pygments.lexers.web
170import pygments.formatters
171from sqlalchemy.orm import joinedload, Query
172from sqlalchemy.sql.functions import func
173from sqlalchemy.sql.expression import desc, or_, select, update
175from camcops_server.cc_modules.cc_audit import audit, AuditEntry
176from camcops_server.cc_modules.cc_all_models import CLIENT_TABLE_MAP
177from camcops_server.cc_modules.cc_client_api_core import (
178 BatchDetails,
179 get_server_live_records,
180 UploadTableChanges,
181 values_preserve_now,
182)
183from camcops_server.cc_modules.cc_client_api_helpers import (
184 upload_commit_order_sorter,
185)
186from camcops_server.cc_modules.cc_constants import (
187 CAMCOPS_URL,
188 DateFormat,
189 ERA_NOW,
190 GITHUB_RELEASES_URL,
191 MINIMUM_PASSWORD_LENGTH,
192)
193from camcops_server.cc_modules.cc_db import (
194 GenericTabletRecordMixin,
195 FN_DEVICE_ID,
196 FN_ERA,
197 FN_GROUP_ID,
198 FN_PK,
199)
200from camcops_server.cc_modules.cc_device import Device
201from camcops_server.cc_modules.cc_email import Email
202from camcops_server.cc_modules.cc_export import (
203 DownloadOptions,
204 make_exporter,
205 UserDownloadFile,
206)
207from camcops_server.cc_modules.cc_exportmodels import (
208 ExportedTask,
209 ExportedTaskEmail,
210 ExportedTaskFileGroup,
211 ExportedTaskHL7Message,
212)
213from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
214from camcops_server.cc_modules.cc_forms import (
215 AddGroupForm,
216 AddIdDefinitionForm,
217 AddSpecialNoteForm,
218 AddUserGroupadminForm,
219 AddUserSuperuserForm,
220 AuditTrailForm,
221 ChangeOtherPasswordForm,
222 ChangeOwnPasswordForm,
223 ChooseTrackerForm,
224 DEFAULT_ROWS_PER_PAGE,
225 DeleteGroupForm,
226 DeleteIdDefinitionForm,
227 DeletePatientChooseForm,
228 DeletePatientConfirmForm,
229 DeleteServerCreatedPatientForm,
230 DeleteSpecialNoteForm,
231 DeleteTaskScheduleForm,
232 DeleteTaskScheduleItemForm,
233 DeleteUserForm,
234 EditGroupForm,
235 EDIT_PATIENT_SIMPLE_PARAMS,
236 EditFinalizedPatientForm,
237 EditIdDefinitionForm,
238 EditServerCreatedPatientForm,
239 EditServerSettingsForm,
240 EditTaskScheduleForm,
241 EditTaskScheduleItemForm,
242 EditUserFullForm,
243 EditUserGroupAdminForm,
244 EditUserGroupMembershipGroupAdminForm,
245 EditUserGroupPermissionsFullForm,
246 EraseTaskForm,
247 ExportedTaskListForm,
248 get_sql_dialect_choices,
249 ForciblyFinalizeChooseDeviceForm,
250 ForciblyFinalizeConfirmForm,
251 LoginForm,
252 OfferBasicDumpForm,
253 OfferSqlDumpForm,
254 OfferTermsForm,
255 RefreshTasksForm,
256 SetUserUploadGroupForm,
257 EditTaskFilterForm,
258 TasksPerPageForm,
259 UserDownloadDeleteForm,
260 UserFilterForm,
261 ViewDdlForm,
262)
263from camcops_server.cc_modules.cc_group import Group
264from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition
265from camcops_server.cc_modules.cc_membership import UserGroupMembership
266from camcops_server.cc_modules.cc_patient import Patient
267from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
268# noinspection PyUnresolvedReferences
269import camcops_server.cc_modules.cc_plot # import side effects (configure matplotlib) # noqa
270from camcops_server.cc_modules.cc_pyramid import (
271 CamcopsPage,
272 FormAction,
273 HTTPFoundDebugVersion,
274 PageUrl,
275 Permission,
276 Routes,
277 SqlalchemyOrmPage,
278 ViewArg,
279 ViewParam,
280)
281from camcops_server.cc_modules.cc_report import get_report_instance
282from camcops_server.cc_modules.cc_request import CamcopsRequest
283from camcops_server.cc_modules.cc_simpleobjects import (
284 IdNumReference,
285 TaskExportOptions,
286)
287from camcops_server.cc_modules.cc_specialnote import SpecialNote
288from camcops_server.cc_modules.cc_session import CamcopsSession
289from camcops_server.cc_modules.cc_sqlalchemy import get_all_ddl
290from camcops_server.cc_modules.cc_task import Task
291from camcops_server.cc_modules.cc_taskcollection import (
292 TaskFilter,
293 TaskCollection,
294 TaskSortMethod,
295)
296from camcops_server.cc_modules.cc_taskfactory import task_factory
297from camcops_server.cc_modules.cc_taskfilter import (
298 task_classes_from_table_names,
299 TaskClassSortMethod,
300)
301from camcops_server.cc_modules.cc_taskindex import (
302 PatientIdNumIndexEntry,
303 TaskIndexEntry,
304 update_indexes_and_push_exports
305)
306from camcops_server.cc_modules.cc_taskschedule import (
307 PatientTaskSchedule,
308 TaskSchedule,
309 TaskScheduleItem,
310 task_schedule_item_sort_order,
311)
312from camcops_server.cc_modules.cc_text import SS
313from camcops_server.cc_modules.cc_tracker import ClinicalTextView, Tracker
314from camcops_server.cc_modules.cc_user import (
315 SecurityAccountLockout,
316 SecurityLoginFailure,
317 User,
318)
319from camcops_server.cc_modules.cc_validators import (
320 validate_export_recipient_name,
321 validate_ip_address,
322 validate_task_tablename,
323 validate_username,
324)
325from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION
326from camcops_server.cc_modules.cc_view_classes import (
327 CreateView,
328 DeleteView,
329 UpdateView,
330)
332if TYPE_CHECKING:
333 # noinspection PyUnresolvedReferences
334 from camcops_server.cc_modules.cc_sqlalchemy import Base
336log = BraceStyleAdapter(logging.getLogger(__name__))
339# =============================================================================
340# Debugging options
341# =============================================================================
343DEBUG_REDIRECT = False
345if DEBUG_REDIRECT:
346 log.warning("Debugging options enabled!")
348if DEBUG_REDIRECT:
349 HTTPFound = HTTPFoundDebugVersion # noqa: F811
352# =============================================================================
353# Flash message queues: https://getbootstrap.com/docs/3.3/components/#alerts
354# =============================================================================
356FLASH_SUCCESS = "success"
357FLASH_INFO = "info"
358FLASH_WARNING = "warning"
359FLASH_DANGER = "danger"
362# =============================================================================
363# Cache control, for the http_cache parameter of view_config etc.
364# =============================================================================
366NEVER_CACHE = 0
369# =============================================================================
370# Constants -- mutated into translated phrases
371# =============================================================================
373def errormsg_cannot_dump(req: "CamcopsRequest") -> str:
374 _ = req.gettext
375 return _("User not authorized to dump data (for any group).")
378def errormsg_cannot_report(req: "CamcopsRequest") -> str:
379 _ = req.gettext
380 return _("User not authorized to run reports (for any group).")
383def errormsg_task_live(req: "CamcopsRequest") -> str:
384 _ = req.gettext
385 return _("Task is live on tablet; finalize (or force-finalize) first.")
388# =============================================================================
389# Unused
390# =============================================================================
392# def query_result_html_core(req: "CamcopsRequest",
393# descriptions: Sequence[str],
394# rows: Sequence[Sequence[Any]],
395# null_html: str = "<i>NULL</i>") -> str:
396# return render("query_result_core.mako",
397# dict(descriptions=descriptions,
398# rows=rows,
399# null_html=null_html),
400# request=req)
403# def query_result_html_orm(req: "CamcopsRequest",
404# attrnames: List[str],
405# descriptions: List[str],
406# orm_objects: Sequence[Sequence[Any]],
407# null_html: str = "<i>NULL</i>") -> str:
408# return render("query_result_orm.mako",
409# dict(attrnames=attrnames,
410# descriptions=descriptions,
411# orm_objects=orm_objects,
412# null_html=null_html),
413# request=req)
416# =============================================================================
417# Error views
418# =============================================================================
420# noinspection PyUnusedLocal
421@notfound_view_config(renderer="not_found.mako",
422 http_cache=NEVER_CACHE)
423def not_found(req: "CamcopsRequest") -> Dict[str, Any]:
424 """
425 "Page not found" view.
426 """
427 return {
428 "msg": "",
429 "extra_html": "",
430 }
433# noinspection PyUnusedLocal
434@view_config(context=HTTPBadRequest,
435 renderer="bad_request.mako",
436 http_cache=NEVER_CACHE)
437def bad_request(req: "CamcopsRequest") -> Dict[str, Any]:
438 """
439 "Bad request" view.
441 NOTE that this view only gets used from
443 .. code-block:: python
445 raise HTTPBadRequest("message")
447 and not
449 .. code-block:: python
451 return HTTPBadRequest("message")
453 ... so always raise it.
454 """
455 return {
456 "msg": "",
457 "extra_html": "",
458 }
461# =============================================================================
462# Test pages
463# =============================================================================
465# noinspection PyUnusedLocal
466@view_config(route_name=Routes.TESTPAGE_PUBLIC_1,
467 permission=NO_PERMISSION_REQUIRED,
468 http_cache=NEVER_CACHE)
469def test_page_1(req: "CamcopsRequest") -> Response:
470 """
471 A public test page with no content.
472 """
473 _ = req.gettext
474 return Response(_("Hello! This is a public CamCOPS test page."))
477# noinspection PyUnusedLocal
478@view_config(route_name=Routes.TESTPAGE_PRIVATE_1,
479 http_cache=NEVER_CACHE)
480def test_page_private_1(req: "CamcopsRequest") -> Response:
481 """
482 A private test page with no informative content, but which should only
483 be accessible to authenticated users.
484 """
485 _ = req.gettext
486 return Response(_("Private test page."))
489# noinspection PyUnusedLocal
490@view_config(route_name=Routes.TESTPAGE_PRIVATE_2,
491 permission=Permission.SUPERUSER,
492 renderer="testpage.mako",
493 http_cache=NEVER_CACHE)
494def test_page_2(req: "CamcopsRequest") -> Dict[str, Any]:
495 """
496 A private test page containing POTENTIALLY SENSITIVE test information,
497 including environment variables, that should only be accessible to
498 superusers.
499 """
500 return dict(param1="world")
503# noinspection PyUnusedLocal
504@view_config(route_name=Routes.TESTPAGE_PRIVATE_3,
505 permission=Permission.SUPERUSER,
506 renderer="inherit_cache_test_child.mako",
507 http_cache=NEVER_CACHE)
508def test_page_3(req: "CamcopsRequest") -> Dict[str, Any]:
509 """
510 A private test page that tests template inheritance.
511 """
512 return {}
515# noinspection PyUnusedLocal
516@view_config(route_name=Routes.TESTPAGE_PRIVATE_4,
517 permission=Permission.SUPERUSER,
518 renderer="test_template_filters.mako",
519 http_cache=NEVER_CACHE)
520def test_page_4(req: "CamcopsRequest") -> Dict[str, Any]:
521 """
522 A private test page that tests Mako filtering.
523 """
524 return dict(
525 test_strings=[
526 "plain",
527 "normal <b>bold</b> normal",
528 ],
529 )
532# noinspection PyUnusedLocal,PyTypeChecker
533@view_config(route_name=Routes.CRASH,
534 permission=Permission.SUPERUSER,
535 http_cache=NEVER_CACHE)
536def crash(req: "CamcopsRequest") -> Response:
537 """
538 A view that deliberately raises an exception.
539 """
540 _ = req.gettext
541 raise RuntimeError(_(
542 "Deliberately crashed. Should not affect other processes."))
545# noinspection PyUnusedLocal
546@view_config(route_name=Routes.DEVELOPER,
547 permission=Permission.SUPERUSER,
548 renderer="developer.mako",
549 http_cache=NEVER_CACHE)
550def developer_page(req: "CamcopsRequest") -> Dict[str, Any]:
551 """
552 Shows the developer menu.
553 """
554 return {}
557# noinspection PyUnusedLocal
558@view_config(route_name=Routes.AUDIT_MENU,
559 permission=Permission.SUPERUSER,
560 renderer="audit_menu.mako",
561 http_cache=NEVER_CACHE)
562def audit_menu(req: "CamcopsRequest") -> Dict[str, Any]:
563 """
564 Shows the auditing menu.
565 """
566 return {}
569# =============================================================================
570# Authorization: login, logout, login failures, terms/conditions
571# =============================================================================
573# Do NOT use extra parameters for functions decorated with @view_config;
574# @view_config can take functions like "def view(request)" but also
575# "def view(context, request)", so if you add additional parameters, it thinks
576# you're doing the latter and sends parameters accordingly.
578@view_config(route_name=Routes.LOGIN,
579 permission=NO_PERMISSION_REQUIRED,
580 http_cache=NEVER_CACHE)
581def login_view(req: "CamcopsRequest") -> Response:
582 """
583 Login view.
585 - GET: presents the login screen
586 - POST/submit: attempts to log in;
588 - failure: returns a login failure view or an account lockout view
589 - success:
591 - redirects to the redirection view if one was specified;
592 - redirects to the home view if not.
593 """
594 cfg = req.config
595 autocomplete_password = not cfg.disable_password_autocomplete
597 form = LoginForm(request=req, autocomplete_password=autocomplete_password)
599 if FormAction.SUBMIT in req.POST:
600 try:
601 controls = list(req.POST.items())
602 appstruct = form.validate(controls)
603 log.debug("Validating user login.")
604 ccsession = req.camcops_session
605 username = appstruct.get(ViewParam.USERNAME)
606 password = appstruct.get(ViewParam.PASSWORD)
607 redirect_url = appstruct.get(ViewParam.REDIRECT_URL)
608 # 1. If we don't have a username, let's stop quickly.
609 if not username:
610 ccsession.logout()
611 return login_failed(req)
612 # 2. Is the user locked?
613 locked_out_until = SecurityAccountLockout.user_locked_out_until(
614 req, username)
615 if locked_out_until is not None:
616 return account_locked(req, locked_out_until)
617 # 3. Is the username/password combination correct?
618 user = User.get_user_from_username_password(
619 req, username, password) # checks password
620 if user is not None and user.may_use_webviewer:
621 # Successful login.
622 user.login(req) # will clear login failure record
623 ccsession.login(user)
624 audit(req, "Login", user_id=user.id)
625 elif user is not None:
626 # This means a user who can upload from tablet but who cannot
627 # log in via the web front end.
628 return login_failed(req)
629 else:
630 # Unsuccessful. Note that the username may/may not be genuine.
631 SecurityLoginFailure.act_on_login_failure(req, username)
632 # ... may lock the account
633 # Now, call audit() before session.logout(), as the latter
634 # will wipe the session IP address:
635 ccsession.logout()
636 return login_failed(req)
638 # OK, logged in.
639 # Redirect to the main menu, or wherever the user was heading.
640 # HOWEVER, that may lead us to a "change password" or "agree terms"
641 # page, via the permissions system (Permission.HAPPY or not).
643 if redirect_url:
644 # log.debug("Redirecting to {!r}", redirect_url)
645 return HTTPFound(redirect_url) # redirect
646 return HTTPFound(req.route_url(Routes.HOME)) # redirect
648 except ValidationFailure as e:
649 rendered_form = e.render()
651 else:
652 redirect_url = req.get_redirect_url_param(ViewParam.REDIRECT_URL, "")
653 # ... use default of "", because None gets serialized to "None", which
654 # would then get read back later as "None".
655 appstruct = {ViewParam.REDIRECT_URL: redirect_url}
656 # log.debug("appstruct from GET/POST: {!r}", appstruct)
657 rendered_form = form.render(appstruct)
659 return render_to_response(
660 "login.mako",
661 dict(form=rendered_form,
662 head_form_html=get_head_form_html(req, [form])),
663 request=req
664 )
667def login_failed(req: "CamcopsRequest") -> Response:
668 """
669 Response given after login failure.
670 Returned by :func:`login_view` only.
671 """
672 return render_to_response(
673 "login_failed.mako",
674 dict(),
675 request=req
676 )
679def account_locked(req: "CamcopsRequest", locked_until: Pendulum) -> Response:
680 """
681 Response given when account locked out.
682 Returned by :func:`login_view` only.
683 """
684 _ = req.gettext
685 return render_to_response(
686 "account_locked.mako",
687 dict(
688 locked_until=format_datetime(locked_until,
689 DateFormat.LONG_DATETIME_WITH_DAY,
690 _("(never)")),
691 msg="",
692 extra_html="",
693 ),
694 request=req
695 )
698@view_config(route_name=Routes.LOGOUT,
699 renderer="logged_out.mako",
700 http_cache=NEVER_CACHE)
701def logout(req: "CamcopsRequest") -> Dict[str, Any]:
702 """
703 Logs a session out, and returns the "logged out" view.
704 """
705 audit(req, "Logout")
706 ccsession = req.camcops_session
707 ccsession.logout()
708 return dict()
711@view_config(route_name=Routes.OFFER_TERMS,
712 permission=Authenticated,
713 renderer="offer_terms.mako",
714 http_cache=NEVER_CACHE)
715def offer_terms(req: "CamcopsRequest") -> Response:
716 """
717 - GET: show terms/conditions and request acknowledgement
718 - POST/submit: note the user's agreement; redirect to the home view.
719 """
720 form = OfferTermsForm(
721 request=req,
722 agree_button_text=req.wsstring(SS.DISCLAIMER_AGREE))
724 if FormAction.SUBMIT in req.POST:
725 req.user.agree_terms(req)
726 return HTTPFound(req.route_url(Routes.HOME)) # redirect
728 return render_to_response(
729 "offer_terms.mako",
730 dict(
731 title=req.wsstring(SS.DISCLAIMER_TITLE),
732 subtitle=req.wsstring(SS.DISCLAIMER_SUBTITLE),
733 content=req.wsstring(SS.DISCLAIMER_CONTENT),
734 form=form.render(),
735 head_form_html=get_head_form_html(req, [form]),
736 ),
737 request=req
738 )
741@forbidden_view_config(http_cache=NEVER_CACHE)
742def forbidden(req: "CamcopsRequest") -> Response:
743 """
744 Generic place that Pyramid comes when permission is denied for a view.
746 We will offer one of these:
748 - Must change password? Redirect to "change own password" view.
749 - Must agree terms? Redirect to "offer terms" view.
750 - Otherwise: a generic "forbidden" view.
751 """
752 # I was doing this:
753 if req.has_permission(Authenticated):
754 user = req.user
755 assert user, "Bug! Authenticated but no user...!?"
756 if user.must_change_password:
757 return HTTPFound(req.route_url(Routes.CHANGE_OWN_PASSWORD))
758 if user.must_agree_terms:
759 return HTTPFound(req.route_url(Routes.OFFER_TERMS))
760 # ... but with "raise HTTPFound" instead.
761 # BUT there is only one level of exception handling in Pyramid, i.e. you
762 # can't raise exceptions from exceptions:
763 # https://github.com/Pylons/pyramid/issues/436
764 # The simplest way round is to use "return", not "raise".
766 redirect_url = req.url
767 # Redirects to login page, with onwards redirection to requested
768 # destination once logged in:
769 querydict = {ViewParam.REDIRECT_URL: redirect_url}
770 return render_to_response("forbidden.mako",
771 dict(querydict=querydict),
772 request=req)
775# =============================================================================
776# Changing passwords
777# =============================================================================
779@view_config(route_name=Routes.CHANGE_OWN_PASSWORD,
780 permission=Authenticated,
781 http_cache=NEVER_CACHE)
782def change_own_password(req: "CamcopsRequest") -> Response:
783 """
784 For any user: to change their own password.
786 - GET: offer "change own password" view
787 - POST/submit: change the password and return :func:`password_changed`.
788 """
789 user = req.user
790 assert user is not None
791 expired = user.must_change_password
792 form = ChangeOwnPasswordForm(request=req, must_differ=True)
793 if FormAction.SUBMIT in req.POST:
794 try:
795 controls = list(req.POST.items())
796 appstruct = form.validate(controls)
797 # -----------------------------------------------------------------
798 # Change the password
799 # -----------------------------------------------------------------
800 new_password = appstruct.get(ViewParam.NEW_PASSWORD)
801 # ... form will validate old password, etc.
802 # OK
803 user.set_password(req, new_password)
804 return password_changed(req, user.username, own_password=True)
805 except ValidationFailure as e:
806 rendered_form = e.render()
807 else:
808 rendered_form = form.render()
809 return render_to_response(
810 "change_own_password.mako",
811 dict(form=rendered_form,
812 expired=expired,
813 min_pw_length=MINIMUM_PASSWORD_LENGTH,
814 head_form_html=get_head_form_html(req, [form])),
815 request=req)
818@view_config(route_name=Routes.CHANGE_OTHER_PASSWORD,
819 permission=Permission.GROUPADMIN,
820 renderer="change_other_password.mako",
821 http_cache=NEVER_CACHE)
822def change_other_password(req: "CamcopsRequest") -> Response:
823 """
824 For administrators, to change another's password.
826 - GET: offer "change another's password" view (except that if you're
827 changing your own password, return :func:`change_own_password`.
828 - POST/submit: change the password and return :func:`password_changed`.
829 """
830 form = ChangeOtherPasswordForm(request=req)
831 username = None # for type checker
832 _ = req.gettext
833 if FormAction.SUBMIT in req.POST:
834 try:
835 controls = list(req.POST.items())
836 appstruct = form.validate(controls)
837 # -----------------------------------------------------------------
838 # Change the password
839 # -----------------------------------------------------------------
840 user_id = appstruct.get(ViewParam.USER_ID)
841 must_change_pw = appstruct.get(ViewParam.MUST_CHANGE_PASSWORD)
842 new_password = appstruct.get(ViewParam.NEW_PASSWORD)
843 user = User.get_user_by_id(req.dbsession, user_id)
844 if not user:
845 raise HTTPBadRequest(f"{_('Missing user for id')} {user_id}")
846 assert_may_edit_user(req, user)
847 user.set_password(req, new_password)
848 if must_change_pw:
849 user.force_password_change()
850 return password_changed(req, user.username, own_password=False)
851 except ValidationFailure as e:
852 rendered_form = e.render()
853 else:
854 user_id = req.get_int_param(ViewParam.USER_ID)
855 if user_id is None:
856 raise HTTPBadRequest(f"{_('Improper user_id of')} {user_id!r}")
857 if user_id == req.user_id:
858 raise HTTPFound(req.route_url(Routes.CHANGE_OWN_PASSWORD))
859 user = User.get_user_by_id(req.dbsession, user_id)
860 if user is None:
861 raise HTTPBadRequest(f"{_('Missing user for id')} {user_id}")
862 assert_may_edit_user(req, user)
863 username = user.username
864 appstruct = {ViewParam.USER_ID: user_id}
865 rendered_form = form.render(appstruct)
866 return render_to_response(
867 "change_other_password.mako",
868 dict(username=username,
869 form=rendered_form,
870 min_pw_length=MINIMUM_PASSWORD_LENGTH,
871 head_form_html=get_head_form_html(req, [form])),
872 request=req)
875def password_changed(req: "CamcopsRequest",
876 username: str,
877 own_password: bool) -> Response:
878 """
879 Generic "the password has been changed" view (whether changing your own
880 or another's password).
882 Args:
883 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
884 username: the username whose password is being changed?
885 own_password: is the user changing their own password?
886 """
887 return render_to_response("password_changed.mako",
888 dict(username=username,
889 own_password=own_password),
890 request=req)
893# =============================================================================
894# Main menu; simple information things
895# =============================================================================
897@view_config(route_name=Routes.HOME,
898 renderer="main_menu.mako",
899 http_cache=NEVER_CACHE)
900def main_menu(req: "CamcopsRequest") -> Dict[str, Any]:
901 """
902 Main CamCOPS menu view.
903 """
904 # log.debug("main_menu: start")
905 user = req.user
906 # log.debug("main_menu: middle")
907 result = dict(
908 authorized_as_groupadmin=user.authorized_as_groupadmin,
909 authorized_as_superuser=user.superuser,
910 authorized_for_reports=user.authorized_for_reports,
911 authorized_to_dump=user.authorized_to_dump,
912 camcops_url=CAMCOPS_URL,
913 now=format_datetime(req.now, DateFormat.SHORT_DATETIME_SECONDS),
914 server_version=CAMCOPS_SERVER_VERSION,
915 )
916 # log.debug("main_menu: returning")
917 return result
920# =============================================================================
921# Tasks
922# =============================================================================
924def edit_filter(req: "CamcopsRequest",
925 task_filter: TaskFilter,
926 redirect_url: str) -> Response:
927 """
928 Edit the task filter for the current user.
930 Args:
931 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
932 task_filter: the user's
933 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter`
934 redirect_url: URL to redirect (back) to upon success
935 """
936 if FormAction.SET_FILTERS in req.POST:
937 form = EditTaskFilterForm(request=req)
938 try:
939 controls = list(req.POST.items())
940 fa = form.validate(controls)
941 # -----------------------------------------------------------------
942 # Apply the changes
943 # -----------------------------------------------------------------
944 who = fa.get(ViewParam.WHO)
945 what = fa.get(ViewParam.WHAT)
946 when = fa.get(ViewParam.WHEN)
947 admin = fa.get(ViewParam.ADMIN)
948 task_filter.surname = who.get(ViewParam.SURNAME)
949 task_filter.forename = who.get(ViewParam.FORENAME)
950 task_filter.dob = who.get(ViewParam.DOB)
951 task_filter.sex = who.get(ViewParam.SEX)
952 task_filter.idnum_criteria = [
953 IdNumReference(which_idnum=x[ViewParam.WHICH_IDNUM],
954 idnum_value=x[ViewParam.IDNUM_VALUE])
955 for x in who.get(ViewParam.ID_REFERENCES)
956 ]
957 task_filter.task_types = what.get(ViewParam.TASKS)
958 task_filter.text_contents = what.get(ViewParam.TEXT_CONTENTS)
959 task_filter.complete_only = what.get(ViewParam.COMPLETE_ONLY)
960 task_filter.start_datetime = when.get(ViewParam.START_DATETIME)
961 task_filter.end_datetime = when.get(ViewParam.END_DATETIME)
962 task_filter.device_ids = admin.get(ViewParam.DEVICE_IDS)
963 task_filter.adding_user_ids = admin.get(ViewParam.USER_IDS)
964 task_filter.group_ids = admin.get(ViewParam.GROUP_IDS)
966 return HTTPFound(redirect_url)
967 except ValidationFailure as e:
968 rendered_form = e.render()
969 else:
970 if FormAction.CLEAR_FILTERS in req.POST:
971 # skip validation
972 task_filter.clear()
973 who = {
974 ViewParam.SURNAME: task_filter.surname,
975 ViewParam.FORENAME: task_filter.forename,
976 ViewParam.DOB: task_filter.dob,
977 ViewParam.SEX: task_filter.sex or "",
978 ViewParam.ID_REFERENCES: [
979 {ViewParam.WHICH_IDNUM: x.which_idnum,
980 ViewParam.IDNUM_VALUE: x.idnum_value}
981 for x in task_filter.idnum_criteria
982 ],
983 }
984 what = {
985 ViewParam.TASKS: task_filter.task_types,
986 ViewParam.TEXT_CONTENTS: task_filter.text_contents,
987 ViewParam.COMPLETE_ONLY: task_filter.complete_only,
988 }
989 when = {
990 ViewParam.START_DATETIME: task_filter.start_datetime,
991 ViewParam.END_DATETIME: task_filter.end_datetime,
992 }
993 admin = {
994 ViewParam.DEVICE_IDS: task_filter.device_ids,
995 ViewParam.USER_IDS: task_filter.adding_user_ids,
996 ViewParam.GROUP_IDS: task_filter.group_ids,
997 }
998 open_who = any(i for i in who.values())
999 open_what = any(i for i in what.values())
1000 open_when = any(i for i in when.values())
1001 open_admin = any(i for i in admin.values())
1002 fa = {
1003 ViewParam.WHO: who,
1004 ViewParam.WHAT: what,
1005 ViewParam.WHEN: when,
1006 ViewParam.ADMIN: admin,
1007 }
1008 form = EditTaskFilterForm(request=req,
1009 open_admin=open_admin,
1010 open_what=open_what,
1011 open_when=open_when,
1012 open_who=open_who)
1013 rendered_form = form.render(fa)
1015 return render_to_response(
1016 "filter_edit.mako",
1017 dict(
1018 form=rendered_form,
1019 head_form_html=get_head_form_html(req, [form])
1020 ),
1021 request=req
1022 )
1025@view_config(route_name=Routes.SET_FILTERS,
1026 http_cache=NEVER_CACHE)
1027def set_filters(req: "CamcopsRequest") -> Response:
1028 """
1029 View to set the task filters for the current user.
1030 """
1031 redirect_url = req.get_redirect_url_param(ViewParam.REDIRECT_URL,
1032 req.route_url(Routes.VIEW_TASKS))
1033 task_filter = req.camcops_session.get_task_filter()
1034 return edit_filter(req, task_filter=task_filter, redirect_url=redirect_url)
1037@view_config(route_name=Routes.VIEW_TASKS,
1038 renderer="view_tasks.mako",
1039 http_cache=NEVER_CACHE)
1040def view_tasks(req: "CamcopsRequest") -> Dict[str, Any]:
1041 """
1042 Main view displaying tasks and applicable filters.
1043 """
1044 ccsession = req.camcops_session
1045 user = req.user
1046 taskfilter = ccsession.get_task_filter()
1048 # Read from the GET parameters (or in some cases potentially POST but those
1049 # will be re-read).
1050 rows_per_page = req.get_int_param(
1051 ViewParam.ROWS_PER_PAGE,
1052 ccsession.number_to_view or DEFAULT_ROWS_PER_PAGE)
1053 page_num = req.get_int_param(ViewParam.PAGE, 1)
1054 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True)
1056 errors = False
1058 # "Number of tasks per page" form
1059 tpp_form = TasksPerPageForm(request=req)
1060 if FormAction.SUBMIT_TASKS_PER_PAGE in req.POST:
1061 try:
1062 controls = list(req.POST.items())
1063 tpp_appstruct = tpp_form.validate(controls)
1064 rows_per_page = tpp_appstruct.get(ViewParam.ROWS_PER_PAGE)
1065 ccsession.number_to_view = rows_per_page
1066 except ValidationFailure:
1067 errors = True
1068 rendered_tpp_form = tpp_form.render()
1069 else:
1070 tpp_appstruct = {ViewParam.ROWS_PER_PAGE: rows_per_page}
1071 rendered_tpp_form = tpp_form.render(tpp_appstruct)
1073 # Refresh tasks. Slightly pointless. Doesn't need validating. The user
1074 # could just press the browser's refresh button, but this improves the UI
1075 # slightly.
1076 refresh_form = RefreshTasksForm(request=req)
1077 rendered_refresh_form = refresh_form.render()
1079 # Get tasks, unless there have been form errors.
1080 # In principle, for some filter settings (single task, no "complete"
1081 # preference...) we could produce an ORM query and use SqlalchemyOrmPage,
1082 # which would apply LIMIT/OFFSET (or equivalent) to the query, and be
1083 # very nippy. In practice, this is probably an unusual setting, so we'll
1084 # simplify things here with a Python list regardless of the settings.
1085 if errors:
1086 collection = []
1087 else:
1088 collection = TaskCollection(
1089 req=req,
1090 taskfilter=taskfilter,
1091 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
1092 via_index=via_index
1093 ).all_tasks_or_indexes_or_query or []
1094 paginator = SqlalchemyOrmPage if isinstance(collection, Query) else CamcopsPage # noqa
1095 page = paginator(collection,
1096 page=page_num,
1097 items_per_page=rows_per_page,
1098 url_maker=PageUrl(req),
1099 request=req)
1100 return dict(
1101 page=page,
1102 head_form_html=get_head_form_html(req, [tpp_form,
1103 refresh_form]),
1104 tpp_form=rendered_tpp_form,
1105 refresh_form=rendered_refresh_form,
1106 no_patient_selected_and_user_restricted=(
1107 not user.may_view_all_patients_when_unfiltered and
1108 not taskfilter.any_specific_patient_filtering()
1109 ),
1110 user=user,
1111 )
1114@view_config(route_name=Routes.TASK,
1115 http_cache=NEVER_CACHE)
1116def serve_task(req: "CamcopsRequest") -> Response:
1117 """
1118 View that serves an individual task, in a variety of possible formats
1119 (e.g. HTML, PDF, XML).
1120 """
1121 _ = req.gettext
1122 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML, lower=True)
1123 tablename = req.get_str_param(ViewParam.TABLE_NAME,
1124 validator=validate_task_tablename)
1125 server_pk = req.get_int_param(ViewParam.SERVER_PK)
1126 anonymise = req.get_bool_param(ViewParam.ANONYMISE, False)
1128 task = task_factory(req, tablename, server_pk)
1130 if task is None:
1131 return HTTPNotFound(
1132 f"{_('Task not found or not permitted:')} "
1133 f"tablename={tablename!r}, server_pk={server_pk!r}")
1135 task.audit(req, "Viewed " + viewtype.upper())
1137 if viewtype == ViewArg.HTML:
1138 return Response(
1139 task.get_html(req=req, anonymise=anonymise)
1140 )
1141 elif viewtype == ViewArg.PDF:
1142 return PdfResponse(
1143 body=task.get_pdf(req, anonymise=anonymise),
1144 filename=task.suggested_pdf_filename(req, anonymise=anonymise)
1145 )
1146 elif viewtype == ViewArg.PDFHTML: # debugging option
1147 return Response(
1148 task.get_pdf_html(req, anonymise=anonymise)
1149 )
1150 elif viewtype == ViewArg.XML:
1151 options = TaskExportOptions(
1152 xml_include_ancillary=True,
1153 include_blobs=req.get_bool_param(ViewParam.INCLUDE_BLOBS, True),
1154 xml_include_comments=req.get_bool_param(
1155 ViewParam.INCLUDE_COMMENTS, True),
1156 xml_include_calculated=req.get_bool_param(
1157 ViewParam.INCLUDE_CALCULATED, True),
1158 xml_include_patient=req.get_bool_param(
1159 ViewParam.INCLUDE_PATIENT, True),
1160 xml_include_plain_columns=True,
1161 xml_include_snomed=req.get_bool_param(
1162 ViewParam.INCLUDE_SNOMED, True),
1163 xml_with_header_comments=True,
1164 )
1165 return XmlResponse(task.get_xml(req=req, options=options))
1166 else:
1167 permissible = [ViewArg.HTML, ViewArg.PDF, ViewArg.PDFHTML, ViewArg.XML]
1168 raise HTTPBadRequest(
1169 f"{_('Bad output type:')} {viewtype!r} "
1170 f"({_('permissible:')} {permissible!r})")
1173# =============================================================================
1174# Trackers, CTVs
1175# =============================================================================
1177def choose_tracker_or_ctv(req: "CamcopsRequest",
1178 as_ctv: bool) -> Dict[str, Any]:
1179 """
1180 Returns a dictionary for a Mako template to configure a
1181 :class:`camcops_server.cc_modules.cc_tracker.Tracker` or
1182 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`.
1184 Upon success, it redirects to the tracker or CTV view itself, with the
1185 tracker's parameters embedded as URL parameters.
1187 Args:
1188 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1189 as_ctv: CTV, rather than tracker?
1190 """
1192 form = ChooseTrackerForm(req, as_ctv=as_ctv) # , css_class="form-inline")
1194 if FormAction.SUBMIT in req.POST:
1195 try:
1196 controls = list(req.POST.items())
1197 appstruct = form.validate(controls)
1198 keys = [
1199 ViewParam.WHICH_IDNUM,
1200 ViewParam.IDNUM_VALUE,
1201 ViewParam.START_DATETIME,
1202 ViewParam.END_DATETIME,
1203 ViewParam.TASKS,
1204 ViewParam.ALL_TASKS,
1205 ViewParam.VIA_INDEX,
1206 ViewParam.VIEWTYPE,
1207 ]
1208 querydict = {k: appstruct.get(k) for k in keys}
1209 # Not so obvious this can be redirected cleanly via POST.
1210 # It is possible by returning a form that then autosubmits: see
1211 # https://stackoverflow.com/questions/46582/response-redirect-with-post-instead-of-get # noqa
1212 # However, since everything's on this server, we could just return
1213 # an appropriate Response directly. But the request information is
1214 # not sensitive, so we lose nothing by using a GET redirect:
1215 raise HTTPFound(req.route_url(
1216 Routes.CTV if as_ctv else Routes.TRACKER,
1217 _query=querydict))
1218 except ValidationFailure as e:
1219 rendered_form = e.render()
1220 else:
1221 rendered_form = form.render()
1222 return dict(form=rendered_form,
1223 head_form_html=get_head_form_html(req, [form]))
1226@view_config(route_name=Routes.CHOOSE_TRACKER,
1227 renderer="choose_tracker.mako",
1228 http_cache=NEVER_CACHE)
1229def choose_tracker(req: "CamcopsRequest") -> Dict[str, Any]:
1230 """
1231 View to choose/configure a
1232 :class:`camcops_server.cc_modules.cc_tracker.Tracker`.
1233 """
1234 return choose_tracker_or_ctv(req, as_ctv=False)
1237@view_config(route_name=Routes.CHOOSE_CTV,
1238 renderer="choose_ctv.mako",
1239 http_cache=NEVER_CACHE)
1240def choose_ctv(req: "CamcopsRequest") -> Dict[str, Any]:
1241 """
1242 View to choose/configure a
1243 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`.
1244 """
1245 return choose_tracker_or_ctv(req, as_ctv=True)
1248def serve_tracker_or_ctv(req: "CamcopsRequest",
1249 as_ctv: bool) -> Response:
1250 """
1251 Returns a response to show a
1252 :class:`camcops_server.cc_modules.cc_tracker.Tracker` or
1253 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`, in a
1254 variety of formats (e.g. HTML, PDF, XML).
1256 Args:
1257 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1258 as_ctv: CTV, rather than tracker?
1259 """
1260 as_tracker = not as_ctv
1261 _ = req.gettext
1262 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
1263 idnum_value = req.get_int_param(ViewParam.IDNUM_VALUE)
1264 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME)
1265 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME)
1266 tasks = req.get_str_list_param(ViewParam.TASKS,
1267 validator=validate_task_tablename)
1268 all_tasks = req.get_bool_param(ViewParam.ALL_TASKS, True)
1269 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML)
1270 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True)
1272 if all_tasks:
1273 task_classes = [] # type: List[Type[Task]]
1274 else:
1275 try:
1276 task_classes = task_classes_from_table_names(
1277 tasks, sortmethod=TaskClassSortMethod.SHORTNAME)
1278 except KeyError:
1279 raise HTTPBadRequest(_("Invalid tasks specified"))
1280 if as_tracker and not all(c.provides_trackers for c in task_classes):
1281 raise HTTPBadRequest(_("Not all tasks specified provide trackers"))
1283 iddefs = [IdNumReference(which_idnum, idnum_value)]
1285 taskfilter = TaskFilter()
1286 taskfilter.task_types = [tc.__tablename__ for tc in task_classes] # a bit silly... # noqa
1287 taskfilter.idnum_criteria = iddefs
1288 taskfilter.start_datetime = start_datetime
1289 taskfilter.end_datetime = end_datetime
1290 taskfilter.complete_only = True # trackers require complete tasks
1291 taskfilter.set_sort_method(TaskClassSortMethod.SHORTNAME)
1292 taskfilter.tasks_offering_trackers_only = as_tracker
1293 taskfilter.tasks_with_patient_only = True
1295 tracker_ctv_class = ClinicalTextView if as_ctv else Tracker
1296 tracker = tracker_ctv_class(req=req, taskfilter=taskfilter,
1297 via_index=via_index)
1299 if viewtype == ViewArg.HTML:
1300 return Response(
1301 tracker.get_html()
1302 )
1303 elif viewtype == ViewArg.PDF:
1304 return PdfResponse(
1305 body=tracker.get_pdf(),
1306 filename=tracker.suggested_pdf_filename()
1307 )
1308 elif viewtype == ViewArg.PDFHTML: # debugging option
1309 return Response(
1310 tracker.get_pdf_html()
1311 )
1312 elif viewtype == ViewArg.XML:
1313 include_comments = req.get_bool_param(ViewParam.INCLUDE_COMMENTS, True)
1314 return XmlResponse(
1315 tracker.get_xml(include_comments=include_comments)
1316 )
1317 else:
1318 permissible = [ViewArg.HTML, ViewArg.PDF, ViewArg.PDFHTML, ViewArg.XML]
1319 raise HTTPBadRequest(
1320 f"{_('Invalid view type:')} {viewtype!r} "
1321 f"({_('permissible:')} {permissible!r})")
1324@view_config(route_name=Routes.TRACKER,
1325 http_cache=NEVER_CACHE)
1326def serve_tracker(req: "CamcopsRequest") -> Response:
1327 """
1328 View to serve a :class:`camcops_server.cc_modules.cc_tracker.Tracker`; see
1329 :func:`serve_tracker_or_ctv`.
1330 """
1331 return serve_tracker_or_ctv(req, as_ctv=False)
1334@view_config(route_name=Routes.CTV,
1335 http_cache=NEVER_CACHE)
1336def serve_ctv(req: "CamcopsRequest") -> Response:
1337 """
1338 View to serve a
1339 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`; see
1340 :func:`serve_tracker_or_ctv`.
1341 """
1342 return serve_tracker_or_ctv(req, as_ctv=True)
1345# =============================================================================
1346# Reports
1347# =============================================================================
1349@view_config(route_name=Routes.REPORTS_MENU,
1350 renderer="reports_menu.mako",
1351 http_cache=NEVER_CACHE)
1352def reports_menu(req: "CamcopsRequest") -> Dict[str, Any]:
1353 """
1354 Offer a menu of reports.
1356 Note: Reports are not group-specific.
1357 If you're authorized to see any, you'll see the whole menu.
1358 (The *data* you get will be restricted to the group's you're authorized
1359 to run reports for.)
1360 """
1361 if not req.user.authorized_for_reports:
1362 raise HTTPBadRequest(errormsg_cannot_report(req))
1363 return {}
1366@view_config(route_name=Routes.OFFER_REPORT,
1367 http_cache=NEVER_CACHE)
1368def offer_report(req: "CamcopsRequest") -> Response:
1369 """
1370 Offer configuration options for a single report, or (following submission)
1371 redirect to serve that report (with configuration parameters in the URL).
1372 """
1373 if not req.user.authorized_for_reports:
1374 raise HTTPBadRequest(errormsg_cannot_report(req))
1375 report_id = req.get_str_param(ViewParam.REPORT_ID)
1376 report = get_report_instance(report_id)
1377 _ = req.gettext
1378 if not report:
1379 raise HTTPBadRequest(f"{_('No such report ID:')} {report_id!r}")
1380 if report.superuser_only and not req.user.superuser:
1381 raise HTTPBadRequest(
1382 f"{_('Report is restricted to the superuser:')} {report_id!r}")
1383 form = report.get_form(req)
1384 if FormAction.SUBMIT in req.POST:
1385 try:
1386 controls = list(req.POST.items())
1387 appstruct = form.validate(controls) # may raise
1388 keys = report.get_http_query_keys()
1389 querydict = {k: appstruct.get(k) for k in keys}
1390 querydict[ViewParam.REPORT_ID] = report_id
1391 querydict[ViewParam.PAGE] = 1
1392 # Send the user to the actual data using GET: this allows page
1393 # navigation whilst maintaining any report-specific parameters.
1394 raise HTTPFound(req.route_url(Routes.REPORT, _query=querydict))
1395 except ValidationFailure as e:
1396 rendered_form = e.render()
1397 else:
1398 rendered_form = form.render({ViewParam.REPORT_ID: report_id})
1399 return render_to_response(
1400 "report_offer.mako",
1401 dict(
1402 report=report,
1403 form=rendered_form,
1404 head_form_html=get_head_form_html(req, [form])
1405 ),
1406 request=req
1407 )
1410@view_config(route_name=Routes.REPORT,
1411 http_cache=NEVER_CACHE)
1412def serve_report(req: "CamcopsRequest") -> Response:
1413 """
1414 Serve a configured report.
1415 """
1416 if not req.user.authorized_for_reports:
1417 raise HTTPBadRequest(errormsg_cannot_report(req))
1418 report_id = req.get_str_param(ViewParam.REPORT_ID)
1419 report = get_report_instance(report_id)
1420 _ = req.gettext
1421 if not report:
1422 raise HTTPBadRequest(f"{_('No such report ID:')} {report_id!r}")
1423 if report.superuser_only and not req.user.superuser:
1424 raise HTTPBadRequest(
1425 f"{_('Report is restricted to the superuser:')} {report_id!r}")
1427 return report.get_response(req)
1430# =============================================================================
1431# Research downloads
1432# =============================================================================
1434@view_config(route_name=Routes.OFFER_BASIC_DUMP,
1435 http_cache=NEVER_CACHE)
1436def offer_basic_dump(req: "CamcopsRequest") -> Response:
1437 """
1438 View to configure a basic research dump.
1439 Following submission success, it redirects to a view serving a TSV/ZIP
1440 dump.
1441 """
1442 if not req.user.authorized_to_dump:
1443 raise HTTPBadRequest(errormsg_cannot_dump(req))
1444 form = OfferBasicDumpForm(request=req)
1445 if FormAction.SUBMIT in req.POST:
1446 try:
1447 controls = list(req.POST.items())
1448 appstruct = form.validate(controls)
1449 manual = appstruct.get(ViewParam.MANUAL)
1450 querydict = {
1451 ViewParam.DUMP_METHOD: appstruct.get(ViewParam.DUMP_METHOD),
1452 ViewParam.SORT: appstruct.get(ViewParam.SORT),
1453 ViewParam.GROUP_IDS: manual.get(ViewParam.GROUP_IDS),
1454 ViewParam.TASKS: manual.get(ViewParam.TASKS),
1455 ViewParam.VIEWTYPE: appstruct.get(ViewParam.VIEWTYPE),
1456 ViewParam.DELIVERY_MODE: appstruct.get(ViewParam.DELIVERY_MODE),
1457 ViewParam.INCLUDE_INFORMATION_SCHEMA_COLUMNS: appstruct.get(
1458 ViewParam.INCLUDE_INFORMATION_SCHEMA_COLUMNS),
1459 }
1460 # We could return a response, or redirect via GET.
1461 # The request is not sensitive, so let's redirect.
1462 return HTTPFound(req.route_url(Routes.BASIC_DUMP,
1463 _query=querydict))
1464 except ValidationFailure as e:
1465 rendered_form = e.render()
1466 else:
1467 rendered_form = form.render()
1468 return render_to_response(
1469 "dump_basic_offer.mako",
1470 dict(form=rendered_form,
1471 head_form_html=get_head_form_html(req, [form])),
1472 request=req
1473 )
1476def get_dump_collection(req: "CamcopsRequest") -> TaskCollection:
1477 """
1478 Returns the collection of tasks being requested for a dump operation.
1479 Raises an error if the request is bad.
1480 """
1481 if not req.user.authorized_to_dump:
1482 raise HTTPBadRequest(errormsg_cannot_dump(req))
1483 # -------------------------------------------------------------------------
1484 # Get parameters
1485 # -------------------------------------------------------------------------
1486 dump_method = req.get_str_param(ViewParam.DUMP_METHOD)
1487 group_ids = req.get_int_list_param(ViewParam.GROUP_IDS)
1488 task_names = req.get_str_list_param(ViewParam.TASKS,
1489 validator=validate_task_tablename)
1491 # -------------------------------------------------------------------------
1492 # Select tasks
1493 # -------------------------------------------------------------------------
1494 if dump_method == ViewArg.EVERYTHING:
1495 taskfilter = TaskFilter()
1496 elif dump_method == ViewArg.USE_SESSION_FILTER:
1497 taskfilter = req.camcops_session.get_task_filter()
1498 elif dump_method == ViewArg.SPECIFIC_TASKS_GROUPS:
1499 taskfilter = TaskFilter()
1500 taskfilter.task_types = task_names
1501 taskfilter.group_ids = group_ids
1502 else:
1503 _ = req.gettext
1504 raise HTTPBadRequest(f"{_('Bad parameter:')} "
1505 f"{ViewParam.DUMP_METHOD}={dump_method!r}")
1506 return TaskCollection(
1507 req=req,
1508 taskfilter=taskfilter,
1509 as_dump=True,
1510 sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC
1511 )
1514@view_config(route_name=Routes.BASIC_DUMP,
1515 http_cache=NEVER_CACHE)
1516def serve_basic_dump(req: "CamcopsRequest") -> Response:
1517 """
1518 View serving a TSV/ZIP basic research dump.
1519 """
1520 # Get view-specific parameters
1521 sort_by_heading = req.get_bool_param(ViewParam.SORT, False)
1522 viewtype = req.get_str_param(
1523 ViewParam.VIEWTYPE, ViewArg.XLSX, lower=True)
1524 delivery_mode = req.get_str_param(
1525 ViewParam.DELIVERY_MODE, ViewArg.EMAIL, lower=True)
1526 include_information_schema_columns = req.get_bool_param(
1527 ViewParam.INCLUDE_INFORMATION_SCHEMA_COLUMNS, False)
1529 # Get tasks (and perform checks)
1530 collection = get_dump_collection(req)
1531 # Create object that knows how to export
1532 exporter = make_exporter(
1533 req=req,
1534 collection=collection,
1535 options=DownloadOptions(
1536 user_id=req.user_id,
1537 viewtype=viewtype,
1538 delivery_mode=delivery_mode,
1539 spreadsheet_sort_by_heading=sort_by_heading,
1540 include_information_schema_columns=include_information_schema_columns # noqa
1541 )
1542 ) # may raise
1543 # Export, or schedule an email/download
1544 return exporter.immediate_response(req)
1547@view_config(route_name=Routes.OFFER_SQL_DUMP,
1548 http_cache=NEVER_CACHE)
1549def offer_sql_dump(req: "CamcopsRequest") -> Response:
1550 """
1551 View to configure a SQL research dump.
1552 Following submission success, it redirects to a view serving the SQL dump.
1553 """
1554 if not req.user.authorized_to_dump:
1555 raise HTTPBadRequest(errormsg_cannot_dump(req))
1556 form = OfferSqlDumpForm(request=req)
1557 if FormAction.SUBMIT in req.POST:
1558 try:
1559 controls = list(req.POST.items())
1560 appstruct = form.validate(controls)
1561 manual = appstruct.get(ViewParam.MANUAL)
1562 querydict = {
1563 ViewParam.DUMP_METHOD: appstruct.get(ViewParam.DUMP_METHOD),
1564 ViewParam.SQLITE_METHOD: appstruct.get(ViewParam.SQLITE_METHOD), # noqa
1565 ViewParam.INCLUDE_BLOBS: appstruct.get(ViewParam.INCLUDE_BLOBS), # noqa
1566 ViewParam.PATIENT_ID_PER_ROW: appstruct.get(ViewParam.PATIENT_ID_PER_ROW), # noqa
1567 ViewParam.GROUP_IDS: manual.get(ViewParam.GROUP_IDS),
1568 ViewParam.TASKS: manual.get(ViewParam.TASKS),
1569 ViewParam.DELIVERY_MODE: appstruct.get(ViewParam.DELIVERY_MODE),
1570 ViewParam.INCLUDE_INFORMATION_SCHEMA_COLUMNS: appstruct.get(
1571 ViewParam.INCLUDE_INFORMATION_SCHEMA_COLUMNS),
1572 }
1573 # We could return a response, or redirect via GET.
1574 # The request is not sensitive, so let's redirect.
1575 return HTTPFound(req.route_url(Routes.SQL_DUMP, _query=querydict))
1576 except ValidationFailure as e:
1577 rendered_form = e.render()
1578 else:
1579 rendered_form = form.render()
1580 return render_to_response(
1581 "dump_sql_offer.mako",
1582 dict(form=rendered_form,
1583 head_form_html=get_head_form_html(req, [form])),
1584 request=req
1585 )
1588@view_config(route_name=Routes.SQL_DUMP,
1589 http_cache=NEVER_CACHE)
1590def sql_dump(req: "CamcopsRequest") -> Response:
1591 """
1592 View serving an SQL dump in the chosen format (e.g. SQLite binary, SQL).
1593 """
1594 # Get view-specific parameters
1595 sqlite_method = req.get_str_param(ViewParam.SQLITE_METHOD)
1596 include_blobs = req.get_bool_param(ViewParam.INCLUDE_BLOBS, False)
1597 patient_id_per_row = req.get_bool_param(ViewParam.PATIENT_ID_PER_ROW, True)
1598 delivery_mode = req.get_str_param(ViewParam.DELIVERY_MODE,
1599 ViewArg.EMAIL, lower=True)
1600 include_information_schema_columns = req.get_bool_param(
1601 ViewParam.INCLUDE_INFORMATION_SCHEMA_COLUMNS, False)
1603 # Get tasks (and perform checks)
1604 collection = get_dump_collection(req)
1605 # Create object that knows how to export
1606 exporter = make_exporter(
1607 req=req,
1608 collection=collection,
1609 options=DownloadOptions(
1610 user_id=req.user_id,
1611 viewtype=sqlite_method,
1612 delivery_mode=delivery_mode,
1613 db_include_blobs=include_blobs,
1614 db_patient_id_per_row=patient_id_per_row,
1615 include_information_schema_columns=include_information_schema_columns # noqa
1616 )
1617 ) # may raise
1618 # Export, or schedule an email/download
1619 return exporter.immediate_response(req)
1622# noinspection PyUnusedLocal
1623@view_config(route_name=Routes.DOWNLOAD_AREA,
1624 renderer="download_area.mako",
1625 http_cache=NEVER_CACHE)
1626def download_area(req: "CamcopsRequest") -> Dict[str, Any]:
1627 """
1628 Shows the user download area.
1629 """
1630 userdir = req.user_download_dir
1631 if userdir:
1632 files = UserDownloadFile.from_directory_scan(
1633 directory=userdir,
1634 permitted_lifespan_min=req.config.user_download_file_lifetime_min,
1635 req=req)
1636 else:
1637 files = [] # type: List[UserDownloadFile]
1638 return dict(
1639 files=files,
1640 available=bytes2human(req.user_download_bytes_available),
1641 permitted=bytes2human(req.user_download_bytes_permitted),
1642 used=bytes2human(req.user_download_bytes_used),
1643 lifetime_min=req.config.user_download_file_lifetime_min,
1644 )
1647@view_config(route_name=Routes.DOWNLOAD_FILE,
1648 http_cache=NEVER_CACHE)
1649def download_file(req: "CamcopsRequest") -> Response:
1650 """
1651 Downloads a file.
1652 """
1653 _ = req.gettext
1654 filename = req.get_str_param(ViewParam.FILENAME, "")
1655 # Security comes here: we do NOT permit any path information in the
1656 # filename. It MUST be relative to and within the user download directory.
1657 # We cannot trust the input.
1658 filename = os.path.basename(filename)
1659 udf = UserDownloadFile(directory=req.user_download_dir,
1660 filename=filename)
1661 if not udf.exists:
1662 raise HTTPBadRequest(f'{_("No such file:")} {filename}')
1663 try:
1664 return BinaryResponse(
1665 body=udf.contents,
1666 filename=udf.filename,
1667 content_type=MimeType.BINARY,
1668 as_inline=False
1669 )
1670 except OSError:
1671 raise HTTPBadRequest(f'{_("Error reading file:")} {filename}')
1674@view_config(route_name=Routes.DELETE_FILE,
1675 request_method="POST",
1676 http_cache=NEVER_CACHE)
1677def delete_file(req: "CamcopsRequest") -> Response:
1678 """
1679 Deletes a file.
1680 """
1681 form = UserDownloadDeleteForm(request=req)
1682 controls = list(req.POST.items())
1683 appstruct = form.validate(controls) # CSRF; may raise ValidationError
1684 filename = appstruct.get(ViewParam.FILENAME, "")
1685 # Security comes here: we do NOT permit any path information in the
1686 # filename. It MUST be relative to and within the user download directory.
1687 # We cannot trust the input.
1688 filename = os.path.basename(filename)
1689 udf = UserDownloadFile(directory=req.user_download_dir,
1690 filename=filename)
1691 if not udf.exists:
1692 _ = req.gettext
1693 raise HTTPBadRequest(f'{_("No such file:")} {filename}')
1694 udf.delete()
1695 return HTTPFound(req.route_url(Routes.DOWNLOAD_AREA)) # redirect
1698# =============================================================================
1699# View DDL (table definitions)
1700# =============================================================================
1702LEXERMAP = {
1703 SqlaDialectName.MYSQL: pygments.lexers.sql.MySqlLexer,
1704 SqlaDialectName.MSSQL: pygments.lexers.sql.SqlLexer, # generic
1705 SqlaDialectName.ORACLE: pygments.lexers.sql.SqlLexer, # generic
1706 SqlaDialectName.FIREBIRD: pygments.lexers.sql.SqlLexer, # generic
1707 SqlaDialectName.POSTGRES: pygments.lexers.sql.PostgresLexer,
1708 SqlaDialectName.SQLITE: pygments.lexers.sql.SqlLexer, # generic; SqliteConsoleLexer is wrong # noqa
1709 SqlaDialectName.SYBASE: pygments.lexers.sql.SqlLexer, # generic
1710}
1713@view_config(route_name=Routes.VIEW_DDL,
1714 http_cache=NEVER_CACHE)
1715def view_ddl(req: "CamcopsRequest") -> Response:
1716 """
1717 Inspect table definitions (data definition language, DDL) with field
1718 comments.
1720 2021-04-30: restricted to users with "dump" authority -- not because this
1721 is a vulnerability, as the penetration testers suggested, but just to make
1722 it consistent with the menu item for this.
1723 """
1724 if not req.user.authorized_to_dump:
1725 raise HTTPBadRequest(errormsg_cannot_dump(req))
1726 form = ViewDdlForm(request=req)
1727 if FormAction.SUBMIT in req.POST:
1728 try:
1729 controls = list(req.POST.items())
1730 appstruct = form.validate(controls)
1731 dialect = appstruct.get(ViewParam.DIALECT)
1732 ddl = get_all_ddl(dialect_name=dialect)
1733 lexer = LEXERMAP[dialect]()
1734 # noinspection PyUnresolvedReferences
1735 formatter = pygments.formatters.HtmlFormatter()
1736 html = pygments.highlight(ddl, lexer, formatter)
1737 css = formatter.get_style_defs('.highlight')
1738 return render_to_response("introspect_file.mako",
1739 dict(css=css,
1740 code_html=html),
1741 request=req)
1742 except ValidationFailure as e:
1743 rendered_form = e.render()
1744 else:
1745 rendered_form = form.render()
1746 current_dialect = get_dialect_name(get_engine_from_session(req.dbsession))
1747 sql_dialect_choices = get_sql_dialect_choices(req)
1748 current_dialect_description = {k: v for k, v in sql_dialect_choices}.get(
1749 current_dialect, "?")
1750 return render_to_response(
1751 "view_ddl_choose_dialect.mako",
1752 dict(current_dialect=current_dialect,
1753 current_dialect_description=current_dialect_description,
1754 form=rendered_form,
1755 head_form_html=get_head_form_html(req, [form])),
1756 request=req)
1759# =============================================================================
1760# View audit trail
1761# =============================================================================
1763@view_config(route_name=Routes.OFFER_AUDIT_TRAIL,
1764 permission=Permission.SUPERUSER,
1765 http_cache=NEVER_CACHE)
1766def offer_audit_trail(req: "CamcopsRequest") -> Response:
1767 """
1768 View to configure how we'll view the audit trail. Once configured, it
1769 redirects to a view that shows the audit trail (with query parameters in
1770 the URL).
1771 """
1772 form = AuditTrailForm(request=req)
1773 if FormAction.SUBMIT in req.POST:
1774 try:
1775 controls = list(req.POST.items())
1776 appstruct = form.validate(controls)
1777 keys = [
1778 ViewParam.ROWS_PER_PAGE,
1779 ViewParam.START_DATETIME,
1780 ViewParam.END_DATETIME,
1781 ViewParam.SOURCE,
1782 ViewParam.REMOTE_IP_ADDR,
1783 ViewParam.USERNAME,
1784 ViewParam.TABLE_NAME,
1785 ViewParam.SERVER_PK,
1786 ViewParam.TRUNCATE,
1787 ]
1788 querydict = {k: appstruct.get(k) for k in keys}
1789 querydict[ViewParam.PAGE] = 1
1790 # Send the user to the actual data using GET:
1791 # (the parameters are NOT sensitive)
1792 raise HTTPFound(req.route_url(Routes.VIEW_AUDIT_TRAIL,
1793 _query=querydict))
1794 except ValidationFailure as e:
1795 rendered_form = e.render()
1796 else:
1797 rendered_form = form.render()
1798 return render_to_response(
1799 "audit_trail_choices.mako",
1800 dict(form=rendered_form,
1801 head_form_html=get_head_form_html(req, [form])),
1802 request=req)
1805AUDIT_TRUNCATE_AT = 100
1808@view_config(route_name=Routes.VIEW_AUDIT_TRAIL,
1809 permission=Permission.SUPERUSER,
1810 http_cache=NEVER_CACHE)
1811def view_audit_trail(req: "CamcopsRequest") -> Response:
1812 """
1813 View to serve the audit trail.
1814 """
1815 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
1816 DEFAULT_ROWS_PER_PAGE)
1817 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME)
1818 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME)
1819 source = req.get_str_param(ViewParam.SOURCE, None)
1820 remote_addr = req.get_str_param(ViewParam.REMOTE_IP_ADDR, None,
1821 validator=validate_ip_address)
1822 username = req.get_str_param(ViewParam.USERNAME, None,
1823 validator=validate_username)
1824 table_name = req.get_str_param(ViewParam.TABLE_NAME, None,
1825 validator=validate_task_tablename)
1826 server_pk = req.get_int_param(ViewParam.SERVER_PK, None)
1827 truncate = req.get_bool_param(ViewParam.TRUNCATE, True)
1828 page_num = req.get_int_param(ViewParam.PAGE, 1)
1830 conditions = [] # type: List[str]
1832 def add_condition(key: str, value: Any) -> None:
1833 conditions.append(f"{key} = {value}")
1835 dbsession = req.dbsession
1836 q = dbsession.query(AuditEntry)
1837 if start_datetime:
1838 q = q.filter(AuditEntry.when_access_utc >= start_datetime)
1839 add_condition(ViewParam.START_DATETIME, start_datetime)
1840 if end_datetime:
1841 q = q.filter(AuditEntry.when_access_utc < end_datetime)
1842 add_condition(ViewParam.END_DATETIME, end_datetime)
1843 if source:
1844 q = q.filter(AuditEntry.source == source)
1845 add_condition(ViewParam.SOURCE, source)
1846 if remote_addr:
1847 q = q.filter(AuditEntry.remote_addr == remote_addr)
1848 add_condition(ViewParam.REMOTE_IP_ADDR, remote_addr)
1849 if username:
1850 # https://stackoverflow.com/questions/8561470/sqlalchemy-filtering-by-relationship-attribute # noqa
1851 q = q.join(User).filter(User.username == username)
1852 add_condition(ViewParam.USERNAME, username)
1853 if table_name:
1854 q = q.filter(AuditEntry.table_name == table_name)
1855 add_condition(ViewParam.TABLE_NAME, table_name)
1856 if server_pk is not None:
1857 q = q.filter(AuditEntry.server_pk == server_pk)
1858 add_condition(ViewParam.SERVER_PK, server_pk)
1860 q = q.order_by(desc(AuditEntry.id))
1862 # audit_entries = dbsession.execute(q).fetchall()
1863 # ... no! That executes to give you row-type results.
1864 # audit_entries = q.all()
1865 # ... yes! But let's paginate, too:
1866 page = SqlalchemyOrmPage(query=q,
1867 page=page_num,
1868 items_per_page=rows_per_page,
1869 url_maker=PageUrl(req),
1870 request=req)
1871 return render_to_response("audit_trail_view.mako",
1872 dict(conditions="; ".join(conditions),
1873 page=page,
1874 truncate=truncate,
1875 truncate_at=AUDIT_TRUNCATE_AT),
1876 request=req)
1879# =============================================================================
1880# View export logs
1881# =============================================================================
1882# Overview:
1883# - View exported tasks (ExportedTask) collectively
1884# ... option to filter by recipient_name
1885# ... option to filter by date/etc.
1886# - View exported tasks (ExportedTask) individually
1887# ... hyperlinks to individual views of:
1888# Email (not necessary: ExportedTaskEmail)
1889# ExportRecipient
1890# ExportedTaskFileGroup
1891# ExportedTaskHL7Message
1893@view_config(route_name=Routes.OFFER_EXPORTED_TASK_LIST,
1894 permission=Permission.SUPERUSER,
1895 http_cache=NEVER_CACHE)
1896def offer_exported_task_list(req: "CamcopsRequest") -> Response:
1897 """
1898 View to choose how we'll view the exported task log.
1899 """
1900 form = ExportedTaskListForm(request=req)
1901 if FormAction.SUBMIT in req.POST:
1902 try:
1903 controls = list(req.POST.items())
1904 appstruct = form.validate(controls)
1905 keys = [
1906 ViewParam.ROWS_PER_PAGE,
1907 ViewParam.RECIPIENT_NAME,
1908 ViewParam.TABLE_NAME,
1909 ViewParam.SERVER_PK,
1910 ViewParam.ID,
1911 ViewParam.START_DATETIME,
1912 ViewParam.END_DATETIME,
1913 ]
1914 querydict = {k: appstruct.get(k) for k in keys}
1915 querydict[ViewParam.PAGE] = 1
1916 # Send the user to the actual data using GET
1917 # (the parameters are NOT sensitive)
1918 return HTTPFound(req.route_url(Routes.VIEW_EXPORTED_TASK_LIST,
1919 _query=querydict))
1920 except ValidationFailure as e:
1921 rendered_form = e.render()
1922 else:
1923 rendered_form = form.render()
1924 return render_to_response(
1925 "exported_task_choose.mako",
1926 dict(form=rendered_form,
1927 head_form_html=get_head_form_html(req, [form])),
1928 request=req)
1931@view_config(route_name=Routes.VIEW_EXPORTED_TASK_LIST,
1932 permission=Permission.SUPERUSER,
1933 http_cache=NEVER_CACHE)
1934def view_exported_task_list(req: "CamcopsRequest") -> Response:
1935 """
1936 View to serve the exported task log.
1937 """
1938 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
1939 DEFAULT_ROWS_PER_PAGE)
1940 recipient_name = req.get_str_param(
1941 ViewParam.RECIPIENT_NAME, None,
1942 validator=validate_export_recipient_name)
1943 table_name = req.get_str_param(
1944 ViewParam.TABLE_NAME, None,
1945 validator=validate_task_tablename)
1946 server_pk = req.get_int_param(ViewParam.SERVER_PK, None)
1947 et_id = req.get_int_param(ViewParam.ID, None)
1948 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME)
1949 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME)
1950 page_num = req.get_int_param(ViewParam.PAGE, 1)
1952 conditions = [] # type: List[str]
1954 def add_condition(key: str, value: Any) -> None:
1955 conditions.append(f"{key} = {value}")
1957 dbsession = req.dbsession
1958 q = dbsession.query(ExportedTask)
1960 if recipient_name:
1961 q = (
1962 q.join(ExportRecipient)
1963 .filter(ExportRecipient.recipient_name == recipient_name)
1964 )
1965 add_condition(ViewParam.RECIPIENT_NAME, recipient_name)
1966 if table_name:
1967 q = q.filter(ExportedTask.basetable == table_name)
1968 add_condition(ViewParam.TABLE_NAME, table_name)
1969 if server_pk is not None:
1970 q = q.filter(ExportedTask.task_server_pk == server_pk)
1971 add_condition(ViewParam.SERVER_PK, server_pk)
1972 if et_id is not None:
1973 q = q.filter(ExportedTask.id == et_id)
1974 add_condition(ViewParam.ID, et_id)
1975 if start_datetime:
1976 q = q.filter(ExportedTask.start_at_utc >= start_datetime)
1977 add_condition(ViewParam.START_DATETIME, start_datetime)
1978 if end_datetime:
1979 q = q.filter(ExportedTask.start_at_utc < end_datetime)
1980 add_condition(ViewParam.END_DATETIME, end_datetime)
1982 q = q.order_by(desc(ExportedTask.id))
1984 page = SqlalchemyOrmPage(query=q,
1985 page=page_num,
1986 items_per_page=rows_per_page,
1987 url_maker=PageUrl(req),
1988 request=req)
1989 return render_to_response("exported_task_list.mako",
1990 dict(conditions="; ".join(conditions),
1991 page=page),
1992 request=req)
1995# =============================================================================
1996# View helpers for ORM objects
1997# =============================================================================
1999def _view_generic_object_by_id(req: "CamcopsRequest",
2000 cls: Type,
2001 instance_name_for_mako: str,
2002 mako_template: str) -> Response:
2003 """
2004 Boilerplate code to view an individual SQLAlchemy ORM object. The object
2005 must have an integer ``id`` field as its primary key, and the ID value must
2006 be present in the ``ViewParam.ID`` field of the request.
2008 Args:
2009 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2010 cls: the SQLAlchemy ORM class
2011 instance_name_for_mako: what will the object be called when it's
2012 mako_template: Mako template filename
2014 Returns:
2015 :class:`pyramid.response.Response`
2016 """
2017 item_id = req.get_int_param(ViewParam.ID, None)
2018 dbsession = req.dbsession
2019 # noinspection PyUnresolvedReferences
2020 obj = (
2021 dbsession.query(cls)
2022 .filter(cls.id == item_id)
2023 .first()
2024 )
2025 if obj is None:
2026 _ = req.gettext
2027 raise HTTPBadRequest(f"{_('Bad ID for object type')} "
2028 f"{cls.__name__}: {item_id}")
2029 d = {instance_name_for_mako: obj}
2030 return render_to_response(mako_template, d, request=req)
2033# =============================================================================
2034# Specialized views for ORM objects
2035# =============================================================================
2037@view_config(route_name=Routes.VIEW_EMAIL,
2038 permission=Permission.SUPERUSER,
2039 http_cache=NEVER_CACHE)
2040def view_email(req: "CamcopsRequest") -> Response:
2041 """
2042 View on an individual :class:`camcops_server.cc_modules.cc_email.Email`.
2043 """
2044 return _view_generic_object_by_id(
2045 req=req,
2046 cls=Email,
2047 instance_name_for_mako="email",
2048 mako_template="view_email.mako",
2049 )
2052@view_config(route_name=Routes.VIEW_EXPORT_RECIPIENT,
2053 permission=Permission.SUPERUSER,
2054 http_cache=NEVER_CACHE)
2055def view_export_recipient(req: "CamcopsRequest") -> Response:
2056 """
2057 View on an individual
2058 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTask`.
2059 """
2060 return _view_generic_object_by_id(
2061 req=req,
2062 cls=ExportRecipient,
2063 instance_name_for_mako="recipient",
2064 mako_template="export_recipient.mako",
2065 )
2068@view_config(route_name=Routes.VIEW_EXPORTED_TASK,
2069 permission=Permission.SUPERUSER,
2070 http_cache=NEVER_CACHE)
2071def view_exported_task(req: "CamcopsRequest") -> Response:
2072 """
2073 View on an individual
2074 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTask`.
2075 """
2076 return _view_generic_object_by_id(
2077 req=req,
2078 cls=ExportedTask,
2079 instance_name_for_mako="et",
2080 mako_template="exported_task.mako",
2081 )
2084@view_config(route_name=Routes.VIEW_EXPORTED_TASK_EMAIL,
2085 permission=Permission.SUPERUSER,
2086 http_cache=NEVER_CACHE)
2087def view_exported_task_email(req: "CamcopsRequest") -> Response:
2088 """
2089 View on an individual
2090 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskEmail`.
2091 """
2092 return _view_generic_object_by_id(
2093 req=req,
2094 cls=ExportedTaskEmail,
2095 instance_name_for_mako="ete",
2096 mako_template="exported_task_email.mako",
2097 )
2100@view_config(route_name=Routes.VIEW_EXPORTED_TASK_FILE_GROUP,
2101 permission=Permission.SUPERUSER,
2102 http_cache=NEVER_CACHE)
2103def view_exported_task_file_group(req: "CamcopsRequest") -> Response:
2104 """
2105 View on an individual
2106 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup`.
2107 """
2108 return _view_generic_object_by_id(
2109 req=req,
2110 cls=ExportedTaskFileGroup,
2111 instance_name_for_mako="fg",
2112 mako_template="exported_task_file_group.mako",
2113 )
2116@view_config(route_name=Routes.VIEW_EXPORTED_TASK_HL7_MESSAGE,
2117 permission=Permission.SUPERUSER,
2118 http_cache=NEVER_CACHE)
2119def view_exported_task_hl7_message(req: "CamcopsRequest") -> Response:
2120 """
2121 View on an individual
2122 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskHL7Message`.
2123 """
2124 return _view_generic_object_by_id(
2125 req=req,
2126 cls=ExportedTaskHL7Message,
2127 instance_name_for_mako="msg",
2128 mako_template="exported_task_hl7_message.mako",
2129 )
2132# =============================================================================
2133# User/server info views
2134# =============================================================================
2136@view_config(route_name=Routes.VIEW_OWN_USER_INFO,
2137 renderer="view_own_user_info.mako",
2138 http_cache=NEVER_CACHE)
2139def view_own_user_info(req: "CamcopsRequest") -> Dict[str, Any]:
2140 """
2141 View to provide information about your own user.
2142 """
2143 groups_page = CamcopsPage(req.user.groups,
2144 url_maker=PageUrl(req),
2145 request=req)
2146 return dict(user=req.user,
2147 groups_page=groups_page,
2148 valid_which_idnums=req.valid_which_idnums)
2151@view_config(route_name=Routes.VIEW_SERVER_INFO,
2152 renderer="view_server_info.mako",
2153 http_cache=NEVER_CACHE)
2154def view_server_info(req: "CamcopsRequest") -> Dict[str, Any]:
2155 """
2156 View to show the server's ID policies, etc.
2157 """
2158 _ = req.gettext
2159 now = req.now
2160 recent_activity = OrderedDict([
2161 (_("Last 1 minute"), CamcopsSession.n_sessions_active_since(
2162 req, now.subtract(minutes=1))),
2163 (_("Last 5 minutes"), CamcopsSession.n_sessions_active_since(
2164 req, now.subtract(minutes=5))),
2165 (_("Last 10 minutes"), CamcopsSession.n_sessions_active_since(
2166 req, now.subtract(minutes=10))),
2167 (_("Last 1 hour"), CamcopsSession.n_sessions_active_since(
2168 req, now.subtract(hours=1))),
2169 ])
2170 return dict(
2171 idnum_definitions=req.idnum_definitions,
2172 string_families=req.extrastring_families(),
2173 all_task_classes=Task.all_subclasses_by_longname(req),
2174 recent_activity=recent_activity,
2175 session_timeout_minutes=req.config.session_timeout_minutes,
2176 restricted_tasks=req.config.restricted_tasks,
2177 )
2180# =============================================================================
2181# User management
2182# =============================================================================
2184EDIT_USER_KEYS_GROUPADMIN = [
2185 # SPECIAL HANDLING # ViewParam.USER_ID,
2186 ViewParam.USERNAME,
2187 ViewParam.FULLNAME,
2188 ViewParam.EMAIL,
2189 ViewParam.MUST_CHANGE_PASSWORD,
2190 ViewParam.LANGUAGE,
2191 # SPECIAL HANDLING # ViewParam.GROUP_IDS,
2192]
2193EDIT_USER_KEYS_SUPERUSER = EDIT_USER_KEYS_GROUPADMIN + [
2194 ViewParam.SUPERUSER,
2195]
2196EDIT_USER_GROUP_MEMBERSHIP_KEYS_GROUPADMIN = [
2197 ViewParam.MAY_UPLOAD,
2198 ViewParam.MAY_REGISTER_DEVICES,
2199 ViewParam.MAY_USE_WEBVIEWER,
2200 ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED,
2201 ViewParam.MAY_DUMP_DATA,
2202 ViewParam.MAY_RUN_REPORTS,
2203 ViewParam.MAY_ADD_NOTES,
2204]
2205EDIT_USER_GROUP_MEMBERSHIP_KEYS_SUPERUSER = EDIT_USER_GROUP_MEMBERSHIP_KEYS_GROUPADMIN + [ # noqa
2206 ViewParam.GROUPADMIN,
2207]
2210def get_user_from_request_user_id_or_raise(req: "CamcopsRequest") -> User:
2211 """
2212 Returns the :class:`camcops_server.cc_modules.cc_user.User` represented by
2213 the request's ``ViewParam.USER_ID`` parameter, or raise
2214 :exc:`HTTPBadRequest`.
2215 """
2216 user_id = req.get_int_param(ViewParam.USER_ID)
2217 user = User.get_user_by_id(req.dbsession, user_id)
2218 if not user:
2219 _ = req.gettext
2220 raise HTTPBadRequest(f"{_('No such user ID:')} {user_id!r}")
2221 return user
2224def query_users_that_i_manage(req: "CamcopsRequest") -> Query:
2225 me = req.user
2226 return me.managed_users()
2229@view_config(route_name=Routes.VIEW_ALL_USERS,
2230 permission=Permission.GROUPADMIN,
2231 renderer="users_view.mako",
2232 http_cache=NEVER_CACHE)
2233def view_all_users(req: "CamcopsRequest") -> Dict[str, Any]:
2234 """
2235 View all users that the current user administers. The view has hyperlinks
2236 to edit those users too.
2237 """
2238 include_auto_generated = req.get_bool_param(
2239 ViewParam.INCLUDE_AUTO_GENERATED, False
2240 )
2241 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
2242 DEFAULT_ROWS_PER_PAGE)
2243 page_num = req.get_int_param(ViewParam.PAGE, 1)
2244 q = query_users_that_i_manage(req)
2245 if not include_auto_generated:
2246 q = q.filter(User.auto_generated == False) # noqa: E712
2247 page = SqlalchemyOrmPage(query=q,
2248 page=page_num,
2249 items_per_page=rows_per_page,
2250 url_maker=PageUrl(req),
2251 request=req)
2253 form = UserFilterForm(request=req)
2254 appstruct = {
2255 ViewParam.INCLUDE_AUTO_GENERATED: include_auto_generated,
2256 }
2257 rendered_form = form.render(appstruct)
2259 return dict(
2260 page=page,
2261 head_form_html=get_head_form_html(req, [form]),
2262 form=rendered_form
2263 )
2266@view_config(route_name=Routes.VIEW_USER_EMAIL_ADDRESSES,
2267 permission=Permission.GROUPADMIN,
2268 renderer="view_user_email_addresses.mako",
2269 http_cache=NEVER_CACHE)
2270def view_user_email_addresses(req: "CamcopsRequest") -> Dict[str, Any]:
2271 """
2272 View e-mail addresses of all users that the requesting user is authorized
2273 to manage.
2274 """
2275 q = query_users_that_i_manage(req).filter(
2276 User.auto_generated == False # noqa: E712
2277 )
2278 return dict(query=q)
2281def assert_may_edit_user(req: "CamcopsRequest", user: User) -> None:
2282 """
2283 Checks that the requesting user (``req.user``) is allowed to edit the other
2284 user (``user``). Raises :exc:`HTTPBadRequest` otherwise.
2285 """
2286 may_edit, why_not = req.user.may_edit_user(req, user)
2287 if not may_edit:
2288 raise HTTPBadRequest(why_not)
2291def assert_may_administer_group(req: "CamcopsRequest", group_id: int) -> None:
2292 """
2293 Checks that the requesting user (``req.user``) is allowed to adminster the
2294 specified group (specified by ``group_id``). Raises :exc:`HTTPBadRequest`
2295 otherwise.
2296 """
2297 if not req.user.may_administer_group(group_id):
2298 _ = req.gettext
2299 raise HTTPBadRequest(_("You may not administer this group"))
2302@view_config(route_name=Routes.VIEW_USER,
2303 permission=Permission.GROUPADMIN,
2304 renderer="view_other_user_info.mako",
2305 http_cache=NEVER_CACHE)
2306def view_user(req: "CamcopsRequest") -> Dict[str, Any]:
2307 """
2308 View to show details of another user, for administrators.
2309 """
2310 user = get_user_from_request_user_id_or_raise(req)
2311 assert_may_edit_user(req, user)
2312 return dict(user=user)
2313 # Groupadmins may see some information regarding groups that aren't theirs
2314 # here, but can't alter it.
2317@view_config(route_name=Routes.EDIT_USER,
2318 renderer="user_edit.mako",
2319 permission=Permission.GROUPADMIN,
2320 http_cache=NEVER_CACHE)
2321def edit_user(req: "CamcopsRequest") -> Dict[str, Any]:
2322 """
2323 View to edit a user (for administrators).
2324 """
2325 route_back = Routes.VIEW_ALL_USERS
2326 if FormAction.CANCEL in req.POST:
2327 raise HTTPFound(req.route_url(route_back))
2328 user = get_user_from_request_user_id_or_raise(req)
2329 assert_may_edit_user(req, user)
2330 # Superusers can do everything, of course.
2331 # Groupadmins can change group memberships only for groups they control
2332 # (here: "fluid"). That means that there may be a subset of group
2333 # memberships for this user that they will neither see nor be able to
2334 # alter (here: "frozen"). They can also edit only a restricted set of
2335 # permissions.
2336 if req.user.superuser:
2337 form = EditUserFullForm(request=req)
2338 keys = EDIT_USER_KEYS_SUPERUSER
2339 else:
2340 form = EditUserGroupAdminForm(request=req)
2341 keys = EDIT_USER_KEYS_GROUPADMIN
2342 # Groups that we might change memberships for:
2343 all_fluid_groups = req.user.ids_of_groups_user_is_admin_for
2344 # All groups that the user is currently in:
2345 user_group_ids = user.group_ids
2346 # Group membership we won't touch:
2347 user_frozen_group_ids = list(set(user_group_ids) - set(all_fluid_groups))
2348 # Group memberships we might alter:
2349 user_fluid_group_ids = list(set(user_group_ids) & set(all_fluid_groups))
2350 # log.debug(
2351 # "all_fluid_groups={}, user_group_ids={}, "
2352 # "user_frozen_group_ids={}, user_fluid_group_ids={}",
2353 # all_fluid_groups, user_group_ids,
2354 # user_frozen_group_ids, user_fluid_group_ids
2355 # )
2356 if FormAction.SUBMIT in req.POST:
2357 try:
2358 controls = list(req.POST.items())
2359 appstruct = form.validate(controls)
2360 # -----------------------------------------------------------------
2361 # Apply the edits
2362 # -----------------------------------------------------------------
2363 dbsession = req.dbsession
2364 new_user_name = appstruct.get(ViewParam.USERNAME)
2365 existing_user = User.get_user_by_name(dbsession, new_user_name)
2366 if existing_user and existing_user.id != user.id:
2367 # noinspection PyUnresolvedReferences
2368 _ = req.gettext
2369 cant_rename_user = _("Can't rename user")
2370 conflicts = _("that conflicts with an existing user with ID")
2371 raise HTTPBadRequest(
2372 f"{cant_rename_user} {user.username!r} (#{user.id!r}) → "
2373 f"{new_user_name!r}; {conflicts} {existing_user.id!r}")
2374 for k in keys:
2375 # What follows assumes that the keys are relevant and valid
2376 # attributes of a User.
2377 setattr(user, k, appstruct.get(k))
2378 group_ids = appstruct.get(ViewParam.GROUP_IDS)
2379 # Add back in the groups we're not going to alter:
2380 final_group_ids = list(set(group_ids) | set(user_frozen_group_ids))
2381 user.set_group_ids(final_group_ids)
2382 # Also, if the user was uploading to a group that they are now no
2383 # longer a member of, we need to fix that
2384 if user.upload_group_id not in final_group_ids:
2385 user.upload_group_id = None
2386 raise HTTPFound(req.route_url(route_back))
2387 except ValidationFailure as e:
2388 rendered_form = e.render()
2389 else:
2390 appstruct = {k: getattr(user, k) for k in keys}
2391 appstruct[ViewParam.USER_ID] = user.id
2392 appstruct[ViewParam.GROUP_IDS] = user_fluid_group_ids
2393 rendered_form = form.render(appstruct)
2394 return dict(user=user,
2395 form=rendered_form,
2396 head_form_html=get_head_form_html(req, [form]))
2399@view_config(route_name=Routes.EDIT_USER_GROUP_MEMBERSHIP,
2400 renderer="user_edit_group_membership.mako",
2401 permission=Permission.GROUPADMIN,
2402 http_cache=NEVER_CACHE)
2403def edit_user_group_membership(req: "CamcopsRequest") -> Dict[str, Any]:
2404 """
2405 View to edit the group memberships of a user (for administrators).
2406 """
2407 route_back = Routes.VIEW_ALL_USERS
2408 if FormAction.CANCEL in req.POST:
2409 raise HTTPFound(req.route_url(route_back))
2410 ugm_id = req.get_int_param(ViewParam.USER_GROUP_MEMBERSHIP_ID)
2411 ugm = UserGroupMembership.get_ugm_by_id(req.dbsession, ugm_id)
2412 if not ugm:
2413 _ = req.gettext
2414 raise HTTPBadRequest(
2415 f"{_('No such UserGroupMembership ID:')} {ugm_id!r}")
2416 user = ugm.user
2417 assert_may_edit_user(req, user)
2418 assert_may_administer_group(req, ugm.group_id)
2419 if req.user.superuser:
2420 form = EditUserGroupPermissionsFullForm(request=req)
2421 keys = EDIT_USER_GROUP_MEMBERSHIP_KEYS_SUPERUSER
2422 else:
2423 form = EditUserGroupMembershipGroupAdminForm(request=req)
2424 keys = EDIT_USER_GROUP_MEMBERSHIP_KEYS_GROUPADMIN
2425 if FormAction.SUBMIT in req.POST:
2426 try:
2427 controls = list(req.POST.items())
2428 appstruct = form.validate(controls)
2429 # -----------------------------------------------------------------
2430 # Apply the changes
2431 # -----------------------------------------------------------------
2432 for k in keys:
2433 setattr(ugm, k, appstruct.get(k))
2434 raise HTTPFound(req.route_url(route_back))
2435 except ValidationFailure as e:
2436 rendered_form = e.render()
2437 else:
2438 appstruct = {k: getattr(ugm, k) for k in keys}
2439 rendered_form = form.render(appstruct)
2440 return dict(ugm=ugm,
2441 form=rendered_form,
2442 head_form_html=get_head_form_html(req, [form]))
2445def set_user_upload_group(req: "CamcopsRequest",
2446 user: User,
2447 by_another: bool) -> Response:
2448 """
2449 Provides a view to choose which group a user uploads into.
2451 TRUSTS ITS CALLER that this is permitted.
2453 Args:
2454 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2455 user: the :class:`camcops_server.cc_modules.cc_user.User` to edit
2456 by_another: is the current user a superuser/group administrator, i.e.
2457 another user? Determines the screen we return to afterwards.
2458 """
2459 route_back = Routes.VIEW_ALL_USERS if by_another else Routes.HOME
2460 if FormAction.CANCEL in req.POST:
2461 return HTTPFound(req.route_url(route_back))
2462 form = SetUserUploadGroupForm(request=req, user=user)
2463 # ... need to show the groups permitted to THAT user, not OUR user
2464 if FormAction.SUBMIT in req.POST:
2465 try:
2466 controls = list(req.POST.items())
2467 appstruct = form.validate(controls)
2468 # -----------------------------------------------------------------
2469 # Apply the changes
2470 # -----------------------------------------------------------------
2471 user.upload_group_id = appstruct.get(ViewParam.UPLOAD_GROUP_ID)
2472 return HTTPFound(req.route_url(route_back))
2473 except ValidationFailure as e:
2474 rendered_form = e.render()
2475 else:
2476 appstruct = {
2477 ViewParam.USER_ID: user.id,
2478 ViewParam.UPLOAD_GROUP_ID: user.upload_group_id
2479 }
2480 rendered_form = form.render(appstruct)
2481 return render_to_response(
2482 "set_user_upload_group.mako",
2483 dict(user=user,
2484 form=rendered_form,
2485 head_form_html=get_head_form_html(req, [form])),
2486 request=req
2487 )
2490@view_config(route_name=Routes.SET_OWN_USER_UPLOAD_GROUP,
2491 http_cache=NEVER_CACHE)
2492def set_own_user_upload_group(req: "CamcopsRequest") -> Response:
2493 """
2494 View to set the upload group for your own user.
2495 """
2496 return set_user_upload_group(req, req.user, False)
2499@view_config(route_name=Routes.SET_OTHER_USER_UPLOAD_GROUP,
2500 permission=Permission.GROUPADMIN,
2501 http_cache=NEVER_CACHE)
2502def set_other_user_upload_group(req: "CamcopsRequest") -> Response:
2503 """
2504 View to set the upload group for another user.
2505 """
2506 user = get_user_from_request_user_id_or_raise(req)
2507 if user.id != req.user.id:
2508 assert_may_edit_user(req, user)
2509 # ... but always OK to edit this for your own user; no such check required
2510 return set_user_upload_group(req, user, True)
2513# noinspection PyTypeChecker
2514@view_config(route_name=Routes.UNLOCK_USER,
2515 permission=Permission.GROUPADMIN,
2516 http_cache=NEVER_CACHE)
2517def unlock_user(req: "CamcopsRequest") -> Response:
2518 """
2519 View to unlock a locked user account.
2520 """
2521 user = get_user_from_request_user_id_or_raise(req)
2522 assert_may_edit_user(req, user)
2523 user.enable(req)
2524 _ = req.gettext
2526 req.session.flash(
2527 _("User {username} enabled").format(username=user.username),
2528 queue=FLASH_SUCCESS
2529 )
2530 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS))
2533@view_config(route_name=Routes.ADD_USER,
2534 permission=Permission.GROUPADMIN,
2535 renderer="user_add.mako",
2536 http_cache=NEVER_CACHE)
2537def add_user(req: "CamcopsRequest") -> Dict[str, Any]:
2538 """
2539 View to add a user.
2540 """
2541 route_back = Routes.VIEW_ALL_USERS
2542 if FormAction.CANCEL in req.POST:
2543 raise HTTPFound(req.route_url(route_back))
2544 if req.user.superuser:
2545 form = AddUserSuperuserForm(request=req)
2546 else:
2547 form = AddUserGroupadminForm(request=req)
2548 dbsession = req.dbsession
2549 if FormAction.SUBMIT in req.POST:
2550 try:
2551 controls = list(req.POST.items())
2552 appstruct = form.validate(controls)
2553 # -----------------------------------------------------------------
2554 # Add the user
2555 # -----------------------------------------------------------------
2556 user = User()
2557 user.username = appstruct.get(ViewParam.USERNAME)
2558 user.set_password(req, appstruct.get(ViewParam.NEW_PASSWORD))
2559 user.must_change_password = appstruct.get(ViewParam.MUST_CHANGE_PASSWORD) # noqa
2560 # We don't ask for language initially; that can be configured
2561 # later. But is is a reasonable guess that it should be the same
2562 # language as used by the person creating the new user.
2563 user.language = req.language
2564 if User.get_user_by_name(dbsession, user.username):
2565 raise HTTPBadRequest(
2566 f"User with username {user.username!r} already exists!")
2567 dbsession.add(user)
2568 group_ids = appstruct.get(ViewParam.GROUP_IDS)
2569 for gid in group_ids:
2570 # noinspection PyUnresolvedReferences
2571 user.user_group_memberships.append(UserGroupMembership(
2572 user_id=user.id,
2573 group_id=gid
2574 ))
2575 raise HTTPFound(req.route_url(route_back))
2576 except ValidationFailure as e:
2577 rendered_form = e.render()
2578 else:
2579 rendered_form = form.render()
2580 return dict(form=rendered_form,
2581 head_form_html=get_head_form_html(req, [form]))
2584def any_records_use_user(req: "CamcopsRequest", user: User) -> bool:
2585 """
2586 Do any records in the database refer to the specified user?
2588 (Used when we're thinking about deleting a user; would it leave broken
2589 references? If so, we will prevent deletion; see :func:`delete_user`.)
2590 """
2591 dbsession = req.dbsession
2592 user_id = user.id
2593 # Device?
2594 q = CountStarSpecializedQuery(Device, session=dbsession)\
2595 .filter(or_(Device.registered_by_user_id == user_id,
2596 Device.uploading_user_id == user_id))
2597 if q.count_star() > 0:
2598 return True
2599 # SpecialNote?
2600 q = CountStarSpecializedQuery(SpecialNote, session=dbsession)\
2601 .filter(SpecialNote.user_id == user_id)
2602 if q.count_star() > 0:
2603 return True
2604 # Audit trail?
2605 q = CountStarSpecializedQuery(AuditEntry, session=dbsession)\
2606 .filter(AuditEntry.user_id == user_id)
2607 if q.count_star() > 0:
2608 return True
2609 # Uploaded records?
2610 for cls in gen_orm_classes_from_base(GenericTabletRecordMixin): # type: Type[GenericTabletRecordMixin] # noqa
2611 # noinspection PyProtectedMember
2612 q = CountStarSpecializedQuery(cls, session=dbsession)\
2613 .filter(or_(cls._adding_user_id == user_id,
2614 cls._removing_user_id == user_id,
2615 cls._preserving_user_id == user_id,
2616 cls._manually_erasing_user_id == user_id))
2617 if q.count_star() > 0:
2618 return True
2619 # No; all clean.
2620 return False
2623@view_config(route_name=Routes.DELETE_USER,
2624 permission=Permission.GROUPADMIN,
2625 renderer="user_delete.mako",
2626 http_cache=NEVER_CACHE)
2627def delete_user(req: "CamcopsRequest") -> Dict[str, Any]:
2628 """
2629 View to delete a user (and make it hard work).
2630 """
2631 if FormAction.CANCEL in req.POST:
2632 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS))
2633 user = get_user_from_request_user_id_or_raise(req)
2634 assert_may_edit_user(req, user)
2635 form = DeleteUserForm(request=req)
2636 rendered_form = ""
2637 error = ""
2638 _ = req.gettext
2639 if user.id == req.user.id:
2640 error = _("Can't delete your own user!")
2641 elif user.may_use_webviewer or user.may_upload:
2642 error = _("Unable to delete user: user still has webviewer login "
2643 "and/or tablet upload permission")
2644 elif user.superuser and (not req.user.superuser):
2645 error = _("Unable to delete user: "
2646 "they are a superuser and you are not")
2647 elif ((not req.user.superuser) and
2648 bool(set(user.group_ids) -
2649 set(req.user.ids_of_groups_user_is_admin_for))):
2650 error = _("Unable to delete user: "
2651 "user belongs to groups that you do not administer")
2652 else:
2653 if any_records_use_user(req, user):
2654 error = _(
2655 "Unable to delete user; records (or audit trails) refer to "
2656 "that user. Disable login and upload permissions instead."
2657 )
2658 else:
2659 if FormAction.DELETE in req.POST:
2660 try:
2661 controls = list(req.POST.items())
2662 appstruct = form.validate(controls)
2663 assert appstruct.get(ViewParam.USER_ID) == user.id
2664 # ---------------------------------------------------------
2665 # Delete the user and associated objects
2666 # ---------------------------------------------------------
2667 # (*) Sessions belonging to this user
2668 # ... done by modifying its ForeignKey to use "ondelete"
2669 # (*) user_group_table mapping
2670 # http://docs.sqlalchemy.org/en/latest/orm/basic_relationships.html#relationships-many-to-many-deletion # noqa
2671 # Simplest way:
2672 user.groups = [] # will delete the mapping entries
2673 # (*) User itself
2674 req.dbsession.delete(user)
2675 # Done
2676 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS))
2677 except ValidationFailure as e:
2678 rendered_form = e.render()
2679 else:
2680 appstruct = {ViewParam.USER_ID: user.id}
2681 rendered_form = form.render(appstruct)
2683 return dict(user=user,
2684 error=error,
2685 form=rendered_form,
2686 head_form_html=get_head_form_html(req, [form]))
2689# =============================================================================
2690# Group management
2691# =============================================================================
2693@view_config(route_name=Routes.VIEW_GROUPS,
2694 permission=Permission.SUPERUSER,
2695 renderer="groups_view.mako",
2696 http_cache=NEVER_CACHE)
2697def view_groups(req: "CamcopsRequest") -> Dict[str, Any]:
2698 """
2699 View to show all groups (with hyperlinks to edit them).
2700 Superusers only.
2701 """
2702 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
2703 DEFAULT_ROWS_PER_PAGE)
2704 page_num = req.get_int_param(ViewParam.PAGE, 1)
2705 dbsession = req.dbsession
2706 groups = dbsession.query(Group).order_by(Group.name).all() # type: List[Group] # noqa
2707 page = CamcopsPage(collection=groups,
2708 page=page_num,
2709 items_per_page=rows_per_page,
2710 url_maker=PageUrl(req),
2711 request=req)
2713 valid_which_idnums = req.valid_which_idnums
2715 return dict(groups_page=page,
2716 valid_which_idnums=valid_which_idnums)
2719def get_group_from_request_group_id_or_raise(req: "CamcopsRequest") -> Group:
2720 """
2721 Returns the :class:`camcops_server.cc_modules.cc_group.Group` represented
2722 by the request's ``ViewParam.GROUP_ID`` parameter, or raise
2723 :exc:`HTTPBadRequest`.
2724 """
2725 group_id = req.get_int_param(ViewParam.GROUP_ID)
2726 group = None
2727 if group_id is not None:
2728 dbsession = req.dbsession
2729 group = dbsession.query(Group).filter(Group.id == group_id).first()
2730 if not group:
2731 _ = req.gettext
2732 raise HTTPBadRequest(f"{_('No such group ID:')} {group_id!r}")
2733 return group
2736class EditGroupView(UpdateView):
2737 """
2738 Django-style view to edit a CamCOPS group.
2739 """
2740 form_class = EditGroupForm
2741 model_form_dict = {
2742 "name": ViewParam.NAME,
2743 "description": ViewParam.DESCRIPTION,
2744 "upload_policy": ViewParam.UPLOAD_POLICY,
2745 "finalize_policy": ViewParam.FINALIZE_POLICY,
2746 }
2747 object_class = Group
2748 pk_param = ViewParam.GROUP_ID
2749 server_pk_name = "id"
2750 template_name = "group_edit.mako"
2752 def get_form_kwargs(self) -> Dict[str, Any]:
2753 kwargs = super().get_form_kwargs()
2755 group = cast(Group, self.object)
2756 kwargs.update(group=group)
2758 return kwargs
2760 def get_form_values(self) -> Dict:
2761 # will populate with model_form_dict
2762 form_values = super().get_form_values()
2764 group = cast(Group, self.object)
2766 other_group_ids = list(group.ids_of_other_groups_group_may_see())
2767 other_groups = Group.get_groups_from_id_list(self.request.dbsession,
2768 other_group_ids)
2769 other_groups.sort(key=lambda g: g.name)
2771 form_values.update({
2772 ViewParam.IP_USE: group.ip_use,
2773 ViewParam.GROUP_ID: group.id,
2774 ViewParam.GROUP_IDS: [g.id for g in other_groups]
2775 })
2777 return form_values
2779 def get_success_url(self) -> str:
2780 return self.request.route_url(Routes.VIEW_GROUPS)
2782 def save_object(self, appstruct: Dict[str, Any]) -> None:
2783 super().save_object(appstruct)
2785 group = cast(Group, self.object)
2787 # Group cross-references
2788 group_ids = appstruct.get(ViewParam.GROUP_IDS)
2789 # The form validation will prevent our own group from being in here
2790 other_groups = Group.get_groups_from_id_list(self.request.dbsession,
2791 group_ids)
2792 group.can_see_other_groups = other_groups
2794 ip_use = appstruct.get(ViewParam.IP_USE)
2795 if group.ip_use is not None:
2796 ip_use.id = group.ip_use.id
2798 group.ip_use = ip_use
2801@view_config(route_name=Routes.EDIT_GROUP,
2802 permission=Permission.SUPERUSER,
2803 http_cache=NEVER_CACHE)
2804def edit_group(req: "CamcopsRequest") -> Response:
2805 """
2806 View to edit a group. Superusers only.
2807 """
2808 return EditGroupView(req).dispatch()
2811@view_config(route_name=Routes.ADD_GROUP,
2812 permission=Permission.SUPERUSER,
2813 renderer="group_add.mako",
2814 http_cache=NEVER_CACHE)
2815def add_group(req: "CamcopsRequest") -> Dict[str, Any]:
2816 """
2817 View to add a group. Superusers only.
2818 """
2819 route_back = Routes.VIEW_GROUPS
2820 if FormAction.CANCEL in req.POST:
2821 raise HTTPFound(req.route_url(route_back))
2822 form = AddGroupForm(request=req)
2823 dbsession = req.dbsession
2824 if FormAction.SUBMIT in req.POST:
2825 try:
2826 controls = list(req.POST.items())
2827 appstruct = form.validate(controls)
2828 # -----------------------------------------------------------------
2829 # Add the group
2830 # -----------------------------------------------------------------
2831 group = Group()
2832 group.name = appstruct.get(ViewParam.NAME)
2833 dbsession.add(group)
2834 raise HTTPFound(req.route_url(route_back))
2835 except ValidationFailure as e:
2836 rendered_form = e.render()
2837 else:
2838 rendered_form = form.render()
2839 return dict(form=rendered_form,
2840 head_form_html=get_head_form_html(req, [form]))
2843def any_records_use_group(req: "CamcopsRequest", group: Group) -> bool:
2844 """
2845 Do any records in the database refer to the specified group?
2847 (Used when we're thinking about deleting a group; would it leave broken
2848 references? If so, we will prevent deletion; see :func:`delete_group`.)
2849 """
2850 dbsession = req.dbsession
2851 group_id = group.id
2852 # Our own or users filtering on us?
2853 # ... doesn't matter; see TaskFilter; stored as a CSV list so not part of
2854 # database integrity checks.
2855 # Uploaded records?
2856 for cls in gen_orm_classes_from_base(GenericTabletRecordMixin): # type: Type[GenericTabletRecordMixin] # noqa
2857 # noinspection PyProtectedMember
2858 q = CountStarSpecializedQuery(cls, session=dbsession)\
2859 .filter(cls._group_id == group_id)
2860 if q.count_star() > 0:
2861 return True
2862 # No; all clean.
2863 return False
2866@view_config(route_name=Routes.DELETE_GROUP,
2867 permission=Permission.SUPERUSER,
2868 renderer="group_delete.mako",
2869 http_cache=NEVER_CACHE)
2870def delete_group(req: "CamcopsRequest") -> Dict[str, Any]:
2871 """
2872 View to delete a group. Superusers only.
2873 """
2874 route_back = Routes.VIEW_GROUPS
2875 if FormAction.CANCEL in req.POST:
2876 raise HTTPFound(req.route_url(route_back))
2877 group = get_group_from_request_group_id_or_raise(req)
2878 form = DeleteGroupForm(request=req)
2879 rendered_form = ""
2880 error = ""
2881 _ = req.gettext
2882 if group.users:
2883 error = _("Unable to delete group; there are users who are members!")
2884 else:
2885 if any_records_use_group(req, group):
2886 error = _("Unable to delete group; records refer to it.")
2887 else:
2888 if FormAction.DELETE in req.POST:
2889 try:
2890 controls = list(req.POST.items())
2891 appstruct = form.validate(controls)
2892 assert appstruct.get(ViewParam.GROUP_ID) == group.id
2893 # ---------------------------------------------------------
2894 # Delete the group
2895 # ---------------------------------------------------------
2896 req.dbsession.delete(group)
2897 raise HTTPFound(req.route_url(route_back))
2898 except ValidationFailure as e:
2899 rendered_form = e.render()
2900 else:
2901 appstruct = {ViewParam.GROUP_ID: group.id}
2902 rendered_form = form.render(appstruct)
2903 return dict(group=group,
2904 error=error,
2905 form=rendered_form,
2906 head_form_html=get_head_form_html(req, [form]))
2909# =============================================================================
2910# Edit server settings
2911# =============================================================================
2913@view_config(route_name=Routes.EDIT_SERVER_SETTINGS,
2914 permission=Permission.SUPERUSER,
2915 renderer="server_settings_edit.mako",
2916 http_cache=NEVER_CACHE)
2917def edit_server_settings(req: "CamcopsRequest") -> Dict[str, Any]:
2918 """
2919 View to edit server settings (like the database title).
2920 """
2921 if FormAction.CANCEL in req.POST:
2922 raise HTTPFound(req.route_url(Routes.HOME))
2923 form = EditServerSettingsForm(request=req)
2924 if FormAction.SUBMIT in req.POST:
2925 try:
2926 controls = list(req.POST.items())
2927 appstruct = form.validate(controls)
2928 title = appstruct.get(ViewParam.DATABASE_TITLE)
2929 # -----------------------------------------------------------------
2930 # Apply changes
2931 # -----------------------------------------------------------------
2932 req.set_database_title(title)
2933 raise HTTPFound(req.route_url(Routes.HOME))
2934 except ValidationFailure as e:
2935 rendered_form = e.render()
2936 else:
2937 title = req.database_title
2938 appstruct = {ViewParam.DATABASE_TITLE: title}
2939 rendered_form = form.render(appstruct)
2940 return dict(form=rendered_form,
2941 head_form_html=get_head_form_html(req, [form]))
2944@view_config(route_name=Routes.VIEW_ID_DEFINITIONS,
2945 permission=Permission.SUPERUSER,
2946 renderer="id_definitions_view.mako",
2947 http_cache=NEVER_CACHE)
2948def view_id_definitions(req: "CamcopsRequest") -> Dict[str, Any]:
2949 """
2950 View to show all ID number definitions (with hyperlinks to edit them).
2951 Superusers only.
2952 """
2953 return dict(
2954 idnum_definitions=req.idnum_definitions,
2955 )
2958def get_iddef_from_request_which_idnum_or_raise(
2959 req: "CamcopsRequest") -> IdNumDefinition:
2960 """
2961 Returns the :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition`
2962 represented by the request's ``ViewParam.WHICH_IDNUM`` parameter, or raise
2963 :exc:`HTTPBadRequest`.
2964 """
2965 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
2966 iddef = req.dbsession.query(IdNumDefinition)\
2967 .filter(IdNumDefinition.which_idnum == which_idnum)\
2968 .first()
2969 if not iddef:
2970 _ = req.gettext
2971 raise HTTPBadRequest(f"{_('No such ID definition:')} {which_idnum!r}")
2972 return iddef
2975@view_config(route_name=Routes.EDIT_ID_DEFINITION,
2976 permission=Permission.SUPERUSER,
2977 renderer="id_definition_edit.mako",
2978 http_cache=NEVER_CACHE)
2979def edit_id_definition(req: "CamcopsRequest") -> Dict[str, Any]:
2980 """
2981 View to edit an ID number definition. Superusers only.
2982 """
2983 route_back = Routes.VIEW_ID_DEFINITIONS
2984 if FormAction.CANCEL in req.POST:
2985 raise HTTPFound(req.route_url(route_back))
2986 iddef = get_iddef_from_request_which_idnum_or_raise(req)
2987 form = EditIdDefinitionForm(request=req)
2988 if FormAction.SUBMIT in req.POST:
2989 try:
2990 controls = list(req.POST.items())
2991 appstruct = form.validate(controls)
2992 # -----------------------------------------------------------------
2993 # Alter the ID definition
2994 # -----------------------------------------------------------------
2995 iddef.description = appstruct.get(ViewParam.DESCRIPTION)
2996 iddef.short_description = appstruct.get(ViewParam.SHORT_DESCRIPTION) # noqa
2997 iddef.validation_method = appstruct.get(ViewParam.VALIDATION_METHOD) # noqa
2998 iddef.hl7_id_type = appstruct.get(ViewParam.HL7_ID_TYPE)
2999 iddef.hl7_assigning_authority = appstruct.get(ViewParam.HL7_ASSIGNING_AUTHORITY) # noqa
3000 # REMOVED # clear_idnum_definition_cache() # SPECIAL
3001 raise HTTPFound(req.route_url(route_back))
3002 except ValidationFailure as e:
3003 rendered_form = e.render()
3004 else:
3005 appstruct = {
3006 ViewParam.WHICH_IDNUM: iddef.which_idnum,
3007 ViewParam.DESCRIPTION: iddef.description or "",
3008 ViewParam.SHORT_DESCRIPTION: iddef.short_description or "",
3009 ViewParam.VALIDATION_METHOD: iddef.validation_method or "",
3010 ViewParam.HL7_ID_TYPE: iddef.hl7_id_type or "",
3011 ViewParam.HL7_ASSIGNING_AUTHORITY: iddef.hl7_assigning_authority or "", # noqa
3012 }
3013 rendered_form = form.render(appstruct)
3014 return dict(iddef=iddef,
3015 form=rendered_form,
3016 head_form_html=get_head_form_html(req, [form]))
3019@view_config(route_name=Routes.ADD_ID_DEFINITION,
3020 permission=Permission.SUPERUSER,
3021 renderer="id_definition_add.mako",
3022 http_cache=NEVER_CACHE)
3023def add_id_definition(req: "CamcopsRequest") -> Dict[str, Any]:
3024 """
3025 View to add an ID number definition. Superusers only.
3026 """
3027 route_back = Routes.VIEW_ID_DEFINITIONS
3028 if FormAction.CANCEL in req.POST:
3029 raise HTTPFound(req.route_url(route_back))
3030 form = AddIdDefinitionForm(request=req)
3031 dbsession = req.dbsession
3032 if FormAction.SUBMIT in req.POST:
3033 try:
3034 controls = list(req.POST.items())
3035 appstruct = form.validate(controls)
3036 iddef = IdNumDefinition(
3037 which_idnum=appstruct.get(ViewParam.WHICH_IDNUM),
3038 description=appstruct.get(ViewParam.DESCRIPTION),
3039 short_description=appstruct.get(ViewParam.SHORT_DESCRIPTION),
3040 # we skip hl7_id_type at this stage
3041 # we skip hl7_assigning_authority at this stage
3042 validation_method=appstruct.get(ViewParam.VALIDATION_METHOD),
3043 )
3044 # -----------------------------------------------------------------
3045 # Add ID definition
3046 # -----------------------------------------------------------------
3047 dbsession.add(iddef)
3048 # REMOVED # clear_idnum_definition_cache() # SPECIAL
3049 raise HTTPFound(req.route_url(route_back))
3050 except ValidationFailure as e:
3051 rendered_form = e.render()
3052 else:
3053 rendered_form = form.render()
3054 return dict(form=rendered_form,
3055 head_form_html=get_head_form_html(req, [form]))
3058def any_records_use_iddef(req: "CamcopsRequest",
3059 iddef: IdNumDefinition) -> bool:
3060 """
3061 Do any records in the database refer to the specified ID number definition?
3063 (Used when we're thinking about deleting one; would it leave broken
3064 references? If so, we will prevent deletion; see
3065 :func:`delete_id_definition`.)
3066 """
3067 # Helpfully, these are only referred to permanently from one place:
3068 q = CountStarSpecializedQuery(PatientIdNum, session=req.dbsession)\
3069 .filter(PatientIdNum.which_idnum == iddef.which_idnum)
3070 if q.count_star() > 0:
3071 return True
3072 # No; all clean.
3073 return False
3076@view_config(route_name=Routes.DELETE_ID_DEFINITION,
3077 permission=Permission.SUPERUSER,
3078 renderer="id_definition_delete.mako",
3079 http_cache=NEVER_CACHE)
3080def delete_id_definition(req: "CamcopsRequest") -> Dict[str, Any]:
3081 """
3082 View to delete an ID number definition. Superusers only.
3083 """
3084 route_back = Routes.VIEW_ID_DEFINITIONS
3085 if FormAction.CANCEL in req.POST:
3086 raise HTTPFound(req.route_url(route_back))
3087 iddef = get_iddef_from_request_which_idnum_or_raise(req)
3088 form = DeleteIdDefinitionForm(request=req)
3089 rendered_form = ""
3090 error = ""
3091 if any_records_use_iddef(req, iddef):
3092 _ = req.gettext
3093 error = _("Unable to delete ID definition; records refer to it.")
3094 else:
3095 if FormAction.DELETE in req.POST:
3096 try:
3097 controls = list(req.POST.items())
3098 appstruct = form.validate(controls)
3099 assert appstruct.get(ViewParam.WHICH_IDNUM) == iddef.which_idnum # noqa
3100 # -------------------------------------------------------------
3101 # Delete ID definition
3102 # -------------------------------------------------------------
3103 req.dbsession.delete(iddef)
3104 # REMOVED # clear_idnum_definition_cache() # SPECIAL
3105 raise HTTPFound(req.route_url(route_back))
3106 except ValidationFailure as e:
3107 rendered_form = e.render()
3108 else:
3109 appstruct = {ViewParam.WHICH_IDNUM: iddef.which_idnum}
3110 rendered_form = form.render(appstruct)
3111 return dict(iddef=iddef,
3112 error=error,
3113 form=rendered_form,
3114 head_form_html=get_head_form_html(req, [form]))
3117# =============================================================================
3118# Altering data. Some of the more complex logic is here.
3119# =============================================================================
3121@view_config(route_name=Routes.ADD_SPECIAL_NOTE,
3122 renderer="special_note_add.mako",
3123 http_cache=NEVER_CACHE)
3124def add_special_note(req: "CamcopsRequest") -> Dict[str, Any]:
3125 """
3126 View to add a special note to a task (after confirmation).
3127 """
3128 table_name = req.get_str_param(ViewParam.TABLE_NAME,
3129 validator=validate_task_tablename)
3130 server_pk = req.get_int_param(ViewParam.SERVER_PK, None)
3131 url_back = req.route_url(
3132 Routes.TASK,
3133 _query={
3134 ViewParam.TABLE_NAME: table_name,
3135 ViewParam.SERVER_PK: server_pk,
3136 ViewParam.VIEWTYPE: ViewArg.HTML,
3137 }
3138 )
3139 if FormAction.CANCEL in req.POST:
3140 raise HTTPFound(url_back)
3141 task = task_factory(req, table_name, server_pk)
3142 _ = req.gettext
3143 if task is None:
3144 raise HTTPBadRequest(
3145 f"{_('No such task:')} {table_name}, PK={server_pk}")
3146 user = req.user
3147 if not user.authorized_to_add_special_note(task.group_id):
3148 raise HTTPBadRequest(
3149 _("Not authorized to add special notes for this task's group"))
3150 form = AddSpecialNoteForm(request=req)
3151 if FormAction.SUBMIT in req.POST:
3152 try:
3153 controls = list(req.POST.items())
3154 appstruct = form.validate(controls)
3155 note = appstruct.get(ViewParam.NOTE)
3156 # -----------------------------------------------------------------
3157 # Apply special note
3158 # -----------------------------------------------------------------
3159 task.apply_special_note(req, note)
3160 raise HTTPFound(url_back)
3161 except ValidationFailure as e:
3162 rendered_form = e.render()
3163 else:
3164 appstruct = {
3165 ViewParam.TABLE_NAME: table_name,
3166 ViewParam.SERVER_PK: server_pk,
3167 }
3168 rendered_form = form.render(appstruct)
3169 return dict(task=task,
3170 form=rendered_form,
3171 head_form_html=get_head_form_html(req, [form]),
3172 viewtype=ViewArg.HTML)
3175@view_config(route_name=Routes.DELETE_SPECIAL_NOTE,
3176 renderer="special_note_delete.mako",
3177 http_cache=NEVER_CACHE)
3178def delete_special_note(req: "CamcopsRequest") -> Dict[str, Any]:
3179 """
3180 View to delete a special note (after confirmation).
3181 """
3182 note_id = req.get_int_param(ViewParam.NOTE_ID, None)
3183 url_back = req.route_url(Routes.HOME)
3184 # ... too fiddly to be more precise as we could be routing back to the task
3185 # relating to a patient relating to this special note
3186 if FormAction.CANCEL in req.POST:
3187 raise HTTPFound(url_back)
3188 sn = SpecialNote.get_specialnote_by_id(req.dbsession, note_id)
3189 _ = req.gettext
3190 if sn is None:
3191 raise HTTPBadRequest(f"{_('No such SpecialNote:')} note_id={note_id}")
3192 if sn.hidden:
3193 raise HTTPBadRequest(f"{_('SpecialNote already deleted/hidden:')} "
3194 f"note_id={note_id}")
3195 if not sn.user_may_delete_specialnote(req.user):
3196 raise HTTPBadRequest(_("Not authorized to delete this special note"))
3197 form = DeleteSpecialNoteForm(request=req)
3198 if FormAction.SUBMIT in req.POST:
3199 try:
3200 controls = list(req.POST.items())
3201 form.validate(controls)
3202 # -----------------------------------------------------------------
3203 # Delete special note
3204 # -----------------------------------------------------------------
3205 sn.hidden = True
3206 raise HTTPFound(url_back)
3207 except ValidationFailure as e:
3208 rendered_form = e.render()
3209 else:
3210 appstruct = {
3211 ViewParam.NOTE_ID: note_id,
3212 }
3213 rendered_form = form.render(appstruct)
3214 return dict(sn=sn,
3215 form=rendered_form,
3216 head_form_html=get_head_form_html(req, [form]))
3219class EraseTaskBaseView(DeleteView):
3220 """
3221 Django-style view to erase a task.
3222 """
3223 form_class = EraseTaskForm
3225 def get_object(self) -> Any:
3226 # noinspection PyAttributeOutsideInit
3227 self.table_name = self.request.get_str_param(
3228 ViewParam.TABLE_NAME, validator=validate_task_tablename)
3229 # noinspection PyAttributeOutsideInit
3230 self.server_pk = self.request.get_int_param(ViewParam.SERVER_PK, None)
3232 task = task_factory(self.request, self.table_name, self.server_pk)
3233 _ = self.request.gettext
3234 if task is None:
3235 raise HTTPBadRequest(
3236 f"{_('No such task:')} {self.table_name}, PK={self.server_pk}")
3237 if task.is_live_on_tablet():
3238 raise HTTPBadRequest(errormsg_task_live(self.request))
3239 self.check_user_is_authorized(task)
3241 return task
3243 def check_user_is_authorized(self, task: Task) -> None:
3244 if not self.request.user.authorized_to_erase_tasks(task.group_id):
3245 _ = self.request.gettext
3246 raise HTTPBadRequest(
3247 _("Not authorized to erase tasks for this task's group"))
3249 def get_cancel_url(self) -> str:
3250 return self.request.route_url(
3251 Routes.TASK,
3252 _query={
3253 ViewParam.TABLE_NAME: self.table_name,
3254 ViewParam.SERVER_PK: self.server_pk,
3255 ViewParam.VIEWTYPE: ViewArg.HTML,
3256 }
3257 )
3260class EraseTaskLeavingPlaceholderView(EraseTaskBaseView):
3261 """
3262 Django-style view to erase data from a task, leaving an empty
3263 "placeholder".
3264 """
3265 template_name = "task_erase.mako"
3267 def get_object(self) -> Any:
3268 task = cast(Task, super().get_object())
3269 if task.is_erased():
3270 _ = self.request.gettext
3271 raise HTTPBadRequest(_("Task already erased"))
3273 return task
3275 def delete(self) -> None:
3276 task = cast(Task, self.object)
3278 task.manually_erase(self.request)
3280 def get_success_url(self) -> str:
3281 return self.request.route_url(
3282 Routes.TASK,
3283 _query={
3284 ViewParam.TABLE_NAME: self.table_name,
3285 ViewParam.SERVER_PK: self.server_pk,
3286 ViewParam.VIEWTYPE: ViewArg.HTML,
3287 }
3288 )
3291class EraseTaskEntirelyView(EraseTaskBaseView):
3292 """
3293 Django-style view to erase (delete) a task entirely.
3294 """
3295 template_name = "task_erase_entirely.mako"
3297 def delete(self) -> None:
3298 task = cast(Task, self.object)
3300 TaskIndexEntry.unindex_task(task, self.request.dbsession)
3301 task.delete_entirely(self.request)
3303 _ = self.request.gettext
3305 msg_erased = _("Task erased:")
3307 self.request.session.flash(
3308 f"{msg_erased} ({self.table_name}, server PK {self.server_pk}).",
3309 queue=FLASH_SUCCESS
3310 )
3312 def get_success_url(self) -> str:
3313 return self.request.route_url(Routes.VIEW_TASKS)
3316@view_config(route_name=Routes.ERASE_TASK_LEAVING_PLACEHOLDER,
3317 permission=Permission.GROUPADMIN,
3318 http_cache=NEVER_CACHE)
3319def erase_task_leaving_placeholder(req: "CamcopsRequest") -> Response:
3320 """
3321 View to wipe all data from a task (after confirmation).
3323 Leaves the task record as a placeholder.
3324 """
3325 return EraseTaskLeavingPlaceholderView(req).dispatch()
3328@view_config(route_name=Routes.ERASE_TASK_ENTIRELY,
3329 permission=Permission.GROUPADMIN,
3330 http_cache=NEVER_CACHE)
3331def erase_task_entirely(req: "CamcopsRequest") -> Response:
3332 """
3333 View to erase a task from the database entirely (after confirmation).
3334 """
3335 return EraseTaskEntirelyView(req).dispatch()
3338@view_config(route_name=Routes.DELETE_PATIENT,
3339 permission=Permission.GROUPADMIN,
3340 http_cache=NEVER_CACHE)
3341def delete_patient(req: "CamcopsRequest") -> Response:
3342 """
3343 View to delete completely all data for a patient (after confirmation),
3344 within a specific group.
3345 """
3346 if FormAction.CANCEL in req.POST:
3347 raise HTTPFound(req.route_url(Routes.HOME))
3349 first_form = DeletePatientChooseForm(request=req)
3350 second_form = DeletePatientConfirmForm(request=req)
3351 form = None
3352 final_phase = False
3353 if FormAction.SUBMIT in req.POST:
3354 # FIRST form has been submitted
3355 form = first_form
3356 elif FormAction.DELETE in req.POST:
3357 # SECOND AND FINAL form has been submitted
3358 form = second_form
3359 final_phase = True
3360 _ = req.gettext
3361 if form is not None:
3362 try:
3363 controls = list(req.POST.items())
3364 appstruct = form.validate(controls)
3365 which_idnum = appstruct.get(ViewParam.WHICH_IDNUM)
3366 idnum_value = appstruct.get(ViewParam.IDNUM_VALUE)
3367 group_id = appstruct.get(ViewParam.GROUP_ID)
3368 if group_id not in req.user.ids_of_groups_user_is_admin_for:
3369 # rare occurrence; form should prevent it;
3370 # unless superuser has changed status since form was read
3371 raise HTTPBadRequest(_("You're not an admin for this group"))
3372 # -----------------------------------------------------------------
3373 # Fetch tasks to be deleted.
3374 # -----------------------------------------------------------------
3375 dbsession = req.dbsession
3376 # Tasks first:
3377 idnum_ref = IdNumReference(which_idnum=which_idnum,
3378 idnum_value=idnum_value)
3379 taskfilter = TaskFilter()
3380 taskfilter.idnum_criteria = [idnum_ref]
3381 taskfilter.group_ids = [group_id]
3382 collection = TaskCollection(
3383 req=req,
3384 taskfilter=taskfilter,
3385 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
3386 current_only=False # unusual option!
3387 )
3388 tasks = collection.all_tasks
3389 n_tasks = len(tasks)
3390 patient_lineage_instances = Patient.get_patients_by_idnum(
3391 dbsession=dbsession,
3392 which_idnum=which_idnum,
3393 idnum_value=idnum_value,
3394 group_id=group_id,
3395 current_only=False
3396 )
3397 n_patient_instances = len(patient_lineage_instances)
3399 # -----------------------------------------------------------------
3400 # Bin out at this stage and offer confirmation page?
3401 # -----------------------------------------------------------------
3402 if not final_phase:
3403 # New appstruct; we don't want the validation code persisting
3404 appstruct = {
3405 ViewParam.WHICH_IDNUM: which_idnum,
3406 ViewParam.IDNUM_VALUE: idnum_value,
3407 ViewParam.GROUP_ID: group_id,
3408 }
3409 rendered_form = second_form.render(appstruct)
3410 return render_to_response(
3411 "patient_delete_confirm.mako",
3412 dict(
3413 form=rendered_form,
3414 tasks=tasks,
3415 n_patient_instances=n_patient_instances,
3416 head_form_html=get_head_form_html(req, [form])
3417 ),
3418 request=req
3419 )
3421 # -----------------------------------------------------------------
3422 # Delete patient and associated tasks
3423 # -----------------------------------------------------------------
3424 for task in tasks:
3425 TaskIndexEntry.unindex_task(task, req.dbsession)
3426 task.delete_entirely(req)
3427 # Then patients:
3428 for p in patient_lineage_instances:
3429 PatientIdNumIndexEntry.unindex_patient(p, req.dbsession)
3430 p.delete_with_dependants(req)
3431 msg = (
3432 f"{_('Patient and associated tasks DELETED from group')} "
3433 f"{group_id}: idnum{which_idnum} = {idnum_value}. "
3434 f"{_('Task records deleted:')} {n_tasks}."
3435 f"{_('Patient records (current and/or old) deleted')} "
3436 f"{n_patient_instances}."
3437 )
3438 audit(req, msg)
3440 req.session.flash(msg, FLASH_SUCCESS)
3441 raise HTTPFound(req.route_url(Routes.HOME))
3443 except ValidationFailure as e:
3444 rendered_form = e.render()
3445 else:
3446 form = first_form
3447 rendered_form = first_form.render()
3448 return render_to_response(
3449 "patient_delete_choose.mako",
3450 dict(
3451 form=rendered_form,
3452 head_form_html=get_head_form_html(req, [form])
3453 ),
3454 request=req
3455 )
3458@view_config(route_name=Routes.FORCIBLY_FINALIZE,
3459 permission=Permission.GROUPADMIN,
3460 http_cache=NEVER_CACHE)
3461def forcibly_finalize(req: "CamcopsRequest") -> Response:
3462 """
3463 View to force-finalize all live (``_era == ERA_NOW``) records from a
3464 device. Available to group administrators if all those records are within
3465 their groups (otherwise, it's a superuser operation).
3466 """
3467 if FormAction.CANCEL in req.POST:
3468 return HTTPFound(req.route_url(Routes.HOME))
3470 dbsession = req.dbsession
3471 first_form = ForciblyFinalizeChooseDeviceForm(request=req)
3472 second_form = ForciblyFinalizeConfirmForm(request=req)
3473 form = None
3474 final_phase = False
3475 if FormAction.SUBMIT in req.POST:
3476 # FIRST form has been submitted
3477 form = first_form
3478 elif FormAction.FINALIZE in req.POST:
3479 # SECOND form has been submitted:
3480 form = second_form
3481 final_phase = True
3482 _ = req.gettext
3483 if form is not None:
3484 try:
3485 controls = list(req.POST.items())
3486 appstruct = form.validate(controls)
3487 # log.debug("{}", pformat(appstruct))
3488 device_id = appstruct.get(ViewParam.DEVICE_ID)
3489 device = Device.get_device_by_id(dbsession, device_id)
3490 if device is None:
3491 raise HTTPBadRequest(f"{_('No such device:')} {device_id!r}")
3492 # -----------------------------------------------------------------
3493 # If at the first stage, bin out and offer confirmation page
3494 # -----------------------------------------------------------------
3495 if not final_phase:
3496 appstruct = {ViewParam.DEVICE_ID: device_id}
3497 rendered_form = second_form.render(appstruct)
3498 taskfilter = TaskFilter()
3499 taskfilter.device_ids = [device_id]
3500 taskfilter.era = ERA_NOW
3501 collection = TaskCollection(
3502 req=req,
3503 taskfilter=taskfilter,
3504 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
3505 current_only=False, # unusual option!
3506 via_index=False # required for current_only=False
3507 )
3508 tasks = collection.all_tasks
3509 return render_to_response(
3510 "device_forcibly_finalize_confirm.mako",
3511 dict(form=rendered_form,
3512 tasks=tasks,
3513 head_form_html=get_head_form_html(req, [form])),
3514 request=req
3515 )
3516 # -----------------------------------------------------------------
3517 # Check it's permitted
3518 # -----------------------------------------------------------------
3519 if not req.user.superuser:
3520 admin_group_ids = req.user.ids_of_groups_user_is_admin_for
3521 for clienttable in CLIENT_TABLE_MAP.values():
3522 # noinspection PyPropertyAccess
3523 count_query = (
3524 select([func.count()])
3525 .select_from(clienttable)
3526 .where(clienttable.c[FN_DEVICE_ID] == device_id)
3527 .where(clienttable.c[FN_ERA] == ERA_NOW)
3528 .where(clienttable.c[FN_GROUP_ID].notin_(admin_group_ids)) # noqa
3529 )
3530 n = dbsession.execute(count_query).scalar()
3531 if n > 0:
3532 raise HTTPBadRequest(
3533 _("Some records for this device are in groups for "
3534 "which you are not an administrator"))
3535 # -----------------------------------------------------------------
3536 # Forcibly finalize
3537 # -----------------------------------------------------------------
3538 msgs = [] # type: List[str]
3539 batchdetails = BatchDetails(batchtime=req.now_utc)
3540 alltables = sorted(CLIENT_TABLE_MAP.values(),
3541 key=upload_commit_order_sorter)
3542 for clienttable in alltables:
3543 liverecs = get_server_live_records(
3544 req, device_id, clienttable, current_only=False)
3545 preservation_pks = [r.server_pk for r in liverecs]
3546 if not preservation_pks:
3547 continue
3548 current_pks = [r.server_pk for r in liverecs if r.current]
3549 tablechanges = UploadTableChanges(clienttable)
3550 tablechanges.note_preservation_pks(preservation_pks)
3551 tablechanges.note_current_pks(current_pks)
3552 dbsession.execute(
3553 update(clienttable)
3554 .where(clienttable.c[FN_PK].in_(preservation_pks))
3555 .values(values_preserve_now(req, batchdetails,
3556 forcibly_preserved=True))
3557 )
3558 update_indexes_and_push_exports(req, batchdetails, tablechanges)
3559 msgs.append(f"{clienttable.name} {preservation_pks}")
3560 # Field names are different in server-side tables, so they need
3561 # special handling:
3562 SpecialNote.forcibly_preserve_special_notes_for_device(req,
3563 device_id)
3564 # -----------------------------------------------------------------
3565 # Done
3566 # -----------------------------------------------------------------
3567 msg = (
3568 f"{_('Live records for device')} {device_id} "
3569 f"({device.friendly_name}) {_('forcibly finalized')} "
3570 f"(PKs: {'; '.join(msgs)})"
3571 )
3572 audit(req, msg)
3573 log.info(msg)
3575 req.session.flash(msg, queue=FLASH_SUCCESS)
3576 raise HTTPFound(req.route_url(Routes.HOME))
3578 except ValidationFailure as e:
3579 rendered_form = e.render()
3580 else:
3581 form = first_form
3582 rendered_form = form.render() # no appstruct
3583 return render_to_response(
3584 "device_forcibly_finalize_choose.mako",
3585 dict(form=rendered_form,
3586 head_form_html=get_head_form_html(req, [form])),
3587 request=req
3588 )
3591# =============================================================================
3592# Patient creation/editing (primarily for task scheduling)
3593# =============================================================================
3595class PatientMixin(object):
3596 """
3597 Mixin for views involving a patient.
3598 """
3599 object: Any
3600 object_class = Patient
3601 server_pk_name = "_pk"
3603 model_form_dict = {
3604 "forename": ViewParam.FORENAME,
3605 "surname": ViewParam.SURNAME,
3606 "dob": ViewParam.DOB,
3607 "sex": ViewParam.SEX,
3608 "email": ViewParam.EMAIL,
3609 "address": ViewParam.ADDRESS,
3610 "gp": ViewParam.GP,
3611 "other": ViewParam.OTHER,
3612 }
3614 def get_form_values(self) -> Dict:
3615 # will populate with model_form_dict
3616 # noinspection PyUnresolvedReferences
3617 form_values = super().get_form_values()
3619 patient = cast(Patient, self.object)
3621 if patient is not None:
3622 form_values[ViewParam.SERVER_PK] = patient.pk
3623 form_values[ViewParam.GROUP_ID] = patient.group.id
3624 form_values[ViewParam.ID_REFERENCES] = [
3625 {ViewParam.WHICH_IDNUM: pidnum.which_idnum,
3626 ViewParam.IDNUM_VALUE: pidnum.idnum_value}
3627 for pidnum in patient.idnums
3628 ]
3629 form_values[ViewParam.TASK_SCHEDULES] = [
3630 {
3631 ViewParam.SCHEDULE_ID: pts.schedule_id,
3632 ViewParam.START_DATETIME: pts.start_datetime,
3633 ViewParam.SETTINGS: pts.settings,
3634 }
3635 for pts in patient.task_schedules
3636 ]
3638 return form_values
3641class EditPatientBaseView(PatientMixin, UpdateView):
3642 """
3643 View to edit details for a patient.
3644 """
3645 pk_param = ViewParam.SERVER_PK
3647 def get_object(self) -> Any:
3648 patient = cast(Patient, super().get_object())
3650 _ = self.request.gettext
3652 if not patient.group:
3653 raise HTTPBadRequest(_("Bad patient: not in a group"))
3655 if not patient.user_may_edit(self.request):
3656 raise HTTPBadRequest(_("Not authorized to edit this patient"))
3658 return patient
3660 def save_object(self, appstruct: Dict[str, Any]) -> None:
3661 # -----------------------------------------------------------------
3662 # Apply edits
3663 # -----------------------------------------------------------------
3664 # Calculate the changes, and apply them to the Patient object
3665 _ = self.request.gettext
3667 patient = cast(Patient, self.object)
3669 changes = OrderedDict() # type: OrderedDict
3671 self.save_changes(appstruct, changes)
3673 if not changes:
3674 self.request.session.flash(
3675 f"{_('No changes required for patient record with server PK')} " # noqa
3676 f"{patient.pk} {_('(all new values matched old values)')}",
3677 queue=FLASH_INFO
3678 )
3679 return
3681 # Below here, changes have definitely been made.
3682 change_msg = (
3683 _("Patient details edited. Changes:") + " " + "; ".join(
3684 f"{k}: {old!r} → {new!r}"
3685 for k, (old, new) in changes.items()
3686 )
3687 )
3689 # Apply special note to patient
3690 patient.apply_special_note(self.request, change_msg,
3691 "Patient edited")
3693 # Patient details changed, so resend any tasks via HL7
3694 for task in self.get_affected_tasks():
3695 task.cancel_from_export_log(self.request)
3697 # Done
3698 self.request.session.flash(
3699 f"{_('Amended patient record with server PK')} "
3700 f"{patient.pk}. "
3701 f"{_('Changes were:')} {change_msg}",
3702 queue=FLASH_SUCCESS
3703 )
3705 def save_changes(self,
3706 appstruct: Dict[str, Any], changes: OrderedDict) -> None:
3707 self._save_simple_params(appstruct, changes)
3708 self._save_idrefs(appstruct, changes)
3710 def _save_simple_params(self,
3711 appstruct: Dict[str, Any],
3712 changes: OrderedDict) -> None:
3713 patient = cast(Patient, self.object)
3714 for k in EDIT_PATIENT_SIMPLE_PARAMS:
3715 new_value = appstruct.get(k)
3716 old_value = getattr(patient, k)
3717 if new_value == old_value:
3718 continue
3719 if new_value in [None, ""] and old_value in [None, ""]:
3720 # Nothing really changing!
3721 continue
3722 changes[k] = (old_value, new_value)
3723 setattr(patient, k, new_value)
3725 def _save_idrefs(self,
3726 appstruct: Dict[str, Any],
3727 changes: OrderedDict) -> None:
3729 # The ID numbers are more complex.
3730 # log.debug("{}", pformat(appstruct))
3731 patient = cast(Patient, self.object)
3732 new_idrefs = [
3733 IdNumReference(which_idnum=idrefdict[ViewParam.WHICH_IDNUM],
3734 idnum_value=idrefdict[ViewParam.IDNUM_VALUE])
3735 for idrefdict in appstruct.get(ViewParam.ID_REFERENCES, {})
3736 ]
3737 for idnum in patient.idnums:
3738 matching_idref = next(
3739 (idref for idref in new_idrefs
3740 if idref.which_idnum == idnum.which_idnum), None)
3741 if not matching_idref:
3742 # Delete ID numbers not present in the new set
3743 changes["idnum{} ({})".format(
3744 idnum.which_idnum,
3745 self.request.get_id_desc(idnum.which_idnum))
3746 ] = (idnum.idnum_value, None)
3747 idnum.mark_as_deleted(self.request)
3748 elif matching_idref.idnum_value != idnum.idnum_value:
3749 # Modify altered ID numbers present in the old + new sets
3750 changes["idnum{} ({})".format(
3751 idnum.which_idnum,
3752 self.request.get_id_desc(idnum.which_idnum))
3753 ] = (idnum.idnum_value, matching_idref.idnum_value)
3754 new_idnum = PatientIdNum()
3755 new_idnum.id = idnum.id
3756 new_idnum.patient_id = idnum.patient_id
3757 new_idnum.which_idnum = idnum.which_idnum
3758 new_idnum.idnum_value = matching_idref.idnum_value
3759 new_idnum.set_predecessor(self.request, idnum)
3761 for idref in new_idrefs:
3762 matching_idnum = next(
3763 (idnum for idnum in patient.idnums
3764 if idnum.which_idnum == idref.which_idnum), None)
3765 if not matching_idnum:
3766 # Create ID numbers where they were absent
3767 changes["idnum{} ({})".format(
3768 idref.which_idnum,
3769 self.request.get_id_desc(idref.which_idnum))
3770 ] = (None, idref.idnum_value)
3771 # We need to establish an "id" field, which is the PK as
3772 # seen by the tablet. The tablet has lost interest in these
3773 # records, since _era != ERA_NOW, so all we have to do is
3774 # pick a number that's not in use.
3775 new_idnum = PatientIdNum()
3776 new_idnum.patient_id = patient.id
3777 new_idnum.which_idnum = idref.which_idnum
3778 new_idnum.idnum_value = idref.idnum_value
3779 new_idnum.create_fresh(self.request,
3780 device_id=patient.device_id,
3781 era=patient.era,
3782 group_id=patient.group_id)
3783 new_idnum.save_with_next_available_id(
3784 self.request,
3785 patient.device_id,
3786 era=patient.era
3787 )
3789 def get_context_data(self, **kwargs: Any) -> Any:
3790 # This parameter is (I think) used by Mako templates such as
3791 # finalized_patient_edit.mako
3792 # Todo:
3793 # Potential inefficiency: we fetch tasks regardless of the stage
3794 # of this form.
3795 kwargs["tasks"] = self.get_affected_tasks()
3797 return super().get_context_data(**kwargs)
3799 def get_affected_tasks(self) -> Optional[List[Task]]:
3800 patient = cast(Patient, self.object)
3802 taskfilter = TaskFilter()
3803 taskfilter.device_ids = [patient.device_id]
3804 taskfilter.group_ids = [patient.group.id]
3805 taskfilter.era = patient.era
3806 collection = TaskCollection(
3807 req=self.request,
3808 taskfilter=taskfilter,
3809 sort_method_global=TaskSortMethod.CREATION_DATE_DESC,
3810 current_only=False, # unusual option!
3811 via_index=False # for current_only=False, or we'll get a warning
3812 )
3813 return collection.all_tasks
3816class EditServerCreatedPatientView(EditPatientBaseView):
3817 """
3818 View to edit a patient created on the server (as part of task scheduling).
3819 """
3820 template_name = "server_created_patient_edit.mako"
3821 form_class = EditServerCreatedPatientForm
3823 def get_success_url(self) -> str:
3824 return self.request.route_url(
3825 Routes.VIEW_PATIENT_TASK_SCHEDULES
3826 )
3828 def get_object(self) -> Any:
3829 patient = cast(Patient, super().get_object())
3831 if not patient.created_on_server(self.request):
3832 _ = self.request.gettext
3834 raise HTTPBadRequest(
3835 _("Patient is not editable - was not created on the server"))
3837 return patient
3839 def save_changes(self,
3840 appstruct: Dict[str, Any], changes: OrderedDict) -> None:
3841 self._save_group(appstruct, changes)
3842 super().save_changes(appstruct, changes)
3843 self._save_task_schedules(appstruct, changes)
3845 def _save_group(self,
3846 appstruct: Dict[str, Any], changes: OrderedDict) -> None:
3847 patient = cast(Patient, self.object)
3849 old_group_id = patient.group.id
3850 old_group_name = patient.group.name
3851 new_group_id = appstruct.get(ViewParam.GROUP_ID, None)
3852 new_group = self.request.dbsession.query(Group).filter(
3853 Group.id == new_group_id
3854 ).first()
3856 if old_group_id != new_group_id:
3857 patient._group_id = new_group_id
3858 changes["group"] = (old_group_name, new_group.name)
3860 def _save_task_schedules(self,
3861 appstruct: Dict[str, Any],
3862 changes: OrderedDict) -> None:
3864 patient = cast(Patient, self.object)
3865 new_schedules = {
3866 schedule_dict[ViewParam.SCHEDULE_ID]: schedule_dict
3867 for schedule_dict in appstruct.get(ViewParam.TASK_SCHEDULES, {})
3868 }
3870 schedule_query = self.request.dbsession.query(TaskSchedule)
3871 schedule_name_dict = {schedule.id: schedule.name
3872 for schedule in schedule_query}
3874 old_schedules = {}
3875 for pts in patient.task_schedules:
3876 old_schedules[pts.task_schedule.id] = {
3877 "start_datetime": pts.start_datetime,
3878 "settings": pts.settings
3879 }
3881 ids_to_add = new_schedules.keys() - old_schedules.keys()
3882 ids_to_update = old_schedules.keys() & new_schedules.keys()
3883 ids_to_delete = old_schedules.keys() - new_schedules.keys()
3885 for schedule_id in ids_to_add:
3886 pts = PatientTaskSchedule()
3887 pts.patient_pk = patient.pk
3888 pts.schedule_id = schedule_id
3889 pts.start_datetime = new_schedules[schedule_id]["start_datetime"]
3890 pts.settings = new_schedules[schedule_id]["settings"]
3892 self.request.dbsession.add(pts)
3893 changes["schedule{} ({})".format(
3894 schedule_id, schedule_name_dict[schedule_id]
3895 )] = ((None, None), (pts.start_datetime, pts.settings))
3897 for schedule_id in ids_to_update:
3898 updates = {}
3900 new_start_datetime = new_schedules[schedule_id]["start_datetime"]
3901 old_start_datetime = old_schedules[schedule_id]["start_datetime"]
3902 if new_start_datetime != old_start_datetime:
3903 updates[PatientTaskSchedule.start_datetime] = new_start_datetime
3905 new_settings = new_schedules[schedule_id]["settings"]
3906 old_settings = old_schedules[schedule_id]["settings"]
3907 if new_settings != old_settings:
3908 updates[PatientTaskSchedule.settings] = new_settings
3910 if len(updates) > 0:
3911 self.request.dbsession.query(PatientTaskSchedule).filter(
3912 PatientTaskSchedule.patient_pk == patient.pk,
3913 PatientTaskSchedule.schedule_id == schedule_id
3914 ).update(updates, synchronize_session="fetch")
3916 changes["schedule{} ({})".format(
3917 schedule_id, schedule_name_dict[schedule_id]
3918 )] = ((old_start_datetime, old_settings),
3919 (new_start_datetime, new_settings))
3921 self.request.dbsession.query(PatientTaskSchedule).filter(
3922 PatientTaskSchedule.patient_pk == patient.pk,
3923 PatientTaskSchedule.schedule_id.in_(ids_to_delete)
3924 ).delete(synchronize_session="fetch")
3926 for schedule_id in ids_to_delete:
3927 old_start_datetime = old_schedules[schedule_id]["start_datetime"]
3928 old_settings = old_schedules[schedule_id]["settings"]
3930 changes["schedule{} ({})".format(
3931 schedule_id, schedule_name_dict[schedule_id]
3932 )] = ((old_start_datetime, old_settings), (None, None))
3935class EditFinalizedPatientView(EditPatientBaseView):
3936 """
3937 View to edit a finalized patient.
3938 """
3939 template_name = "finalized_patient_edit.mako"
3940 form_class = EditFinalizedPatientForm
3942 def get_success_url(self) -> str:
3943 return self.request.route_url(Routes.HOME)
3945 def get_object(self) -> Any:
3946 patient = cast(Patient, super().get_object())
3948 if not patient.is_finalized():
3949 _ = self.request.gettext
3951 raise HTTPBadRequest(
3952 _("Patient is not editable (likely: not finalized, so a copy "
3953 "still on a client device)"))
3955 return patient
3958@view_config(route_name=Routes.EDIT_FINALIZED_PATIENT,
3959 permission=Permission.GROUPADMIN,
3960 http_cache=NEVER_CACHE)
3961def edit_finalized_patient(req: "CamcopsRequest") -> Response:
3962 """
3963 View to edit details for a patient.
3964 """
3965 return EditFinalizedPatientView(req).dispatch()
3968@view_config(route_name=Routes.EDIT_SERVER_CREATED_PATIENT,
3969 permission=Permission.GROUPADMIN,
3970 http_cache=NEVER_CACHE)
3971def edit_server_created_patient(req: "CamcopsRequest") -> Response:
3972 """
3973 View to edit details for a patient created on the server (for scheduling
3974 tasks).
3975 """
3976 return EditServerCreatedPatientView(req).dispatch()
3979class AddPatientView(PatientMixin, CreateView):
3980 """
3981 View to add a patient (for task scheduling).
3982 """
3983 form_class = EditServerCreatedPatientForm
3984 template_name = "patient_add.mako"
3986 def get_success_url(self) -> str:
3987 return self.request.route_url(
3988 Routes.VIEW_PATIENT_TASK_SCHEDULES
3989 )
3991 def save_object(self, appstruct: Dict[str, Any]) -> None:
3992 server_device = Device.get_server_device(
3993 self.request.dbsession
3994 )
3996 patient = Patient()
3997 patient.create_fresh(
3998 self.request,
3999 device_id=server_device.id,
4000 era=ERA_NOW,
4001 group_id=appstruct.get(ViewParam.GROUP_ID)
4002 )
4004 for k in EDIT_PATIENT_SIMPLE_PARAMS:
4005 new_value = appstruct.get(k)
4006 setattr(patient, k, new_value)
4008 patient.save_with_next_available_id(self.request, server_device.id)
4010 new_idrefs = [
4011 IdNumReference(which_idnum=idrefdict[ViewParam.WHICH_IDNUM],
4012 idnum_value=idrefdict[ViewParam.IDNUM_VALUE])
4013 for idrefdict in appstruct.get(ViewParam.ID_REFERENCES)
4014 ]
4016 for idref in new_idrefs:
4017 new_idnum = PatientIdNum()
4018 new_idnum.patient_id = patient.id
4019 new_idnum.which_idnum = idref.which_idnum
4020 new_idnum.idnum_value = idref.idnum_value
4021 new_idnum.create_fresh(
4022 self.request,
4023 device_id=server_device.id,
4024 era=ERA_NOW,
4025 group_id=appstruct.get(ViewParam.GROUP_ID)
4026 )
4028 new_idnum.save_with_next_available_id(
4029 self.request, server_device.id
4030 )
4032 task_schedules = appstruct.get(ViewParam.TASK_SCHEDULES)
4034 self.request.dbsession.commit()
4036 for task_schedule in task_schedules:
4037 schedule_id = task_schedule[ViewParam.SCHEDULE_ID]
4038 start_datetime = task_schedule[ViewParam.START_DATETIME]
4039 settings = task_schedule[ViewParam.SETTINGS]
4040 patient_task_schedule = PatientTaskSchedule()
4041 patient_task_schedule.patient_pk = patient.pk
4042 patient_task_schedule.schedule_id = schedule_id
4043 patient_task_schedule.start_datetime = start_datetime
4044 patient_task_schedule.settings = settings
4046 self.request.dbsession.add(patient_task_schedule)
4048 self.object = patient
4051@view_config(route_name=Routes.ADD_PATIENT,
4052 permission=Permission.GROUPADMIN,
4053 http_cache=NEVER_CACHE)
4054def add_patient(req: "CamcopsRequest") -> Response:
4055 """
4056 View to add a patient.
4057 """
4058 return AddPatientView(req).dispatch()
4061class DeleteServerCreatedPatientView(DeleteView):
4062 """
4063 View to delete a patient that had been created on the server.
4064 """
4065 form_class = DeleteServerCreatedPatientForm
4066 object_class = Patient
4067 pk_param = ViewParam.SERVER_PK
4068 server_pk_name = "_pk"
4069 template_name = "generic_form.mako"
4071 def get_extra_context(self) -> Dict[str, Any]:
4072 _ = self.request.gettext
4073 return {
4074 "title": _("Delete patient"),
4075 }
4077 def get_success_url(self) -> str:
4078 return self.request.route_url(
4079 Routes.VIEW_PATIENT_TASK_SCHEDULES
4080 )
4082 def delete(self) -> None:
4083 patient = cast(Patient, self.object)
4085 PatientIdNumIndexEntry.unindex_patient(
4086 patient, self.request.dbsession
4087 )
4089 patient.delete_with_dependants(self.request)
4092@view_config(route_name=Routes.DELETE_SERVER_CREATED_PATIENT,
4093 permission=Permission.GROUPADMIN,
4094 http_cache=NEVER_CACHE)
4095def delete_server_created_patient(req: "CamcopsRequest") -> Response:
4096 """
4097 Page to delete a patient created on the server (as part of task
4098 scheduling).
4099 """
4100 return DeleteServerCreatedPatientView(req).dispatch()
4103# =============================================================================
4104# Task scheduling
4105# =============================================================================
4107@view_config(route_name=Routes.VIEW_TASK_SCHEDULES,
4108 permission=Permission.GROUPADMIN,
4109 renderer="view_task_schedules.mako",
4110 http_cache=NEVER_CACHE)
4111def view_task_schedules(req: "CamcopsRequest") -> Dict[str, Any]:
4112 """
4113 View whole task schedules.
4114 """
4115 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
4116 DEFAULT_ROWS_PER_PAGE)
4117 page_num = req.get_int_param(ViewParam.PAGE, 1)
4118 group_ids = req.user.ids_of_groups_user_is_admin_for
4119 q = req.dbsession.query(TaskSchedule).join(TaskSchedule.group).filter(
4120 TaskSchedule.group_id.in_(group_ids)
4121 ).order_by(Group.name, TaskSchedule.name)
4122 page = SqlalchemyOrmPage(query=q,
4123 page=page_num,
4124 items_per_page=rows_per_page,
4125 url_maker=PageUrl(req),
4126 request=req)
4127 return dict(page=page)
4130@view_config(route_name=Routes.VIEW_TASK_SCHEDULE_ITEMS,
4131 permission=Permission.GROUPADMIN,
4132 renderer="view_task_schedule_items.mako",
4133 http_cache=NEVER_CACHE)
4134def view_task_schedule_items(req: "CamcopsRequest") -> Dict[str, Any]:
4135 """
4136 View items within a task schedule.
4137 """
4138 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
4139 DEFAULT_ROWS_PER_PAGE)
4140 page_num = req.get_int_param(ViewParam.PAGE, 1)
4141 schedule_id = req.get_int_param(ViewParam.SCHEDULE_ID)
4143 schedule = req.dbsession.query(TaskSchedule).filter(
4144 TaskSchedule.id == schedule_id
4145 ).one_or_none()
4147 if schedule is None:
4148 _ = req.gettext
4149 raise HTTPBadRequest(_("Schedule does not exist"))
4151 q = req.dbsession.query(TaskScheduleItem).filter(
4152 TaskScheduleItem.schedule_id == schedule_id
4153 ).order_by(*task_schedule_item_sort_order())
4154 page = SqlalchemyOrmPage(query=q,
4155 page=page_num,
4156 items_per_page=rows_per_page,
4157 url_maker=PageUrl(req),
4158 request=req)
4159 return dict(page=page, schedule_name=schedule.name)
4162@view_config(route_name=Routes.VIEW_PATIENT_TASK_SCHEDULES,
4163 permission=Permission.GROUPADMIN,
4164 renderer="view_patient_task_schedules.mako",
4165 http_cache=NEVER_CACHE)
4166def view_patient_task_schedules(req: "CamcopsRequest") -> Dict[str, Any]:
4167 """
4168 View all patients and their assigned schedules (as well as their access
4169 keys, etc.).
4170 """
4171 server_device = Device.get_server_device(req.dbsession)
4173 rows_per_page = req.get_int_param(ViewParam.ROWS_PER_PAGE,
4174 DEFAULT_ROWS_PER_PAGE)
4175 page_num = req.get_int_param(ViewParam.PAGE, 1)
4176 allowed_group_ids = req.user.ids_of_groups_user_is_admin_for
4177 # noinspection PyProtectedMember
4178 q = (
4179 req.dbsession.query(Patient)
4180 .filter(Patient._era == ERA_NOW)
4181 .filter(Patient._group_id.in_(allowed_group_ids))
4182 .filter(Patient._device_id == server_device.id)
4183 .order_by(Patient.surname, Patient.forename)
4184 .options(joinedload("task_schedules"))
4185 .options(joinedload("idnums"))
4186 )
4188 page = SqlalchemyOrmPage(query=q,
4189 page=page_num,
4190 items_per_page=rows_per_page,
4191 url_maker=PageUrl(req),
4192 request=req)
4193 return dict(page=page)
4196@view_config(route_name=Routes.VIEW_PATIENT_TASK_SCHEDULE,
4197 permission=Permission.GROUPADMIN,
4198 renderer="view_patient_task_schedule.mako",
4199 http_cache=NEVER_CACHE)
4200def view_patient_task_schedule(req: "CamcopsRequest") -> Dict[str, Any]:
4201 """
4202 View scheduled tasks for one patient's specific task schedule.
4203 """
4204 pts_id = req.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID)
4206 pts = req.dbsession.query(PatientTaskSchedule).filter(
4207 PatientTaskSchedule.id == pts_id).options(
4208 joinedload("patient.idnums"),
4209 joinedload("task_schedule.items"),
4210 ).one_or_none()
4212 if pts is None:
4213 _ = req.gettext
4214 raise HTTPBadRequest(_("Patient's task schedule does not exist"))
4216 patient_descriptor = pts.patient.prettystr(req)
4218 return dict(
4219 patient_descriptor=patient_descriptor,
4220 schedule_name=pts.task_schedule.name,
4221 task_list=pts.get_list_of_scheduled_tasks(req),
4222 )
4225class TaskScheduleMixin(object):
4226 """
4227 Mixin for viewing/editing a task schedule.
4228 """
4229 form_class = EditTaskScheduleForm
4230 model_form_dict = {
4231 "name": ViewParam.NAME,
4232 "group_id": ViewParam.GROUP_ID,
4233 "email_subject": ViewParam.EMAIL_SUBJECT,
4234 "email_template": ViewParam.EMAIL_TEMPLATE,
4235 }
4236 object_class = TaskSchedule
4237 request: "CamcopsRequest"
4238 server_pk_name = "id"
4239 template_name = "generic_form.mako"
4241 def get_success_url(self) -> str:
4242 return self.request.route_url(
4243 Routes.VIEW_TASK_SCHEDULES
4244 )
4246 def get_object(self) -> Any:
4247 # noinspection PyUnresolvedReferences
4248 schedule = cast(TaskSchedule, super().get_object())
4250 if not schedule.user_may_edit(self.request):
4251 _ = self.request.gettext
4252 raise HTTPBadRequest(_("You a not a group administrator for this "
4253 "task schedule's group"))
4255 return schedule
4258class AddTaskScheduleView(TaskScheduleMixin, CreateView):
4259 """
4260 Django-style view class to add a task schedule.
4261 """
4262 def get_extra_context(self) -> Dict[str, Any]:
4263 _ = self.request.gettext
4264 return {
4265 "title": _("Add a task schedule"),
4266 }
4269class EditTaskScheduleView(TaskScheduleMixin, UpdateView):
4270 """
4271 Django-style view class to edit a task schedule.
4272 """
4273 pk_param = ViewParam.SCHEDULE_ID
4275 def get_extra_context(self) -> Dict[str, Any]:
4276 _ = self.request.gettext
4277 return {
4278 "title": _("Edit details for a task schedule"),
4279 }
4282class DeleteTaskScheduleView(TaskScheduleMixin, DeleteView):
4283 """
4284 Django-style view class to delete a task schedule.
4285 """
4286 form_class = DeleteTaskScheduleForm
4287 pk_param = ViewParam.SCHEDULE_ID
4289 def get_extra_context(self) -> Dict[str, Any]:
4290 _ = self.request.gettext
4291 return {
4292 "title": _("Delete a task schedule"),
4293 }
4296@view_config(route_name=Routes.ADD_TASK_SCHEDULE,
4297 permission=Permission.GROUPADMIN,
4298 http_cache=NEVER_CACHE)
4299def add_task_schedule(req: "CamcopsRequest") -> Response:
4300 """
4301 View to add a task schedule.
4302 """
4303 return AddTaskScheduleView(req).dispatch()
4306@view_config(route_name=Routes.EDIT_TASK_SCHEDULE,
4307 permission=Permission.GROUPADMIN)
4308def edit_task_schedule(req: "CamcopsRequest") -> Response:
4309 """
4310 View to edit a task schedule.
4311 """
4312 return EditTaskScheduleView(req).dispatch()
4315@view_config(route_name=Routes.DELETE_TASK_SCHEDULE,
4316 permission=Permission.GROUPADMIN)
4317def delete_task_schedule(req: "CamcopsRequest") -> Response:
4318 """
4319 View to delete a task schedule.
4320 """
4321 return DeleteTaskScheduleView(req).dispatch()
4324class TaskScheduleItemMixin(object):
4325 """
4326 Mixin for viewing/editing a task schedule items.
4327 """
4328 form_class = EditTaskScheduleItemForm
4329 template_name = "generic_form.mako"
4330 model_form_dict = {
4331 "schedule_id": ViewParam.SCHEDULE_ID,
4332 "task_table_name": ViewParam.TABLE_NAME,
4333 "due_from": ViewParam.DUE_FROM,
4334 # we need to convert due_within to due_by
4335 }
4336 object: Any
4337 # noinspection PyTypeChecker
4338 object_class = cast(Type["Base"], TaskScheduleItem)
4339 pk_param = ViewParam.SCHEDULE_ITEM_ID
4340 request: "CamcopsRequest"
4341 server_pk_name = "id"
4343 def get_success_url(self) -> str:
4344 # noinspection PyUnresolvedReferences
4345 return self.request.route_url(
4346 Routes.VIEW_TASK_SCHEDULE_ITEMS,
4347 _query={
4348 ViewParam.SCHEDULE_ID: self.get_schedule_id(),
4349 }
4350 )
4353class EditTaskScheduleItemMixin(TaskScheduleItemMixin):
4354 """
4355 Django-style view class to edit a task schedule item.
4356 """
4357 def set_object_properties(self, appstruct: Dict[str, Any]) -> None:
4358 # noinspection PyUnresolvedReferences
4359 super().set_object_properties(appstruct)
4361 due_from = appstruct.get(ViewParam.DUE_FROM)
4362 due_within = appstruct.get(ViewParam.DUE_WITHIN)
4364 setattr(self.object, "due_by", due_from + due_within)
4366 def get_schedule(self) -> TaskSchedule:
4367 # noinspection PyUnresolvedReferences
4368 schedule_id = self.get_schedule_id()
4370 schedule = self.request.dbsession.query(TaskSchedule).filter(
4371 TaskSchedule.id == schedule_id
4372 ).one_or_none()
4374 if schedule is None:
4375 _ = self.request.gettext
4376 raise HTTPBadRequest(
4377 f"{_('Missing Task Schedule for id')} {schedule_id}"
4378 )
4380 if not schedule.user_may_edit(self.request):
4381 _ = self.request.gettext
4382 raise HTTPBadRequest(_("You a not a group administrator for this "
4383 "task schedule's group"))
4385 return schedule
4388class AddTaskScheduleItemView(EditTaskScheduleItemMixin, CreateView):
4389 """
4390 Django-style view class to add a task schedule item.
4391 """
4392 def get_extra_context(self) -> Dict[str, Any]:
4393 _ = self.request.gettext
4395 schedule = self.get_schedule()
4397 return {
4398 "title": _("Add an item to the {schedule_name} schedule").format(
4399 schedule_name=schedule.name),
4400 }
4402 def get_schedule_id(self) -> int:
4403 return self.request.get_int_param(ViewParam.SCHEDULE_ID)
4405 def get_form_values(self) -> Dict:
4406 schedule = self.get_schedule()
4408 form_values = super().get_form_values()
4409 form_values[ViewParam.SCHEDULE_ID] = schedule.id
4411 return form_values
4414class EditTaskScheduleItemView(EditTaskScheduleItemMixin, UpdateView):
4415 """
4416 Django-style view class to edit a task schedule item.
4417 """
4418 def get_extra_context(self) -> Dict[str, Any]:
4419 _ = self.request.gettext
4420 return {
4421 "title": _("Edit details for a task schedule item"),
4422 }
4424 def get_schedule_id(self) -> int:
4425 item = cast(TaskScheduleItem, self.object)
4427 return item.schedule_id
4429 def get_form_values(self) -> Dict:
4430 schedule = self.get_schedule()
4432 form_values = super().get_form_values()
4433 form_values[ViewParam.SCHEDULE_ID] = schedule.id
4435 item = cast(TaskScheduleItem, self.object)
4436 due_within = item.due_by - form_values[ViewParam.DUE_FROM]
4437 form_values[ViewParam.DUE_WITHIN] = due_within
4439 return form_values
4442class DeleteTaskScheduleItemView(TaskScheduleItemMixin, DeleteView):
4443 """
4444 Django-style view class to delete a task schedule item.
4445 """
4446 form_class = DeleteTaskScheduleItemForm
4448 def get_extra_context(self) -> Dict[str, Any]:
4449 _ = self.request.gettext
4450 return {
4451 "title": _("Delete a task schedule item"),
4452 }
4454 def get_schedule_id(self) -> int:
4455 item = cast(TaskScheduleItem, self.object)
4457 return item.schedule_id
4460@view_config(route_name=Routes.ADD_TASK_SCHEDULE_ITEM,
4461 permission=Permission.GROUPADMIN)
4462def add_task_schedule_item(req: "CamcopsRequest") -> Response:
4463 """
4464 View to add a task schedule item.
4465 """
4466 return AddTaskScheduleItemView(req).dispatch()
4469@view_config(route_name=Routes.EDIT_TASK_SCHEDULE_ITEM,
4470 permission=Permission.GROUPADMIN)
4471def edit_task_schedule_item(req: "CamcopsRequest") -> Response:
4472 """
4473 View to edit a task schedule item.
4474 """
4475 return EditTaskScheduleItemView(req).dispatch()
4478@view_config(route_name=Routes.DELETE_TASK_SCHEDULE_ITEM,
4479 permission=Permission.GROUPADMIN)
4480def delete_task_schedule_item(req: "CamcopsRequest") -> Response:
4481 """
4482 View to delete a task schedule item.
4483 """
4484 return DeleteTaskScheduleItemView(req).dispatch()
4487@view_config(route_name=Routes.CLIENT_API, request_method="GET",
4488 permission=NO_PERMISSION_REQUIRED,
4489 renderer="client_api_signposting.mako")
4490@view_config(route_name=Routes.CLIENT_API_ALIAS, request_method="GET",
4491 permission=NO_PERMISSION_REQUIRED,
4492 renderer="client_api_signposting.mako")
4493def client_api_signposting(req: "CamcopsRequest") -> Dict[str, Any]:
4494 """
4495 Patients are likely to enter the ``/api`` address into a web browser,
4496 especially if it appears as a hyperlink in an email. If so, that will
4497 arrive as a ``GET`` request. This page will direct them to download the
4498 app.
4499 """
4500 return {
4501 "github_link": f"<a href='{GITHUB_RELEASES_URL}'>GitHub</a>",
4502 "server_url": req.route_url(Routes.CLIENT_API)
4503 }
4506# =============================================================================
4507# Static assets
4508# =============================================================================
4509# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/assets.html#advanced-static # noqa
4511def debug_form_rendering() -> None:
4512 r"""
4513 Test code for form rendering.
4515 From the command line:
4517 .. code-block:: bash
4519 # Start in the CamCOPS source root directory.
4520 # - Needs the "-f" option to follow forks.
4521 # - "open" doesn't show all files opened. To see what you need, try
4522 # strace cat /proc/version
4523 # - ... which shows that "openat" is most useful.
4525 strace -f --trace=openat \
4526 python -c 'from camcops_server.cc_modules.webview import debug_form_rendering; debug_form_rendering()' \
4527 | grep site-packages \
4528 | grep -v "\.pyc"
4530 This tells us that the templates are files like:
4532 .. code-block:: none
4534 site-packages/deform/templates/form.pt
4535 site-packages/deform/templates/select.pt
4536 site-packages/deform/templates/textinput.pt
4538 On 2020-06-29 we are interested in why a newer (Docker) installation
4539 renders buggy HTML like:
4541 .. code-block:: none
4543 <select name="which_idnum" id="deformField2" class=" form-control " multiple="False">
4544 <option value="1">CPFT RiO number</option>
4545 <option value="2">NHS number</option>
4546 <option value="1000">MyHospital number</option>
4547 </select>
4549 ... the bug being that ``multiple="False"`` is wrong; an HTML boolean
4550 attribute is false when *absent*, not when set to a certain value (see
4551 https://developer.mozilla.org/en-US/docs/Web/HTML/Attributes#Boolean_Attributes).
4552 The ``multiple`` attribute of ``<select>`` is a boolean attribute
4553 (https://developer.mozilla.org/en-US/docs/Web/HTML/Element/select).
4555 The ``select.pt`` file indicates that this is controlled by
4556 ``tal:attributes`` syntax. TAL is Template Attribution Language
4557 (https://sharptal.readthedocs.io/en/latest/tal.html).
4559 TAL is either provided by Zope (given ZPT files) or Chameleon or both. The
4560 tracing suggests Chameleon. So the TAL language reference is
4561 https://chameleon.readthedocs.io/en/latest/reference.html.
4563 Chameleon changelog is
4564 https://github.com/malthe/chameleon/blob/master/CHANGES.rst.
4566 Multiple sources for ``tal:attributes`` syntax say that a null value
4567 (presumably: ``None``) is required to omit the attribute, not a false
4568 value.
4570 """ # noqa
4572 import sys
4574 from camcops_server.cc_modules.cc_debug import makefunc_trace_unique_calls
4575 from camcops_server.cc_modules.cc_forms import ChooseTrackerForm
4576 from camcops_server.cc_modules.cc_request import get_core_debugging_request
4578 req = get_core_debugging_request()
4579 form = ChooseTrackerForm(req, as_ctv=False)
4581 sys.settrace(makefunc_trace_unique_calls(file_only=True))
4582 _ = form.render()
4583 sys.settrace(None)