Coverage for cc_modules/webview.py: 25%

2277 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/cc_modules/webview.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26**Implements the CamCOPS web front end.** 

27 

28Quick tutorial on Pyramid views: 

29 

30- The configurator registers routes, and routes have URLs associated with 

31 them. Those URLs can be templatized, e.g. to accept numerical parameters. 

32 The configurator associates view callables ("views" for short) with routes, 

33 and one method for doing that is an automatic scan via Venusian for views 

34 decorated with @view_config(). 

35 

36- All views take a Request object and return a Response or raise an exception 

37 that Pyramid will translate into a Response. 

38 

39- Having matched a route, Pyramid uses its "view lookup" process to choose 

40 one from potentially several views. For example, a single route might be 

41 associated with: 

42 

43 .. code-block:: python 

44 

45 @view_config(route_name="myroute") 

46 def myroute_default(req: Request) -> Response: 

47 pass 

48 

49 @view_config(route_name="myroute", request_method="POST") 

50 def myroute_post(req: Request) -> Response: 

51 pass 

52 

53 In this example, POST requests will go to the second; everything else will 

54 go to the first. Pyramid's view lookup rule is essentially: if multiple 

55 views match, choose the one with the most specifiers. 

56 

57- Specifiers include: 

58 

59 .. code-block:: none 

60 

61 route_name=ROUTENAME 

62 

63 the route 

64 

65 request_method="POST" 

66 

67 requires HTTP GET, POST, etc. 

68 

69 request_param="XXX" 

70 

71 ... requires the presence of a GET/POST variable with this name in 

72 the request.params dictionary 

73 

74 request_param="XXX=YYY" 

75 

76 ... requires the presence of a GET/POST variable called XXX whose 

77 value is YYY, in the request.params dictionary 

78 

79 match_param="XXX=YYY" 

80 

81 .. requires the presence of this key/value pair in 

82 request.matchdict, which contains parameters from the URL 

83 

84 https://docs.pylonsproject.org/projects/pyramid/en/latest/api/config.html#pyramid.config.Configurator.add_view # noqa 

85 

86- Getting parameters 

87 

88 .. code-block:: none 

89 

90 request.params 

91 

92 ... parameters from HTTP GET or POST, including both the query 

93 string (as in https://somewhere/path?key=value) and the body (e.g. 

94 POST). 

95 

96 request.matchdict 

97 

98 ... parameters from the URL, via URL dispatch; see 

99 https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/urldispatch.html#urldispatch-chapter # noqa 

100 

101- Regarding rendering: 

102 

103 There might be some simplicity benefits from converting to a template 

104 system like Mako. On the downside, that would entail a bit more work; 

105 likely a minor performance hit (relative to plain Python string rendering); 

106 and a loss of type checking. The type checking is also why we prefer: 

107 

108 .. code-block:: python 

109 

110 html = " ... {param_blah} ...".format(param_blah=PARAM.BLAH) 

111 

112 to 

113 

114 .. code-block:: python 

115 

116 html = " ... {PARAM.BLAH} ...".format(PARAM=PARAM) 

117 

118 as in the first situation, PyCharm will check that "BLAH" is present in 

119 "PARAM", and in the second it won't. Automatic checking is worth a lot. 

120 

121""" 

122 

123from collections import OrderedDict 

124import json 

125import logging 

126import os 

127 

128# from pprint import pformat 

129import time 

130from typing import ( 

131 Any, 

132 cast, 

133 Dict, 

134 List, 

135 NoReturn, 

136 Optional, 

137 Tuple, 

138 Type, 

139 TYPE_CHECKING, 

140) 

141 

142from cardinal_pythonlib.datetimefunc import format_datetime 

143from cardinal_pythonlib.deform_utils import get_head_form_html 

144from cardinal_pythonlib.httpconst import HttpMethod, MimeType 

145from cardinal_pythonlib.logs import BraceStyleAdapter 

146from cardinal_pythonlib.pyramid.responses import ( 

147 BinaryResponse, 

148 JsonResponse, 

149 PdfResponse, 

150 XmlResponse, 

151) 

152from cardinal_pythonlib.sqlalchemy.dialect import ( 

153 get_dialect_name, 

154 SqlaDialectName, 

155) 

156from cardinal_pythonlib.sizeformatter import bytes2human 

157from cardinal_pythonlib.sqlalchemy.orm_inspect import gen_orm_classes_from_base 

158from cardinal_pythonlib.sqlalchemy.orm_query import CountStarSpecializedQuery 

159from cardinal_pythonlib.sqlalchemy.session import get_engine_from_session 

160from deform.exception import ValidationFailure 

161from pendulum import DateTime as Pendulum 

162import pyotp 

163from pyramid.httpexceptions import HTTPBadRequest, HTTPFound, HTTPNotFound 

164from pyramid.view import ( 

165 forbidden_view_config, 

166 notfound_view_config, 

167 view_config, 

168) 

169from pyramid.renderers import render_to_response 

170from pyramid.response import Response 

171from pyramid.security import Authenticated, NO_PERMISSION_REQUIRED 

172import pygments 

173import pygments.lexers 

174import pygments.lexers.sql 

175import pygments.lexers.web 

176import pygments.formatters 

177from sqlalchemy.orm import joinedload, Query 

178from sqlalchemy.sql.functions import func 

179from sqlalchemy.sql.expression import desc, or_, select, update 

180 

181from camcops_server.cc_modules.cc_audit import audit, AuditEntry 

182from camcops_server.cc_modules.cc_all_models import CLIENT_TABLE_MAP 

183from camcops_server.cc_modules.cc_client_api_core import ( 

184 BatchDetails, 

185 get_server_live_records, 

186 UploadTableChanges, 

187 values_preserve_now, 

188) 

189from camcops_server.cc_modules.cc_client_api_helpers import ( 

190 upload_commit_order_sorter, 

191) 

192from camcops_server.cc_modules.cc_constants import ( 

193 CAMCOPS_URL, 

194 DateFormat, 

195 ERA_NOW, 

196 GITHUB_RELEASES_URL, 

197 JSON_INDENT, 

198 MfaMethod, 

199) 

200from camcops_server.cc_modules.cc_db import ( 

201 GenericTabletRecordMixin, 

202 FN_DEVICE_ID, 

203 FN_ERA, 

204 FN_GROUP_ID, 

205 FN_PK, 

206) 

207from camcops_server.cc_modules.cc_device import Device 

208from camcops_server.cc_modules.cc_email import Email 

209from camcops_server.cc_modules.cc_export import ( 

210 DownloadOptions, 

211 make_exporter, 

212 UserDownloadFile, 

213) 

214from camcops_server.cc_modules.cc_exportmodels import ( 

215 ExportedTask, 

216 ExportedTaskEmail, 

217 ExportedTaskFhir, 

218 ExportedTaskFhirEntry, 

219 ExportedTaskFileGroup, 

220 ExportedTaskHL7Message, 

221 ExportedTaskRedcap, 

222) 

223from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

224from camcops_server.cc_modules.cc_forms import ( 

225 AddGroupForm, 

226 AddIdDefinitionForm, 

227 AddSpecialNoteForm, 

228 AddUserGroupadminForm, 

229 AddUserSuperuserForm, 

230 AuditTrailForm, 

231 ChangeOtherPasswordForm, 

232 ChangeOwnPasswordForm, 

233 ChooseTrackerForm, 

234 DEFORM_ACCORDION_BUG, 

235 DEFAULT_ROWS_PER_PAGE, 

236 DeleteGroupForm, 

237 DeleteIdDefinitionForm, 

238 DeletePatientChooseForm, 

239 DeletePatientConfirmForm, 

240 DeleteServerCreatedPatientForm, 

241 DeleteSpecialNoteForm, 

242 DeleteTaskScheduleForm, 

243 DeleteTaskScheduleItemForm, 

244 DeleteUserForm, 

245 EDIT_PATIENT_SIMPLE_PARAMS, 

246 EditFinalizedPatientForm, 

247 EditGroupForm, 

248 EditIdDefinitionForm, 

249 EditOtherUserMfaForm, 

250 EditServerCreatedPatientForm, 

251 EditServerSettingsForm, 

252 EditTaskFilterForm, 

253 EditTaskScheduleForm, 

254 EditTaskScheduleItemForm, 

255 EditUserFullForm, 

256 EditUserGroupAdminForm, 

257 EditUserGroupMembershipGroupAdminForm, 

258 EditUserGroupPermissionsFullForm, 

259 EraseTaskForm, 

260 ExportedTaskListForm, 

261 ForciblyFinalizeChooseDeviceForm, 

262 ForciblyFinalizeConfirmForm, 

263 get_sql_dialect_choices, 

264 LoginForm, 

265 MfaHotpEmailForm, 

266 MfaHotpSmsForm, 

267 MfaMethodForm, 

268 MfaTotpForm, 

269 OfferBasicDumpForm, 

270 OfferSqlDumpForm, 

271 OfferTermsForm, 

272 OtpTokenForm, 

273 RefreshTasksForm, 

274 SendEmailForm, 

275 SetUserUploadGroupForm, 

276 TasksPerPageForm, 

277 UserDownloadDeleteForm, 

278 UserFilterForm, 

279 ViewDdlForm, 

280) 

281from camcops_server.cc_modules.cc_group import Group 

282from camcops_server.cc_modules.cc_idnumdef import IdNumDefinition 

283from camcops_server.cc_modules.cc_membership import UserGroupMembership 

284from camcops_server.cc_modules.cc_patient import Patient 

285from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

286 

287# noinspection PyUnresolvedReferences 

288import camcops_server.cc_modules.cc_plot # import side effects (configure matplotlib) # noqa 

289from camcops_server.cc_modules.cc_pyramid import ( 

290 CamcopsPage, 

291 FlashQueue, 

292 FormAction, 

293 HTTPFoundDebugVersion, 

294 Icons, 

295 PageUrl, 

296 Permission, 

297 Routes, 

298 SqlalchemyOrmPage, 

299 ViewArg, 

300 ViewParam, 

301) 

302from camcops_server.cc_modules.cc_report import get_report_instance 

303from camcops_server.cc_modules.cc_request import CamcopsRequest 

304from camcops_server.cc_modules.cc_simpleobjects import ( 

305 IdNumReference, 

306 TaskExportOptions, 

307) 

308from camcops_server.cc_modules.cc_specialnote import SpecialNote 

309from camcops_server.cc_modules.cc_session import CamcopsSession 

310from camcops_server.cc_modules.cc_sqlalchemy import get_all_ddl 

311from camcops_server.cc_modules.cc_task import ( 

312 tablename_to_task_class_dict, 

313 Task, 

314) 

315from camcops_server.cc_modules.cc_taskcollection import ( 

316 TaskFilter, 

317 TaskCollection, 

318 TaskSortMethod, 

319) 

320from camcops_server.cc_modules.cc_taskfactory import task_factory 

321from camcops_server.cc_modules.cc_taskfilter import ( 

322 task_classes_from_table_names, 

323 TaskClassSortMethod, 

324) 

325from camcops_server.cc_modules.cc_taskindex import ( 

326 PatientIdNumIndexEntry, 

327 TaskIndexEntry, 

328 update_indexes_and_push_exports, 

329) 

330from camcops_server.cc_modules.cc_taskschedule import ( 

331 PatientTaskSchedule, 

332 PatientTaskScheduleEmail, 

333 TaskSchedule, 

334 TaskScheduleItem, 

335 task_schedule_item_sort_order, 

336) 

337from camcops_server.cc_modules.cc_text import SS 

338from camcops_server.cc_modules.cc_tracker import ClinicalTextView, Tracker 

339from camcops_server.cc_modules.cc_user import ( 

340 SecurityAccountLockout, 

341 SecurityLoginFailure, 

342 User, 

343) 

344from camcops_server.cc_modules.cc_validators import ( 

345 validate_download_filename, 

346 validate_export_recipient_name, 

347 validate_ip_address, 

348 validate_task_tablename, 

349 validate_username, 

350) 

351from camcops_server.cc_modules.cc_version import CAMCOPS_SERVER_VERSION 

352from camcops_server.cc_modules.cc_view_classes import ( 

353 CreateView, 

354 DeleteView, 

355 FormView, 

356 FormWizardMixin, 

357 UpdateView, 

358) 

359 

360if TYPE_CHECKING: 

361 # noinspection PyUnresolvedReferences 

362 from deform.form import Form 

363 

364 # noinspection PyUnresolvedReferences 

365 from camcops_server.cc_modules.cc_sqlalchemy import Base 

366 

367log = BraceStyleAdapter(logging.getLogger(__name__)) 

368 

369 

370# ============================================================================= 

371# Debugging options 

372# ============================================================================= 

373 

374DEBUG_REDIRECT = False 

375 

376if DEBUG_REDIRECT: 

377 log.warning("Debugging options enabled!") 

378 

379if DEBUG_REDIRECT: 

380 HTTPFound = HTTPFoundDebugVersion # noqa: F811 

381 

382 

383# ============================================================================= 

384# Cache control, for the http_cache parameter of view_config etc. 

385# ============================================================================= 

386 

387NEVER_CACHE = 0 

388 

389 

390# ============================================================================= 

391# Constants -- for Mako templates 

392# ============================================================================= 

393# Keys that will be added to a context dictionary that is passed to a Mako 

394# template. For example, a key of "title" can be rendered within the template 

395# as ${title}. Some are used frequently, so we have them here as constants. 

396 

397MAKO_VAR_TITLE = "title" 

398TEMPLATE_GENERIC_FORM = "generic_form.mako" 

399 

400 

401# ============================================================================= 

402# Constants -- mutated into translated phrases 

403# ============================================================================= 

404 

405 

406def errormsg_cannot_dump(req: "CamcopsRequest") -> str: 

407 _ = req.gettext 

408 return _("User not authorized to dump data (for any group).") 

409 

410 

411def errormsg_cannot_report(req: "CamcopsRequest") -> str: 

412 _ = req.gettext 

413 return _("User not authorized to run reports (for any group).") 

414 

415 

416def errormsg_task_live(req: "CamcopsRequest") -> str: 

417 _ = req.gettext 

418 return _("Task is live on tablet; finalize (or force-finalize) first.") 

419 

420 

421# ============================================================================= 

422# Unused 

423# ============================================================================= 

424 

425# def query_result_html_core(req: "CamcopsRequest", 

426# descriptions: Sequence[str], 

427# rows: Sequence[Sequence[Any]], 

428# null_html: str = "<i>NULL</i>") -> str: 

429# return render("query_result_core.mako", 

430# dict(descriptions=descriptions, 

431# rows=rows, 

432# null_html=null_html), 

433# request=req) 

434 

435 

436# def query_result_html_orm(req: "CamcopsRequest", 

437# attrnames: List[str], 

438# descriptions: List[str], 

439# orm_objects: Sequence[Sequence[Any]], 

440# null_html: str = "<i>NULL</i>") -> str: 

441# return render("query_result_orm.mako", 

442# dict(attrnames=attrnames, 

443# descriptions=descriptions, 

444# orm_objects=orm_objects, 

445# null_html=null_html), 

446# request=req) 

447 

448 

449# ============================================================================= 

450# Error views 

451# ============================================================================= 

452 

453 

454# noinspection PyUnusedLocal 

455@notfound_view_config(renderer="not_found.mako", http_cache=NEVER_CACHE) 

456def not_found(req: "CamcopsRequest") -> Dict[str, Any]: 

457 """ 

458 "Page not found" view. 

459 """ 

460 return {"msg": "", "extra_html": ""} 

461 

462 

463# noinspection PyUnusedLocal 

464@view_config( 

465 context=HTTPBadRequest, renderer="bad_request.mako", http_cache=NEVER_CACHE 

466) 

467def bad_request(req: "CamcopsRequest") -> Dict[str, Any]: 

468 """ 

469 "Bad request" view. 

470 

471 NOTE that this view only gets used from 

472 

473 .. code-block:: python 

474 

475 raise HTTPBadRequest("message") 

476 

477 and not 

478 

479 .. code-block:: python 

480 

481 return HTTPBadRequest("message") 

482 

483 ... so always raise it. 

484 """ 

485 return {"msg": "", "extra_html": ""} 

486 

487 

488# ============================================================================= 

489# Test pages 

490# ============================================================================= 

491 

492 

493# noinspection PyUnusedLocal 

494@view_config( 

495 route_name=Routes.TESTPAGE_PUBLIC_1, 

496 permission=NO_PERMISSION_REQUIRED, 

497 http_cache=NEVER_CACHE, 

498) 

499def test_page_1(req: "CamcopsRequest") -> Response: 

500 """ 

501 A public test page with no content. 

502 """ 

503 _ = req.gettext 

504 return Response(_("Hello! This is a public CamCOPS test page.")) 

505 

506 

507# noinspection PyUnusedLocal 

508@view_config( 

509 route_name=Routes.TEST_NHS_NUMBERS, 

510 permission=NO_PERMISSION_REQUIRED, 

511 renderer="test_nhs_numbers.mako", 

512 http_cache=NEVER_CACHE, 

513) 

514def test_nhs_numbers(req: "CamcopsRequest") -> Response: 

515 """ 

516 Random Test NHS numbers for testing 

517 """ 

518 from cardinal_pythonlib.nhs import generate_random_nhs_number 

519 

520 nhs_numbers = [generate_random_nhs_number() for _ in range(10)] 

521 return dict(test_nhs_numbers=nhs_numbers) 

522 

523 

524# noinspection PyUnusedLocal 

525@view_config(route_name=Routes.TESTPAGE_PRIVATE_1, http_cache=NEVER_CACHE) 

526def test_page_private_1(req: "CamcopsRequest") -> Response: 

527 """ 

528 A private test page with no informative content, but which should only 

529 be accessible to authenticated users. 

530 """ 

531 _ = req.gettext 

532 return Response(_("Private test page.")) 

533 

534 

535# noinspection PyUnusedLocal 

536@view_config( 

537 route_name=Routes.TESTPAGE_PRIVATE_2, 

538 permission=Permission.SUPERUSER, 

539 renderer="testpage.mako", 

540 http_cache=NEVER_CACHE, 

541) 

542def test_page_2(req: "CamcopsRequest") -> Dict[str, Any]: 

543 """ 

544 A private test page containing POTENTIALLY SENSITIVE test information, 

545 including environment variables, that should only be accessible to 

546 superusers. 

547 """ 

548 return dict(param1="world") 

549 

550 

551# noinspection PyUnusedLocal 

552@view_config( 

553 route_name=Routes.TESTPAGE_PRIVATE_3, 

554 permission=Permission.SUPERUSER, 

555 renderer="inherit_cache_test_child.mako", 

556 http_cache=NEVER_CACHE, 

557) 

558def test_page_3(req: "CamcopsRequest") -> Dict[str, Any]: 

559 """ 

560 A private test page that tests template inheritance. 

561 """ 

562 return {} 

563 

564 

565# noinspection PyUnusedLocal 

566@view_config( 

567 route_name=Routes.TESTPAGE_PRIVATE_4, 

568 permission=Permission.SUPERUSER, 

569 renderer="test_template_filters.mako", 

570 http_cache=NEVER_CACHE, 

571) 

572def test_page_4(req: "CamcopsRequest") -> Dict[str, Any]: 

573 """ 

574 A private test page that tests Mako filtering. 

575 """ 

576 return dict(test_strings=["plain", "normal <b>bold</b> normal"]) 

577 

578 

579# noinspection PyUnusedLocal,PyTypeChecker 

580@view_config( 

581 route_name=Routes.CRASH, 

582 permission=Permission.SUPERUSER, 

583 http_cache=NEVER_CACHE, 

584) 

585def crash(req: "CamcopsRequest") -> Response: 

586 """ 

587 A view that deliberately raises an exception. 

588 """ 

589 _ = req.gettext 

590 raise RuntimeError( 

591 _("Deliberately crashed. Should not affect other processes.") 

592 ) 

593 

594 

595# noinspection PyUnusedLocal 

596@view_config( 

597 route_name=Routes.DEVELOPER, 

598 permission=Permission.SUPERUSER, 

599 renderer="developer.mako", 

600 http_cache=NEVER_CACHE, 

601) 

602def developer_page(req: "CamcopsRequest") -> Dict[str, Any]: 

603 """ 

604 Shows the developer menu. 

605 """ 

606 return {} 

607 

608 

609# noinspection PyUnusedLocal 

610@view_config( 

611 route_name=Routes.AUDIT_MENU, 

612 permission=Permission.SUPERUSER, 

613 renderer="audit_menu.mako", 

614 http_cache=NEVER_CACHE, 

615) 

616def audit_menu(req: "CamcopsRequest") -> Dict[str, Any]: 

617 """ 

618 Shows the auditing menu. 

619 """ 

620 return {} 

621 

622 

623# ============================================================================= 

624# Authorization: login, logout, login failures, terms/conditions 

625# ============================================================================= 

626 

627# Do NOT use extra parameters for functions decorated with @view_config; 

628# @view_config can take functions like "def view(request)" but also 

629# "def view(context, request)", so if you add additional parameters, it thinks 

630# you're doing the latter and sends parameters accordingly. 

631 

632 

633class MfaMixin(FormWizardMixin): 

634 """ 

635 Enhances FormWizardMixin to include a multi-factor authentication step. 

636 This must be named "mfa" in the subclass, via the ``SELF_MFA`` variable. 

637 

638 This handles: 

639 

640 - Timing out 

641 - Generating, sending and checking the six-digit code used for 

642 authentication 

643 

644 The subclass should: 

645 

646 - Set ``mfa_user`` on the class to be an instance of the User to be 

647 authenticated. 

648 - Call ``handle_authentication_type()`` in the appropriate step. 

649 - Call ``otp_is_valid()`` and ``fail_bad_mfa_code()`` in the appropriate 

650 step. 

651 

652 See ``LoginView`` for an example that works with the yet-to-be-logged-in 

653 user. 

654 See ``ChangeOwnPasswordView`` for an example with the logged-in user. 

655 """ 

656 

657 STEP_PASSWORD = "password" 

658 STEP_MFA = "mfa" 

659 

660 KEY_TITLE_HTML = "title_html" 

661 KEY_INSTRUCTIONS = "instructions" 

662 KEY_MFA_TIME = "mfa_time" 

663 

664 def __init__(self, *args: Any, **kwargs: Any) -> None: 

665 self._mfa_user: Optional[User] = None 

666 super().__init__(*args, **kwargs) 

667 

668 # ------------------------------------------------------------------------- 

669 # mfa_user 

670 # ------------------------------------------------------------------------- 

671 # Set during __init__ by LoggedInUserMfaMixin, or via a more complex 

672 # process by LoginView. 

673 

674 @property 

675 def mfa_user(self) -> Optional[User]: 

676 """ 

677 The user undergoing authentication. 

678 """ 

679 return self._mfa_user 

680 

681 @mfa_user.setter 

682 def mfa_user(self, user: Optional[User]) -> None: 

683 """ 

684 Sets the current user being authenticated. 

685 """ 

686 self._mfa_user = user 

687 

688 # ------------------------------------------------------------------------- 

689 # Dispatch and timeouts 

690 # ------------------------------------------------------------------------- 

691 

692 def dispatch(self) -> Response: 

693 # Docstring in superclass. 

694 if self.timed_out(): 

695 self.fail_timed_out() # will raise 

696 

697 return super().dispatch() 

698 

699 def timed_out(self) -> bool: 

700 """ 

701 Has authentication timed out? 

702 """ 

703 if self.step != self.STEP_MFA: 

704 return False 

705 

706 timeout = self.request.config.mfa_timeout_s 

707 if timeout == 0: 

708 return False 

709 

710 login_time = self.state.get(self.KEY_MFA_TIME) 

711 if login_time is None: 

712 return False 

713 

714 return int(time.time()) > login_time + timeout 

715 

716 # ------------------------------------------------------------------------- 

717 # Extra context for templates 

718 # ------------------------------------------------------------------------- 

719 

720 def get_extra_context(self) -> Dict[str, Any]: 

721 # Docstring in superclass. 

722 if self.step == self.STEP_MFA: 

723 context = { 

724 self.KEY_TITLE_HTML: self.request.icon_text( 

725 icon=self.get_mfa_icon(), text=self.get_mfa_title() 

726 ), 

727 self.KEY_INSTRUCTIONS: self.get_mfa_instructions(), 

728 } 

729 return context 

730 else: 

731 return {} 

732 

733 def get_mfa_icon(self) -> str: 

734 """ 

735 Returns an icon to let the user know which MFA method is being used. 

736 """ 

737 method = self.mfa_user.mfa_method 

738 

739 if method == MfaMethod.TOTP: 

740 return "shield-shaded" 

741 

742 elif method == MfaMethod.HOTP_EMAIL: 

743 return "envelope" 

744 

745 elif method == MfaMethod.HOTP_SMS: 

746 return "chat-left-dots" 

747 

748 else: 

749 return "Error: get_mfa_icon() called for invalid MFA method" 

750 

751 def get_mfa_title(self) -> str: 

752 """ 

753 Returns a title for the page that requests the code itself. 

754 """ 

755 _ = self.request.gettext 

756 method = self.mfa_user.mfa_method 

757 

758 if method == MfaMethod.TOTP: 

759 return _("Authenticate via your authentication app") 

760 

761 elif method == MfaMethod.HOTP_EMAIL: 

762 return _("Authenticate via e-mail") 

763 

764 elif method == MfaMethod.HOTP_SMS: 

765 return _("Authenticate via SMS") 

766 

767 else: 

768 return "Error: get_mfa_title() called for invalid MFA method" 

769 

770 def get_mfa_instructions(self) -> str: 

771 """ 

772 Return user instructions for the relevant MFA method. 

773 """ 

774 _ = self.request.gettext 

775 method = self.mfa_user.mfa_method 

776 

777 if method == MfaMethod.TOTP: 

778 return _( 

779 "Enter the code for CamCOPS displayed on your " 

780 "authentication app." 

781 ) 

782 

783 elif method == MfaMethod.HOTP_EMAIL: 

784 return _("We've sent a code by email to {}.").format( 

785 self.mfa_user.partial_email 

786 ) 

787 

788 elif method == MfaMethod.HOTP_SMS: 

789 return _("We've sent a code by text message to {}").format( 

790 self.mfa_user.partial_phone_number 

791 ) 

792 

793 else: 

794 return "Error: get_mfa_instruction() called for invalid MFA method" 

795 

796 # ------------------------------------------------------------------------- 

797 # MFA handling 

798 # ------------------------------------------------------------------------- 

799 

800 def handle_authentication_type(self) -> None: 

801 """ 

802 Function to be called when we want an MFA code to be created. 

803 """ 

804 mfa_user = self.mfa_user 

805 mfa_user.ensure_mfa_info() 

806 mfa_method = mfa_user.mfa_method 

807 

808 if mfa_method == MfaMethod.TOTP: 

809 # Nothing to do. The app generates the code. 

810 return 

811 

812 # Record the time of code creation: 

813 self.state[self.KEY_MFA_TIME] = int(time.time()) 

814 

815 if mfa_method == MfaMethod.HOTP_EMAIL: 

816 self.send_authentication_email() 

817 elif mfa_method == MfaMethod.HOTP_SMS: 

818 self.send_authentication_sms() 

819 else: 

820 raise ValueError( 

821 f"MfaMixin.handle_authentication_type: " 

822 f"unexpected mfa_method {mfa_method!r}" 

823 ) 

824 

825 def send_authentication_email(self) -> None: 

826 """ 

827 E-mail the code to the user. 

828 """ 

829 _ = self.request.gettext 

830 config = self.request.config 

831 kwargs = dict( 

832 from_addr=config.email_from, 

833 to=self.mfa_user.email, 

834 subject=_("CamCOPS authentication"), 

835 body=self.get_hotp_message(), 

836 content_type=MimeType.TEXT, 

837 ) 

838 

839 email = Email(**kwargs) 

840 success = email.send( 

841 host=config.email_host, 

842 username=config.email_host_username, 

843 password=config.email_host_password, 

844 port=config.email_port, 

845 use_tls=config.email_use_tls, 

846 ) 

847 if success: 

848 msg = _("E-mail sent") 

849 queue = FlashQueue.SUCCESS 

850 else: 

851 msg = _( 

852 "Failed to send e-mail! " 

853 "Please try again or contact your administrator." 

854 ) 

855 queue = FlashQueue.DANGER 

856 self.request.session.flash(msg, queue=queue) 

857 

858 def send_authentication_sms(self) -> None: 

859 """ 

860 Send a code to the user via SMS (text message). 

861 """ 

862 backend = self.request.config.sms_backend 

863 backend.send_sms( 

864 self.mfa_user.raw_phone_number, self.get_hotp_message() 

865 ) 

866 

867 def get_hotp_message(self) -> str: 

868 """ 

869 Return a human-readable message containing an HOTP (HMAC-Based One-Time 

870 Password). 

871 """ 

872 self.mfa_user.hotp_counter += 1 

873 self.request.dbsession.add(self.mfa_user) 

874 _ = self.request.gettext 

875 key = self.mfa_user.mfa_secret_key 

876 assert key, f"Bug: self.mfa_user.mfa_secret_key = {key!r}" 

877 handler = pyotp.HOTP(key) 

878 code = handler.at(self.mfa_user.hotp_counter) 

879 return _("Your CamCOPS verification code is {}").format(code) 

880 

881 def otp_is_valid(self, appstruct: Dict[str, Any]) -> bool: 

882 """ 

883 Is the code being offered by the user the right one? 

884 """ 

885 otp = appstruct.get(ViewParam.ONE_TIME_PASSWORD) 

886 return self.mfa_user.verify_one_time_password(otp) 

887 

888 # ------------------------------------------------------------------------- 

889 # Ways to fail 

890 # ------------------------------------------------------------------------- 

891 

892 def fail_bad_mfa_code(self) -> NoReturn: 

893 """ 

894 Fail because the code was wrong. 

895 """ 

896 _ = self.request.gettext 

897 self.fail(_("You entered an invalid code. Please try again.")) 

898 

899 def fail_timed_out(self) -> NoReturn: 

900 """ 

901 Fail because the process timed out. 

902 """ 

903 _ = self.request.gettext 

904 self.fail(_("Your code expired. Please try again.")) 

905 

906 

907class LoggedInUserMfaMixin(MfaMixin): 

908 """ 

909 Handles multi-factor authentication for the currently logged in user 

910 (everything except :class:`LoginView`). 

911 """ 

912 

913 def __init__(self, *args: Any, **kwargs: Any) -> None: 

914 super().__init__(*args, **kwargs) 

915 self.mfa_user = self.request.user 

916 

917 

918class LoginView(MfaMixin, FormView): 

919 """ 

920 Multi-factor authentication for the login process. 

921 Sequences is: (1) password; (2) MFA, if enabled. 

922 

923 Inheritance (as of 2021-10-06): 

924 

925 - webview.LoginView 

926 

927 - webview.MfaMixin 

928 

929 - cc_view_classes.FormWizardMixin 

930 

931 - cc_view_classes.FormView 

932 

933 - cc_view_classes.TemplateResponseMixin 

934 

935 - cc_view_classes.BaseFormView 

936 

937 - cc_view_classes.FormMixin 

938 

939 - cc_view_classes.ContextMixin 

940 

941 - cc_view_classes.ProcessFormView -- provides ``get()``, ``post()`` 

942 

943 - cc_view_classes.View -- owns ``request``, provides ``dispatch()`` 

944 """ 

945 

946 KEY_MFA_USER_ID = "mfa_user_id" 

947 

948 _mfa_user: Optional[User] 

949 wizard_first_step = MfaMixin.STEP_PASSWORD 

950 wizard_forms = { 

951 MfaMixin.STEP_PASSWORD: LoginForm, # 1. enter username/password 

952 MfaMixin.STEP_MFA: OtpTokenForm, # 2. enter one-time code 

953 } 

954 wizard_templates = { 

955 MfaMixin.STEP_PASSWORD: "login.mako", 

956 MfaMixin.STEP_MFA: "login_token.mako", 

957 } 

958 

959 def __init__(self, *args: Any, **kwargs: Any) -> None: 

960 super().__init__(*args, **kwargs) 

961 

962 # ------------------------------------------------------------------------- 

963 # mfa_user 

964 # ------------------------------------------------------------------------- 

965 # Slightly more complex here, since our user isn't logged in properly yet. 

966 

967 @property 

968 def mfa_user(self) -> Optional[User]: 

969 # Docstring in superclass. 

970 if self._mfa_user is None: 

971 try: 

972 user_id = self.state[self.KEY_MFA_USER_ID] 

973 self._mfa_user = ( 

974 self.request.dbsession.query(User) 

975 .filter(User.id == user_id) 

976 .one_or_none() 

977 ) 

978 except KeyError: 

979 pass 

980 

981 return self._mfa_user 

982 

983 @mfa_user.setter 

984 def mfa_user(self, user: Optional[User]) -> None: 

985 # Docstring in superclass. 

986 self._mfa_user = user 

987 if user is None: 

988 self.state[self.KEY_MFA_USER_ID] = None 

989 return 

990 

991 self.state[self.KEY_MFA_USER_ID] = user.id 

992 

993 # ------------------------------------------------------------------------- 

994 # Content for forms 

995 # ------------------------------------------------------------------------- 

996 

997 def get_form_values(self) -> Dict: 

998 # Docstring in superclass. 

999 return {ViewParam.REDIRECT_URL: self.get_redirect_url()} 

1000 

1001 def get_form_kwargs(self) -> Dict[str, Any]: 

1002 # Docstring in superclass. 

1003 kwargs = super().get_form_kwargs() 

1004 

1005 cfg = self.request.config 

1006 autocomplete_password = not cfg.disable_password_autocomplete 

1007 kwargs["autocomplete_password"] = autocomplete_password 

1008 

1009 return kwargs 

1010 

1011 # ------------------------------------------------------------------------- 

1012 # Form validation, and sequence handling 

1013 # ------------------------------------------------------------------------- 

1014 

1015 def form_valid_process_data( 

1016 self, form: "Form", appstruct: Dict[str, Any] 

1017 ) -> None: 

1018 # Docstring in superclass. 

1019 if self.step == self.STEP_PASSWORD: 

1020 self._form_valid_password(appstruct) 

1021 else: 

1022 self._form_valid_mfa(appstruct) 

1023 

1024 super().form_valid_process_data(form, appstruct) 

1025 

1026 def _form_valid_password(self, appstruct: Dict[str, Any]) -> None: 

1027 """ 

1028 Called when the user has entered a username/password (via a validated 

1029 form). 

1030 """ 

1031 username = appstruct.get(ViewParam.USERNAME) 

1032 

1033 # Is the user locked? 

1034 locked_out_until = SecurityAccountLockout.user_locked_out_until( 

1035 self.request, username 

1036 ) 

1037 if locked_out_until is not None: 

1038 self.fail_locked_out(locked_out_until) # will raise 

1039 

1040 password = appstruct.get(ViewParam.PASSWORD) 

1041 

1042 # Is the username/password combination correct? 

1043 user = User.get_user_from_username_password( 

1044 self.request, username, password 

1045 ) # checks password 

1046 

1047 # Some trade-off between usability and security here. 

1048 # For failed attempts, the user has some idea as to what the problem 

1049 # is. 

1050 if user is None: 

1051 # Unsuccessful. Note that the username may/may not be genuine. 

1052 SecurityLoginFailure.act_on_login_failure(self.request, username) 

1053 # ... may lock the account 

1054 # Now, call audit() before session.logout(), as the latter 

1055 # will wipe the session IP address: 

1056 self.request.camcops_session.logout() 

1057 self.fail_not_authorized() # will raise 

1058 

1059 if not user.may_use_webviewer: 

1060 # This means a user who can upload from tablet but who cannot 

1061 # log in via the web front end. 

1062 self.fail_not_authorized() # will raise 

1063 

1064 self.mfa_user = user 

1065 self._password_next_step() 

1066 self._form_valid_success() 

1067 

1068 def _password_next_step(self) -> None: 

1069 """ 

1070 The user has entered a password correctly; what's the next step? 

1071 """ 

1072 method = self.mfa_user.mfa_method 

1073 if MfaMethod.requires_second_step(method): 

1074 self.step = self.STEP_MFA 

1075 self.handle_authentication_type() 

1076 else: 

1077 self.finish() 

1078 # Guaranteed to be valid; see constructor. 

1079 

1080 def _form_valid_mfa(self, appstruct: Dict[str, Any]) -> None: 

1081 """ 

1082 Called when the user has entered an MFA code (via a validated form). 

1083 """ 

1084 if not self.otp_is_valid(appstruct): 

1085 self.fail_bad_mfa_code() # will raise 

1086 

1087 self.finish() 

1088 self._form_valid_success() 

1089 

1090 def _form_valid_success(self) -> None: 

1091 """ 

1092 Called when the next step has been determined. One possible outcome is 

1093 a successful login. 

1094 """ 

1095 if self.finished(): 

1096 # Successful login. 

1097 self.mfa_user.login( 

1098 self.request 

1099 ) # will clear login failure record 

1100 self.request.camcops_session.login(self.mfa_user) 

1101 audit(self.request, "Login", user_id=self.mfa_user.id) 

1102 

1103 # OK, logged in. 

1104 # Redirect to the main menu, or wherever the user was heading. 

1105 # HOWEVER, that may lead us to a "change password" or "agree terms" 

1106 # page, via the permissions system (Permission.HAPPY or not). 

1107 

1108 # ------------------------------------------------------------------------- 

1109 # Next destinations 

1110 # ------------------------------------------------------------------------- 

1111 

1112 def get_success_url(self) -> str: 

1113 # Docstring in superclass. 

1114 if self.finished(): 

1115 return self.get_redirect_url() 

1116 

1117 return self.request.route_url( 

1118 Routes.LOGIN, 

1119 _query={ViewParam.REDIRECT_URL: self.get_redirect_url()}, 

1120 ) 

1121 

1122 def get_failure_url(self) -> None: 

1123 # Docstring in superclass. 

1124 return self.request.route_url( 

1125 Routes.LOGIN, 

1126 _query={ViewParam.REDIRECT_URL: self.get_redirect_url()}, 

1127 ) 

1128 

1129 def get_redirect_url(self) -> str: 

1130 """ 

1131 We may be logging in after a timeout, in which case we can redirect the 

1132 user back to where they were before. Otherwise, they go to the main 

1133 page. 

1134 """ 

1135 return self.request.get_redirect_url_param( 

1136 ViewParam.REDIRECT_URL, default=self.request.route_url(Routes.HOME) 

1137 ) 

1138 

1139 # ------------------------------------------------------------------------- 

1140 # Ways to fail 

1141 # ------------------------------------------------------------------------- 

1142 

1143 def fail_not_authorized(self) -> NoReturn: 

1144 """ 

1145 Fail because the user has not logged in correctly or is not authorized 

1146 to log in. 

1147 

1148 Pretends to the type checker that it returns a response, so callers can 

1149 use ``return`` for code safety. 

1150 """ 

1151 _ = self.request.gettext 

1152 self.fail( 

1153 _("Invalid username/password (or user not authorized).") 

1154 ) # will raise 

1155 # assert False, "Bug: LoginView.fail_not_authorized() falling through" 

1156 

1157 def fail_locked_out(self, locked_until: Pendulum) -> NoReturn: 

1158 """ 

1159 Raises a failure because the user is locked out. 

1160 

1161 Pretends to the type checker that it returns a response, so callers can 

1162 use ``return`` for code safety. 

1163 """ 

1164 _ = self.request.gettext 

1165 locked_until = format_datetime( 

1166 locked_until, DateFormat.LONG_DATETIME_WITH_DAY, _("(never)") 

1167 ) 

1168 message = _( 

1169 "Account locked until {} due to multiple login failures. " 

1170 "Try again later or contact your administrator." 

1171 ).format(locked_until) 

1172 self.fail(message) # will raise 

1173 # assert False, "Bug: LoginView.fail_locked_out() falling through" 

1174 

1175 

1176@view_config( 

1177 route_name=Routes.LOGIN, 

1178 permission=NO_PERMISSION_REQUIRED, 

1179 http_cache=NEVER_CACHE, 

1180) 

1181def login_view(req: "CamcopsRequest") -> Response: 

1182 """ 

1183 Login view. 

1184 

1185 - GET: presents the login screen 

1186 - POST/submit: attempts to log in (with optional multi-factor 

1187 authentication); 

1188 

1189 - failure: returns a login failure view or an account lockout view 

1190 - success: 

1191 

1192 - redirects to the redirection view if one was specified; 

1193 - redirects to the home view if not. 

1194 """ 

1195 return LoginView(req).dispatch() 

1196 

1197 

1198@view_config( 

1199 route_name=Routes.LOGOUT, 

1200 permission=Authenticated, 

1201 renderer="logged_out.mako", 

1202 http_cache=NEVER_CACHE, 

1203) 

1204def logout(req: "CamcopsRequest") -> Dict[str, Any]: 

1205 """ 

1206 Logs a session out, and returns the "logged out" view. 

1207 """ 

1208 audit(req, "Logout") 

1209 ccsession = req.camcops_session 

1210 ccsession.logout() 

1211 return dict() 

1212 

1213 

1214@view_config( 

1215 route_name=Routes.OFFER_TERMS, 

1216 permission=Authenticated, 

1217 renderer="offer_terms.mako", 

1218 http_cache=NEVER_CACHE, 

1219) 

1220def offer_terms(req: "CamcopsRequest") -> Response: 

1221 """ 

1222 - GET: show terms/conditions and request acknowledgement 

1223 - POST/submit: note the user's agreement; redirect to the home view. 

1224 """ 

1225 form = OfferTermsForm( 

1226 request=req, agree_button_text=req.wsstring(SS.DISCLAIMER_AGREE) 

1227 ) 

1228 

1229 if FormAction.SUBMIT in req.POST: 

1230 req.user.agree_terms(req) 

1231 return HTTPFound(req.route_url(Routes.HOME)) # redirect 

1232 

1233 return render_to_response( 

1234 "offer_terms.mako", 

1235 dict( 

1236 title=req.wsstring(SS.DISCLAIMER_TITLE), 

1237 subtitle=req.wsstring(SS.DISCLAIMER_SUBTITLE), 

1238 content=req.wsstring(SS.DISCLAIMER_CONTENT), 

1239 form=form.render(), 

1240 head_form_html=get_head_form_html(req, [form]), 

1241 ), 

1242 request=req, 

1243 ) 

1244 

1245 

1246@forbidden_view_config(http_cache=NEVER_CACHE) 

1247def forbidden(req: "CamcopsRequest") -> Response: 

1248 """ 

1249 Generic place that Pyramid comes when permission is denied for a view. 

1250 

1251 We will offer one of these: 

1252 

1253 - Must change password? Redirect to "change own password" view. 

1254 - Must agree terms? Redirect to "offer terms" view. 

1255 - Otherwise: a generic "forbidden" view. 

1256 """ 

1257 # I was doing this: 

1258 if req.has_permission(Authenticated): 

1259 user = req.user 

1260 assert user, "Bug! Authenticated but no user...!?" 

1261 if user.must_change_password: 

1262 return HTTPFound(req.route_url(Routes.CHANGE_OWN_PASSWORD)) 

1263 if user.must_agree_terms: 

1264 return HTTPFound(req.route_url(Routes.OFFER_TERMS)) 

1265 if user.must_set_mfa_method(req): 

1266 return HTTPFound(req.route_url(Routes.EDIT_OWN_USER_MFA)) 

1267 # ... but with "raise HTTPFound" instead. 

1268 # BUT there is only one level of exception handling in Pyramid, i.e. you 

1269 # can't raise exceptions from exceptions: 

1270 # https://github.com/Pylons/pyramid/issues/436 

1271 # The simplest way round is to use "return", not "raise". 

1272 

1273 redirect_url = req.url 

1274 # Redirects to login page, with onwards redirection to requested 

1275 # destination once logged in: 

1276 querydict = {ViewParam.REDIRECT_URL: redirect_url} 

1277 return render_to_response( 

1278 "forbidden.mako", dict(querydict=querydict), request=req 

1279 ) 

1280 

1281 

1282# ============================================================================= 

1283# Changing passwords 

1284# ============================================================================= 

1285 

1286 

1287class ChangeOwnPasswordView(LoggedInUserMfaMixin, UpdateView): 

1288 """ 

1289 View to change one's own password. 

1290 

1291 If MFA is enabled, you need to (re-)authenticate via MFA to do so. 

1292 Then, you need to supply your own password to change it (regardless). 

1293 Sequence is therefore (1) MFA, optionally; (2) change password. 

1294 

1295 Most documentation in superclass. 

1296 """ 

1297 

1298 model_form_dict: Dict[str, "Form"] = {} 

1299 STEP_CHANGE_PASSWORD = "change_password" 

1300 

1301 wizard_forms = { 

1302 MfaMixin.STEP_MFA: OtpTokenForm, 

1303 STEP_CHANGE_PASSWORD: ChangeOwnPasswordForm, 

1304 } 

1305 

1306 wizard_templates = { 

1307 MfaMixin.STEP_MFA: "login_token.mako", 

1308 STEP_CHANGE_PASSWORD: "change_own_password.mako", 

1309 } 

1310 

1311 wizard_extra_contexts: Dict[str, Dict[str, Any]] = { 

1312 MfaMixin.STEP_MFA: {}, 

1313 STEP_CHANGE_PASSWORD: {}, 

1314 } 

1315 

1316 def get_first_step(self) -> str: 

1317 if self.request.user.mfa_method == MfaMethod.NO_MFA: 

1318 return self.STEP_CHANGE_PASSWORD 

1319 

1320 return self.STEP_MFA 

1321 

1322 def get(self) -> Response: 

1323 if self.step == self.STEP_MFA: 

1324 self.handle_authentication_type() 

1325 

1326 _ = self.request.gettext 

1327 

1328 if self.request.user.must_change_password: 

1329 self.request.session.flash( 

1330 _("Your password has expired and must be changed."), 

1331 queue=FlashQueue.DANGER, 

1332 ) 

1333 return super().get() 

1334 

1335 def get_object(self) -> User: 

1336 return self.request.user 

1337 

1338 def get_form_kwargs(self) -> Dict[str, Any]: 

1339 kwargs = super().get_form_kwargs() 

1340 kwargs.update(must_differ=True) 

1341 return kwargs 

1342 

1343 def get_success_url(self) -> str: 

1344 if self.finished(): 

1345 return self.request.route_url(Routes.HOME) 

1346 

1347 return self.request.route_url(Routes.CHANGE_OWN_PASSWORD) 

1348 

1349 def get_failure_url(self) -> str: 

1350 return self.request.route_url(Routes.HOME) 

1351 

1352 def form_valid_process_data( 

1353 self, form: "Form", appstruct: Dict[str, Any] 

1354 ) -> None: 

1355 if self.step == self.STEP_MFA: 

1356 if not self.otp_is_valid(appstruct): 

1357 self.fail_bad_mfa_code() # will raise 

1358 

1359 super().form_valid_process_data(form, appstruct) 

1360 

1361 def set_object_properties(self, appstruct: Dict[str, Any]) -> None: 

1362 # Superclass method overridden, not called. 

1363 if self.step == self.STEP_MFA: 

1364 self.step = self.STEP_CHANGE_PASSWORD 

1365 elif self.step == self.STEP_CHANGE_PASSWORD: 

1366 self.set_password(appstruct) 

1367 self.finish() 

1368 else: 

1369 assert f"ChangeOwnPasswordView: bad step {self.step!r}" 

1370 

1371 def set_password(self, appstruct: Dict[str, Any]) -> None: 

1372 """ 

1373 Success; change the user's password. 

1374 """ 

1375 user = cast(User, self.object) 

1376 # ... form has validated old password, etc. 

1377 new_password = appstruct[ViewParam.NEW_PASSWORD] 

1378 user.set_password(self.request, new_password) 

1379 

1380 _ = self.request.gettext 

1381 self.request.session.flash( 

1382 _( 

1383 "You have changed your password. " 

1384 "If you store your password in your CamCOPS tablet " 

1385 "application, remember to change it there as well." 

1386 ), 

1387 queue=FlashQueue.SUCCESS, 

1388 ) 

1389 

1390 

1391@view_config( 

1392 route_name=Routes.CHANGE_OWN_PASSWORD, 

1393 permission=Authenticated, 

1394 http_cache=NEVER_CACHE, 

1395) 

1396def change_own_password(req: "CamcopsRequest") -> Response: 

1397 """ 

1398 For any user: to change their own password. 

1399 

1400 - GET: offer "change own password" view 

1401 - POST/submit: change the password and display success message. 

1402 """ 

1403 view = ChangeOwnPasswordView(req) 

1404 

1405 return view.dispatch() 

1406 

1407 

1408class EditUserAuthenticationView(LoggedInUserMfaMixin, UpdateView): 

1409 """ 

1410 View to edit aspects of another user. 

1411 """ 

1412 

1413 model_form_dict: Dict[str, "Form"] = {} 

1414 object_class = User 

1415 pk_param = ViewParam.USER_ID 

1416 server_pk_name = "id" 

1417 

1418 def get(self) -> Response: 

1419 if self.step == self.STEP_MFA: 

1420 self.handle_authentication_type() 

1421 

1422 return super().get() 

1423 

1424 def get_object(self) -> User: 

1425 user = cast(User, super().get_object()) 

1426 assert_may_edit_user(self.request, user) 

1427 

1428 return user 

1429 

1430 def get_extra_context(self) -> Dict[str, Any]: 

1431 if self.step == self.STEP_MFA: 

1432 return super().get_extra_context() 

1433 

1434 user = cast(User, self.object) 

1435 

1436 return {"username": user.username} 

1437 

1438 def form_valid_process_data( 

1439 self, form: "Form", appstruct: Dict[str, Any] 

1440 ) -> None: 

1441 if self.step == self.STEP_MFA: 

1442 if not self.otp_is_valid(appstruct): 

1443 self.fail_bad_mfa_code() # will raise 

1444 

1445 super().form_valid_process_data(form, appstruct) 

1446 

1447 def get_failure_url(self) -> str: 

1448 return self.request.route_url(Routes.VIEW_ALL_USERS) 

1449 

1450 

1451class ChangeOtherPasswordView(EditUserAuthenticationView): 

1452 """ 

1453 View to change the password for another user. 

1454 """ 

1455 

1456 STEP_CHANGE_PASSWORD = "change_password" 

1457 

1458 wizard_forms = { 

1459 MfaMixin.STEP_MFA: OtpTokenForm, 

1460 STEP_CHANGE_PASSWORD: ChangeOtherPasswordForm, 

1461 } 

1462 

1463 wizard_templates = { 

1464 MfaMixin.STEP_MFA: "login_token.mako", 

1465 STEP_CHANGE_PASSWORD: "change_other_password.mako", 

1466 } 

1467 

1468 def get(self) -> Response: 

1469 if self.get_pk_value() == self.request.user_id: 

1470 raise HTTPFound(self.request.route_url(Routes.CHANGE_OWN_PASSWORD)) 

1471 

1472 return super().get() 

1473 

1474 def get_first_step(self) -> str: 

1475 if self.request.user.mfa_method != MfaMethod.NO_MFA: 

1476 return self.STEP_MFA 

1477 

1478 return self.STEP_CHANGE_PASSWORD 

1479 

1480 def set_object_properties(self, appstruct: Dict[str, Any]) -> None: 

1481 # Superclass method overridden, not called. 

1482 if self.step == self.STEP_CHANGE_PASSWORD: 

1483 self.set_password(appstruct) 

1484 self.finish() 

1485 return 

1486 

1487 if self.step == self.STEP_MFA: 

1488 self.step = self.STEP_CHANGE_PASSWORD 

1489 

1490 def set_password(self, appstruct: Dict[str, Any]) -> None: 

1491 """ 

1492 Success; change the password for the other user. 

1493 """ 

1494 user = cast(User, self.object) 

1495 _ = self.request.gettext 

1496 new_password = appstruct[ViewParam.NEW_PASSWORD] 

1497 user.set_password(self.request, new_password) 

1498 must_change_pw = appstruct.get(ViewParam.MUST_CHANGE_PASSWORD) 

1499 if must_change_pw: 

1500 user.force_password_change() 

1501 self.request.session.flash( 

1502 _("Password changed for user '{username}'").format( 

1503 username=user.username 

1504 ), 

1505 queue=FlashQueue.SUCCESS, 

1506 ) 

1507 

1508 def get_success_url(self) -> str: 

1509 if self.finished(): 

1510 return self.request.route_url(Routes.VIEW_ALL_USERS) 

1511 

1512 user = cast(User, self.object) 

1513 

1514 return self.request.route_url( 

1515 Routes.CHANGE_OTHER_PASSWORD, _query={ViewParam.USER_ID: user.id} 

1516 ) 

1517 

1518 

1519@view_config( 

1520 route_name=Routes.CHANGE_OTHER_PASSWORD, 

1521 permission=Permission.GROUPADMIN, 

1522 http_cache=NEVER_CACHE, 

1523) 

1524def change_other_password(req: "CamcopsRequest") -> Response: 

1525 """ 

1526 For administrators, to change another's password. 

1527 

1528 - GET: offer "change another's password" view (except that if you're 

1529 changing your own password, return :func:`change_own_password`. 

1530 - POST/submit: change the password and display success message. 

1531 """ 

1532 view = ChangeOtherPasswordView(req) 

1533 return view.dispatch() 

1534 

1535 

1536class EditOtherUserMfaView(EditUserAuthenticationView): 

1537 """ 

1538 View to edit the MFA method for another user. Only permits disabling of 

1539 MFA. (If MFA is mandatory, that will require the other user to set their 

1540 MFA method at next logon.) 

1541 """ 

1542 

1543 STEP_OTHER_USER_MFA = "other_user_mfa" 

1544 

1545 wizard_forms = { 

1546 MfaMixin.STEP_MFA: OtpTokenForm, 

1547 STEP_OTHER_USER_MFA: EditOtherUserMfaForm, 

1548 } 

1549 

1550 wizard_templates = { 

1551 MfaMixin.STEP_MFA: "login_token.mako", 

1552 STEP_OTHER_USER_MFA: "edit_other_user_mfa.mako", 

1553 } 

1554 

1555 def get(self) -> Response: 

1556 if self.get_pk_value() == self.request.user_id: 

1557 raise HTTPFound(self.request.route_url(Routes.EDIT_OWN_USER_MFA)) 

1558 

1559 return super().get() 

1560 

1561 def get_first_step(self) -> str: 

1562 if self.request.user.mfa_method != MfaMethod.NO_MFA: 

1563 return self.STEP_MFA 

1564 

1565 return self.STEP_OTHER_USER_MFA 

1566 

1567 def set_object_properties(self, appstruct: Dict[str, Any]) -> None: 

1568 # Superclass method overridden, not called. 

1569 if self.step == self.STEP_OTHER_USER_MFA: 

1570 self.maybe_disable_mfa(appstruct) 

1571 self.finish() 

1572 return 

1573 

1574 if self.step == self.STEP_MFA: 

1575 self.step = self.STEP_OTHER_USER_MFA 

1576 

1577 def maybe_disable_mfa(self, appstruct: Dict[str, Any]) -> None: 

1578 """ 

1579 If our user asked for it, disable MFA for the user being edited. 

1580 """ 

1581 if appstruct.get(ViewParam.DISABLE_MFA): 

1582 user = cast(User, self.object) 

1583 _ = self.request.gettext 

1584 

1585 user.mfa_method = MfaMethod.NO_MFA 

1586 self.request.session.flash( 

1587 _( 

1588 "Multi-factor authentication disabled for user " 

1589 "'{username}'" 

1590 ).format(username=user.username), 

1591 queue=FlashQueue.SUCCESS, 

1592 ) 

1593 

1594 def get_success_url(self) -> str: 

1595 if self.finished(): 

1596 return self.request.route_url(Routes.VIEW_ALL_USERS) 

1597 

1598 user = cast(User, self.object) 

1599 

1600 return self.request.route_url( 

1601 Routes.EDIT_OTHER_USER_MFA, _query={ViewParam.USER_ID: user.id} 

1602 ) 

1603 

1604 

1605@view_config( 

1606 route_name=Routes.EDIT_OTHER_USER_MFA, 

1607 permission=Permission.GROUPADMIN, 

1608 http_cache=NEVER_CACHE, 

1609) 

1610def edit_other_user_mfa(req: "CamcopsRequest") -> Response: 

1611 """ 

1612 For administrators, to change another users's Multi-factor Authentication. 

1613 Currently it is only possible to disable Multi-factor authentication for 

1614 a user. 

1615 

1616 - GET: offer "edit another's MFA" view (except that if you're 

1617 changing your own MFA, return :func:`edit_own_user_mfa`. 

1618 - POST/submit: edit MFA and display success message. 

1619 """ 

1620 view = EditOtherUserMfaView(req) 

1621 return view.dispatch() 

1622 

1623 

1624class EditOwnUserMfaView(LoggedInUserMfaMixin, UpdateView): 

1625 """ 

1626 View to edit your own MFA method. 

1627 

1628 The inheritance (as of 2021-10-06) illustrates a typical situation: 

1629 

1630 SPECIMEN VIEW CLASS: 

1631 

1632 - webview.EditOwnUserMfaView 

1633 

1634 - webview.LoggedInUserMfaMixin 

1635 

1636 - webview.MfaMixin 

1637 

1638 - cc_view_classes.FormWizardMixin -- with typehint for FormMixin -- 

1639 implements ``state``. 

1640 

1641 - cc_view_classes.UpdateView 

1642 

1643 - cc_view_classes.TemplateResponseMixin 

1644 

1645 - cc_view_classes.BaseUpdateView 

1646 

1647 - cc_view_classes.ModelFormMixin -- implements ``form_valid()`` --> 

1648 ``save_object()`` > ``set_object_properties()`` 

1649 

1650 - cc_view_classes.FormMixin -- implements ``form_valid()``, 

1651 ``get_context_data()``, etc. 

1652 

1653 - cc_view_classes.ContextMixin 

1654 

1655 - cc_view_classes.SingleObjectMixin -- implements ``get_object()`` 

1656 etc. 

1657 

1658 - cc_view_classes.ContextMixin 

1659 

1660 - cc_view_classes.ProcessFormView -- implements ``get()``, ``post()`` 

1661 

1662 - cc_view_classes.View -- owns ``request``, implements 

1663 ``dispatch()`` (which calls ``get()``, ``post()``). 

1664 

1665 SPECIMEN FORM WITHIN THAT VIEW: 

1666 

1667 - cc_forms.MfaMethodForm 

1668 

1669 - cc_forms.InformativeNonceForm 

1670 

1671 - cc_forms.InformativeForm 

1672 

1673 - deform.Form 

1674 

1675 If you subclass A(B, C), then B's superclass methods are called before C's: 

1676 https://www.python.org/download/releases/2.3/mro/; 

1677 https://makina-corpus.com/blog/metier/2014/python-tutorial-understanding-python-mro-class-search-path; 

1678 """ 

1679 

1680 STEP_MFA_METHOD = "mfa_method" 

1681 STEP_TOTP = MfaMethod.TOTP 

1682 STEP_HOTP_EMAIL = MfaMethod.HOTP_EMAIL 

1683 STEP_HOTP_SMS = MfaMethod.HOTP_SMS 

1684 wizard_first_step = STEP_MFA_METHOD 

1685 

1686 wizard_forms = { 

1687 STEP_MFA_METHOD: MfaMethodForm, # 1. choose your MFA method 

1688 STEP_TOTP: MfaTotpForm, # 2a. show TOTP (auth app) QR/alphanumeric code # noqa: E501 

1689 STEP_HOTP_EMAIL: MfaHotpEmailForm, # 2b. choose e-mail address 

1690 STEP_HOTP_SMS: MfaHotpSmsForm, # 2c. choose phone number for SMS 

1691 MfaMixin.STEP_MFA: OtpTokenForm, # 4. request code from user 

1692 } 

1693 

1694 FORM_WITH_TITLE_TEMPLATE = "form_with_title.mako" 

1695 

1696 wizard_templates = { 

1697 STEP_MFA_METHOD: FORM_WITH_TITLE_TEMPLATE, 

1698 STEP_TOTP: FORM_WITH_TITLE_TEMPLATE, 

1699 STEP_HOTP_EMAIL: FORM_WITH_TITLE_TEMPLATE, 

1700 STEP_HOTP_SMS: FORM_WITH_TITLE_TEMPLATE, 

1701 MfaMixin.STEP_MFA: "login_token.mako", 

1702 } 

1703 

1704 hotp_steps = (STEP_HOTP_EMAIL, STEP_HOTP_SMS) 

1705 secret_key_steps = (STEP_TOTP, STEP_HOTP_EMAIL, STEP_HOTP_SMS) 

1706 

1707 def get(self) -> Response: 

1708 if self.step == self.STEP_MFA: 

1709 self.handle_authentication_type() 

1710 

1711 return super().get() 

1712 

1713 def get_model_form_dict(self) -> Dict[str, Any]: 

1714 model_form_dict = {} 

1715 

1716 # Dictionary keys here are attribute names of the User object. 

1717 # Values are form attributes. 

1718 

1719 if self.step == self.STEP_MFA_METHOD: 

1720 model_form_dict["mfa_method"] = ViewParam.MFA_METHOD 

1721 

1722 elif self.step == self.STEP_HOTP_EMAIL: 

1723 model_form_dict["email"] = ViewParam.EMAIL 

1724 

1725 elif self.step == self.STEP_HOTP_SMS: 

1726 model_form_dict["phone_number"] = ViewParam.PHONE_NUMBER 

1727 

1728 if self.step in self.secret_key_steps: 

1729 model_form_dict["mfa_secret_key"] = ViewParam.MFA_SECRET_KEY 

1730 

1731 return model_form_dict 

1732 

1733 def get_object(self) -> User: 

1734 return self.request.user 

1735 

1736 def get_form_values(self) -> Dict[str, Any]: 

1737 # Will call get_model_form_dict() 

1738 form_values = super().get_form_values() 

1739 

1740 if self.step in self.secret_key_steps: 

1741 # Always create a new secret key. This will be written to the 

1742 # user object at the next step, via set_object_properties. 

1743 form_values[ViewParam.MFA_SECRET_KEY] = pyotp.random_base32() 

1744 

1745 return form_values 

1746 

1747 def get_extra_context(self) -> Dict[str, Any]: 

1748 req = self.request 

1749 _ = req.gettext 

1750 if self.step == self.STEP_MFA: 

1751 test_msg = _("Let's test it!") + " " 

1752 context = super().get_extra_context() 

1753 context[self.KEY_INSTRUCTIONS] = ( 

1754 test_msg + self.get_mfa_instructions() 

1755 ) 

1756 return context 

1757 

1758 titles = { 

1759 self.STEP_MFA_METHOD: req.icon_text( 

1760 icon=Icons.MFA, 

1761 text=_("Configure multi-factor authentication settings"), 

1762 ), 

1763 self.STEP_TOTP: req.icon_text( 

1764 icon=Icons.APP_AUTHENTICATOR, 

1765 text=_("Configure authentication with app"), 

1766 ), 

1767 self.STEP_HOTP_EMAIL: req.icon_text( 

1768 icon=Icons.EMAIL_SEND, 

1769 text=_("Configure authentication by email"), 

1770 ), 

1771 self.STEP_HOTP_SMS: req.icon_text( 

1772 icon=Icons.SMS, 

1773 text=_("Configure authentication by text message"), 

1774 ), 

1775 } 

1776 return {MAKO_VAR_TITLE: titles[self.step]} 

1777 

1778 def get_success_url(self) -> str: 

1779 if self.finished(): 

1780 return self.request.route_url(Routes.HOME) 

1781 

1782 return self.request.route_url(Routes.EDIT_OWN_USER_MFA) 

1783 

1784 def get_failure_url(self) -> str: 

1785 # We get here because the user, who has already logged in successfully, 

1786 # has changed their MFA method. Failure doesn't mean they should be 

1787 # logged out instantly -- they may have (for example) misconfigured 

1788 # their phone number, and if they are forcibly logged out now, they are 

1789 # stuffed and require administrator assistance. Instead, we return them 

1790 # to the home screen. 

1791 return self.request.route_url(Routes.HOME) 

1792 

1793 def set_object_properties(self, appstruct: Dict[str, Any]) -> None: 

1794 # Called by ModelFormMixin.form_valid_process_data() -> 

1795 # ModelFormMixin.save_object(). 

1796 

1797 super().set_object_properties(appstruct) 

1798 

1799 if self.step == self.STEP_MFA_METHOD: 

1800 # We are setting the MFA method, including secret key etc. 

1801 user = cast(User, self.object) 

1802 user.set_mfa_method(appstruct.get(ViewParam.MFA_METHOD)) 

1803 

1804 elif self.step == self.STEP_MFA: 

1805 # Code entered. 

1806 if self.otp_is_valid(appstruct): 

1807 _ = self.request.gettext 

1808 self.request.session.flash( 

1809 _("Multi-factor authentication: success!"), 

1810 queue=FlashQueue.SUCCESS, 

1811 ) 

1812 # ... and continue as below 

1813 else: 

1814 return self.fail_bad_mfa_code() # type: ignore[return-value] 

1815 

1816 self._next_step(appstruct) 

1817 

1818 def _next_step(self, appstruct: Dict[str, Any]) -> None: 

1819 if self.step == self.STEP_MFA_METHOD: 

1820 # The user has just chosen their method. 

1821 # 2. Offer them method-specific options 

1822 mfa_method = appstruct.get(ViewParam.MFA_METHOD) 

1823 if mfa_method == MfaMethod.NO_MFA: 

1824 self.finish() 

1825 else: 

1826 self.step = mfa_method 

1827 

1828 elif self.step in ( 

1829 self.STEP_TOTP, 

1830 self.STEP_HOTP_EMAIL, 

1831 self.STEP_HOTP_SMS, 

1832 ): 

1833 # Coming from one of the method-specific steps. 

1834 # 3. Ask for the authentication code. 

1835 self.step = self.STEP_MFA 

1836 

1837 elif self.step == self.STEP_MFA: 

1838 # Authentication code provided. End. 

1839 self.finish() 

1840 

1841 else: 

1842 raise AssertionError( 

1843 f"EditOwnUserMfaView.next_step(): " f"Bad step {self.step!r}" 

1844 ) 

1845 

1846 

1847@view_config( 

1848 route_name=Routes.EDIT_OWN_USER_MFA, 

1849 permission=Authenticated, 

1850 http_cache=NEVER_CACHE, 

1851) 

1852def edit_own_user_mfa(request: "CamcopsRequest") -> Response: 

1853 """ 

1854 Edit your own MFA method. 

1855 """ 

1856 view = EditOwnUserMfaView(request) 

1857 return view.dispatch() 

1858 

1859 

1860# ============================================================================= 

1861# Main menu; simple information things 

1862# ============================================================================= 

1863 

1864 

1865@view_config( 

1866 route_name=Routes.HOME, renderer="main_menu.mako", http_cache=NEVER_CACHE 

1867) 

1868def main_menu(req: "CamcopsRequest") -> Dict[str, Any]: 

1869 """ 

1870 Main CamCOPS menu view. 

1871 """ 

1872 user = req.user 

1873 result = dict( 

1874 authorized_as_groupadmin=user.authorized_as_groupadmin, 

1875 authorized_as_superuser=user.superuser, 

1876 authorized_for_reports=user.authorized_for_reports, 

1877 authorized_to_dump=user.authorized_to_dump, 

1878 authorized_to_manage_patients=user.authorized_to_manage_patients, 

1879 camcops_url=CAMCOPS_URL, 

1880 now=format_datetime(req.now, DateFormat.SHORT_DATETIME_SECONDS), 

1881 server_version=CAMCOPS_SERVER_VERSION, 

1882 ) 

1883 return result 

1884 

1885 

1886# ============================================================================= 

1887# Tasks 

1888# ============================================================================= 

1889 

1890 

1891def edit_filter( 

1892 req: "CamcopsRequest", task_filter: TaskFilter, redirect_url: str 

1893) -> Response: 

1894 """ 

1895 Edit the task filter for the current user. 

1896 

1897 Args: 

1898 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

1899 task_filter: the user's 

1900 :class:`camcops_server.cc_modules.cc_taskfilter.TaskFilter` 

1901 redirect_url: URL to redirect (back) to upon success 

1902 """ 

1903 if FormAction.SET_FILTERS in req.POST: 

1904 form = EditTaskFilterForm(request=req) 

1905 try: 

1906 controls = list(req.POST.items()) 

1907 fa = form.validate(controls) 

1908 # ----------------------------------------------------------------- 

1909 # Apply the changes 

1910 # ----------------------------------------------------------------- 

1911 who = fa.get(ViewParam.WHO) 

1912 what = fa.get(ViewParam.WHAT) 

1913 when = fa.get(ViewParam.WHEN) 

1914 admin = fa.get(ViewParam.ADMIN) 

1915 task_filter.surname = who.get(ViewParam.SURNAME) 

1916 task_filter.forename = who.get(ViewParam.FORENAME) 

1917 task_filter.dob = who.get(ViewParam.DOB) 

1918 task_filter.sex = who.get(ViewParam.SEX) 

1919 task_filter.idnum_criteria = [ 

1920 IdNumReference( 

1921 which_idnum=x[ViewParam.WHICH_IDNUM], 

1922 idnum_value=x[ViewParam.IDNUM_VALUE], 

1923 ) 

1924 for x in who.get(ViewParam.ID_REFERENCES) 

1925 ] 

1926 task_filter.task_types = what.get(ViewParam.TASKS) 

1927 task_filter.text_contents = what.get(ViewParam.TEXT_CONTENTS) 

1928 task_filter.complete_only = what.get(ViewParam.COMPLETE_ONLY) 

1929 task_filter.start_datetime = when.get(ViewParam.START_DATETIME) 

1930 task_filter.end_datetime = when.get(ViewParam.END_DATETIME) 

1931 task_filter.device_ids = admin.get(ViewParam.DEVICE_IDS) 

1932 task_filter.adding_user_ids = admin.get(ViewParam.USER_IDS) 

1933 task_filter.group_ids = admin.get(ViewParam.GROUP_IDS) 

1934 

1935 return HTTPFound(redirect_url) 

1936 except ValidationFailure as e: 

1937 rendered_form = e.render() 

1938 else: 

1939 if FormAction.CLEAR_FILTERS in req.POST: 

1940 # skip validation 

1941 task_filter.clear() 

1942 who = { 

1943 ViewParam.SURNAME: task_filter.surname, 

1944 ViewParam.FORENAME: task_filter.forename, 

1945 ViewParam.DOB: task_filter.dob, 

1946 ViewParam.SEX: task_filter.sex or "", 

1947 ViewParam.ID_REFERENCES: [ 

1948 { 

1949 ViewParam.WHICH_IDNUM: x.which_idnum, 

1950 ViewParam.IDNUM_VALUE: x.idnum_value, 

1951 } 

1952 for x in task_filter.idnum_criteria 

1953 ], 

1954 } 

1955 what = { 

1956 ViewParam.TASKS: task_filter.task_types, 

1957 ViewParam.TEXT_CONTENTS: task_filter.text_contents, 

1958 ViewParam.COMPLETE_ONLY: task_filter.complete_only, 

1959 } 

1960 when = { 

1961 ViewParam.START_DATETIME: task_filter.start_datetime, 

1962 ViewParam.END_DATETIME: task_filter.end_datetime, 

1963 } 

1964 admin = { 

1965 ViewParam.DEVICE_IDS: task_filter.device_ids, 

1966 ViewParam.USER_IDS: task_filter.adding_user_ids, 

1967 ViewParam.GROUP_IDS: task_filter.group_ids, 

1968 } 

1969 open_who = any(i for i in who.values()) 

1970 open_what = any(i for i in what.values()) 

1971 open_when = any(i for i in when.values()) 

1972 open_admin = any(i for i in admin.values()) 

1973 fa = { 

1974 ViewParam.WHO: who, 

1975 ViewParam.WHAT: what, 

1976 ViewParam.WHEN: when, 

1977 ViewParam.ADMIN: admin, 

1978 } 

1979 form = EditTaskFilterForm( 

1980 request=req, 

1981 open_admin=open_admin, 

1982 open_what=open_what, 

1983 open_when=open_when, 

1984 open_who=open_who, 

1985 ) 

1986 rendered_form = form.render(fa) 

1987 

1988 return render_to_response( 

1989 "filter_edit.mako", 

1990 dict( 

1991 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

1992 ), 

1993 request=req, 

1994 ) 

1995 

1996 

1997@view_config(route_name=Routes.SET_FILTERS, http_cache=NEVER_CACHE) 

1998def set_filters(req: "CamcopsRequest") -> Response: 

1999 """ 

2000 View to set the task filters for the current user. 

2001 """ 

2002 redirect_url = req.get_redirect_url_param( 

2003 ViewParam.REDIRECT_URL, req.route_url(Routes.VIEW_TASKS) 

2004 ) 

2005 task_filter = req.camcops_session.get_task_filter() 

2006 return edit_filter(req, task_filter=task_filter, redirect_url=redirect_url) 

2007 

2008 

2009@view_config( 

2010 route_name=Routes.VIEW_TASKS, 

2011 renderer="view_tasks.mako", 

2012 http_cache=NEVER_CACHE, 

2013) 

2014def view_tasks(req: "CamcopsRequest") -> Dict[str, Any]: 

2015 """ 

2016 Main view displaying tasks and applicable filters. 

2017 """ 

2018 ccsession = req.camcops_session 

2019 user = req.user 

2020 taskfilter = ccsession.get_task_filter() 

2021 

2022 # Read from the GET parameters (or in some cases potentially POST but those 

2023 # will be re-read). 

2024 rows_per_page = req.get_int_param( 

2025 ViewParam.ROWS_PER_PAGE, 

2026 ccsession.number_to_view or DEFAULT_ROWS_PER_PAGE, 

2027 ) 

2028 page_num = req.get_int_param(ViewParam.PAGE, 1) 

2029 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True) 

2030 

2031 errors = False 

2032 

2033 # "Number of tasks per page" form 

2034 tpp_form = TasksPerPageForm(request=req) 

2035 if FormAction.SUBMIT_TASKS_PER_PAGE in req.POST: 

2036 try: 

2037 controls = list(req.POST.items()) 

2038 tpp_appstruct = tpp_form.validate(controls) 

2039 rows_per_page = tpp_appstruct.get(ViewParam.ROWS_PER_PAGE) 

2040 ccsession.number_to_view = rows_per_page 

2041 except ValidationFailure: 

2042 errors = True 

2043 rendered_tpp_form = tpp_form.render() 

2044 else: 

2045 tpp_appstruct = {ViewParam.ROWS_PER_PAGE: rows_per_page} 

2046 rendered_tpp_form = tpp_form.render(tpp_appstruct) 

2047 

2048 # Refresh tasks. Slightly pointless. Doesn't need validating. The user 

2049 # could just press the browser's refresh button, but this improves the UI 

2050 # slightly. 

2051 refresh_form = RefreshTasksForm(request=req) 

2052 rendered_refresh_form = refresh_form.render() 

2053 

2054 # Get tasks, unless there have been form errors. 

2055 # In principle, for some filter settings (single task, no "complete" 

2056 # preference...) we could produce an ORM query and use SqlalchemyOrmPage, 

2057 # which would apply LIMIT/OFFSET (or equivalent) to the query, and be 

2058 # very nippy. In practice, this is probably an unusual setting, so we'll 

2059 # simplify things here with a Python list regardless of the settings. 

2060 if errors: 

2061 collection = [] # type: ignore[var-annotated] 

2062 else: 

2063 collection = ( 

2064 # SECURITY APPLIED HERE 

2065 TaskCollection( # type: ignore[assignment] 

2066 req=req, 

2067 taskfilter=taskfilter, 

2068 sort_method_global=TaskSortMethod.CREATION_DATE_DESC, 

2069 via_index=via_index, 

2070 ).all_tasks_or_indexes_or_query 

2071 or [] 

2072 ) 

2073 paginator = ( 

2074 SqlalchemyOrmPage if isinstance(collection, Query) else CamcopsPage 

2075 ) 

2076 page = paginator( 

2077 collection, 

2078 page=page_num, 

2079 items_per_page=rows_per_page, 

2080 url_maker=PageUrl(req), 

2081 request=req, 

2082 ) 

2083 return dict( 

2084 page=page, 

2085 head_form_html=get_head_form_html(req, [tpp_form, refresh_form]), 

2086 tpp_form=rendered_tpp_form, 

2087 refresh_form=rendered_refresh_form, 

2088 no_patient_selected_and_user_restricted=( 

2089 not user.may_view_all_patients_when_unfiltered 

2090 and not taskfilter.any_specific_patient_filtering() 

2091 ), 

2092 user=user, 

2093 ) 

2094 

2095 

2096@view_config(route_name=Routes.TASK, http_cache=NEVER_CACHE) 

2097def serve_task(req: "CamcopsRequest") -> Response: 

2098 """ 

2099 View that serves an individual task, in a variety of possible formats 

2100 (e.g. HTML, PDF, XML). 

2101 """ 

2102 _ = req.gettext 

2103 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML, lower=True) 

2104 tablename = req.get_str_param( 

2105 ViewParam.TABLE_NAME, validator=validate_task_tablename 

2106 ) 

2107 server_pk = req.get_int_param(ViewParam.SERVER_PK) 

2108 anonymise = req.get_bool_param(ViewParam.ANONYMISE, False) 

2109 

2110 task = task_factory(req, tablename, server_pk) # SECURITY APPLIED HERE 

2111 

2112 if task is None: 

2113 raise HTTPNotFound( # raise, don't return 

2114 f"{_('Task not found or not permitted:')} " 

2115 f"tablename={tablename!r}, server_pk={server_pk!r}" 

2116 ) 

2117 

2118 task.audit(req, "Viewed " + viewtype.upper()) 

2119 

2120 if viewtype == ViewArg.HTML: 

2121 return Response(task.get_html(req=req, anonymise=anonymise)) 

2122 elif viewtype == ViewArg.PDF: 

2123 return PdfResponse( 

2124 body=task.get_pdf(req, anonymise=anonymise), 

2125 filename=task.suggested_pdf_filename(req, anonymise=anonymise), 

2126 ) 

2127 elif viewtype == ViewArg.PDFHTML: # debugging option; no direct hyperlink 

2128 return Response(task.get_pdf_html(req, anonymise=anonymise)) 

2129 elif viewtype == ViewArg.XML: 

2130 options = TaskExportOptions( 

2131 xml_include_ancillary=True, 

2132 include_blobs=req.get_bool_param(ViewParam.INCLUDE_BLOBS, True), 

2133 xml_include_comments=req.get_bool_param( 

2134 ViewParam.INCLUDE_COMMENTS, True 

2135 ), 

2136 xml_include_calculated=req.get_bool_param( 

2137 ViewParam.INCLUDE_CALCULATED, True 

2138 ), 

2139 xml_include_patient=req.get_bool_param( 

2140 ViewParam.INCLUDE_PATIENT, True 

2141 ), 

2142 xml_include_plain_columns=True, 

2143 xml_include_snomed=req.get_bool_param( 

2144 ViewParam.INCLUDE_SNOMED, True 

2145 ), 

2146 xml_with_header_comments=True, 

2147 ) 

2148 return XmlResponse(task.get_xml(req=req, options=options)) 

2149 elif viewtype == ViewArg.FHIRJSON: # debugging option 

2150 dummy_recipient = ExportRecipient() 

2151 bundle = task.get_fhir_bundle( 

2152 req, dummy_recipient, skip_docs_if_other_content=True 

2153 ) 

2154 return JsonResponse(json.dumps(bundle.as_json(), indent=JSON_INDENT)) 

2155 else: 

2156 permissible = ( 

2157 ViewArg.FHIRJSON, 

2158 ViewArg.HTML, 

2159 ViewArg.PDF, 

2160 ViewArg.PDFHTML, 

2161 ViewArg.XML, 

2162 ) 

2163 raise HTTPBadRequest( 

2164 f"{_('Bad output type:')} {viewtype!r} " 

2165 f"({_('permissible:')} {permissible!r})" 

2166 ) 

2167 

2168 

2169def view_patient(req: "CamcopsRequest", patient_server_pk: int) -> Response: 

2170 """ 

2171 Primarily for FHIR views: show just a patient's details. 

2172 Must check security carefully for this one. 

2173 """ 

2174 user = req.user 

2175 patient = Patient.get_patient_by_pk(req.dbsession, patient_server_pk) 

2176 if not patient or not patient.user_may_view(user): 

2177 _ = req.gettext 

2178 raise HTTPBadRequest(_("No such patient or not authorized")) 

2179 return render_to_response( 

2180 "patient.mako", 

2181 dict(patient=patient, viewtype=ViewArg.HTML), 

2182 request=req, 

2183 ) 

2184 

2185 

2186# ============================================================================= 

2187# Trackers, CTVs 

2188# ============================================================================= 

2189 

2190 

2191def choose_tracker_or_ctv( 

2192 req: "CamcopsRequest", as_ctv: bool 

2193) -> Dict[str, Any]: 

2194 """ 

2195 Returns a dictionary for a Mako template to configure a 

2196 :class:`camcops_server.cc_modules.cc_tracker.Tracker` or 

2197 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`. 

2198 

2199 Upon success, it redirects to the tracker or CTV view itself, with the 

2200 tracker's parameters embedded as URL parameters. 

2201 

2202 Args: 

2203 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2204 as_ctv: CTV, rather than tracker? 

2205 """ 

2206 

2207 form = ChooseTrackerForm(req, as_ctv=as_ctv) # , css_class="form-inline") 

2208 

2209 if FormAction.SUBMIT in req.POST: 

2210 try: 

2211 controls = list(req.POST.items()) 

2212 appstruct = form.validate(controls) 

2213 keys = [ 

2214 ViewParam.WHICH_IDNUM, 

2215 ViewParam.IDNUM_VALUE, 

2216 ViewParam.START_DATETIME, 

2217 ViewParam.END_DATETIME, 

2218 ViewParam.TASKS, 

2219 ViewParam.ALL_TASKS, 

2220 ViewParam.VIA_INDEX, 

2221 ViewParam.VIEWTYPE, 

2222 ] 

2223 querydict = {k: appstruct.get(k) for k in keys} 

2224 # Not so obvious this can be redirected cleanly via POST. 

2225 # It is possible by returning a form that then autosubmits: see 

2226 # https://stackoverflow.com/questions/46582/response-redirect-with-post-instead-of-get # noqa 

2227 # However, since everything's on this server, we could just return 

2228 # an appropriate Response directly. But the request information is 

2229 # not sensitive, so we lose nothing by using a GET redirect: 

2230 raise HTTPFound( 

2231 req.route_url( 

2232 Routes.CTV if as_ctv else Routes.TRACKER, _query=querydict 

2233 ) 

2234 ) 

2235 except ValidationFailure as e: 

2236 rendered_form = e.render() 

2237 else: 

2238 rendered_form = form.render() 

2239 return dict( 

2240 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

2241 ) 

2242 

2243 

2244@view_config( 

2245 route_name=Routes.CHOOSE_TRACKER, 

2246 renderer="choose_tracker.mako", 

2247 http_cache=NEVER_CACHE, 

2248) 

2249def choose_tracker(req: "CamcopsRequest") -> Dict[str, Any]: 

2250 """ 

2251 View to choose/configure a 

2252 :class:`camcops_server.cc_modules.cc_tracker.Tracker`. 

2253 """ 

2254 return choose_tracker_or_ctv(req, as_ctv=False) 

2255 

2256 

2257@view_config( 

2258 route_name=Routes.CHOOSE_CTV, 

2259 renderer="choose_ctv.mako", 

2260 http_cache=NEVER_CACHE, 

2261) 

2262def choose_ctv(req: "CamcopsRequest") -> Dict[str, Any]: 

2263 """ 

2264 View to choose/configure a 

2265 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`. 

2266 """ 

2267 return choose_tracker_or_ctv(req, as_ctv=True) 

2268 

2269 

2270def serve_tracker_or_ctv(req: "CamcopsRequest", as_ctv: bool) -> Response: 

2271 """ 

2272 Returns a response to show a 

2273 :class:`camcops_server.cc_modules.cc_tracker.Tracker` or 

2274 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`, in a 

2275 variety of formats (e.g. HTML, PDF, XML). 

2276 

2277 Args: 

2278 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

2279 as_ctv: CTV, rather than tracker? 

2280 """ 

2281 as_tracker = not as_ctv 

2282 _ = req.gettext 

2283 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM) 

2284 idnum_value = req.get_int_param(ViewParam.IDNUM_VALUE) 

2285 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME) 

2286 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME) 

2287 tasks = req.get_str_list_param( 

2288 ViewParam.TASKS, validator=validate_task_tablename 

2289 ) 

2290 all_tasks = req.get_bool_param(ViewParam.ALL_TASKS, True) 

2291 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.HTML) 

2292 via_index = req.get_bool_param(ViewParam.VIA_INDEX, True) 

2293 

2294 if all_tasks: 

2295 task_classes = [] # type: List[Type[Task]] 

2296 else: 

2297 try: 

2298 task_classes = task_classes_from_table_names( 

2299 tasks, sortmethod=TaskClassSortMethod.SHORTNAME 

2300 ) 

2301 except KeyError: 

2302 raise HTTPBadRequest(_("Invalid tasks specified")) 

2303 if as_tracker and not all(c.provides_trackers for c in task_classes): 

2304 raise HTTPBadRequest(_("Not all tasks specified provide trackers")) 

2305 

2306 iddefs = [IdNumReference(which_idnum, idnum_value)] 

2307 

2308 taskfilter = TaskFilter() 

2309 taskfilter.task_types = [ 

2310 tc.__tablename__ for tc in task_classes 

2311 ] # a bit silly... 

2312 taskfilter.idnum_criteria = iddefs 

2313 taskfilter.start_datetime = start_datetime 

2314 taskfilter.end_datetime = end_datetime 

2315 taskfilter.complete_only = True # trackers require complete tasks 

2316 taskfilter.set_sort_method(TaskClassSortMethod.SHORTNAME) 

2317 taskfilter.tasks_offering_trackers_only = as_tracker 

2318 taskfilter.tasks_with_patient_only = True 

2319 

2320 tracker_ctv_class = ClinicalTextView if as_ctv else Tracker 

2321 tracker = tracker_ctv_class( 

2322 req=req, taskfilter=taskfilter, via_index=via_index 

2323 ) 

2324 

2325 if viewtype == ViewArg.HTML: 

2326 return Response(tracker.get_html()) 

2327 elif viewtype == ViewArg.PDF: 

2328 return PdfResponse( 

2329 body=tracker.get_pdf(), filename=tracker.suggested_pdf_filename() 

2330 ) 

2331 elif viewtype == ViewArg.PDFHTML: # debugging option 

2332 return Response(tracker.get_pdf_html()) 

2333 elif viewtype == ViewArg.XML: 

2334 include_comments = req.get_bool_param(ViewParam.INCLUDE_COMMENTS, True) 

2335 return XmlResponse(tracker.get_xml(include_comments=include_comments)) 

2336 else: 

2337 permissible = [ViewArg.HTML, ViewArg.PDF, ViewArg.PDFHTML, ViewArg.XML] 

2338 raise HTTPBadRequest( 

2339 f"{_('Invalid view type:')} {viewtype!r} " 

2340 f"({_('permissible:')} {permissible!r})" 

2341 ) 

2342 

2343 

2344@view_config(route_name=Routes.TRACKER, http_cache=NEVER_CACHE) 

2345def serve_tracker(req: "CamcopsRequest") -> Response: 

2346 """ 

2347 View to serve a :class:`camcops_server.cc_modules.cc_tracker.Tracker`; see 

2348 :func:`serve_tracker_or_ctv`. 

2349 """ 

2350 return serve_tracker_or_ctv(req, as_ctv=False) 

2351 

2352 

2353@view_config(route_name=Routes.CTV, http_cache=NEVER_CACHE) 

2354def serve_ctv(req: "CamcopsRequest") -> Response: 

2355 """ 

2356 View to serve a 

2357 :class:`camcops_server.cc_modules.cc_tracker.ClinicalTextView`; see 

2358 :func:`serve_tracker_or_ctv`. 

2359 """ 

2360 return serve_tracker_or_ctv(req, as_ctv=True) 

2361 

2362 

2363# ============================================================================= 

2364# Reports 

2365# ============================================================================= 

2366 

2367 

2368@view_config( 

2369 route_name=Routes.REPORTS_MENU, 

2370 renderer="reports_menu.mako", 

2371 http_cache=NEVER_CACHE, 

2372) 

2373def reports_menu(req: "CamcopsRequest") -> Dict[str, Any]: 

2374 """ 

2375 Offer a menu of reports. 

2376 

2377 Note: Reports are not group-specific. 

2378 If you're authorized to see any, you'll see the whole menu. 

2379 (The *data* you get will be restricted to the group's you're authorized 

2380 to run reports for.) 

2381 """ 

2382 if not req.user.authorized_for_reports: 

2383 raise HTTPBadRequest(errormsg_cannot_report(req)) 

2384 return {} 

2385 

2386 

2387@view_config(route_name=Routes.OFFER_REPORT, http_cache=NEVER_CACHE) 

2388def offer_report(req: "CamcopsRequest") -> Response: 

2389 """ 

2390 Offer configuration options for a single report, or (following submission) 

2391 redirect to serve that report (with configuration parameters in the URL). 

2392 """ 

2393 if not req.user.authorized_for_reports: 

2394 raise HTTPBadRequest(errormsg_cannot_report(req)) 

2395 report_id = req.get_str_param(ViewParam.REPORT_ID) 

2396 report = get_report_instance(report_id) 

2397 _ = req.gettext 

2398 if not report: 

2399 raise HTTPBadRequest(f"{_('No such report ID:')} {report_id!r}") 

2400 if report.superuser_only and not req.user.superuser: 

2401 raise HTTPBadRequest( 

2402 f"{_('Report is restricted to the superuser:')} {report_id!r}" 

2403 ) 

2404 form = report.get_form(req) 

2405 if FormAction.SUBMIT in req.POST: 

2406 try: 

2407 controls = list(req.POST.items()) 

2408 appstruct = form.validate(controls) # may raise 

2409 keys = report.get_http_query_keys() 

2410 querydict = {k: appstruct.get(k) for k in keys} 

2411 querydict[ViewParam.REPORT_ID] = report_id 

2412 querydict[ViewParam.PAGE] = 1 

2413 # Send the user to the actual data using GET: this allows page 

2414 # navigation whilst maintaining any report-specific parameters. 

2415 raise HTTPFound(req.route_url(Routes.REPORT, _query=querydict)) 

2416 except ValidationFailure as e: 

2417 rendered_form = e.render() 

2418 else: 

2419 rendered_form = form.render({ViewParam.REPORT_ID: report_id}) 

2420 return render_to_response( 

2421 "report_offer.mako", 

2422 dict( 

2423 report=report, 

2424 form=rendered_form, 

2425 head_form_html=get_head_form_html(req, [form]), 

2426 ), 

2427 request=req, 

2428 ) 

2429 

2430 

2431@view_config(route_name=Routes.REPORT, http_cache=NEVER_CACHE) 

2432def serve_report(req: "CamcopsRequest") -> Response: 

2433 """ 

2434 Serve a configured report. 

2435 """ 

2436 if not req.user.authorized_for_reports: 

2437 raise HTTPBadRequest(errormsg_cannot_report(req)) 

2438 report_id = req.get_str_param(ViewParam.REPORT_ID) 

2439 report = get_report_instance(report_id) 

2440 _ = req.gettext 

2441 if not report: 

2442 raise HTTPBadRequest(f"{_('No such report ID:')} {report_id!r}") 

2443 if report.superuser_only and not req.user.superuser: 

2444 raise HTTPBadRequest( 

2445 f"{_('Report is restricted to the superuser:')} {report_id!r}" 

2446 ) 

2447 

2448 return report.get_response(req) 

2449 

2450 

2451# ============================================================================= 

2452# Research downloads 

2453# ============================================================================= 

2454 

2455 

2456@view_config(route_name=Routes.OFFER_BASIC_DUMP, http_cache=NEVER_CACHE) 

2457def offer_basic_dump(req: "CamcopsRequest") -> Response: 

2458 """ 

2459 View to configure a basic research dump. 

2460 Following submission success, it redirects to a view serving a TSV/ZIP 

2461 dump. 

2462 """ 

2463 if not req.user.authorized_to_dump: 

2464 raise HTTPBadRequest(errormsg_cannot_dump(req)) 

2465 form = OfferBasicDumpForm(request=req) 

2466 if FormAction.SUBMIT in req.POST: 

2467 try: 

2468 controls = list(req.POST.items()) 

2469 appstruct = form.validate(controls) 

2470 manual = appstruct.get(ViewParam.MANUAL) 

2471 querydict = { 

2472 ViewParam.DUMP_METHOD: appstruct.get(ViewParam.DUMP_METHOD), 

2473 ViewParam.SORT: appstruct.get(ViewParam.SORT), 

2474 ViewParam.GROUP_IDS: manual.get(ViewParam.GROUP_IDS), 

2475 ViewParam.TASKS: manual.get(ViewParam.TASKS), 

2476 ViewParam.VIEWTYPE: appstruct.get(ViewParam.VIEWTYPE), 

2477 ViewParam.DELIVERY_MODE: appstruct.get( 

2478 ViewParam.DELIVERY_MODE 

2479 ), 

2480 ViewParam.INCLUDE_SCHEMA: appstruct.get( 

2481 ViewParam.INCLUDE_SCHEMA 

2482 ), 

2483 ViewParam.SIMPLIFIED: appstruct.get(ViewParam.SIMPLIFIED), 

2484 } 

2485 # We could return a response, or redirect via GET. 

2486 # The request is not sensitive, so let's redirect. 

2487 return HTTPFound( 

2488 req.route_url(Routes.BASIC_DUMP, _query=querydict) 

2489 ) 

2490 except ValidationFailure as e: 

2491 rendered_form = e.render() 

2492 else: 

2493 rendered_form = form.render() 

2494 return render_to_response( 

2495 "dump_basic_offer.mako", 

2496 dict( 

2497 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

2498 ), 

2499 request=req, 

2500 ) 

2501 

2502 

2503def get_dump_collection(req: "CamcopsRequest") -> TaskCollection: 

2504 """ 

2505 Returns the collection of tasks being requested for a dump operation. 

2506 Raises an error if the request is bad. 

2507 """ 

2508 if not req.user.authorized_to_dump: 

2509 raise HTTPBadRequest(errormsg_cannot_dump(req)) 

2510 # ------------------------------------------------------------------------- 

2511 # Get parameters 

2512 # ------------------------------------------------------------------------- 

2513 dump_method = req.get_str_param(ViewParam.DUMP_METHOD) 

2514 group_ids = req.get_int_list_param(ViewParam.GROUP_IDS) 

2515 task_names = req.get_str_list_param( 

2516 ViewParam.TASKS, validator=validate_task_tablename 

2517 ) 

2518 

2519 # ------------------------------------------------------------------------- 

2520 # Select tasks 

2521 # ------------------------------------------------------------------------- 

2522 if dump_method == ViewArg.EVERYTHING: 

2523 taskfilter = TaskFilter() 

2524 elif dump_method == ViewArg.USE_SESSION_FILTER: 

2525 taskfilter = req.camcops_session.get_task_filter() 

2526 elif dump_method == ViewArg.SPECIFIC_TASKS_GROUPS: 

2527 taskfilter = TaskFilter() 

2528 taskfilter.task_types = task_names 

2529 taskfilter.group_ids = group_ids 

2530 else: 

2531 _ = req.gettext 

2532 raise HTTPBadRequest( 

2533 f"{_('Bad parameter:')} " 

2534 f"{ViewParam.DUMP_METHOD}={dump_method!r}" 

2535 ) 

2536 return TaskCollection( 

2537 req=req, 

2538 taskfilter=taskfilter, 

2539 as_dump=True, 

2540 sort_method_by_class=TaskSortMethod.CREATION_DATE_ASC, 

2541 ) 

2542 

2543 

2544@view_config(route_name=Routes.BASIC_DUMP, http_cache=NEVER_CACHE) 

2545def serve_basic_dump(req: "CamcopsRequest") -> Response: 

2546 """ 

2547 View serving a spreadsheet-style basic research dump. 

2548 """ 

2549 # Get view-specific parameters 

2550 simplified = req.get_bool_param(ViewParam.SIMPLIFIED, False) 

2551 sort_by_heading = req.get_bool_param(ViewParam.SORT, False) 

2552 viewtype = req.get_str_param(ViewParam.VIEWTYPE, ViewArg.XLSX, lower=True) 

2553 delivery_mode = req.get_str_param( 

2554 ViewParam.DELIVERY_MODE, ViewArg.EMAIL, lower=True 

2555 ) 

2556 include_schema = req.get_bool_param(ViewParam.INCLUDE_SCHEMA, False) 

2557 

2558 # Get tasks (and perform checks) 

2559 collection = get_dump_collection(req) 

2560 # Create object that knows how to export 

2561 exporter = make_exporter( 

2562 req=req, 

2563 collection=collection, 

2564 options=DownloadOptions( 

2565 # Exporting to spreadsheets 

2566 user_id=req.user_id, 

2567 viewtype=viewtype, 

2568 delivery_mode=delivery_mode, 

2569 spreadsheet_simplified=simplified, 

2570 spreadsheet_sort_by_heading=sort_by_heading, 

2571 include_information_schema_columns=include_schema, 

2572 include_summary_schema=True, 

2573 ), 

2574 ) # may raise 

2575 # Export, or schedule an email/download 

2576 return exporter.immediate_response(req) 

2577 

2578 

2579@view_config(route_name=Routes.OFFER_SQL_DUMP, http_cache=NEVER_CACHE) 

2580def offer_sql_dump(req: "CamcopsRequest") -> Response: 

2581 """ 

2582 View to configure a SQL research dump. 

2583 Following submission success, it redirects to a view serving the SQL dump. 

2584 """ 

2585 if not req.user.authorized_to_dump: 

2586 raise HTTPBadRequest(errormsg_cannot_dump(req)) 

2587 form = OfferSqlDumpForm(request=req) 

2588 if FormAction.SUBMIT in req.POST: 

2589 try: 

2590 controls = list(req.POST.items()) 

2591 appstruct = form.validate(controls) 

2592 manual = appstruct.get(ViewParam.MANUAL) 

2593 querydict = { 

2594 ViewParam.DUMP_METHOD: appstruct.get(ViewParam.DUMP_METHOD), 

2595 ViewParam.SQLITE_METHOD: appstruct.get( 

2596 ViewParam.SQLITE_METHOD 

2597 ), 

2598 ViewParam.INCLUDE_BLOBS: appstruct.get( 

2599 ViewParam.INCLUDE_BLOBS 

2600 ), 

2601 ViewParam.PATIENT_ID_PER_ROW: appstruct.get( 

2602 ViewParam.PATIENT_ID_PER_ROW 

2603 ), 

2604 ViewParam.GROUP_IDS: manual.get(ViewParam.GROUP_IDS), 

2605 ViewParam.TASKS: manual.get(ViewParam.TASKS), 

2606 ViewParam.DELIVERY_MODE: appstruct.get( 

2607 ViewParam.DELIVERY_MODE 

2608 ), 

2609 ViewParam.INCLUDE_SCHEMA: appstruct.get( 

2610 ViewParam.INCLUDE_SCHEMA 

2611 ), 

2612 } 

2613 # We could return a response, or redirect via GET. 

2614 # The request is not sensitive, so let's redirect. 

2615 return HTTPFound(req.route_url(Routes.SQL_DUMP, _query=querydict)) 

2616 except ValidationFailure as e: 

2617 rendered_form = e.render() 

2618 else: 

2619 rendered_form = form.render() 

2620 return render_to_response( 

2621 "dump_sql_offer.mako", 

2622 dict( 

2623 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

2624 ), 

2625 request=req, 

2626 ) 

2627 

2628 

2629@view_config(route_name=Routes.SQL_DUMP, http_cache=NEVER_CACHE) 

2630def sql_dump(req: "CamcopsRequest") -> Response: 

2631 """ 

2632 View serving an SQL dump in the chosen format (e.g. SQLite binary, SQL). 

2633 """ 

2634 # Get view-specific parameters 

2635 sqlite_method = req.get_str_param(ViewParam.SQLITE_METHOD) 

2636 include_blobs = req.get_bool_param(ViewParam.INCLUDE_BLOBS, False) 

2637 patient_id_per_row = req.get_bool_param(ViewParam.PATIENT_ID_PER_ROW, True) 

2638 delivery_mode = req.get_str_param( 

2639 ViewParam.DELIVERY_MODE, ViewArg.EMAIL, lower=True 

2640 ) 

2641 include_schema = req.get_bool_param(ViewParam.INCLUDE_SCHEMA, False) 

2642 

2643 # Get tasks (and perform checks) 

2644 collection = get_dump_collection(req) 

2645 # Create object that knows how to export 

2646 exporter = make_exporter( 

2647 req=req, 

2648 collection=collection, 

2649 options=DownloadOptions( 

2650 # Exporting to SQL 

2651 user_id=req.user_id, 

2652 viewtype=sqlite_method, 

2653 delivery_mode=delivery_mode, 

2654 db_include_blobs=include_blobs, 

2655 db_patient_id_per_row=patient_id_per_row, 

2656 include_information_schema_columns=include_schema, 

2657 include_summary_schema=include_schema, # doesn't do much for SQL export at present # noqa 

2658 ), 

2659 ) # may raise 

2660 # Export, or schedule an email/download 

2661 return exporter.immediate_response(req) 

2662 

2663 

2664# noinspection PyUnusedLocal 

2665@view_config( 

2666 route_name=Routes.DOWNLOAD_AREA, 

2667 renderer="download_area.mako", 

2668 http_cache=NEVER_CACHE, 

2669) 

2670def download_area(req: "CamcopsRequest") -> Dict[str, Any]: 

2671 """ 

2672 Shows the user download area. 

2673 """ 

2674 userdir = req.user_download_dir 

2675 if userdir: 

2676 files = UserDownloadFile.from_directory_scan( 

2677 directory=userdir, 

2678 permitted_lifespan_min=req.config.user_download_file_lifetime_min, 

2679 req=req, 

2680 ) 

2681 else: 

2682 files = [] # type: ignore[no-redef] # type: List[UserDownloadFile] 

2683 return dict( 

2684 files=files, 

2685 available=bytes2human(req.user_download_bytes_available), 

2686 permitted=bytes2human(req.user_download_bytes_permitted), 

2687 used=bytes2human(req.user_download_bytes_used), 

2688 lifetime_min=req.config.user_download_file_lifetime_min, 

2689 ) 

2690 

2691 

2692@view_config(route_name=Routes.DOWNLOAD_FILE, http_cache=NEVER_CACHE) 

2693def download_file(req: "CamcopsRequest") -> Response: 

2694 """ 

2695 Downloads a file. 

2696 """ 

2697 _ = req.gettext 

2698 filename = req.get_str_param( 

2699 ViewParam.FILENAME, "", validator=validate_download_filename 

2700 ) 

2701 # Security comes here: we do NOT permit any path information in the 

2702 # filename. It MUST be relative to and within the user download directory. 

2703 # We cannot trust the input. 

2704 filename = os.path.basename(filename) 

2705 udf = UserDownloadFile(directory=req.user_download_dir, filename=filename) 

2706 if not udf.exists: 

2707 raise HTTPBadRequest(f'{_("No such file:")} {filename}') 

2708 try: 

2709 return BinaryResponse( 

2710 body=udf.contents, 

2711 filename=udf.filename, 

2712 content_type=MimeType.BINARY, 

2713 as_inline=False, 

2714 ) 

2715 except OSError: 

2716 raise HTTPBadRequest(f'{_("Error reading file:")} {filename}') 

2717 

2718 

2719@view_config( 

2720 route_name=Routes.DELETE_FILE, 

2721 request_method=HttpMethod.POST, 

2722 http_cache=NEVER_CACHE, 

2723) 

2724def delete_file(req: "CamcopsRequest") -> Response: 

2725 """ 

2726 Deletes a file. 

2727 """ 

2728 form = UserDownloadDeleteForm(request=req) 

2729 controls = list(req.POST.items()) 

2730 appstruct = form.validate(controls) # CSRF; may raise ValidationError 

2731 filename = appstruct.get(ViewParam.FILENAME, "") 

2732 # Security comes here: we do NOT permit any path information in the 

2733 # filename. It MUST be relative to and within the user download directory. 

2734 # We cannot trust the input. 

2735 filename = os.path.basename(filename) 

2736 udf = UserDownloadFile(directory=req.user_download_dir, filename=filename) 

2737 if not udf.exists: 

2738 _ = req.gettext 

2739 raise HTTPBadRequest(f'{_("No such file:")} {filename}') 

2740 udf.delete() 

2741 return HTTPFound(req.route_url(Routes.DOWNLOAD_AREA)) # redirect 

2742 

2743 

2744# ============================================================================= 

2745# View DDL (table definitions) 

2746# ============================================================================= 

2747 

2748LEXERMAP = { 

2749 SqlaDialectName.MYSQL: pygments.lexers.sql.MySqlLexer, 

2750 SqlaDialectName.MSSQL: pygments.lexers.sql.SqlLexer, # generic 

2751 SqlaDialectName.ORACLE: pygments.lexers.sql.SqlLexer, # generic 

2752 SqlaDialectName.FIREBIRD: pygments.lexers.sql.SqlLexer, # generic 

2753 SqlaDialectName.POSTGRES: pygments.lexers.sql.PostgresLexer, 

2754 SqlaDialectName.SQLITE: pygments.lexers.sql.SqlLexer, # generic; SqliteConsoleLexer is wrong # noqa 

2755 SqlaDialectName.SYBASE: pygments.lexers.sql.SqlLexer, # generic 

2756} 

2757 

2758 

2759def format_sql_as_html( 

2760 sql: str, dialect: str = SqlaDialectName.MYSQL 

2761) -> Tuple[str, str]: 

2762 """ 

2763 Formats SQL as HTML with CSS. 

2764 """ 

2765 lexer = LEXERMAP[dialect]() 

2766 # noinspection PyUnresolvedReferences 

2767 formatter = pygments.formatters.HtmlFormatter() 

2768 html = pygments.highlight(sql, lexer, formatter) 

2769 css = formatter.get_style_defs(".highlight") 

2770 return html, css 

2771 

2772 

2773@view_config(route_name=Routes.VIEW_DDL, http_cache=NEVER_CACHE) 

2774def view_ddl(req: "CamcopsRequest") -> Response: 

2775 """ 

2776 Inspect table definitions (data definition language, DDL) with field 

2777 comments. 

2778 

2779 2021-04-30: restricted to users with "dump" authority -- not because this 

2780 is a vulnerability, as the penetration testers suggested, but just to make 

2781 it consistent with the menu item for this. 

2782 """ 

2783 if not req.user.authorized_to_dump: 

2784 raise HTTPBadRequest(errormsg_cannot_dump(req)) 

2785 form = ViewDdlForm(request=req) 

2786 if FormAction.SUBMIT in req.POST: 

2787 try: 

2788 controls = list(req.POST.items()) 

2789 appstruct = form.validate(controls) 

2790 dialect = appstruct.get(ViewParam.DIALECT) 

2791 ddl = get_all_ddl(dialect_name=dialect) 

2792 html, css = format_sql_as_html(ddl, dialect) 

2793 return render_to_response( 

2794 "introspect_file.mako", 

2795 dict(css=css, code_html=html), 

2796 request=req, 

2797 ) 

2798 except ValidationFailure as e: 

2799 rendered_form = e.render() 

2800 else: 

2801 rendered_form = form.render() 

2802 current_dialect = get_dialect_name(get_engine_from_session(req.dbsession)) 

2803 sql_dialect_choices = get_sql_dialect_choices(req) 

2804 current_dialect_description = {k: v for k, v in sql_dialect_choices}.get( 

2805 current_dialect, "?" 

2806 ) 

2807 return render_to_response( 

2808 "view_ddl_choose_dialect.mako", 

2809 dict( 

2810 current_dialect=current_dialect, 

2811 current_dialect_description=current_dialect_description, 

2812 form=rendered_form, 

2813 head_form_html=get_head_form_html(req, [form]), 

2814 ), 

2815 request=req, 

2816 ) 

2817 

2818 

2819# ============================================================================= 

2820# View audit trail 

2821# ============================================================================= 

2822 

2823 

2824@view_config( 

2825 route_name=Routes.OFFER_AUDIT_TRAIL, 

2826 permission=Permission.SUPERUSER, 

2827 http_cache=NEVER_CACHE, 

2828) 

2829def offer_audit_trail(req: "CamcopsRequest") -> Response: 

2830 """ 

2831 View to configure how we'll view the audit trail. Once configured, it 

2832 redirects to a view that shows the audit trail (with query parameters in 

2833 the URL). 

2834 """ 

2835 form = AuditTrailForm(request=req) 

2836 if FormAction.SUBMIT in req.POST: 

2837 try: 

2838 controls = list(req.POST.items()) 

2839 appstruct = form.validate(controls) 

2840 keys = [ 

2841 ViewParam.ROWS_PER_PAGE, 

2842 ViewParam.START_DATETIME, 

2843 ViewParam.END_DATETIME, 

2844 ViewParam.SOURCE, 

2845 ViewParam.REMOTE_IP_ADDR, 

2846 ViewParam.USERNAME, 

2847 ViewParam.TABLE_NAME, 

2848 ViewParam.SERVER_PK, 

2849 ViewParam.TRUNCATE, 

2850 ] 

2851 querydict = {k: appstruct.get(k) for k in keys} 

2852 querydict[ViewParam.PAGE] = 1 

2853 # Send the user to the actual data using GET: 

2854 # (the parameters are NOT sensitive) 

2855 raise HTTPFound( 

2856 req.route_url(Routes.VIEW_AUDIT_TRAIL, _query=querydict) 

2857 ) 

2858 except ValidationFailure as e: 

2859 rendered_form = e.render() 

2860 else: 

2861 rendered_form = form.render() 

2862 return render_to_response( 

2863 "audit_trail_choices.mako", 

2864 dict( 

2865 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

2866 ), 

2867 request=req, 

2868 ) 

2869 

2870 

2871AUDIT_TRUNCATE_AT = 100 

2872 

2873 

2874@view_config( 

2875 route_name=Routes.VIEW_AUDIT_TRAIL, 

2876 permission=Permission.SUPERUSER, 

2877 http_cache=NEVER_CACHE, 

2878) 

2879def view_audit_trail(req: "CamcopsRequest") -> Response: 

2880 """ 

2881 View to serve the audit trail. 

2882 """ 

2883 rows_per_page = req.get_int_param( 

2884 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

2885 ) 

2886 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME) 

2887 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME) 

2888 source = req.get_str_param(ViewParam.SOURCE, None) 

2889 remote_addr = req.get_str_param( 

2890 ViewParam.REMOTE_IP_ADDR, None, validator=validate_ip_address 

2891 ) 

2892 username = req.get_str_param( 

2893 ViewParam.USERNAME, None, validator=validate_username 

2894 ) 

2895 table_name = req.get_str_param( 

2896 ViewParam.TABLE_NAME, None, validator=validate_task_tablename 

2897 ) 

2898 server_pk = req.get_int_param(ViewParam.SERVER_PK, None) 

2899 truncate = req.get_bool_param(ViewParam.TRUNCATE, True) 

2900 page_num = req.get_int_param(ViewParam.PAGE, 1) 

2901 

2902 conditions = [] # type: List[str] 

2903 

2904 def add_condition(key: str, value: Any) -> None: 

2905 conditions.append(f"{key} = {value}") 

2906 

2907 dbsession = req.dbsession 

2908 q = dbsession.query(AuditEntry) 

2909 if start_datetime: 

2910 q = q.filter(AuditEntry.when_access_utc >= start_datetime) 

2911 add_condition(ViewParam.START_DATETIME, start_datetime) 

2912 if end_datetime: 

2913 q = q.filter(AuditEntry.when_access_utc < end_datetime) 

2914 add_condition(ViewParam.END_DATETIME, end_datetime) 

2915 if source: 

2916 q = q.filter(AuditEntry.source == source) 

2917 add_condition(ViewParam.SOURCE, source) 

2918 if remote_addr: 

2919 q = q.filter(AuditEntry.remote_addr == remote_addr) 

2920 add_condition(ViewParam.REMOTE_IP_ADDR, remote_addr) 

2921 if username: 

2922 # https://stackoverflow.com/questions/8561470/sqlalchemy-filtering-by-relationship-attribute # noqa 

2923 q = q.join(User).filter(User.username == username) 

2924 add_condition(ViewParam.USERNAME, username) 

2925 if table_name: 

2926 q = q.filter(AuditEntry.table_name == table_name) 

2927 add_condition(ViewParam.TABLE_NAME, table_name) 

2928 if server_pk is not None: 

2929 q = q.filter(AuditEntry.server_pk == server_pk) 

2930 add_condition(ViewParam.SERVER_PK, server_pk) 

2931 

2932 q = q.order_by(desc(AuditEntry.id)) 

2933 

2934 # audit_entries = dbsession.execute(q).fetchall() 

2935 # ... no! That executes to give you row-type results. 

2936 # audit_entries = q.all() 

2937 # ... yes! But let's paginate, too: 

2938 page = SqlalchemyOrmPage( 

2939 query=q, 

2940 page=page_num, 

2941 items_per_page=rows_per_page, 

2942 url_maker=PageUrl(req), 

2943 request=req, 

2944 ) 

2945 return render_to_response( 

2946 "audit_trail_view.mako", 

2947 dict( 

2948 conditions="; ".join(conditions), 

2949 page=page, 

2950 truncate=truncate, 

2951 truncate_at=AUDIT_TRUNCATE_AT, 

2952 ), 

2953 request=req, 

2954 ) 

2955 

2956 

2957# ============================================================================= 

2958# View export logs 

2959# ============================================================================= 

2960# Overview: 

2961# - View exported tasks (ExportedTask) collectively 

2962# ... option to filter by recipient_name 

2963# ... option to filter by date/etc. 

2964# - View exported tasks (ExportedTask) individually 

2965# ... hyperlinks to individual views of: 

2966# Email (not necessary: ExportedTaskEmail) 

2967# ExportRecipient 

2968# ExportedTaskFileGroup 

2969# ExportedTaskHL7Message 

2970 

2971 

2972@view_config( 

2973 route_name=Routes.OFFER_EXPORTED_TASK_LIST, 

2974 permission=Permission.SUPERUSER, 

2975 http_cache=NEVER_CACHE, 

2976) 

2977def offer_exported_task_list(req: "CamcopsRequest") -> Response: 

2978 """ 

2979 View to choose how we'll view the exported task log. 

2980 """ 

2981 form = ExportedTaskListForm(request=req) 

2982 if FormAction.SUBMIT in req.POST: 

2983 try: 

2984 controls = list(req.POST.items()) 

2985 appstruct = form.validate(controls) 

2986 keys = [ 

2987 ViewParam.ROWS_PER_PAGE, 

2988 ViewParam.RECIPIENT_NAME, 

2989 ViewParam.TABLE_NAME, 

2990 ViewParam.SERVER_PK, 

2991 ViewParam.ID, 

2992 ViewParam.START_DATETIME, 

2993 ViewParam.END_DATETIME, 

2994 ] 

2995 querydict = {k: appstruct.get(k) for k in keys} 

2996 querydict[ViewParam.PAGE] = 1 

2997 # Send the user to the actual data using GET 

2998 # (the parameters are NOT sensitive) 

2999 return HTTPFound( 

3000 req.route_url(Routes.VIEW_EXPORTED_TASK_LIST, _query=querydict) 

3001 ) 

3002 except ValidationFailure as e: 

3003 rendered_form = e.render() 

3004 else: 

3005 rendered_form = form.render() 

3006 return render_to_response( 

3007 "exported_task_choose.mako", 

3008 dict( 

3009 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

3010 ), 

3011 request=req, 

3012 ) 

3013 

3014 

3015@view_config( 

3016 route_name=Routes.VIEW_EXPORTED_TASK_LIST, 

3017 permission=Permission.SUPERUSER, 

3018 http_cache=NEVER_CACHE, 

3019) 

3020def view_exported_task_list(req: "CamcopsRequest") -> Response: 

3021 """ 

3022 View to serve the exported task log. 

3023 """ 

3024 rows_per_page = req.get_int_param( 

3025 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

3026 ) 

3027 recipient_name = req.get_str_param( 

3028 ViewParam.RECIPIENT_NAME, 

3029 None, 

3030 validator=validate_export_recipient_name, 

3031 ) 

3032 table_name = req.get_str_param( 

3033 ViewParam.TABLE_NAME, None, validator=validate_task_tablename 

3034 ) 

3035 server_pk = req.get_int_param(ViewParam.SERVER_PK, None) 

3036 et_id = req.get_int_param(ViewParam.ID, None) 

3037 start_datetime = req.get_datetime_param(ViewParam.START_DATETIME) 

3038 end_datetime = req.get_datetime_param(ViewParam.END_DATETIME) 

3039 page_num = req.get_int_param(ViewParam.PAGE, 1) 

3040 

3041 conditions = [] # type: List[str] 

3042 

3043 def add_condition(key: str, value: Any) -> None: 

3044 conditions.append(f"{key} = {value}") 

3045 

3046 dbsession = req.dbsession 

3047 q = dbsession.query(ExportedTask) 

3048 

3049 if recipient_name: 

3050 q = q.join(ExportRecipient).filter( 

3051 ExportRecipient.recipient_name == recipient_name 

3052 ) 

3053 add_condition(ViewParam.RECIPIENT_NAME, recipient_name) 

3054 if table_name: 

3055 q = q.filter(ExportedTask.basetable == table_name) 

3056 add_condition(ViewParam.TABLE_NAME, table_name) 

3057 if server_pk is not None: 

3058 q = q.filter(ExportedTask.task_server_pk == server_pk) 

3059 add_condition(ViewParam.SERVER_PK, server_pk) 

3060 if et_id is not None: 

3061 q = q.filter(ExportedTask.id == et_id) 

3062 add_condition(ViewParam.ID, et_id) 

3063 if start_datetime: 

3064 q = q.filter(ExportedTask.start_at_utc >= start_datetime) 

3065 add_condition(ViewParam.START_DATETIME, start_datetime) 

3066 if end_datetime: 

3067 q = q.filter(ExportedTask.start_at_utc < end_datetime) 

3068 add_condition(ViewParam.END_DATETIME, end_datetime) 

3069 

3070 q = q.order_by(desc(ExportedTask.id)) 

3071 

3072 page = SqlalchemyOrmPage( 

3073 query=q, 

3074 page=page_num, 

3075 items_per_page=rows_per_page, 

3076 url_maker=PageUrl(req), 

3077 request=req, 

3078 ) 

3079 return render_to_response( 

3080 "exported_task_list.mako", 

3081 dict(conditions="; ".join(conditions), page=page), 

3082 request=req, 

3083 ) 

3084 

3085 

3086# ============================================================================= 

3087# View helpers for ORM objects 

3088# ============================================================================= 

3089 

3090 

3091def _view_generic_object_by_id( 

3092 req: "CamcopsRequest", 

3093 cls: Type, 

3094 instance_name_for_mako: str, 

3095 mako_template: str, 

3096) -> Response: 

3097 """ 

3098 Boilerplate code to view an individual SQLAlchemy ORM object. The object 

3099 must have an integer ``id`` field as its primary key, and the ID value must 

3100 be present in the ``ViewParam.ID`` field of the request. 

3101 

3102 Args: 

3103 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3104 cls: the SQLAlchemy ORM class 

3105 instance_name_for_mako: what will the object be called when it's 

3106 mako_template: Mako template filename 

3107 

3108 Returns: 

3109 :class:`pyramid.response.Response` 

3110 """ 

3111 item_id = req.get_int_param(ViewParam.ID, None) 

3112 dbsession = req.dbsession 

3113 # noinspection PyUnresolvedReferences 

3114 obj = dbsession.query(cls).filter(cls.id == item_id).first() 

3115 if obj is None: 

3116 _ = req.gettext 

3117 raise HTTPBadRequest( 

3118 f"{_('Bad ID for object type')} " f"{cls.__name__}: {item_id}" 

3119 ) 

3120 d = {instance_name_for_mako: obj} 

3121 return render_to_response(mako_template, d, request=req) 

3122 

3123 

3124# ============================================================================= 

3125# Specialized views for ORM objects 

3126# ============================================================================= 

3127 

3128 

3129@view_config( 

3130 route_name=Routes.VIEW_EMAIL, 

3131 permission=Permission.SUPERUSER, 

3132 http_cache=NEVER_CACHE, 

3133) 

3134def view_email(req: "CamcopsRequest") -> Response: 

3135 """ 

3136 View on an individual :class:`camcops_server.cc_modules.cc_email.Email`. 

3137 """ 

3138 return _view_generic_object_by_id( 

3139 req=req, 

3140 cls=Email, 

3141 instance_name_for_mako="email", 

3142 mako_template="view_email.mako", 

3143 ) 

3144 

3145 

3146@view_config( 

3147 route_name=Routes.VIEW_EXPORT_RECIPIENT, 

3148 permission=Permission.SUPERUSER, 

3149 http_cache=NEVER_CACHE, 

3150) 

3151def view_export_recipient(req: "CamcopsRequest") -> Response: 

3152 """ 

3153 View on an individual 

3154 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTask`. 

3155 """ 

3156 return _view_generic_object_by_id( 

3157 req=req, 

3158 cls=ExportRecipient, 

3159 instance_name_for_mako="recipient", 

3160 mako_template="export_recipient.mako", 

3161 ) 

3162 

3163 

3164@view_config( 

3165 route_name=Routes.VIEW_EXPORTED_TASK, 

3166 permission=Permission.SUPERUSER, 

3167 http_cache=NEVER_CACHE, 

3168) 

3169def view_exported_task(req: "CamcopsRequest") -> Response: 

3170 """ 

3171 View on an individual 

3172 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTask`. 

3173 """ 

3174 return _view_generic_object_by_id( 

3175 req=req, 

3176 cls=ExportedTask, 

3177 instance_name_for_mako="et", 

3178 mako_template="exported_task.mako", 

3179 ) 

3180 

3181 

3182@view_config( 

3183 route_name=Routes.VIEW_EXPORTED_TASK_EMAIL, 

3184 permission=Permission.SUPERUSER, 

3185 http_cache=NEVER_CACHE, 

3186) 

3187def view_exported_task_email(req: "CamcopsRequest") -> Response: 

3188 """ 

3189 View on an individual 

3190 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskEmail`. 

3191 """ 

3192 return _view_generic_object_by_id( 

3193 req=req, 

3194 cls=ExportedTaskEmail, 

3195 instance_name_for_mako="ete", 

3196 mako_template="exported_task_email.mako", 

3197 ) 

3198 

3199 

3200@view_config( 

3201 route_name=Routes.VIEW_EXPORTED_TASK_FILE_GROUP, 

3202 permission=Permission.SUPERUSER, 

3203 http_cache=NEVER_CACHE, 

3204) 

3205def view_exported_task_file_group(req: "CamcopsRequest") -> Response: 

3206 """ 

3207 View on an individual 

3208 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup`. 

3209 """ 

3210 return _view_generic_object_by_id( 

3211 req=req, 

3212 cls=ExportedTaskFileGroup, 

3213 instance_name_for_mako="fg", 

3214 mako_template="exported_task_file_group.mako", 

3215 ) 

3216 

3217 

3218@view_config( 

3219 route_name=Routes.VIEW_EXPORTED_TASK_HL7_MESSAGE, 

3220 permission=Permission.SUPERUSER, 

3221 http_cache=NEVER_CACHE, 

3222) 

3223def view_exported_task_hl7_message(req: "CamcopsRequest") -> Response: 

3224 """ 

3225 View on an individual 

3226 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskHL7Message`. 

3227 """ 

3228 return _view_generic_object_by_id( 

3229 req=req, 

3230 cls=ExportedTaskHL7Message, 

3231 instance_name_for_mako="msg", 

3232 mako_template="exported_task_hl7_message.mako", 

3233 ) 

3234 

3235 

3236@view_config( 

3237 route_name=Routes.VIEW_EXPORTED_TASK_REDCAP, 

3238 permission=Permission.SUPERUSER, 

3239 http_cache=NEVER_CACHE, 

3240) 

3241def view_exported_task_redcap(req: "CamcopsRequest") -> Response: 

3242 """ 

3243 View on an individual 

3244 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskRedcap`. 

3245 """ 

3246 return _view_generic_object_by_id( 

3247 req=req, 

3248 cls=ExportedTaskRedcap, 

3249 instance_name_for_mako="etr", 

3250 mako_template="exported_task_redcap.mako", 

3251 ) 

3252 

3253 

3254@view_config( 

3255 route_name=Routes.VIEW_EXPORTED_TASK_FHIR, 

3256 permission=Permission.SUPERUSER, 

3257 http_cache=NEVER_CACHE, 

3258) 

3259def view_exported_task_fhir(req: "CamcopsRequest") -> Response: 

3260 """ 

3261 View on an individual 

3262 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskRedcap`. 

3263 """ 

3264 return _view_generic_object_by_id( 

3265 req=req, 

3266 cls=ExportedTaskFhir, 

3267 instance_name_for_mako="etf", 

3268 mako_template="exported_task_fhir.mako", 

3269 ) 

3270 

3271 

3272@view_config( 

3273 route_name=Routes.VIEW_EXPORTED_TASK_FHIR_ENTRY, 

3274 permission=Permission.SUPERUSER, 

3275 http_cache=NEVER_CACHE, 

3276) 

3277def view_exported_task_fhir_entry(req: "CamcopsRequest") -> Response: 

3278 """ 

3279 View on an individual 

3280 :class:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskRedcap`. 

3281 """ 

3282 return _view_generic_object_by_id( 

3283 req=req, 

3284 cls=ExportedTaskFhirEntry, 

3285 instance_name_for_mako="etfe", 

3286 mako_template="exported_task_fhir_entry.mako", 

3287 ) 

3288 

3289 

3290# ============================================================================= 

3291# User/server info views 

3292# ============================================================================= 

3293 

3294 

3295@view_config( 

3296 route_name=Routes.VIEW_OWN_USER_INFO, 

3297 renderer="view_own_user_info.mako", 

3298 http_cache=NEVER_CACHE, 

3299) 

3300def view_own_user_info(req: "CamcopsRequest") -> Dict[str, Any]: 

3301 """ 

3302 View to provide information about your own user. 

3303 """ 

3304 groups_page = CamcopsPage( 

3305 req.user.groups, url_maker=PageUrl(req), request=req 

3306 ) 

3307 return dict( 

3308 user=req.user, 

3309 groups_page=groups_page, 

3310 valid_which_idnums=req.valid_which_idnums, 

3311 ) 

3312 

3313 

3314@view_config( 

3315 route_name=Routes.VIEW_SERVER_INFO, 

3316 renderer="view_server_info.mako", 

3317 http_cache=NEVER_CACHE, 

3318) 

3319def view_server_info(req: "CamcopsRequest") -> Dict[str, Any]: 

3320 """ 

3321 View to show the server's ID policies, etc. 

3322 """ 

3323 _ = req.gettext 

3324 now = req.now 

3325 recent_activity = OrderedDict( 

3326 [ 

3327 ( 

3328 _("Last 1 minute"), 

3329 CamcopsSession.n_sessions_active_since( 

3330 req, now.subtract(minutes=1) 

3331 ), 

3332 ), 

3333 ( 

3334 _("Last 5 minutes"), 

3335 CamcopsSession.n_sessions_active_since( 

3336 req, now.subtract(minutes=5) 

3337 ), 

3338 ), 

3339 ( 

3340 _("Last 10 minutes"), 

3341 CamcopsSession.n_sessions_active_since( 

3342 req, now.subtract(minutes=10) 

3343 ), 

3344 ), 

3345 ( 

3346 _("Last 1 hour"), 

3347 CamcopsSession.n_sessions_active_since( 

3348 req, now.subtract(hours=1) 

3349 ), 

3350 ), 

3351 ] 

3352 ) 

3353 return dict( 

3354 idnum_definitions=req.idnum_definitions, 

3355 string_families=req.extrastring_families(), 

3356 recent_activity=recent_activity, 

3357 session_timeout_minutes=req.config.session_timeout_minutes, 

3358 restricted_tasks=req.config.restricted_tasks, 

3359 ) 

3360 

3361 

3362# ============================================================================= 

3363# User management 

3364# ============================================================================= 

3365 

3366 

3367def get_user_from_request_user_id_or_raise(req: "CamcopsRequest") -> User: 

3368 """ 

3369 Returns the :class:`camcops_server.cc_modules.cc_user.User` represented by 

3370 the request's ``ViewParam.USER_ID`` parameter, or raise 

3371 :exc:`HTTPBadRequest`. 

3372 """ 

3373 user_id = req.get_int_param(ViewParam.USER_ID) 

3374 user = User.get_user_by_id(req.dbsession, user_id) 

3375 if not user: 

3376 _ = req.gettext 

3377 raise HTTPBadRequest(f"{_('No such user ID:')} {user_id!r}") 

3378 return user 

3379 

3380 

3381def query_users_that_i_manage(req: "CamcopsRequest") -> Query: 

3382 me = req.user 

3383 return me.managed_users() 

3384 

3385 

3386@view_config( 

3387 route_name=Routes.VIEW_ALL_USERS, 

3388 permission=Permission.GROUPADMIN, 

3389 renderer="users_view.mako", 

3390 http_cache=NEVER_CACHE, 

3391) 

3392def view_all_users(req: "CamcopsRequest") -> Dict[str, Any]: 

3393 """ 

3394 View all users that the current user administers. The view has hyperlinks 

3395 to edit those users too. 

3396 """ 

3397 include_auto_generated = req.get_bool_param( 

3398 ViewParam.INCLUDE_AUTO_GENERATED, False 

3399 ) 

3400 rows_per_page = req.get_int_param( 

3401 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

3402 ) 

3403 page_num = req.get_int_param(ViewParam.PAGE, 1) 

3404 q = query_users_that_i_manage(req) 

3405 if not include_auto_generated: 

3406 q = q.filter(User.auto_generated == False) # noqa: E712 

3407 page = SqlalchemyOrmPage( 

3408 query=q, 

3409 page=page_num, 

3410 items_per_page=rows_per_page, 

3411 url_maker=PageUrl(req), 

3412 request=req, 

3413 ) 

3414 

3415 form = UserFilterForm(request=req) 

3416 appstruct = {ViewParam.INCLUDE_AUTO_GENERATED: include_auto_generated} 

3417 rendered_form = form.render(appstruct) 

3418 

3419 return dict( 

3420 page=page, 

3421 head_form_html=get_head_form_html(req, [form]), 

3422 form=rendered_form, 

3423 ) 

3424 

3425 

3426@view_config( 

3427 route_name=Routes.VIEW_USER_EMAIL_ADDRESSES, 

3428 permission=Permission.GROUPADMIN, 

3429 renderer="view_user_email_addresses.mako", 

3430 http_cache=NEVER_CACHE, 

3431) 

3432def view_user_email_addresses(req: "CamcopsRequest") -> Dict[str, Any]: 

3433 """ 

3434 View e-mail addresses of all users that the requesting user is authorized 

3435 to manage. 

3436 """ 

3437 q = query_users_that_i_manage(req).filter( 

3438 User.auto_generated == False # noqa: E712 

3439 ) 

3440 return dict(query=q) 

3441 

3442 

3443def assert_may_edit_user(req: "CamcopsRequest", user: User) -> None: 

3444 """ 

3445 Checks that the requesting user (``req.user``) is allowed to edit the other 

3446 user (``user``). Raises :exc:`HTTPBadRequest` otherwise. 

3447 """ 

3448 may_edit, why_not = req.user.may_edit_user(req, user) 

3449 if not may_edit: 

3450 raise HTTPBadRequest(why_not) 

3451 

3452 

3453def assert_may_administer_group(req: "CamcopsRequest", group_id: int) -> None: 

3454 """ 

3455 Checks that the requesting user (``req.user``) is allowed to adminster the 

3456 specified group (specified by ``group_id``). Raises :exc:`HTTPBadRequest` 

3457 otherwise. 

3458 """ 

3459 if not req.user.may_administer_group(group_id): 

3460 _ = req.gettext 

3461 raise HTTPBadRequest(_("You may not administer this group")) 

3462 

3463 

3464@view_config( 

3465 route_name=Routes.VIEW_USER, 

3466 permission=Permission.GROUPADMIN, 

3467 renderer="view_other_user_info.mako", 

3468 http_cache=NEVER_CACHE, 

3469) 

3470def view_user(req: "CamcopsRequest") -> Dict[str, Any]: 

3471 """ 

3472 View to show details of another user, for administrators. 

3473 """ 

3474 user = get_user_from_request_user_id_or_raise(req) 

3475 assert_may_edit_user(req, user) 

3476 return dict(user=user) 

3477 # Groupadmins may see some information regarding groups that aren't theirs 

3478 # here, but can't alter it. 

3479 

3480 

3481class EditUserBaseView(UpdateView): 

3482 """ 

3483 Django-style view to edit a user and their groups 

3484 """ 

3485 

3486 model_form_dict = { 

3487 "username": ViewParam.USERNAME, 

3488 "fullname": ViewParam.FULLNAME, 

3489 "email": ViewParam.EMAIL, 

3490 "must_change_password": ViewParam.MUST_CHANGE_PASSWORD, 

3491 "language": ViewParam.LANGUAGE, 

3492 } 

3493 object_class = User 

3494 pk_param = ViewParam.USER_ID 

3495 server_pk_name = "id" 

3496 template_name = "user_edit.mako" 

3497 

3498 def get_success_url(self) -> str: 

3499 return self.request.route_url(Routes.VIEW_ALL_USERS) 

3500 

3501 def get_object(self) -> Any: 

3502 user = cast(User, super().get_object()) 

3503 

3504 assert_may_edit_user(self.request, user) 

3505 

3506 return user 

3507 

3508 def set_object_properties(self, appstruct: Dict[str, Any]) -> None: 

3509 user = cast(User, self.object) 

3510 _ = self.request.gettext 

3511 

3512 new_user_name = appstruct.get(ViewParam.USERNAME) 

3513 existing_user = User.get_user_by_name( 

3514 self.request.dbsession, new_user_name 

3515 ) 

3516 if existing_user and existing_user.id != user.id: 

3517 # noinspection PyUnresolvedReferences 

3518 cant_rename_user = _("Can't rename user") 

3519 conflicts = _("that conflicts with an existing user with ID") 

3520 raise HTTPBadRequest( 

3521 f"{cant_rename_user} {user.username!r} (#{user.id!r}) → " 

3522 f"{new_user_name!r}; {conflicts} {existing_user.id!r}" 

3523 ) 

3524 

3525 email = appstruct.get(ViewParam.EMAIL) 

3526 if not email and user.mfa_method == MfaMethod.HOTP_EMAIL: 

3527 message = _( 

3528 "This user's email address is used for multi-factor " 

3529 "authentication. If you want to remove their email " 

3530 "address, you must first disable multi-factor " 

3531 "authentication" 

3532 ) 

3533 

3534 raise HTTPBadRequest(message) 

3535 

3536 super().set_object_properties(appstruct) 

3537 

3538 # Groups that we might change memberships for: 

3539 all_fluid_groups = self.request.user.ids_of_groups_user_is_admin_for 

3540 # All groups that the user is currently in: 

3541 user_group_ids = user.group_ids 

3542 # Group membership we won't touch: 

3543 user_frozen_group_ids = list( 

3544 set(user_group_ids) - set(all_fluid_groups) 

3545 ) 

3546 group_ids = appstruct.get(ViewParam.GROUP_IDS) 

3547 # Add back in the groups we're not going to alter: 

3548 final_group_ids = list(set(group_ids) | set(user_frozen_group_ids)) 

3549 user.set_group_ids(final_group_ids) 

3550 # Also, if the user was uploading to a group that they are now no 

3551 # longer a member of, we need to fix that 

3552 if user.upload_group_id not in final_group_ids: 

3553 user.upload_group_id = None 

3554 

3555 def get_form_values(self) -> Dict[str, Any]: 

3556 # will populate with model_form_dict 

3557 form_values = super().get_form_values() 

3558 

3559 user = cast(User, self.object) 

3560 

3561 # Superusers can do everything, of course. 

3562 # Groupadmins can change group memberships only for groups they control 

3563 # (here: "fluid"). That means that there may be a subset of group 

3564 # memberships for this user that they will neither see nor be able to 

3565 # alter (here: "frozen"). They can also edit only a restricted set of 

3566 # permissions. 

3567 

3568 # Groups that we might change memberships for: 

3569 all_fluid_groups = self.request.user.ids_of_groups_user_is_admin_for 

3570 # All groups that the user is currently in: 

3571 user_group_ids = user.group_ids 

3572 # Group memberships we might alter: 

3573 user_fluid_group_ids = list( 

3574 set(user_group_ids) & set(all_fluid_groups) 

3575 ) 

3576 form_values.update( 

3577 { 

3578 ViewParam.USER_ID: user.id, 

3579 ViewParam.GROUP_IDS: user_fluid_group_ids, 

3580 } 

3581 ) 

3582 

3583 return form_values 

3584 

3585 

3586class EditUserGroupAdminView(EditUserBaseView): 

3587 """ 

3588 For group administrators to edit a user. 

3589 """ 

3590 

3591 form_class = EditUserGroupAdminForm 

3592 

3593 

3594class EditUserSuperUserView(EditUserBaseView): 

3595 """ 

3596 For superusers to edit a user. 

3597 """ 

3598 

3599 form_class = EditUserFullForm 

3600 

3601 def get_model_form_dict(self) -> Dict[str, Any]: 

3602 model_form_dict = super().get_model_form_dict() 

3603 model_form_dict["superuser"] = ViewParam.SUPERUSER 

3604 

3605 return model_form_dict 

3606 

3607 

3608@view_config( 

3609 route_name=Routes.EDIT_USER, 

3610 permission=Permission.GROUPADMIN, 

3611 http_cache=NEVER_CACHE, 

3612) 

3613def edit_user(req: "CamcopsRequest") -> Response: 

3614 """ 

3615 View to edit a user (for administrators). 

3616 """ 

3617 view: EditUserBaseView 

3618 

3619 if req.user.superuser: 

3620 view = EditUserSuperUserView(req) 

3621 else: 

3622 view = EditUserGroupAdminView(req) 

3623 

3624 return view.dispatch() 

3625 

3626 

3627class EditUserGroupMembershipBaseView(UpdateView): 

3628 """ 

3629 Django-style view to edit a user's group membership permissions. 

3630 """ 

3631 

3632 model_form_dict = { 

3633 "may_upload": ViewParam.MAY_UPLOAD, 

3634 "may_register_devices": ViewParam.MAY_REGISTER_DEVICES, 

3635 "may_use_webviewer": ViewParam.MAY_USE_WEBVIEWER, 

3636 "view_all_patients_when_unfiltered": ViewParam.VIEW_ALL_PATIENTS_WHEN_UNFILTERED, # noqa: E501 

3637 "may_dump_data": ViewParam.MAY_DUMP_DATA, 

3638 "may_run_reports": ViewParam.MAY_RUN_REPORTS, 

3639 "may_add_notes": ViewParam.MAY_ADD_NOTES, 

3640 "may_manage_patients": ViewParam.MAY_MANAGE_PATIENTS, 

3641 "may_email_patients": ViewParam.MAY_EMAIL_PATIENTS, 

3642 } 

3643 

3644 object_class = UserGroupMembership 

3645 pk_param = ViewParam.USER_GROUP_MEMBERSHIP_ID 

3646 server_pk_name = "id" 

3647 template_name = "user_edit_group_membership.mako" 

3648 

3649 def get_success_url(self) -> str: 

3650 return self.request.route_url(Routes.VIEW_ALL_USERS) 

3651 

3652 def get_object(self) -> Any: 

3653 # noinspection PyUnresolvedReferences 

3654 ugm = cast(UserGroupMembership, super().get_object()) 

3655 user = ugm.user 

3656 assert_may_edit_user(self.request, user) 

3657 assert_may_administer_group(self.request, ugm.group_id) 

3658 

3659 return ugm 

3660 

3661 

3662class EditUserGroupMembershipSuperUserView(EditUserGroupMembershipBaseView): 

3663 """ 

3664 For superusers to edit a user's group memberships. 

3665 """ 

3666 

3667 form_class = EditUserGroupPermissionsFullForm 

3668 

3669 def get_model_form_dict(self) -> Dict[str, str]: 

3670 model_form_dict = super().get_model_form_dict() 

3671 model_form_dict["groupadmin"] = ViewParam.GROUPADMIN 

3672 

3673 return model_form_dict 

3674 

3675 

3676class EditUserGroupMembershipGroupAdminView(EditUserGroupMembershipBaseView): 

3677 """ 

3678 For group administrators to edit a user's group memberships. 

3679 """ 

3680 

3681 form_class = EditUserGroupMembershipGroupAdminForm 

3682 

3683 

3684@view_config( 

3685 route_name=Routes.EDIT_USER_GROUP_MEMBERSHIP, 

3686 permission=Permission.GROUPADMIN, 

3687 http_cache=NEVER_CACHE, 

3688) 

3689def edit_user_group_membership(req: "CamcopsRequest") -> Response: 

3690 """ 

3691 View to edit the group memberships of a user (for administrators). 

3692 """ 

3693 if req.user.superuser: 

3694 view = EditUserGroupMembershipSuperUserView(req) 

3695 else: 

3696 view = EditUserGroupMembershipGroupAdminView(req) 

3697 

3698 return view.dispatch() 

3699 

3700 

3701def set_user_upload_group( 

3702 req: "CamcopsRequest", user: User, by_another: bool 

3703) -> Response: 

3704 """ 

3705 Provides a view to choose which group a user uploads into. 

3706 

3707 TRUSTS ITS CALLER that this is permitted. 

3708 

3709 Args: 

3710 req: the :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

3711 user: the :class:`camcops_server.cc_modules.cc_user.User` to edit 

3712 by_another: is the current user a superuser/group administrator, i.e. 

3713 another user? Determines the screen we return to afterwards. 

3714 """ 

3715 route_back = Routes.VIEW_ALL_USERS if by_another else Routes.HOME 

3716 if FormAction.CANCEL in req.POST: 

3717 return HTTPFound(req.route_url(route_back)) 

3718 form = SetUserUploadGroupForm(request=req, user=user) 

3719 # ... need to show the groups permitted to THAT user, not OUR user 

3720 if FormAction.SUBMIT in req.POST: 

3721 try: 

3722 controls = list(req.POST.items()) 

3723 appstruct = form.validate(controls) 

3724 # ----------------------------------------------------------------- 

3725 # Apply the changes 

3726 # ----------------------------------------------------------------- 

3727 user.upload_group_id = appstruct.get(ViewParam.UPLOAD_GROUP_ID) 

3728 return HTTPFound(req.route_url(route_back)) 

3729 except ValidationFailure as e: 

3730 rendered_form = e.render() 

3731 else: 

3732 appstruct = { 

3733 ViewParam.USER_ID: user.id, 

3734 ViewParam.UPLOAD_GROUP_ID: user.upload_group_id, 

3735 } 

3736 rendered_form = form.render(appstruct) 

3737 return render_to_response( 

3738 "set_user_upload_group.mako", 

3739 dict( 

3740 user=user, 

3741 form=rendered_form, 

3742 head_form_html=get_head_form_html(req, [form]), 

3743 ), 

3744 request=req, 

3745 ) 

3746 

3747 

3748@view_config( 

3749 route_name=Routes.SET_OWN_USER_UPLOAD_GROUP, http_cache=NEVER_CACHE 

3750) 

3751def set_own_user_upload_group(req: "CamcopsRequest") -> Response: 

3752 """ 

3753 View to set the upload group for your own user. 

3754 """ 

3755 return set_user_upload_group(req, req.user, False) 

3756 

3757 

3758@view_config( 

3759 route_name=Routes.SET_OTHER_USER_UPLOAD_GROUP, 

3760 permission=Permission.GROUPADMIN, 

3761 http_cache=NEVER_CACHE, 

3762) 

3763def set_other_user_upload_group(req: "CamcopsRequest") -> Response: 

3764 """ 

3765 View to set the upload group for another user. 

3766 """ 

3767 user = get_user_from_request_user_id_or_raise(req) 

3768 if user.id != req.user.id: 

3769 assert_may_edit_user(req, user) 

3770 # ... but always OK to edit this for your own user; no such check required 

3771 return set_user_upload_group(req, user, True) 

3772 

3773 

3774# noinspection PyTypeChecker 

3775@view_config( 

3776 route_name=Routes.UNLOCK_USER, 

3777 permission=Permission.GROUPADMIN, 

3778 http_cache=NEVER_CACHE, 

3779) 

3780def unlock_user(req: "CamcopsRequest") -> Response: 

3781 """ 

3782 View to unlock a locked user account. 

3783 """ 

3784 user = get_user_from_request_user_id_or_raise(req) 

3785 assert_may_edit_user(req, user) 

3786 user.enable(req) 

3787 _ = req.gettext 

3788 

3789 req.session.flash( 

3790 _("User {username} enabled").format(username=user.username), 

3791 queue=FlashQueue.SUCCESS, 

3792 ) 

3793 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS)) 

3794 

3795 

3796@view_config( 

3797 route_name=Routes.ADD_USER, 

3798 permission=Permission.GROUPADMIN, 

3799 renderer="user_add.mako", 

3800 http_cache=NEVER_CACHE, 

3801) 

3802def add_user(req: "CamcopsRequest") -> Dict[str, Any]: 

3803 """ 

3804 View to add a user. 

3805 """ 

3806 route_back = Routes.VIEW_ALL_USERS 

3807 if FormAction.CANCEL in req.POST: 

3808 raise HTTPFound(req.route_url(route_back)) 

3809 if req.user.superuser: 

3810 form = AddUserSuperuserForm(request=req) 

3811 else: 

3812 form = AddUserGroupadminForm(request=req) 

3813 dbsession = req.dbsession 

3814 if FormAction.SUBMIT in req.POST: 

3815 try: 

3816 controls = list(req.POST.items()) 

3817 appstruct = form.validate(controls) 

3818 # ----------------------------------------------------------------- 

3819 # Add the user 

3820 # ----------------------------------------------------------------- 

3821 user = User() 

3822 user.username = appstruct.get(ViewParam.USERNAME) 

3823 user.set_password(req, appstruct.get(ViewParam.NEW_PASSWORD)) 

3824 user.must_change_password = appstruct.get( 

3825 ViewParam.MUST_CHANGE_PASSWORD 

3826 ) 

3827 # We don't ask for language initially; that can be configured 

3828 # later. But is is a reasonable guess that it should be the same 

3829 # language as used by the person creating the new user. 

3830 user.language = req.language 

3831 if User.get_user_by_name(dbsession, user.username): 

3832 raise HTTPBadRequest( 

3833 f"User with username {user.username!r} already exists!" 

3834 ) 

3835 dbsession.add(user) 

3836 group_ids = appstruct.get(ViewParam.GROUP_IDS) 

3837 for gid in group_ids: 

3838 # noinspection PyUnresolvedReferences 

3839 user.user_group_memberships.append( 

3840 UserGroupMembership(user_id=user.id, group_id=gid) 

3841 ) 

3842 raise HTTPFound(req.route_url(route_back)) 

3843 except ValidationFailure as e: 

3844 rendered_form = e.render() 

3845 else: 

3846 rendered_form = form.render() 

3847 return dict( 

3848 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

3849 ) 

3850 

3851 

3852def any_records_use_user(req: "CamcopsRequest", user: User) -> bool: 

3853 """ 

3854 Do any records in the database refer to the specified user? 

3855 

3856 (Used when we're thinking about deleting a user; would it leave broken 

3857 references? If so, we will prevent deletion; see :func:`delete_user`.) 

3858 """ 

3859 dbsession = req.dbsession 

3860 user_id = user.id 

3861 # Device? 

3862 q = CountStarSpecializedQuery(Device, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501 

3863 or_( 

3864 Device.registered_by_user_id == user_id, 

3865 Device.uploading_user_id == user_id, 

3866 ) 

3867 ) 

3868 if q.count_star() > 0: 

3869 return True 

3870 # SpecialNote? 

3871 q = CountStarSpecializedQuery(SpecialNote, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501 

3872 SpecialNote.user_id == user_id 

3873 ) 

3874 if q.count_star() > 0: 

3875 return True 

3876 # Audit trail? 

3877 q = CountStarSpecializedQuery(AuditEntry, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501 

3878 AuditEntry.user_id == user_id 

3879 ) 

3880 if q.count_star() > 0: 

3881 return True 

3882 # Uploaded records? 

3883 for cls in gen_orm_classes_from_base( 

3884 GenericTabletRecordMixin 

3885 ): # type: Type[GenericTabletRecordMixin] 

3886 # noinspection PyProtectedMember 

3887 q = CountStarSpecializedQuery(cls, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501 

3888 or_( 

3889 cls._adding_user_id == user_id, 

3890 cls._removing_user_id == user_id, 

3891 cls._preserving_user_id == user_id, 

3892 cls._manually_erasing_user_id == user_id, 

3893 ) 

3894 ) 

3895 if q.count_star() > 0: 

3896 return True 

3897 # No; all clean. 

3898 return False 

3899 

3900 

3901@view_config( 

3902 route_name=Routes.DELETE_USER, 

3903 permission=Permission.GROUPADMIN, 

3904 renderer="user_delete.mako", 

3905 http_cache=NEVER_CACHE, 

3906) 

3907def delete_user(req: "CamcopsRequest") -> Dict[str, Any]: 

3908 """ 

3909 View to delete a user (and make it hard work). 

3910 """ 

3911 if FormAction.CANCEL in req.POST: 

3912 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS)) 

3913 user = get_user_from_request_user_id_or_raise(req) 

3914 assert_may_edit_user(req, user) 

3915 form = DeleteUserForm(request=req) 

3916 rendered_form = "" 

3917 error = "" 

3918 _ = req.gettext 

3919 if user.id == req.user.id: 

3920 error = _("Can't delete your own user!") 

3921 elif user.may_use_webviewer or user.may_upload: 

3922 error = _( 

3923 "Unable to delete user: user still has webviewer login " 

3924 "and/or tablet upload permission" 

3925 ) 

3926 elif user.superuser and (not req.user.superuser): 

3927 error = _( 

3928 "Unable to delete user: " "they are a superuser and you are not" 

3929 ) 

3930 elif (not req.user.superuser) and bool( 

3931 set(user.group_ids) - set(req.user.ids_of_groups_user_is_admin_for) 

3932 ): 

3933 error = _( 

3934 "Unable to delete user: " 

3935 "user belongs to groups that you do not administer" 

3936 ) 

3937 else: 

3938 if any_records_use_user(req, user): 

3939 error = _( 

3940 "Unable to delete user; records (or audit trails) refer to " 

3941 "that user. Disable login and upload permissions instead." 

3942 ) 

3943 else: 

3944 if FormAction.DELETE in req.POST: 

3945 try: 

3946 controls = list(req.POST.items()) 

3947 appstruct = form.validate(controls) 

3948 assert appstruct.get(ViewParam.USER_ID) == user.id 

3949 # --------------------------------------------------------- 

3950 # Delete the user and associated objects 

3951 # --------------------------------------------------------- 

3952 # (*) Sessions belonging to this user 

3953 # ... done by modifying its ForeignKey to use "ondelete" 

3954 # (*) user_group_table mapping 

3955 # https://docs.sqlalchemy.org/en/latest/orm/basic_relationships.html#relationships-many-to-many-deletion # noqa 

3956 # Simplest way: 

3957 user.groups = [] # will delete the mapping entries 

3958 # (*) User itself 

3959 req.dbsession.delete(user) 

3960 # Done 

3961 raise HTTPFound(req.route_url(Routes.VIEW_ALL_USERS)) 

3962 except ValidationFailure as e: 

3963 rendered_form = e.render() 

3964 else: 

3965 appstruct = {ViewParam.USER_ID: user.id} 

3966 rendered_form = form.render(appstruct) 

3967 

3968 return dict( 

3969 user=user, 

3970 error=error, 

3971 form=rendered_form, 

3972 head_form_html=get_head_form_html(req, [form]), 

3973 ) 

3974 

3975 

3976# ============================================================================= 

3977# Group management 

3978# ============================================================================= 

3979 

3980 

3981@view_config( 

3982 route_name=Routes.VIEW_GROUPS, 

3983 permission=Permission.SUPERUSER, 

3984 renderer="groups_view.mako", 

3985 http_cache=NEVER_CACHE, 

3986) 

3987def view_groups(req: "CamcopsRequest") -> Dict[str, Any]: 

3988 """ 

3989 View to show all groups (with hyperlinks to edit them). 

3990 Superusers only. 

3991 """ 

3992 rows_per_page = req.get_int_param( 

3993 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

3994 ) 

3995 page_num = req.get_int_param(ViewParam.PAGE, 1) 

3996 dbsession = req.dbsession 

3997 groups = ( 

3998 dbsession.query(Group).order_by(Group.name).all() 

3999 ) # type: List[Group] 

4000 page = CamcopsPage( 

4001 collection=groups, 

4002 page=page_num, 

4003 items_per_page=rows_per_page, 

4004 url_maker=PageUrl(req), 

4005 request=req, 

4006 ) 

4007 

4008 valid_which_idnums = req.valid_which_idnums 

4009 

4010 return dict(groups_page=page, valid_which_idnums=valid_which_idnums) 

4011 

4012 

4013def get_group_from_request_group_id_or_raise(req: "CamcopsRequest") -> Group: 

4014 """ 

4015 Returns the :class:`camcops_server.cc_modules.cc_group.Group` represented 

4016 by the request's ``ViewParam.GROUP_ID`` parameter, or raise 

4017 :exc:`HTTPBadRequest`. 

4018 """ 

4019 group_id = req.get_int_param(ViewParam.GROUP_ID) 

4020 group = None 

4021 if group_id is not None: 

4022 dbsession = req.dbsession 

4023 group = dbsession.query(Group).filter(Group.id == group_id).first() 

4024 if not group: 

4025 _ = req.gettext 

4026 raise HTTPBadRequest(f"{_('No such group ID:')} {group_id!r}") 

4027 return group 

4028 

4029 

4030class EditGroupView(UpdateView): 

4031 """ 

4032 Django-style view to edit a CamCOPS group. 

4033 """ 

4034 

4035 form_class = EditGroupForm 

4036 model_form_dict = { 

4037 "name": ViewParam.NAME, 

4038 "description": ViewParam.DESCRIPTION, 

4039 "upload_policy": ViewParam.UPLOAD_POLICY, 

4040 "finalize_policy": ViewParam.FINALIZE_POLICY, 

4041 } 

4042 object_class = Group 

4043 pk_param = ViewParam.GROUP_ID 

4044 server_pk_name = "id" 

4045 template_name = "group_edit.mako" 

4046 

4047 def get_form_kwargs(self) -> Dict[str, Any]: 

4048 kwargs = super().get_form_kwargs() 

4049 

4050 group = cast(Group, self.object) 

4051 kwargs.update(group=group) 

4052 

4053 return kwargs 

4054 

4055 def get_form_values(self) -> Dict: 

4056 # will populate with model_form_dict 

4057 form_values = super().get_form_values() 

4058 

4059 group = cast(Group, self.object) 

4060 

4061 other_group_ids = list(group.ids_of_other_groups_group_may_see()) 

4062 other_groups = Group.get_groups_from_id_list( 

4063 self.request.dbsession, other_group_ids 

4064 ) 

4065 other_groups.sort(key=lambda g: g.name) 

4066 

4067 form_values.update( 

4068 { 

4069 ViewParam.IP_USE: group.ip_use, 

4070 ViewParam.GROUP_ID: group.id, 

4071 ViewParam.GROUP_IDS: [g.id for g in other_groups], 

4072 } 

4073 ) 

4074 

4075 return form_values 

4076 

4077 def get_success_url(self) -> str: 

4078 return self.request.route_url(Routes.VIEW_GROUPS) 

4079 

4080 def save_object(self, appstruct: Dict[str, Any]) -> None: 

4081 super().save_object(appstruct) 

4082 

4083 group = cast(Group, self.object) 

4084 

4085 # Group cross-references 

4086 group_ids = appstruct.get(ViewParam.GROUP_IDS) 

4087 # The form validation will prevent our own group from being in here 

4088 other_groups = Group.get_groups_from_id_list( 

4089 self.request.dbsession, group_ids 

4090 ) 

4091 group.can_see_other_groups = other_groups 

4092 

4093 ip_use = appstruct.get(ViewParam.IP_USE) 

4094 if group.ip_use is not None: 

4095 ip_use.id = group.ip_use.id 

4096 

4097 group.ip_use = ip_use 

4098 

4099 

4100@view_config( 

4101 route_name=Routes.EDIT_GROUP, 

4102 permission=Permission.SUPERUSER, 

4103 http_cache=NEVER_CACHE, 

4104) 

4105def edit_group(req: "CamcopsRequest") -> Response: 

4106 """ 

4107 View to edit a group. Superusers only. 

4108 """ 

4109 return EditGroupView(req).dispatch() 

4110 

4111 

4112@view_config( 

4113 route_name=Routes.ADD_GROUP, 

4114 permission=Permission.SUPERUSER, 

4115 renderer="group_add.mako", 

4116 http_cache=NEVER_CACHE, 

4117) 

4118def add_group(req: "CamcopsRequest") -> Dict[str, Any]: 

4119 """ 

4120 View to add a group. Superusers only. 

4121 """ 

4122 route_back = Routes.VIEW_GROUPS 

4123 if FormAction.CANCEL in req.POST: 

4124 raise HTTPFound(req.route_url(route_back)) 

4125 form = AddGroupForm(request=req) 

4126 dbsession = req.dbsession 

4127 if FormAction.SUBMIT in req.POST: 

4128 try: 

4129 controls = list(req.POST.items()) 

4130 appstruct = form.validate(controls) 

4131 # ----------------------------------------------------------------- 

4132 # Add the group 

4133 # ----------------------------------------------------------------- 

4134 group = Group() 

4135 group.name = appstruct.get(ViewParam.NAME) 

4136 dbsession.add(group) 

4137 raise HTTPFound(req.route_url(route_back)) 

4138 except ValidationFailure as e: 

4139 rendered_form = e.render() 

4140 else: 

4141 rendered_form = form.render() 

4142 return dict( 

4143 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

4144 ) 

4145 

4146 

4147def any_records_use_group(req: "CamcopsRequest", group: Group) -> bool: 

4148 """ 

4149 Do any records in the database refer to the specified group? 

4150 

4151 (Used when we're thinking about deleting a group; would it leave broken 

4152 references? If so, we will prevent deletion; see :func:`delete_group`.) 

4153 """ 

4154 dbsession = req.dbsession 

4155 group_id = group.id 

4156 # Our own or users filtering on us? 

4157 # ... doesn't matter; see TaskFilter; stored as a CSV list so not part of 

4158 # database integrity checks. 

4159 # Uploaded records? 

4160 for cls in gen_orm_classes_from_base( 

4161 GenericTabletRecordMixin 

4162 ): # type: Type[GenericTabletRecordMixin] 

4163 # noinspection PyProtectedMember 

4164 q = CountStarSpecializedQuery(cls, session=dbsession).filter( # type: ignore[arg-type] # noqa: E501 

4165 cls._group_id == group_id 

4166 ) 

4167 if q.count_star() > 0: 

4168 return True 

4169 # No; all clean. 

4170 return False 

4171 

4172 

4173@view_config( 

4174 route_name=Routes.DELETE_GROUP, 

4175 permission=Permission.SUPERUSER, 

4176 renderer="group_delete.mako", 

4177 http_cache=NEVER_CACHE, 

4178) 

4179def delete_group(req: "CamcopsRequest") -> Dict[str, Any]: 

4180 """ 

4181 View to delete a group. Superusers only. 

4182 """ 

4183 route_back = Routes.VIEW_GROUPS 

4184 if FormAction.CANCEL in req.POST: 

4185 raise HTTPFound(req.route_url(route_back)) 

4186 group = get_group_from_request_group_id_or_raise(req) 

4187 form = DeleteGroupForm(request=req) 

4188 rendered_form = "" 

4189 error = "" 

4190 _ = req.gettext 

4191 if group.users: 

4192 error = _("Unable to delete group; there are users who are members!") 

4193 else: 

4194 if any_records_use_group(req, group): 

4195 error = _("Unable to delete group; records refer to it.") 

4196 else: 

4197 if FormAction.DELETE in req.POST: 

4198 try: 

4199 controls = list(req.POST.items()) 

4200 appstruct = form.validate(controls) 

4201 assert appstruct.get(ViewParam.GROUP_ID) == group.id 

4202 # --------------------------------------------------------- 

4203 # Delete the group 

4204 # --------------------------------------------------------- 

4205 req.dbsession.delete(group) 

4206 raise HTTPFound(req.route_url(route_back)) 

4207 except ValidationFailure as e: 

4208 rendered_form = e.render() 

4209 else: 

4210 appstruct = {ViewParam.GROUP_ID: group.id} 

4211 rendered_form = form.render(appstruct) 

4212 return dict( 

4213 group=group, 

4214 error=error, 

4215 form=rendered_form, 

4216 head_form_html=get_head_form_html(req, [form]), 

4217 ) 

4218 

4219 

4220# ============================================================================= 

4221# Edit server settings 

4222# ============================================================================= 

4223 

4224 

4225@view_config( 

4226 route_name=Routes.EDIT_SERVER_SETTINGS, 

4227 permission=Permission.SUPERUSER, 

4228 renderer="server_settings_edit.mako", 

4229 http_cache=NEVER_CACHE, 

4230) 

4231def edit_server_settings(req: "CamcopsRequest") -> Dict[str, Any]: 

4232 """ 

4233 View to edit server settings (like the database title). 

4234 """ 

4235 if FormAction.CANCEL in req.POST: 

4236 raise HTTPFound(req.route_url(Routes.HOME)) 

4237 form = EditServerSettingsForm(request=req) 

4238 if FormAction.SUBMIT in req.POST: 

4239 try: 

4240 controls = list(req.POST.items()) 

4241 appstruct = form.validate(controls) 

4242 title = appstruct.get(ViewParam.DATABASE_TITLE) 

4243 # ----------------------------------------------------------------- 

4244 # Apply changes 

4245 # ----------------------------------------------------------------- 

4246 req.set_database_title(title) 

4247 raise HTTPFound(req.route_url(Routes.HOME)) 

4248 except ValidationFailure as e: 

4249 rendered_form = e.render() 

4250 else: 

4251 title = req.database_title 

4252 appstruct = {ViewParam.DATABASE_TITLE: title} 

4253 rendered_form = form.render(appstruct) 

4254 return dict( 

4255 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

4256 ) 

4257 

4258 

4259@view_config( 

4260 route_name=Routes.VIEW_ID_DEFINITIONS, 

4261 permission=Permission.SUPERUSER, 

4262 renderer="id_definitions_view.mako", 

4263 http_cache=NEVER_CACHE, 

4264) 

4265def view_id_definitions(req: "CamcopsRequest") -> Dict[str, Any]: 

4266 """ 

4267 View to show all ID number definitions (with hyperlinks to edit them). 

4268 Superusers only. 

4269 """ 

4270 return dict(idnum_definitions=req.idnum_definitions) 

4271 

4272 

4273def get_iddef_from_request_which_idnum_or_raise( 

4274 req: "CamcopsRequest", 

4275) -> IdNumDefinition: 

4276 """ 

4277 Returns the :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` 

4278 represented by the request's ``ViewParam.WHICH_IDNUM`` parameter, or raise 

4279 :exc:`HTTPBadRequest`. 

4280 """ 

4281 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM) 

4282 iddef = ( 

4283 req.dbsession.query(IdNumDefinition) 

4284 .filter(IdNumDefinition.which_idnum == which_idnum) 

4285 .first() 

4286 ) 

4287 if not iddef: 

4288 _ = req.gettext 

4289 raise HTTPBadRequest(f"{_('No such ID definition:')} {which_idnum!r}") 

4290 return iddef 

4291 

4292 

4293@view_config( 

4294 route_name=Routes.EDIT_ID_DEFINITION, 

4295 permission=Permission.SUPERUSER, 

4296 renderer="id_definition_edit.mako", 

4297 http_cache=NEVER_CACHE, 

4298) 

4299def edit_id_definition(req: "CamcopsRequest") -> Dict[str, Any]: 

4300 """ 

4301 View to edit an ID number definition. Superusers only. 

4302 """ 

4303 route_back = Routes.VIEW_ID_DEFINITIONS 

4304 if FormAction.CANCEL in req.POST: 

4305 raise HTTPFound(req.route_url(route_back)) 

4306 iddef = get_iddef_from_request_which_idnum_or_raise(req) 

4307 form = EditIdDefinitionForm(request=req) 

4308 if FormAction.SUBMIT in req.POST: 

4309 try: 

4310 controls = list(req.POST.items()) 

4311 appstruct = form.validate(controls) 

4312 # ----------------------------------------------------------------- 

4313 # Alter the ID definition 

4314 # ----------------------------------------------------------------- 

4315 iddef.description = appstruct.get(ViewParam.DESCRIPTION) 

4316 iddef.short_description = appstruct.get( 

4317 ViewParam.SHORT_DESCRIPTION 

4318 ) 

4319 iddef.validation_method = appstruct.get( 

4320 ViewParam.VALIDATION_METHOD 

4321 ) 

4322 iddef.hl7_id_type = appstruct.get(ViewParam.HL7_ID_TYPE) 

4323 iddef.hl7_assigning_authority = appstruct.get( 

4324 ViewParam.HL7_ASSIGNING_AUTHORITY 

4325 ) 

4326 iddef.fhir_id_system = appstruct.get(ViewParam.FHIR_ID_SYSTEM) 

4327 # REMOVED # clear_idnum_definition_cache() # SPECIAL 

4328 raise HTTPFound(req.route_url(route_back)) 

4329 except ValidationFailure as e: 

4330 rendered_form = e.render() 

4331 else: 

4332 appstruct = { 

4333 ViewParam.WHICH_IDNUM: iddef.which_idnum, 

4334 ViewParam.DESCRIPTION: iddef.description or "", 

4335 ViewParam.SHORT_DESCRIPTION: iddef.short_description or "", 

4336 ViewParam.VALIDATION_METHOD: iddef.validation_method or "", 

4337 ViewParam.HL7_ID_TYPE: iddef.hl7_id_type or "", 

4338 ViewParam.HL7_ASSIGNING_AUTHORITY: iddef.hl7_assigning_authority 

4339 or "", 

4340 ViewParam.FHIR_ID_SYSTEM: iddef.fhir_id_system or "", 

4341 } 

4342 rendered_form = form.render(appstruct) 

4343 return dict( 

4344 iddef=iddef, 

4345 form=rendered_form, 

4346 head_form_html=get_head_form_html(req, [form]), 

4347 ) 

4348 

4349 

4350@view_config( 

4351 route_name=Routes.ADD_ID_DEFINITION, 

4352 permission=Permission.SUPERUSER, 

4353 renderer="id_definition_add.mako", 

4354 http_cache=NEVER_CACHE, 

4355) 

4356def add_id_definition(req: "CamcopsRequest") -> Dict[str, Any]: 

4357 """ 

4358 View to add an ID number definition. Superusers only. 

4359 """ 

4360 route_back = Routes.VIEW_ID_DEFINITIONS 

4361 if FormAction.CANCEL in req.POST: 

4362 raise HTTPFound(req.route_url(route_back)) 

4363 form = AddIdDefinitionForm(request=req) 

4364 dbsession = req.dbsession 

4365 if FormAction.SUBMIT in req.POST: 

4366 try: 

4367 controls = list(req.POST.items()) 

4368 appstruct = form.validate(controls) 

4369 iddef = IdNumDefinition( 

4370 which_idnum=appstruct.get(ViewParam.WHICH_IDNUM), 

4371 description=appstruct.get(ViewParam.DESCRIPTION), 

4372 short_description=appstruct.get(ViewParam.SHORT_DESCRIPTION), 

4373 # we skip hl7_id_type at this stage 

4374 # we skip hl7_assigning_authority at this stage 

4375 validation_method=appstruct.get(ViewParam.VALIDATION_METHOD), 

4376 ) 

4377 # ----------------------------------------------------------------- 

4378 # Add ID definition 

4379 # ----------------------------------------------------------------- 

4380 dbsession.add(iddef) 

4381 # REMOVED # clear_idnum_definition_cache() # SPECIAL 

4382 raise HTTPFound(req.route_url(route_back)) 

4383 except ValidationFailure as e: 

4384 rendered_form = e.render() 

4385 else: 

4386 rendered_form = form.render() 

4387 return dict( 

4388 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

4389 ) 

4390 

4391 

4392def any_records_use_iddef( 

4393 req: "CamcopsRequest", iddef: IdNumDefinition 

4394) -> bool: 

4395 """ 

4396 Do any records in the database refer to the specified ID number definition? 

4397 

4398 (Used when we're thinking about deleting one; would it leave broken 

4399 references? If so, we will prevent deletion; see 

4400 :func:`delete_id_definition`.) 

4401 """ 

4402 # Helpfully, these are only referred to permanently from one place: 

4403 q = CountStarSpecializedQuery(PatientIdNum, session=req.dbsession).filter( # type: ignore[arg-type] # noqa: E501 

4404 PatientIdNum.which_idnum == iddef.which_idnum 

4405 ) 

4406 if q.count_star() > 0: 

4407 return True 

4408 # No; all clean. 

4409 return False 

4410 

4411 

4412@view_config( 

4413 route_name=Routes.DELETE_ID_DEFINITION, 

4414 permission=Permission.SUPERUSER, 

4415 renderer="id_definition_delete.mako", 

4416 http_cache=NEVER_CACHE, 

4417) 

4418def delete_id_definition(req: "CamcopsRequest") -> Dict[str, Any]: 

4419 """ 

4420 View to delete an ID number definition. Superusers only. 

4421 """ 

4422 route_back = Routes.VIEW_ID_DEFINITIONS 

4423 if FormAction.CANCEL in req.POST: 

4424 raise HTTPFound(req.route_url(route_back)) 

4425 iddef = get_iddef_from_request_which_idnum_or_raise(req) 

4426 form = DeleteIdDefinitionForm(request=req) 

4427 rendered_form = "" 

4428 error = "" 

4429 if any_records_use_iddef(req, iddef): 

4430 _ = req.gettext 

4431 error = _("Unable to delete ID definition; records refer to it.") 

4432 else: 

4433 if FormAction.DELETE in req.POST: 

4434 try: 

4435 controls = list(req.POST.items()) 

4436 appstruct = form.validate(controls) 

4437 assert ( 

4438 appstruct.get(ViewParam.WHICH_IDNUM) == iddef.which_idnum 

4439 ) 

4440 # ------------------------------------------------------------- 

4441 # Delete ID definition 

4442 # ------------------------------------------------------------- 

4443 req.dbsession.delete(iddef) 

4444 # REMOVED # clear_idnum_definition_cache() # SPECIAL 

4445 raise HTTPFound(req.route_url(route_back)) 

4446 except ValidationFailure as e: 

4447 rendered_form = e.render() 

4448 else: 

4449 appstruct = {ViewParam.WHICH_IDNUM: iddef.which_idnum} 

4450 rendered_form = form.render(appstruct) 

4451 return dict( 

4452 iddef=iddef, 

4453 error=error, 

4454 form=rendered_form, 

4455 head_form_html=get_head_form_html(req, [form]), 

4456 ) 

4457 

4458 

4459# ============================================================================= 

4460# Altering data. Some of the more complex logic is here. 

4461# ============================================================================= 

4462 

4463 

4464@view_config( 

4465 route_name=Routes.ADD_SPECIAL_NOTE, 

4466 renderer="special_note_add.mako", 

4467 http_cache=NEVER_CACHE, 

4468) 

4469def add_special_note(req: "CamcopsRequest") -> Dict[str, Any]: 

4470 """ 

4471 View to add a special note to a task (after confirmation). 

4472 

4473 (Note that users can't add special notes to patients -- those get added 

4474 automatically when a patient is edited. So the context here is always of a 

4475 task.) 

4476 """ 

4477 table_name = req.get_str_param( 

4478 ViewParam.TABLE_NAME, validator=validate_task_tablename 

4479 ) 

4480 server_pk = req.get_int_param(ViewParam.SERVER_PK, None) 

4481 url_back = req.route_url( 

4482 Routes.TASK, 

4483 _query={ 

4484 ViewParam.TABLE_NAME: table_name, 

4485 ViewParam.SERVER_PK: server_pk, 

4486 ViewParam.VIEWTYPE: ViewArg.HTML, 

4487 }, 

4488 ) 

4489 if FormAction.CANCEL in req.POST: 

4490 raise HTTPFound(url_back) 

4491 task = task_factory(req, table_name, server_pk) 

4492 _ = req.gettext 

4493 if task is None: 

4494 raise HTTPBadRequest( 

4495 f"{_('No such task:')} {table_name}, PK={server_pk}" 

4496 ) 

4497 user = req.user 

4498 if not user.authorized_to_add_special_note(task.group_id): 

4499 raise HTTPBadRequest( 

4500 _("Not authorized to add special notes for this task's group") 

4501 ) 

4502 form = AddSpecialNoteForm(request=req) 

4503 if FormAction.SUBMIT in req.POST: 

4504 try: 

4505 controls = list(req.POST.items()) 

4506 appstruct = form.validate(controls) 

4507 note = appstruct.get(ViewParam.NOTE) 

4508 # ----------------------------------------------------------------- 

4509 # Apply special note 

4510 # ----------------------------------------------------------------- 

4511 task.apply_special_note(req, note) 

4512 raise HTTPFound(url_back) 

4513 except ValidationFailure as e: 

4514 rendered_form = e.render() 

4515 else: 

4516 appstruct = { 

4517 ViewParam.TABLE_NAME: table_name, 

4518 ViewParam.SERVER_PK: server_pk, 

4519 } 

4520 rendered_form = form.render(appstruct) 

4521 return dict( 

4522 task=task, 

4523 form=rendered_form, 

4524 head_form_html=get_head_form_html(req, [form]), 

4525 viewtype=ViewArg.HTML, 

4526 ) 

4527 

4528 

4529@view_config( 

4530 route_name=Routes.DELETE_SPECIAL_NOTE, 

4531 renderer="special_note_delete.mako", 

4532 http_cache=NEVER_CACHE, 

4533) 

4534def delete_special_note(req: "CamcopsRequest") -> Dict[str, Any]: 

4535 """ 

4536 View to delete a special note (after confirmation). 

4537 """ 

4538 note_id = req.get_int_param(ViewParam.NOTE_ID, None) 

4539 sn = SpecialNote.get_specialnote_by_id(req.dbsession, note_id) 

4540 _ = req.gettext 

4541 if sn is None: 

4542 raise HTTPBadRequest(f"{_('No such SpecialNote:')} note_id={note_id}") 

4543 if sn.hidden: 

4544 raise HTTPBadRequest( 

4545 f"{_('SpecialNote already deleted/hidden:')} " f"note_id={note_id}" 

4546 ) 

4547 if not sn.user_may_delete_specialnote(req.user): 

4548 raise HTTPBadRequest(_("Not authorized to delete this special note")) 

4549 url_back = req.route_url(Routes.VIEW_TASKS) # default 

4550 if sn.refers_to_patient(): 

4551 # Special note on a patient. 

4552 # We might have come here from any number of tasks relating to this 

4553 # patient. In principle this information is retrievable; in practice it 

4554 # is a considerable faff for a rare operation, since special notes are 

4555 # displayed via special_notes.mako, which only looks at information 

4556 # stored with the note itself. 

4557 pass 

4558 else: 

4559 # Special note on a task. 

4560 task = sn.target_task() 

4561 if task: 

4562 url_back = req.route_url( 

4563 Routes.TASK, 

4564 _query={ 

4565 ViewParam.TABLE_NAME: task.tablename, 

4566 ViewParam.SERVER_PK: task.pk, 

4567 ViewParam.VIEWTYPE: ViewArg.HTML, 

4568 }, 

4569 ) 

4570 if FormAction.CANCEL in req.POST: 

4571 raise HTTPFound(url_back) 

4572 form = DeleteSpecialNoteForm(request=req) 

4573 if FormAction.SUBMIT in req.POST: 

4574 try: 

4575 controls = list(req.POST.items()) 

4576 form.validate(controls) 

4577 # ----------------------------------------------------------------- 

4578 # Delete special note 

4579 # ----------------------------------------------------------------- 

4580 sn.hidden = True 

4581 raise HTTPFound(url_back) 

4582 except ValidationFailure as e: 

4583 rendered_form = e.render() 

4584 else: 

4585 appstruct = {ViewParam.NOTE_ID: note_id} 

4586 rendered_form = form.render(appstruct) 

4587 return dict( 

4588 sn=sn, 

4589 form=rendered_form, 

4590 head_form_html=get_head_form_html(req, [form]), 

4591 ) 

4592 

4593 

4594class EraseTaskBaseView(DeleteView): 

4595 """ 

4596 Django-style view to erase a task. 

4597 """ 

4598 

4599 form_class = EraseTaskForm 

4600 

4601 def get_object(self) -> Any: 

4602 # noinspection PyAttributeOutsideInit 

4603 self.table_name = self.request.get_str_param( 

4604 ViewParam.TABLE_NAME, validator=validate_task_tablename 

4605 ) 

4606 # noinspection PyAttributeOutsideInit 

4607 self.server_pk = self.request.get_int_param(ViewParam.SERVER_PK, None) 

4608 

4609 task = task_factory(self.request, self.table_name, self.server_pk) 

4610 _ = self.request.gettext 

4611 if task is None: 

4612 raise HTTPBadRequest( 

4613 f"{_('No such task:')} {self.table_name}, PK={self.server_pk}" 

4614 ) 

4615 if task.is_live_on_tablet(): 

4616 raise HTTPBadRequest(errormsg_task_live(self.request)) 

4617 self.check_user_is_authorized(task) 

4618 

4619 return task 

4620 

4621 def check_user_is_authorized(self, task: Task) -> None: 

4622 if not self.request.user.authorized_to_erase_tasks(task.group_id): 

4623 _ = self.request.gettext 

4624 raise HTTPBadRequest( 

4625 _("Not authorized to erase tasks for this task's group") 

4626 ) 

4627 

4628 def get_cancel_url(self) -> str: 

4629 return self.request.route_url( 

4630 Routes.TASK, 

4631 _query={ 

4632 ViewParam.TABLE_NAME: self.table_name, 

4633 ViewParam.SERVER_PK: self.server_pk, 

4634 ViewParam.VIEWTYPE: ViewArg.HTML, 

4635 }, 

4636 ) 

4637 

4638 

4639class EraseTaskLeavingPlaceholderView(EraseTaskBaseView): 

4640 """ 

4641 Django-style view to erase data from a task, leaving an empty 

4642 "placeholder". 

4643 """ 

4644 

4645 template_name = "task_erase.mako" 

4646 

4647 def get_object(self) -> Any: 

4648 task = cast(Task, super().get_object()) 

4649 if task.is_erased(): 

4650 _ = self.request.gettext 

4651 raise HTTPBadRequest(_("Task already erased")) 

4652 

4653 return task 

4654 

4655 def delete(self) -> None: 

4656 task = cast(Task, self.object) 

4657 

4658 task.manually_erase(self.request) 

4659 

4660 def get_success_url(self) -> str: 

4661 return self.request.route_url( 

4662 Routes.TASK, 

4663 _query={ 

4664 ViewParam.TABLE_NAME: self.table_name, 

4665 ViewParam.SERVER_PK: self.server_pk, 

4666 ViewParam.VIEWTYPE: ViewArg.HTML, 

4667 }, 

4668 ) 

4669 

4670 

4671class EraseTaskEntirelyView(EraseTaskBaseView): 

4672 """ 

4673 Django-style view to erase (delete) a task entirely. 

4674 """ 

4675 

4676 template_name = "task_erase_entirely.mako" 

4677 

4678 def delete(self) -> None: 

4679 task = cast(Task, self.object) 

4680 

4681 TaskIndexEntry.unindex_task(task, self.request.dbsession) 

4682 task.delete_entirely(self.request) 

4683 

4684 _ = self.request.gettext 

4685 

4686 msg_erased = _("Task erased:") 

4687 

4688 self.request.session.flash( 

4689 f"{msg_erased} ({self.table_name}, server PK {self.server_pk}).", 

4690 queue=FlashQueue.SUCCESS, 

4691 ) 

4692 

4693 def get_success_url(self) -> str: 

4694 return self.request.route_url(Routes.VIEW_TASKS) 

4695 

4696 

4697@view_config( 

4698 route_name=Routes.ERASE_TASK_LEAVING_PLACEHOLDER, 

4699 permission=Permission.GROUPADMIN, 

4700 http_cache=NEVER_CACHE, 

4701) 

4702def erase_task_leaving_placeholder(req: "CamcopsRequest") -> Response: 

4703 """ 

4704 View to wipe all data from a task (after confirmation). 

4705 

4706 Leaves the task record as a placeholder. 

4707 """ 

4708 return EraseTaskLeavingPlaceholderView(req).dispatch() 

4709 

4710 

4711@view_config( 

4712 route_name=Routes.ERASE_TASK_ENTIRELY, 

4713 permission=Permission.GROUPADMIN, 

4714 http_cache=NEVER_CACHE, 

4715) 

4716def erase_task_entirely(req: "CamcopsRequest") -> Response: 

4717 """ 

4718 View to erase a task from the database entirely (after confirmation). 

4719 """ 

4720 return EraseTaskEntirelyView(req).dispatch() 

4721 

4722 

4723@view_config( 

4724 route_name=Routes.DELETE_PATIENT, 

4725 permission=Permission.GROUPADMIN, 

4726 http_cache=NEVER_CACHE, 

4727) 

4728def delete_patient(req: "CamcopsRequest") -> Response: 

4729 """ 

4730 View to delete completely all data for a patient (after confirmation), 

4731 within a specific group. 

4732 """ 

4733 if FormAction.CANCEL in req.POST: 

4734 raise HTTPFound(req.route_url(Routes.HOME)) 

4735 

4736 first_form = DeletePatientChooseForm(request=req) 

4737 second_form = DeletePatientConfirmForm(request=req) 

4738 form = None 

4739 final_phase = False 

4740 if FormAction.SUBMIT in req.POST: 

4741 # FIRST form has been submitted 

4742 form = first_form 

4743 elif FormAction.DELETE in req.POST: 

4744 # SECOND AND FINAL form has been submitted 

4745 form = second_form 

4746 final_phase = True 

4747 _ = req.gettext 

4748 if form is not None: 

4749 try: 

4750 controls = list(req.POST.items()) 

4751 appstruct = form.validate(controls) 

4752 which_idnum = appstruct.get(ViewParam.WHICH_IDNUM) 

4753 idnum_value = appstruct.get(ViewParam.IDNUM_VALUE) 

4754 group_id = appstruct.get(ViewParam.GROUP_ID) 

4755 if group_id not in req.user.ids_of_groups_user_is_admin_for: 

4756 # rare occurrence; form should prevent it; 

4757 # unless superuser has changed status since form was read 

4758 raise HTTPBadRequest(_("You're not an admin for this group")) 

4759 # ----------------------------------------------------------------- 

4760 # Fetch tasks to be deleted. 

4761 # ----------------------------------------------------------------- 

4762 dbsession = req.dbsession 

4763 # Tasks first: 

4764 idnum_ref = IdNumReference( 

4765 which_idnum=which_idnum, idnum_value=idnum_value 

4766 ) 

4767 taskfilter = TaskFilter() 

4768 taskfilter.idnum_criteria = [idnum_ref] 

4769 taskfilter.group_ids = [group_id] 

4770 collection = TaskCollection( 

4771 req=req, 

4772 taskfilter=taskfilter, 

4773 sort_method_global=TaskSortMethod.CREATION_DATE_DESC, 

4774 current_only=False, # unusual option! 

4775 ) 

4776 tasks = collection.all_tasks 

4777 n_tasks = len(tasks) 

4778 patient_lineage_instances = Patient.get_patients_by_idnum( 

4779 dbsession=dbsession, 

4780 which_idnum=which_idnum, 

4781 idnum_value=idnum_value, 

4782 group_id=group_id, 

4783 current_only=False, 

4784 ) 

4785 n_patient_instances = len(patient_lineage_instances) 

4786 

4787 # ----------------------------------------------------------------- 

4788 # Bin out at this stage and offer confirmation page? 

4789 # ----------------------------------------------------------------- 

4790 if not final_phase: 

4791 # New appstruct; we don't want the validation code persisting 

4792 appstruct = { 

4793 ViewParam.WHICH_IDNUM: which_idnum, 

4794 ViewParam.IDNUM_VALUE: idnum_value, 

4795 ViewParam.GROUP_ID: group_id, 

4796 } 

4797 rendered_form = second_form.render(appstruct) 

4798 return render_to_response( 

4799 "patient_delete_confirm.mako", 

4800 dict( 

4801 form=rendered_form, 

4802 tasks=tasks, 

4803 n_patient_instances=n_patient_instances, 

4804 head_form_html=get_head_form_html(req, [form]), 

4805 ), 

4806 request=req, 

4807 ) 

4808 

4809 # ----------------------------------------------------------------- 

4810 # Delete patient and associated tasks 

4811 # ----------------------------------------------------------------- 

4812 for task in tasks: 

4813 TaskIndexEntry.unindex_task(task, req.dbsession) 

4814 task.delete_entirely(req) 

4815 # Then patients: 

4816 for p in patient_lineage_instances: 

4817 PatientIdNumIndexEntry.unindex_patient(p, req.dbsession) 

4818 p.delete_with_dependants(req) 

4819 msg = ( 

4820 f"{_('Patient and associated tasks DELETED from group')} " 

4821 f"{group_id}: idnum{which_idnum} = {idnum_value}. " 

4822 f"{_('Task records deleted:')} {n_tasks}." 

4823 f"{_('Patient records (current and/or old) deleted')} " 

4824 f"{n_patient_instances}." 

4825 ) 

4826 audit(req, msg) 

4827 

4828 req.session.flash(msg, FlashQueue.SUCCESS) 

4829 raise HTTPFound(req.route_url(Routes.HOME)) 

4830 

4831 except ValidationFailure as e: 

4832 rendered_form = e.render() 

4833 else: 

4834 form = first_form 

4835 rendered_form = first_form.render() 

4836 return render_to_response( 

4837 "patient_delete_choose.mako", 

4838 dict( 

4839 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

4840 ), 

4841 request=req, 

4842 ) 

4843 

4844 

4845@view_config( 

4846 route_name=Routes.FORCIBLY_FINALIZE, 

4847 permission=Permission.GROUPADMIN, 

4848 http_cache=NEVER_CACHE, 

4849) 

4850def forcibly_finalize(req: "CamcopsRequest") -> Response: 

4851 """ 

4852 View to force-finalize all live (``_era == ERA_NOW``) records from a 

4853 device. Available to group administrators if all those records are within 

4854 their groups (otherwise, it's a superuser operation). 

4855 """ 

4856 if FormAction.CANCEL in req.POST: 

4857 return HTTPFound(req.route_url(Routes.HOME)) 

4858 

4859 dbsession = req.dbsession 

4860 first_form = ForciblyFinalizeChooseDeviceForm(request=req) 

4861 second_form = ForciblyFinalizeConfirmForm(request=req) 

4862 form = None 

4863 final_phase = False 

4864 if FormAction.SUBMIT in req.POST: 

4865 # FIRST form has been submitted 

4866 form = first_form 

4867 elif FormAction.FINALIZE in req.POST: 

4868 # SECOND form has been submitted: 

4869 form = second_form 

4870 final_phase = True 

4871 _ = req.gettext 

4872 if form is not None: 

4873 try: 

4874 controls = list(req.POST.items()) 

4875 appstruct = form.validate(controls) 

4876 # log.debug("{}", pformat(appstruct)) 

4877 device_id = appstruct.get(ViewParam.DEVICE_ID) 

4878 device = Device.get_device_by_id(dbsession, device_id) 

4879 if device is None: 

4880 raise HTTPBadRequest(f"{_('No such device:')} {device_id!r}") 

4881 # ----------------------------------------------------------------- 

4882 # If at the first stage, bin out and offer confirmation page 

4883 # ----------------------------------------------------------------- 

4884 if not final_phase: 

4885 appstruct = {ViewParam.DEVICE_ID: device_id} 

4886 rendered_form = second_form.render(appstruct) 

4887 taskfilter = TaskFilter() 

4888 taskfilter.device_ids = [device_id] 

4889 taskfilter.era = ERA_NOW 

4890 collection = TaskCollection( 

4891 req=req, 

4892 taskfilter=taskfilter, 

4893 sort_method_global=TaskSortMethod.CREATION_DATE_DESC, 

4894 current_only=False, # unusual option! 

4895 via_index=False, # required for current_only=False 

4896 ) 

4897 tasks = collection.all_tasks 

4898 return render_to_response( 

4899 "device_forcibly_finalize_confirm.mako", 

4900 dict( 

4901 form=rendered_form, 

4902 tasks=tasks, 

4903 head_form_html=get_head_form_html(req, [form]), 

4904 ), 

4905 request=req, 

4906 ) 

4907 # ----------------------------------------------------------------- 

4908 # Check it's permitted 

4909 # ----------------------------------------------------------------- 

4910 if not req.user.superuser: 

4911 admin_group_ids = req.user.ids_of_groups_user_is_admin_for 

4912 for clienttable in CLIENT_TABLE_MAP.values(): 

4913 # noinspection PyPropertyAccess 

4914 count_query = ( 

4915 select(func.count()) 

4916 .select_from(clienttable) 

4917 .where(clienttable.c[FN_DEVICE_ID] == device_id) 

4918 .where(clienttable.c[FN_ERA] == ERA_NOW) 

4919 .where( 

4920 clienttable.c[FN_GROUP_ID].notin_(admin_group_ids) 

4921 ) 

4922 ) 

4923 n = dbsession.execute(count_query).scalar() 

4924 if n > 0: 

4925 raise HTTPBadRequest( 

4926 _( 

4927 "Some records for this device are in groups " 

4928 "for which you are not an administrator" 

4929 ) 

4930 ) 

4931 # ----------------------------------------------------------------- 

4932 # Forcibly finalize 

4933 # ----------------------------------------------------------------- 

4934 msgs = [] # type: List[str] 

4935 batchdetails = BatchDetails(batchtime=req.now_utc) 

4936 alltables = sorted( 

4937 CLIENT_TABLE_MAP.values(), key=upload_commit_order_sorter 

4938 ) 

4939 for clienttable in alltables: 

4940 liverecs = get_server_live_records( 

4941 req, device_id, clienttable, current_only=False 

4942 ) 

4943 preservation_pks = [r.server_pk for r in liverecs] 

4944 if not preservation_pks: 

4945 continue 

4946 current_pks = [r.server_pk for r in liverecs if r.current] 

4947 tablechanges = UploadTableChanges(clienttable) 

4948 tablechanges.note_preservation_pks(preservation_pks) 

4949 tablechanges.note_current_pks(current_pks) 

4950 dbsession.execute( 

4951 update(clienttable) 

4952 .where(clienttable.c[FN_PK].in_(preservation_pks)) 

4953 .values( 

4954 values_preserve_now( 

4955 req, batchdetails, forcibly_preserved=True 

4956 ) 

4957 ) 

4958 ) 

4959 update_indexes_and_push_exports( 

4960 req, batchdetails, tablechanges 

4961 ) 

4962 msgs.append(f"{clienttable.name} {preservation_pks}") 

4963 # Field names are different in server-side tables, so they need 

4964 # special handling: 

4965 SpecialNote.forcibly_preserve_special_notes_for_device( 

4966 req, device_id 

4967 ) 

4968 # ----------------------------------------------------------------- 

4969 # Done 

4970 # ----------------------------------------------------------------- 

4971 msg = ( 

4972 f"{_('Live records for device')} {device_id} " 

4973 f"({device.friendly_name}) {_('forcibly finalized')} " 

4974 f"(PKs: {'; '.join(msgs)})" 

4975 ) 

4976 audit(req, msg) 

4977 log.info(msg) 

4978 

4979 req.session.flash(msg, queue=FlashQueue.SUCCESS) 

4980 raise HTTPFound(req.route_url(Routes.HOME)) 

4981 

4982 except ValidationFailure as e: 

4983 rendered_form = e.render() 

4984 else: 

4985 form = first_form 

4986 rendered_form = form.render() # no appstruct 

4987 return render_to_response( 

4988 "device_forcibly_finalize_choose.mako", 

4989 dict( 

4990 form=rendered_form, head_form_html=get_head_form_html(req, [form]) 

4991 ), 

4992 request=req, 

4993 ) 

4994 

4995 

4996# ============================================================================= 

4997# Patient creation/editing (primarily for task scheduling) 

4998# ============================================================================= 

4999 

5000 

5001class PatientMixin(object): 

5002 """ 

5003 Mixin for views involving a patient. 

5004 """ 

5005 

5006 object: Any 

5007 object_class = Patient 

5008 server_pk_name = "_pk" 

5009 

5010 model_form_dict = { 

5011 "forename": ViewParam.FORENAME, 

5012 "surname": ViewParam.SURNAME, 

5013 "dob": ViewParam.DOB, 

5014 "sex": ViewParam.SEX, 

5015 "email": ViewParam.EMAIL, 

5016 "address": ViewParam.ADDRESS, 

5017 "gp": ViewParam.GP, 

5018 "other": ViewParam.OTHER, 

5019 } 

5020 

5021 def get_form_values(self) -> Dict: 

5022 # will populate with model_form_dict 

5023 # noinspection PyUnresolvedReferences 

5024 form_values = super().get_form_values() # type: ignore[misc] 

5025 

5026 patient = cast(Patient, self.object) 

5027 

5028 if patient is not None: 

5029 form_values[ViewParam.SERVER_PK] = patient.pk 

5030 form_values[ViewParam.GROUP_ID] = patient.group.id 

5031 form_values[ViewParam.ID_REFERENCES] = [ 

5032 { 

5033 ViewParam.WHICH_IDNUM: pidnum.which_idnum, 

5034 ViewParam.IDNUM_VALUE: pidnum.idnum_value, 

5035 } 

5036 for pidnum in patient.idnums 

5037 ] 

5038 ts_list = [] # type: List[Dict] 

5039 for pts in patient.task_schedules: 

5040 ts_dict = { 

5041 ViewParam.PATIENT_TASK_SCHEDULE_ID: pts.id, 

5042 ViewParam.SCHEDULE_ID: pts.schedule_id, 

5043 ViewParam.START_DATETIME: pts.start_datetime, 

5044 } 

5045 if DEFORM_ACCORDION_BUG: 

5046 ts_dict[ViewParam.SETTINGS] = pts.settings 

5047 else: 

5048 ts_dict[ViewParam.ADVANCED] = { 

5049 ViewParam.SETTINGS: pts.settings 

5050 } 

5051 ts_list.append(ts_dict) 

5052 form_values[ViewParam.TASK_SCHEDULES] = ts_list 

5053 

5054 return form_values 

5055 

5056 

5057class EditPatientBaseView(PatientMixin, UpdateView): 

5058 """ 

5059 View to edit details for a patient. 

5060 """ 

5061 

5062 pk_param = ViewParam.SERVER_PK 

5063 

5064 def get_object(self) -> Any: 

5065 patient = cast(Patient, super().get_object()) 

5066 

5067 _ = self.request.gettext 

5068 

5069 if not patient.group: 

5070 raise HTTPBadRequest(_("Bad patient: not in a group")) 

5071 

5072 if not patient.user_may_edit(self.request): 

5073 raise HTTPBadRequest(_("Not authorized to edit this patient")) 

5074 

5075 return patient 

5076 

5077 def save_object(self, appstruct: Dict[str, Any]) -> None: 

5078 # ----------------------------------------------------------------- 

5079 # Apply edits 

5080 # ----------------------------------------------------------------- 

5081 # Calculate the changes, and apply them to the Patient object 

5082 _ = self.request.gettext 

5083 

5084 patient = cast(Patient, self.object) 

5085 

5086 changes = OrderedDict() # type: OrderedDict 

5087 

5088 self.save_changes(appstruct, changes) 

5089 

5090 if not changes: 

5091 self.request.session.flash( 

5092 f"{_('No changes required for patient record with server PK')} " # noqa 

5093 f"{patient.pk} {_('(all new values matched old values)')}", 

5094 queue=FlashQueue.INFO, 

5095 ) 

5096 return 

5097 

5098 formatted_changes = [] 

5099 

5100 for k, details in changes.items(): 

5101 if len(details) == 1: 

5102 change = f"{k}: {details[0]}" # usually a plain message 

5103 else: 

5104 change = f"{k}: {details[0]!r} → {details[1]!r}" 

5105 

5106 formatted_changes.append(change) 

5107 

5108 # Below here, changes have definitely been made. 

5109 change_msg = ( 

5110 _("Patient details edited. Changes:") 

5111 + " " 

5112 + "; ".join(formatted_changes) 

5113 ) 

5114 

5115 # Apply special note to patient 

5116 patient.apply_special_note(self.request, change_msg, "Patient edited") 

5117 

5118 # Patient details changed, so resend any tasks via HL7 

5119 for task in self.get_affected_tasks(): 

5120 task.cancel_from_export_log(self.request) 

5121 

5122 # Done 

5123 self.request.session.flash( 

5124 f"{_('Amended patient record with server PK')} " 

5125 f"{patient.pk}. " 

5126 f"{_('Changes were:')} {change_msg}", 

5127 queue=FlashQueue.SUCCESS, 

5128 ) 

5129 

5130 def save_changes( 

5131 self, appstruct: Dict[str, Any], changes: OrderedDict 

5132 ) -> None: 

5133 self._save_simple_params(appstruct, changes) 

5134 self._save_idrefs(appstruct, changes) 

5135 

5136 def _save_simple_params( 

5137 self, appstruct: Dict[str, Any], changes: OrderedDict 

5138 ) -> None: 

5139 patient = cast(Patient, self.object) 

5140 for k in EDIT_PATIENT_SIMPLE_PARAMS: 

5141 new_value = appstruct.get(k) 

5142 old_value = getattr(patient, k) 

5143 if new_value == old_value: 

5144 continue 

5145 if new_value in (None, "") and old_value in (None, ""): 

5146 # Nothing really changing! 

5147 continue 

5148 changes[k] = (old_value, new_value) 

5149 setattr(patient, k, new_value) 

5150 

5151 def _save_idrefs( 

5152 self, appstruct: Dict[str, Any], changes: OrderedDict 

5153 ) -> None: 

5154 

5155 # The ID numbers are more complex. 

5156 # log.debug("{}", pformat(appstruct)) 

5157 patient = cast(Patient, self.object) 

5158 new_idrefs = [ 

5159 IdNumReference( 

5160 which_idnum=idrefdict[ViewParam.WHICH_IDNUM], 

5161 idnum_value=idrefdict[ViewParam.IDNUM_VALUE], 

5162 ) 

5163 for idrefdict in appstruct.get(ViewParam.ID_REFERENCES, {}) 

5164 ] 

5165 for idnum in patient.idnums: 

5166 matching_idref = next( 

5167 ( 

5168 idref 

5169 for idref in new_idrefs 

5170 if idref.which_idnum == idnum.which_idnum 

5171 ), 

5172 None, 

5173 ) 

5174 if not matching_idref: 

5175 # Delete ID numbers not present in the new set 

5176 changes[ 

5177 "idnum{} ({})".format( 

5178 idnum.which_idnum, 

5179 self.request.get_id_desc(idnum.which_idnum), 

5180 ) 

5181 ] = (idnum.idnum_value, None) 

5182 idnum.mark_as_deleted(self.request) 

5183 elif matching_idref.idnum_value != idnum.idnum_value: 

5184 # Modify altered ID numbers present in the old + new sets 

5185 changes[ 

5186 "idnum{} ({})".format( 

5187 idnum.which_idnum, 

5188 self.request.get_id_desc(idnum.which_idnum), 

5189 ) 

5190 ] = (idnum.idnum_value, matching_idref.idnum_value) 

5191 new_idnum = PatientIdNum() 

5192 new_idnum.id = idnum.id 

5193 new_idnum.patient_id = idnum.patient_id 

5194 new_idnum.which_idnum = idnum.which_idnum 

5195 new_idnum.idnum_value = matching_idref.idnum_value 

5196 new_idnum.set_predecessor(self.request, idnum) 

5197 

5198 for idref in new_idrefs: 

5199 matching_idnum = next( 

5200 ( 

5201 idnum 

5202 for idnum in patient.idnums 

5203 if idnum.which_idnum == idref.which_idnum 

5204 ), 

5205 None, 

5206 ) 

5207 if not matching_idnum: 

5208 # Create ID numbers where they were absent 

5209 changes[ 

5210 "idnum{} ({})".format( 

5211 idref.which_idnum, 

5212 self.request.get_id_desc(idref.which_idnum), 

5213 ) 

5214 ] = (None, idref.idnum_value) 

5215 # We need to establish an "id" field, which is the PK as 

5216 # seen by the tablet. The tablet has lost interest in these 

5217 # records, since _era != ERA_NOW, so all we have to do is 

5218 # pick a number that's not in use. 

5219 new_idnum = PatientIdNum() 

5220 new_idnum.patient_id = patient.id 

5221 new_idnum.which_idnum = idref.which_idnum 

5222 new_idnum.idnum_value = idref.idnum_value 

5223 new_idnum.create_fresh( 

5224 self.request, 

5225 device_id=patient.device_id, 

5226 era=patient.era, 

5227 group_id=patient.group_id, 

5228 ) 

5229 new_idnum.save_with_next_available_id( 

5230 self.request, patient.device_id, era=patient.era 

5231 ) 

5232 

5233 def get_context_data(self, **kwargs: Any) -> Any: 

5234 # This parameter is (I think) used by Mako templates such as 

5235 # finalized_patient_edit.mako 

5236 # Todo: 

5237 # Potential inefficiency: we fetch tasks regardless of the stage 

5238 # of this form. 

5239 kwargs["tasks"] = self.get_affected_tasks() 

5240 

5241 return super().get_context_data(**kwargs) 

5242 

5243 def get_affected_tasks(self) -> Optional[List[Task]]: 

5244 patient = cast(Patient, self.object) 

5245 

5246 taskfilter = TaskFilter() 

5247 taskfilter.device_ids = [patient.device_id] 

5248 taskfilter.group_ids = [patient.group.id] 

5249 taskfilter.era = patient.era 

5250 collection = TaskCollection( 

5251 req=self.request, 

5252 taskfilter=taskfilter, 

5253 sort_method_global=TaskSortMethod.CREATION_DATE_DESC, 

5254 current_only=False, # unusual option! 

5255 via_index=False, # for current_only=False, or we'll get a warning 

5256 ) 

5257 return collection.all_tasks 

5258 

5259 

5260class EditServerCreatedPatientView(EditPatientBaseView): 

5261 """ 

5262 View to edit a patient created on the server (as part of task scheduling). 

5263 """ 

5264 

5265 template_name = "server_created_patient_edit.mako" 

5266 form_class = EditServerCreatedPatientForm 

5267 

5268 def get_success_url(self) -> str: 

5269 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES) 

5270 

5271 def get_object(self) -> Any: 

5272 patient = cast(Patient, super().get_object()) 

5273 

5274 if not patient.created_on_server(self.request): 

5275 _ = self.request.gettext 

5276 

5277 raise HTTPBadRequest( 

5278 _("Patient is not editable - was not created on the server") 

5279 ) 

5280 

5281 return patient 

5282 

5283 def save_changes( 

5284 self, appstruct: Dict[str, Any], changes: OrderedDict 

5285 ) -> None: 

5286 self._save_group(appstruct, changes) 

5287 super().save_changes(appstruct, changes) 

5288 self._save_task_schedules(appstruct, changes) 

5289 

5290 def _save_group( 

5291 self, appstruct: Dict[str, Any], changes: OrderedDict 

5292 ) -> None: 

5293 patient = cast(Patient, self.object) 

5294 

5295 old_group_id = patient.group.id 

5296 old_group_name = patient.group.name 

5297 new_group_id = appstruct.get(ViewParam.GROUP_ID, None) 

5298 new_group = ( 

5299 self.request.dbsession.query(Group) 

5300 .filter(Group.id == new_group_id) 

5301 .first() 

5302 ) 

5303 

5304 if old_group_id != new_group_id: 

5305 patient._group_id = new_group_id 

5306 changes["group"] = (old_group_name, new_group.name) 

5307 

5308 def _save_task_schedules( 

5309 self, appstruct: Dict[str, Any], changes: OrderedDict 

5310 ) -> None: 

5311 

5312 _ = self.request.gettext 

5313 patient = cast(Patient, self.object) 

5314 ids_to_delete = [pts.id for pts in patient.task_schedules] 

5315 

5316 anything_changed = False 

5317 

5318 for schedule_dict in appstruct.get(ViewParam.TASK_SCHEDULES, {}): 

5319 pts_id = schedule_dict[ViewParam.PATIENT_TASK_SCHEDULE_ID] 

5320 schedule_id = schedule_dict[ViewParam.SCHEDULE_ID] 

5321 start_datetime = schedule_dict[ViewParam.START_DATETIME] 

5322 if DEFORM_ACCORDION_BUG: 

5323 settings = schedule_dict[ViewParam.SETTINGS] 

5324 else: 

5325 settings = schedule_dict[ViewParam.ADVANCED][ 

5326 ViewParam.SETTINGS 

5327 ] 

5328 

5329 if pts_id is None: 

5330 pts = PatientTaskSchedule() 

5331 pts.patient_pk = patient.pk 

5332 pts.schedule_id = schedule_id 

5333 pts.start_datetime = start_datetime 

5334 pts.settings = settings 

5335 

5336 self.request.dbsession.add(pts) 

5337 anything_changed = True 

5338 else: 

5339 old_pts = ( 

5340 self.request.dbsession.query(PatientTaskSchedule) 

5341 .filter(PatientTaskSchedule.id == pts_id) 

5342 .first() 

5343 ) 

5344 

5345 updates = {} 

5346 if old_pts.start_datetime != start_datetime: 

5347 updates[PatientTaskSchedule.start_datetime] = ( 

5348 start_datetime 

5349 ) 

5350 

5351 if old_pts.schedule_id != schedule_id: 

5352 updates[PatientTaskSchedule.schedule_id] = schedule_id # type: ignore[index] # noqa: E501 

5353 

5354 if old_pts.settings != settings: 

5355 updates[PatientTaskSchedule.settings] = settings 

5356 

5357 if updates: 

5358 anything_changed = True 

5359 self.request.dbsession.query(PatientTaskSchedule).filter( 

5360 PatientTaskSchedule.id == pts_id 

5361 ).update(updates, synchronize_session="fetch") 

5362 

5363 ids_to_delete.remove(pts_id) 

5364 

5365 pts_to_delete = self.request.dbsession.query( 

5366 PatientTaskSchedule 

5367 ).filter(PatientTaskSchedule.id.in_(ids_to_delete)) 

5368 

5369 # Previously we had: 

5370 # pts_to_delete.delete(synchronize_session="fetch") 

5371 # 

5372 # This won't cascade the deletion because we are calling delete() on 

5373 # the query object. We could set up cascade at the database level 

5374 # instead but there is little performance gain here. 

5375 # https://stackoverflow.com/questions/19243964/sqlalchemy-delete-doesnt-cascade 

5376 

5377 for pts in pts_to_delete: 

5378 self.request.dbsession.delete(pts) 

5379 anything_changed = True 

5380 

5381 if anything_changed: 

5382 changes[_("Task schedules")] = (_("Updated"),) 

5383 

5384 

5385class EditFinalizedPatientView(EditPatientBaseView): 

5386 """ 

5387 View to edit a finalized patient. 

5388 """ 

5389 

5390 template_name = "finalized_patient_edit.mako" 

5391 form_class = EditFinalizedPatientForm 

5392 

5393 def __init__( 

5394 self, 

5395 req: CamcopsRequest, 

5396 task_tablename: str = None, 

5397 task_server_pk: int = None, 

5398 ) -> None: 

5399 """ 

5400 The two additional parameters are for returning the user to the task 

5401 from which editing was initiated. 

5402 """ 

5403 super().__init__(req) 

5404 self.task_tablename = task_tablename 

5405 self.task_server_pk = task_server_pk 

5406 

5407 def get_success_url(self) -> str: 

5408 """ 

5409 We got here by editing a patient from an uploaded task, so that's our 

5410 return point. 

5411 """ 

5412 if self.task_tablename and self.task_server_pk: 

5413 return self.request.route_url( 

5414 Routes.TASK, 

5415 _query={ 

5416 ViewParam.TABLE_NAME: self.task_tablename, 

5417 ViewParam.SERVER_PK: self.task_server_pk, 

5418 ViewParam.VIEWTYPE: ViewArg.HTML, 

5419 }, 

5420 ) 

5421 else: 

5422 # Likely in a testing environment! 

5423 return self.request.route_url(Routes.HOME) 

5424 

5425 def get_object(self) -> Any: 

5426 patient = cast(Patient, super().get_object()) 

5427 

5428 if not patient.is_finalized(): 

5429 _ = self.request.gettext 

5430 

5431 raise HTTPBadRequest( 

5432 _( 

5433 "Patient is not editable (likely: not finalized, so a " 

5434 "copy still on a client device)" 

5435 ) 

5436 ) 

5437 

5438 return patient 

5439 

5440 

5441@view_config( 

5442 route_name=Routes.EDIT_FINALIZED_PATIENT, 

5443 permission=Permission.GROUPADMIN, 

5444 http_cache=NEVER_CACHE, 

5445) 

5446def edit_finalized_patient(req: "CamcopsRequest") -> Response: 

5447 """ 

5448 View to edit details for a patient. 

5449 """ 

5450 task_table_name = req.get_str_param( 

5451 ViewParam.BACK_TASK_TABLENAME, validator=validate_task_tablename 

5452 ) 

5453 task_server_pk = req.get_int_param(ViewParam.BACK_TASK_SERVER_PK, None) 

5454 

5455 return EditFinalizedPatientView( 

5456 req, task_tablename=task_table_name, task_server_pk=task_server_pk 

5457 ).dispatch() 

5458 

5459 

5460@view_config( 

5461 route_name=Routes.EDIT_SERVER_CREATED_PATIENT, http_cache=NEVER_CACHE 

5462) 

5463def edit_server_created_patient(req: "CamcopsRequest") -> Response: 

5464 """ 

5465 View to edit details for a patient created on the server (for scheduling 

5466 tasks). 

5467 """ 

5468 return EditServerCreatedPatientView(req).dispatch() 

5469 

5470 

5471class AddPatientView(PatientMixin, CreateView): 

5472 """ 

5473 View to add a patient (for task scheduling). 

5474 """ 

5475 

5476 form_class = EditServerCreatedPatientForm 

5477 template_name = "patient_add.mako" 

5478 

5479 def dispatch(self) -> Response: 

5480 if not self.request.user.authorized_to_manage_patients: 

5481 _ = self.request.gettext 

5482 raise HTTPBadRequest(_("Not authorized to manage patients")) 

5483 

5484 return super().dispatch() 

5485 

5486 def get_success_url(self) -> str: 

5487 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES) 

5488 

5489 def save_object(self, appstruct: Dict[str, Any]) -> None: 

5490 server_device = Device.get_server_device(self.request.dbsession) 

5491 

5492 patient = Patient() 

5493 patient.create_fresh( 

5494 self.request, 

5495 device_id=server_device.id, 

5496 era=ERA_NOW, 

5497 group_id=appstruct.get(ViewParam.GROUP_ID), 

5498 ) 

5499 

5500 for k in EDIT_PATIENT_SIMPLE_PARAMS: 

5501 new_value = appstruct.get(k) 

5502 setattr(patient, k, new_value) 

5503 

5504 patient.save_with_next_available_id(self.request, server_device.id) 

5505 

5506 new_idrefs = [ 

5507 IdNumReference( 

5508 which_idnum=idrefdict[ViewParam.WHICH_IDNUM], 

5509 idnum_value=idrefdict[ViewParam.IDNUM_VALUE], 

5510 ) 

5511 for idrefdict in appstruct.get(ViewParam.ID_REFERENCES) 

5512 ] 

5513 

5514 for idref in new_idrefs: 

5515 new_idnum = PatientIdNum() 

5516 new_idnum.patient_id = patient.id 

5517 new_idnum.which_idnum = idref.which_idnum 

5518 new_idnum.idnum_value = idref.idnum_value 

5519 new_idnum.create_fresh( 

5520 self.request, 

5521 device_id=server_device.id, 

5522 era=ERA_NOW, 

5523 group_id=appstruct.get(ViewParam.GROUP_ID), 

5524 ) 

5525 

5526 new_idnum.save_with_next_available_id( 

5527 self.request, server_device.id 

5528 ) 

5529 

5530 task_schedules = appstruct.get(ViewParam.TASK_SCHEDULES) 

5531 

5532 self.request.dbsession.commit() 

5533 

5534 for task_schedule in task_schedules: 

5535 schedule_id = task_schedule[ViewParam.SCHEDULE_ID] 

5536 start_datetime = task_schedule[ViewParam.START_DATETIME] 

5537 if DEFORM_ACCORDION_BUG: 

5538 settings = task_schedule[ViewParam.SETTINGS] 

5539 else: 

5540 settings = task_schedule[ViewParam.ADVANCED][ 

5541 ViewParam.SETTINGS 

5542 ] 

5543 patient_task_schedule = PatientTaskSchedule() 

5544 patient_task_schedule.patient_pk = patient.pk 

5545 patient_task_schedule.schedule_id = schedule_id 

5546 patient_task_schedule.start_datetime = start_datetime 

5547 patient_task_schedule.settings = settings 

5548 

5549 self.request.dbsession.add(patient_task_schedule) 

5550 

5551 self.object = patient 

5552 

5553 

5554@view_config(route_name=Routes.ADD_PATIENT, http_cache=NEVER_CACHE) 

5555def add_patient(req: "CamcopsRequest") -> Response: 

5556 """ 

5557 View to add a patient. 

5558 """ 

5559 return AddPatientView(req).dispatch() 

5560 

5561 

5562class DeleteServerCreatedPatientView(DeleteView): 

5563 """ 

5564 View to delete a patient that had been created on the server. 

5565 """ 

5566 

5567 form_class = DeleteServerCreatedPatientForm 

5568 object_class = Patient 

5569 pk_param = ViewParam.SERVER_PK 

5570 server_pk_name = "_pk" 

5571 template_name = TEMPLATE_GENERIC_FORM 

5572 

5573 def get_object(self) -> Any: 

5574 patient = cast(Patient, super().get_object()) 

5575 if not patient.user_may_edit(self.request): 

5576 _ = self.request.gettext 

5577 raise HTTPBadRequest(_("Not authorized to delete this patient")) 

5578 return patient 

5579 

5580 def get_extra_context(self) -> Dict[str, Any]: 

5581 _ = self.request.gettext 

5582 return { 

5583 MAKO_VAR_TITLE: self.request.icon_text( 

5584 icon=Icons.DELETE, text=_("Delete patient") 

5585 ) 

5586 } 

5587 

5588 def get_success_url(self) -> str: 

5589 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES) 

5590 

5591 def delete(self) -> None: 

5592 patient = cast(Patient, self.object) 

5593 

5594 PatientIdNumIndexEntry.unindex_patient(patient, self.request.dbsession) 

5595 

5596 patient.delete_with_dependants(self.request) 

5597 

5598 

5599@view_config( 

5600 route_name=Routes.DELETE_SERVER_CREATED_PATIENT, http_cache=NEVER_CACHE 

5601) 

5602def delete_server_created_patient(req: "CamcopsRequest") -> Response: 

5603 """ 

5604 Page to delete a patient created on the server (as part of task 

5605 scheduling). 

5606 """ 

5607 return DeleteServerCreatedPatientView(req).dispatch() 

5608 

5609 

5610# ============================================================================= 

5611# Task scheduling 

5612# ============================================================================= 

5613 

5614 

5615@view_config( 

5616 route_name=Routes.VIEW_TASK_SCHEDULES, 

5617 permission=Permission.GROUPADMIN, 

5618 renderer="view_task_schedules.mako", 

5619 http_cache=NEVER_CACHE, 

5620) 

5621def view_task_schedules(req: "CamcopsRequest") -> Dict[str, Any]: 

5622 """ 

5623 View whole task schedules. 

5624 """ 

5625 rows_per_page = req.get_int_param( 

5626 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

5627 ) 

5628 page_num = req.get_int_param(ViewParam.PAGE, 1) 

5629 group_ids = req.user.ids_of_groups_user_is_admin_for 

5630 q = ( 

5631 req.dbsession.query(TaskSchedule) 

5632 .join(TaskSchedule.group) 

5633 .filter(TaskSchedule.group_id.in_(group_ids)) 

5634 .order_by(Group.name, TaskSchedule.name) 

5635 ) 

5636 page = SqlalchemyOrmPage( 

5637 query=q, 

5638 page=page_num, 

5639 items_per_page=rows_per_page, 

5640 url_maker=PageUrl(req), 

5641 request=req, 

5642 ) 

5643 return dict(page=page) 

5644 

5645 

5646@view_config( 

5647 route_name=Routes.VIEW_TASK_SCHEDULE_ITEMS, 

5648 permission=Permission.GROUPADMIN, 

5649 renderer="view_task_schedule_items.mako", 

5650 http_cache=NEVER_CACHE, 

5651) 

5652def view_task_schedule_items(req: "CamcopsRequest") -> Dict[str, Any]: 

5653 """ 

5654 View items within a task schedule. 

5655 """ 

5656 rows_per_page = req.get_int_param( 

5657 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

5658 ) 

5659 page_num = req.get_int_param(ViewParam.PAGE, 1) 

5660 schedule_id = req.get_int_param(ViewParam.SCHEDULE_ID) 

5661 

5662 schedule = ( 

5663 req.dbsession.query(TaskSchedule) 

5664 .filter(TaskSchedule.id == schedule_id) 

5665 .one_or_none() 

5666 ) 

5667 

5668 if schedule is None: 

5669 _ = req.gettext 

5670 raise HTTPBadRequest(_("Schedule does not exist")) 

5671 

5672 q = ( 

5673 req.dbsession.query(TaskScheduleItem) 

5674 .filter(TaskScheduleItem.schedule_id == schedule_id) 

5675 .order_by(*task_schedule_item_sort_order()) 

5676 ) 

5677 page = SqlalchemyOrmPage( 

5678 query=q, 

5679 page=page_num, 

5680 items_per_page=rows_per_page, 

5681 url_maker=PageUrl(req), 

5682 request=req, 

5683 ) 

5684 return dict(page=page, schedule_name=schedule.name) 

5685 

5686 

5687@view_config( 

5688 route_name=Routes.VIEW_PATIENT_TASK_SCHEDULES, 

5689 renderer="view_patient_task_schedules.mako", 

5690 http_cache=NEVER_CACHE, 

5691) 

5692def view_patient_task_schedules(req: "CamcopsRequest") -> Dict[str, Any]: 

5693 """ 

5694 View all patients and their assigned schedules (as well as their access 

5695 keys, etc.). 

5696 """ 

5697 server_device = Device.get_server_device(req.dbsession) 

5698 

5699 rows_per_page = req.get_int_param( 

5700 ViewParam.ROWS_PER_PAGE, DEFAULT_ROWS_PER_PAGE 

5701 ) 

5702 page_num = req.get_int_param(ViewParam.PAGE, 1) 

5703 allowed_group_ids = req.user.ids_of_groups_user_may_manage_patients_in 

5704 # noinspection PyProtectedMember 

5705 q = ( 

5706 req.dbsession.query(Patient) 

5707 .filter(Patient._era == ERA_NOW) 

5708 .filter(Patient._group_id.in_(allowed_group_ids)) 

5709 .filter(Patient._device_id == server_device.id) 

5710 .order_by(Patient.surname, Patient.forename) 

5711 ) 

5712 

5713 page = SqlalchemyOrmPage( 

5714 query=q, 

5715 page=page_num, 

5716 items_per_page=rows_per_page, 

5717 url_maker=PageUrl(req), 

5718 request=req, 

5719 ) 

5720 return dict(page=page) 

5721 

5722 

5723@view_config( 

5724 route_name=Routes.VIEW_PATIENT_TASK_SCHEDULE, 

5725 renderer="view_patient_task_schedule.mako", 

5726 http_cache=NEVER_CACHE, 

5727) 

5728def view_patient_task_schedule(req: "CamcopsRequest") -> Dict[str, Any]: 

5729 """ 

5730 View scheduled tasks for one patient's specific task schedule. 

5731 """ 

5732 pts_id = req.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID) 

5733 

5734 pts = ( 

5735 req.dbsession.query(PatientTaskSchedule) 

5736 .filter(PatientTaskSchedule.id == pts_id) 

5737 .options( 

5738 joinedload(PatientTaskSchedule.patient).joinedload(Patient.idnums), 

5739 joinedload(PatientTaskSchedule.task_schedule).joinedload( 

5740 TaskSchedule.items 

5741 ), 

5742 ) 

5743 .one_or_none() 

5744 ) 

5745 

5746 _ = req.gettext 

5747 if pts is None: 

5748 raise HTTPBadRequest(_("Patient's task schedule does not exist")) 

5749 

5750 if not pts.patient.user_may_edit(req): 

5751 raise HTTPBadRequest(_("Not authorized to manage this patient")) 

5752 

5753 patient_descriptor = pts.patient.prettystr(req) 

5754 

5755 return dict( 

5756 pts=pts, 

5757 patient_descriptor=patient_descriptor, 

5758 schedule_name=pts.task_schedule.name, 

5759 task_list=pts.get_list_of_scheduled_tasks(req), 

5760 ) 

5761 

5762 

5763class TaskScheduleMixin(object): 

5764 """ 

5765 Mixin for viewing/editing a task schedule. 

5766 """ 

5767 

5768 form_class = EditTaskScheduleForm 

5769 model_form_dict = { 

5770 "name": ViewParam.NAME, 

5771 "group_id": ViewParam.GROUP_ID, 

5772 "email_bcc": ViewParam.EMAIL_BCC, 

5773 "email_cc": ViewParam.EMAIL_CC, 

5774 "email_from": ViewParam.EMAIL_FROM, 

5775 "email_subject": ViewParam.EMAIL_SUBJECT, 

5776 "email_template": ViewParam.EMAIL_TEMPLATE, 

5777 } 

5778 object_class = TaskSchedule 

5779 request: "CamcopsRequest" 

5780 server_pk_name = "id" 

5781 template_name = TEMPLATE_GENERIC_FORM 

5782 

5783 def get_success_url(self) -> str: 

5784 return self.request.route_url(Routes.VIEW_TASK_SCHEDULES) 

5785 

5786 def get_object(self) -> Any: 

5787 # noinspection PyUnresolvedReferences 

5788 schedule = cast(TaskSchedule, super().get_object()) # type: ignore[misc] # noqa: E501 

5789 

5790 if not schedule.user_may_edit(self.request): 

5791 _ = self.request.gettext 

5792 raise HTTPBadRequest( 

5793 _( 

5794 "You a not a group administrator for this " 

5795 "task schedule's group" 

5796 ) 

5797 ) 

5798 

5799 return schedule 

5800 

5801 

5802class AddTaskScheduleView(TaskScheduleMixin, CreateView): 

5803 """ 

5804 Django-style view class to add a task schedule. 

5805 """ 

5806 

5807 def get_extra_context(self) -> Dict[str, Any]: 

5808 _ = self.request.gettext 

5809 return { 

5810 MAKO_VAR_TITLE: self.request.icon_text( 

5811 icon=Icons.TASK_SCHEDULE_ADD, text=_("Add a task schedule") 

5812 ) 

5813 } 

5814 

5815 

5816class EditTaskScheduleView(TaskScheduleMixin, UpdateView): 

5817 """ 

5818 Django-style view class to edit a task schedule. 

5819 """ 

5820 

5821 pk_param = ViewParam.SCHEDULE_ID 

5822 

5823 def get_extra_context(self) -> Dict[str, Any]: 

5824 _ = self.request.gettext 

5825 return { 

5826 MAKO_VAR_TITLE: self.request.icon_text( 

5827 icon=Icons.TASK_SCHEDULE, 

5828 text=_("Edit details for a task schedule"), 

5829 ) 

5830 } 

5831 

5832 

5833class DeleteTaskScheduleView(TaskScheduleMixin, DeleteView): 

5834 """ 

5835 Django-style view class to delete a task schedule. 

5836 """ 

5837 

5838 form_class = DeleteTaskScheduleForm 

5839 pk_param = ViewParam.SCHEDULE_ID 

5840 

5841 def get_extra_context(self) -> Dict[str, Any]: 

5842 _ = self.request.gettext 

5843 return { 

5844 MAKO_VAR_TITLE: self.request.icon_text( 

5845 icon=Icons.DELETE, text=_("Delete a task schedule") 

5846 ) 

5847 } 

5848 

5849 

5850@view_config( 

5851 route_name=Routes.ADD_TASK_SCHEDULE, 

5852 permission=Permission.GROUPADMIN, 

5853 http_cache=NEVER_CACHE, 

5854) 

5855def add_task_schedule(req: "CamcopsRequest") -> Response: 

5856 """ 

5857 View to add a task schedule. 

5858 """ 

5859 return AddTaskScheduleView(req).dispatch() 

5860 

5861 

5862@view_config( 

5863 route_name=Routes.EDIT_TASK_SCHEDULE, permission=Permission.GROUPADMIN 

5864) 

5865def edit_task_schedule(req: "CamcopsRequest") -> Response: 

5866 """ 

5867 View to edit a task schedule. 

5868 """ 

5869 return EditTaskScheduleView(req).dispatch() 

5870 

5871 

5872@view_config( 

5873 route_name=Routes.DELETE_TASK_SCHEDULE, permission=Permission.GROUPADMIN 

5874) 

5875def delete_task_schedule(req: "CamcopsRequest") -> Response: 

5876 """ 

5877 View to delete a task schedule. 

5878 """ 

5879 return DeleteTaskScheduleView(req).dispatch() 

5880 

5881 

5882class TaskScheduleItemMixin(object): 

5883 """ 

5884 Mixin for viewing/editing a task schedule items. 

5885 """ 

5886 

5887 form_class = EditTaskScheduleItemForm 

5888 template_name = TEMPLATE_GENERIC_FORM 

5889 model_form_dict = { 

5890 "schedule_id": ViewParam.SCHEDULE_ID, 

5891 "task_table_name": ViewParam.TABLE_NAME, 

5892 "due_from": ViewParam.DUE_FROM, 

5893 # we need to convert due_within to due_by 

5894 } 

5895 object: Any 

5896 # noinspection PyTypeChecker 

5897 object_class = cast(Type["Base"], TaskScheduleItem) 

5898 pk_param = ViewParam.SCHEDULE_ITEM_ID 

5899 request: "CamcopsRequest" 

5900 server_pk_name = "id" 

5901 

5902 def get_success_url(self) -> str: 

5903 # noinspection PyUnresolvedReferences 

5904 return self.request.route_url( 

5905 Routes.VIEW_TASK_SCHEDULE_ITEMS, 

5906 _query={ViewParam.SCHEDULE_ID: self.get_schedule_id()}, # type: ignore[attr-defined] # noqa: E501 

5907 ) 

5908 

5909 

5910class EditTaskScheduleItemMixin(TaskScheduleItemMixin): 

5911 """ 

5912 Django-style view class to edit a task schedule item. 

5913 """ 

5914 

5915 def set_object_properties(self, appstruct: Dict[str, Any]) -> None: 

5916 # noinspection PyUnresolvedReferences 

5917 super().set_object_properties(appstruct) # type: ignore[misc] 

5918 

5919 due_from = appstruct.get(ViewParam.DUE_FROM) 

5920 due_within = appstruct.get(ViewParam.DUE_WITHIN) 

5921 

5922 setattr(self.object, "due_by", due_from + due_within) 

5923 

5924 def get_schedule(self) -> TaskSchedule: 

5925 # noinspection PyUnresolvedReferences 

5926 schedule_id = self.get_schedule_id() # type: ignore[attr-defined] 

5927 

5928 schedule = ( 

5929 self.request.dbsession.query(TaskSchedule) 

5930 .filter(TaskSchedule.id == schedule_id) 

5931 .one_or_none() 

5932 ) 

5933 

5934 if schedule is None: 

5935 _ = self.request.gettext 

5936 raise HTTPBadRequest( 

5937 f"{_('Missing Task Schedule for id')} {schedule_id}" 

5938 ) 

5939 

5940 if not schedule.user_may_edit(self.request): 

5941 _ = self.request.gettext 

5942 raise HTTPBadRequest( 

5943 _( 

5944 "You a not a group administrator for this " 

5945 "task schedule's group" 

5946 ) 

5947 ) 

5948 

5949 return schedule 

5950 

5951 

5952class AddTaskScheduleItemView(EditTaskScheduleItemMixin, CreateView): 

5953 """ 

5954 Django-style view class to add a task schedule item. 

5955 """ 

5956 

5957 def get_extra_context(self) -> Dict[str, Any]: 

5958 _ = self.request.gettext 

5959 

5960 schedule = self.get_schedule() 

5961 

5962 return { 

5963 MAKO_VAR_TITLE: self.request.icon_text( 

5964 icon=Icons.TASK_SCHEDULE_ITEM_ADD, 

5965 text=_("Add an item to the {schedule_name} schedule").format( 

5966 schedule_name=schedule.name 

5967 ), 

5968 ) 

5969 } 

5970 

5971 def get_schedule_id(self) -> int: 

5972 return self.request.get_int_param(ViewParam.SCHEDULE_ID) 

5973 

5974 def get_form_values(self) -> Dict: 

5975 schedule = self.get_schedule() 

5976 

5977 form_values = super().get_form_values() 

5978 form_values[ViewParam.SCHEDULE_ID] = schedule.id 

5979 

5980 return form_values 

5981 

5982 

5983class EditTaskScheduleItemView(EditTaskScheduleItemMixin, UpdateView): 

5984 """ 

5985 Django-style view class to edit a task schedule item. 

5986 """ 

5987 

5988 def get_extra_context(self) -> Dict[str, Any]: 

5989 _ = self.request.gettext 

5990 return { 

5991 MAKO_VAR_TITLE: self.request.icon_text( 

5992 icon=Icons.EDIT, 

5993 text=_("Edit details for a task schedule item"), 

5994 ) 

5995 } 

5996 

5997 def get_schedule_id(self) -> int: 

5998 item = cast(TaskScheduleItem, self.object) 

5999 

6000 return item.schedule_id 

6001 

6002 def get_form_values(self) -> Dict: 

6003 schedule = self.get_schedule() 

6004 

6005 form_values = super().get_form_values() 

6006 form_values[ViewParam.SCHEDULE_ID] = schedule.id 

6007 

6008 item = cast(TaskScheduleItem, self.object) 

6009 due_within = item.due_by - form_values[ViewParam.DUE_FROM] 

6010 form_values[ViewParam.DUE_WITHIN] = due_within 

6011 

6012 return form_values 

6013 

6014 

6015class DeleteTaskScheduleItemView(TaskScheduleItemMixin, DeleteView): 

6016 """ 

6017 Django-style view class to delete a task schedule item. 

6018 """ 

6019 

6020 form_class = DeleteTaskScheduleItemForm 

6021 

6022 def get_extra_context(self) -> Dict[str, Any]: 

6023 _ = self.request.gettext 

6024 return { 

6025 MAKO_VAR_TITLE: self.request.icon_text( 

6026 icon=Icons.DELETE, text=_("Delete a task schedule item") 

6027 ) 

6028 } 

6029 

6030 def get_schedule_id(self) -> int: 

6031 item = cast(TaskScheduleItem, self.object) 

6032 

6033 return item.schedule_id 

6034 

6035 

6036@view_config( 

6037 route_name=Routes.ADD_TASK_SCHEDULE_ITEM, permission=Permission.GROUPADMIN 

6038) 

6039def add_task_schedule_item(req: "CamcopsRequest") -> Response: 

6040 """ 

6041 View to add a task schedule item. 

6042 """ 

6043 return AddTaskScheduleItemView(req).dispatch() 

6044 

6045 

6046@view_config( 

6047 route_name=Routes.EDIT_TASK_SCHEDULE_ITEM, permission=Permission.GROUPADMIN 

6048) 

6049def edit_task_schedule_item(req: "CamcopsRequest") -> Response: 

6050 """ 

6051 View to edit a task schedule item. 

6052 """ 

6053 return EditTaskScheduleItemView(req).dispatch() 

6054 

6055 

6056@view_config( 

6057 route_name=Routes.DELETE_TASK_SCHEDULE_ITEM, 

6058 permission=Permission.GROUPADMIN, 

6059) 

6060def delete_task_schedule_item(req: "CamcopsRequest") -> Response: 

6061 """ 

6062 View to delete a task schedule item. 

6063 """ 

6064 return DeleteTaskScheduleItemView(req).dispatch() 

6065 

6066 

6067@view_config( 

6068 route_name=Routes.CLIENT_API, 

6069 request_method=HttpMethod.GET, 

6070 permission=NO_PERMISSION_REQUIRED, 

6071 renderer="client_api_signposting.mako", 

6072) 

6073@view_config( 

6074 route_name=Routes.CLIENT_API_ALIAS, 

6075 request_method=HttpMethod.GET, 

6076 permission=NO_PERMISSION_REQUIRED, 

6077 renderer="client_api_signposting.mako", 

6078) 

6079def client_api_signposting(req: "CamcopsRequest") -> Dict[str, Any]: 

6080 """ 

6081 Patients are likely to enter the ``/api`` address into a web browser, 

6082 especially if it appears as a hyperlink in an email. If so, that will 

6083 arrive as a ``GET`` request. This page will direct them to download the 

6084 app. 

6085 """ 

6086 return { 

6087 "github_link": req.icon_text( 

6088 icon=Icons.GITHUB, url=GITHUB_RELEASES_URL, text="GitHub" 

6089 ), 

6090 "server_url": req.route_url(Routes.CLIENT_API), 

6091 } 

6092 

6093 

6094class SendPatientEmailBaseView(FormView): 

6095 """ 

6096 Send an e-mail to a patient (such as: "please download the app and register 

6097 with this URL/code"). 

6098 """ 

6099 

6100 form_class = SendEmailForm 

6101 template_name = "send_patient_email.mako" 

6102 

6103 def __init__(self, *args: Any, **kwargs: Any) -> None: 

6104 self._pts = None 

6105 

6106 super().__init__(*args, **kwargs) 

6107 

6108 def dispatch(self) -> Response: 

6109 if not self.request.user.authorized_to_email_patients: 

6110 _ = self.request.gettext 

6111 raise HTTPBadRequest(_("Not authorized to email patients")) 

6112 

6113 return super().dispatch() 

6114 

6115 def get_context_data(self, **kwargs: Any) -> Dict[str, Any]: 

6116 kwargs["pts"] = self._get_patient_task_schedule() 

6117 

6118 return super().get_context_data(**kwargs) 

6119 

6120 def form_valid(self, form: "Form", appstruct: Dict[str, Any]) -> Response: 

6121 config = self.request.config 

6122 

6123 patient_email = appstruct.get(ViewParam.EMAIL) 

6124 

6125 kwargs = dict( 

6126 from_addr=appstruct.get(ViewParam.EMAIL_FROM), 

6127 to=patient_email, 

6128 subject=appstruct.get(ViewParam.EMAIL_SUBJECT), 

6129 body=appstruct.get(ViewParam.EMAIL_BODY), 

6130 content_type=MimeType.HTML, 

6131 ) 

6132 

6133 cc = appstruct.get(ViewParam.EMAIL_CC) 

6134 if cc: 

6135 kwargs["cc"] = cc 

6136 

6137 bcc = appstruct.get(ViewParam.EMAIL_BCC) 

6138 if bcc: 

6139 kwargs["bcc"] = bcc 

6140 

6141 email = Email(**kwargs) 

6142 ok = email.send( 

6143 host=config.email_host, 

6144 username=config.email_host_username, 

6145 password=config.email_host_password, 

6146 port=config.email_port, 

6147 use_tls=config.email_use_tls, 

6148 ) 

6149 if ok: 

6150 self._display_success_message(patient_email) 

6151 else: 

6152 self._display_failure_message(patient_email) 

6153 

6154 self.request.dbsession.add(email) 

6155 self.request.dbsession.flush() 

6156 pts_id = self.request.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID) 

6157 if pts_id is None: 

6158 _ = self.request.gettext 

6159 raise HTTPBadRequest(_("Patient task schedule does not exist")) 

6160 

6161 pts_email = PatientTaskScheduleEmail() 

6162 pts_email.patient_task_schedule_id = pts_id 

6163 pts_email.email_id = email.id 

6164 self.request.dbsession.add(pts_email) 

6165 self.request.dbsession.commit() 

6166 

6167 return super().form_valid(form, appstruct) 

6168 

6169 def _display_success_message(self, patient_email: str) -> None: 

6170 _ = self.request.gettext 

6171 message = _("Email sent to {patient_email}").format( 

6172 patient_email=patient_email 

6173 ) 

6174 

6175 self.request.session.flash(message, queue=FlashQueue.SUCCESS) 

6176 

6177 def _display_failure_message(self, patient_email: str) -> None: 

6178 _ = self.request.gettext 

6179 message = _("Failed to send email to {patient_email}").format( 

6180 patient_email=patient_email 

6181 ) 

6182 

6183 self.request.session.flash(message, queue=FlashQueue.DANGER) 

6184 

6185 def get_form_values(self) -> Dict: 

6186 pts = self._get_patient_task_schedule() 

6187 

6188 if pts is None: 

6189 _ = self.request.gettext 

6190 raise HTTPBadRequest(_("Patient task schedule does not exist")) 

6191 

6192 return { 

6193 ViewParam.EMAIL: pts.patient.email, 

6194 ViewParam.EMAIL_CC: pts.task_schedule.email_cc, 

6195 ViewParam.EMAIL_BCC: pts.task_schedule.email_bcc, 

6196 ViewParam.EMAIL_FROM: pts.task_schedule.email_from, 

6197 ViewParam.EMAIL_SUBJECT: pts.task_schedule.email_subject, 

6198 ViewParam.EMAIL_BODY: pts.email_body(self.request), 

6199 } 

6200 

6201 def _get_patient_task_schedule(self) -> Optional[PatientTaskSchedule]: 

6202 if self._pts is not None: 

6203 return self._pts 

6204 

6205 pts_id = self.request.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID) 

6206 

6207 self._pts = ( 

6208 self.request.dbsession.query(PatientTaskSchedule) 

6209 .filter(PatientTaskSchedule.id == pts_id) 

6210 .one_or_none() 

6211 ) 

6212 

6213 return self._pts 

6214 

6215 

6216class SendEmailFromPatientListView(SendPatientEmailBaseView): 

6217 """ 

6218 Send an e-mail to a patient and return to the patient task schedule list 

6219 view. 

6220 """ 

6221 

6222 def get_success_url(self) -> str: 

6223 return self.request.route_url(Routes.VIEW_PATIENT_TASK_SCHEDULES) 

6224 

6225 

6226class SendEmailFromPatientTaskScheduleView(SendPatientEmailBaseView): 

6227 """ 

6228 Send an e-mail to a patient and return to the task schedule view for that 

6229 specific patient. 

6230 """ 

6231 

6232 def get_success_url(self) -> str: 

6233 pts_id = self.request.get_int_param(ViewParam.PATIENT_TASK_SCHEDULE_ID) 

6234 

6235 return self.request.route_url( 

6236 Routes.VIEW_PATIENT_TASK_SCHEDULE, 

6237 _query={ViewParam.PATIENT_TASK_SCHEDULE_ID: pts_id}, 

6238 ) 

6239 

6240 

6241@view_config( 

6242 route_name=Routes.SEND_EMAIL_FROM_PATIENT_TASK_SCHEDULE, 

6243 http_cache=NEVER_CACHE, 

6244) 

6245def send_email_from_patient_task_schedule(req: "CamcopsRequest") -> Response: 

6246 """ 

6247 View to send an email to a patient from their task schedule page. 

6248 """ 

6249 return SendEmailFromPatientTaskScheduleView(req).dispatch() 

6250 

6251 

6252@view_config( 

6253 route_name=Routes.SEND_EMAIL_FROM_PATIENT_LIST, http_cache=NEVER_CACHE 

6254) 

6255def send_email_from_patient_list(req: "CamcopsRequest") -> Response: 

6256 """ 

6257 View to send an email to a patient from the list of patients. 

6258 """ 

6259 return SendEmailFromPatientListView(req).dispatch() 

6260 

6261 

6262# ============================================================================= 

6263# FHIR identifier "system" information 

6264# ============================================================================= 

6265 

6266 

6267@view_config( 

6268 route_name=Routes.FHIR_PATIENT_ID_SYSTEM, 

6269 request_method=HttpMethod.GET, 

6270 renderer="fhir_patient_id_system.mako", 

6271 http_cache=NEVER_CACHE, 

6272) 

6273def view_fhir_patient_id_system(req: "CamcopsRequest") -> Dict[str, Any]: 

6274 """ 

6275 Placeholder view for FHIR patient identifier "system" types (from the ID 

6276 that we may have provided to a FHIR server). 

6277 

6278 Within each system, the "value" is the actual patient's ID number (not 

6279 part of what we show here). 

6280 """ 

6281 which_idnum = int(req.matchdict[ViewParam.WHICH_IDNUM]) 

6282 if which_idnum not in req.valid_which_idnums: 

6283 _ = req.gettext 

6284 raise HTTPBadRequest( 

6285 f"{_('Unknown patient ID type:')} " f"{which_idnum!r}" 

6286 ) 

6287 return dict(which_idnum=which_idnum) 

6288 

6289 

6290# noinspection PyUnusedLocal 

6291@view_config( 

6292 route_name=Routes.FHIR_QUESTIONNAIRE_SYSTEM, 

6293 request_method=HttpMethod.GET, 

6294 renderer="all_tasks.mako", 

6295 http_cache=NEVER_CACHE, 

6296) 

6297@view_config( 

6298 route_name=Routes.TASK_LIST, 

6299 request_method=HttpMethod.GET, 

6300 renderer="all_tasks.mako", 

6301 http_cache=NEVER_CACHE, 

6302) 

6303def view_task_list(req: "CamcopsRequest") -> Dict[str, Any]: 

6304 """ 

6305 Lists all tasks. 

6306 

6307 Also the placeholder view for FHIR Questionnaire "system". 

6308 There's only one system -- the "value" is the task type. 

6309 """ 

6310 return dict(all_task_classes=Task.all_subclasses_by_tablename()) 

6311 

6312 

6313@view_config( 

6314 route_name=Routes.TASK_DETAILS, 

6315 request_method=HttpMethod.GET, 

6316 renderer="task_details.mako", 

6317 http_cache=NEVER_CACHE, 

6318) 

6319def view_task_details(req: "CamcopsRequest") -> Dict[str, Any]: 

6320 """ 

6321 View details of a specific task type. 

6322 

6323 Used also for for FHIR DocumentReference, Observation,and 

6324 QuestionnaireResponse "system" types. (There's one system per task. Within 

6325 each task, the "value" relates to the specific task PK.) 

6326 """ 

6327 table_name = req.matchdict[ViewParam.TABLE_NAME] 

6328 task_class_dict = tablename_to_task_class_dict() 

6329 if table_name not in task_class_dict: 

6330 _ = req.gettext 

6331 raise HTTPBadRequest(f"{_('Unknown task:')} {table_name!r}") 

6332 task_class = task_class_dict[table_name] 

6333 task_instance = task_class() 

6334 

6335 fhir_aq_items = task_instance.get_fhir_questionnaire(req) 

6336 # ddl = task_instance.get_ddl() 

6337 # ddl_html, ddl_css = format_sql_as_html(ddl) 

6338 

6339 return dict( 

6340 task_class=task_class, 

6341 task_instance=task_instance, 

6342 fhir_aq_items=fhir_aq_items, 

6343 # ddl_html=ddl_html, 

6344 # css=ddl_css, 

6345 ) 

6346 

6347 

6348@view_config( 

6349 route_name=Routes.FHIR_CONDITION, 

6350 request_method=HttpMethod.GET, 

6351 http_cache=NEVER_CACHE, 

6352) 

6353@view_config( 

6354 route_name=Routes.FHIR_DOCUMENT_REFERENCE, 

6355 request_method=HttpMethod.GET, 

6356 http_cache=NEVER_CACHE, 

6357) 

6358@view_config( 

6359 route_name=Routes.FHIR_OBSERVATION, 

6360 request_method=HttpMethod.GET, 

6361 http_cache=NEVER_CACHE, 

6362) 

6363@view_config( 

6364 route_name=Routes.FHIR_PRACTITIONER, 

6365 request_method=HttpMethod.GET, 

6366 http_cache=NEVER_CACHE, 

6367) 

6368@view_config( 

6369 route_name=Routes.FHIR_QUESTIONNAIRE_RESPONSE, 

6370 request_method=HttpMethod.GET, 

6371 http_cache=NEVER_CACHE, 

6372) 

6373def fhir_view_task(req: "CamcopsRequest") -> Response: 

6374 """ 

6375 Retrieve parameters from a FHIR URL referring back to this server, and 

6376 serve the relevant task (as HTML). 

6377 

6378 The "canonical URL" or "business identifier" of a FHIR resource is the 

6379 reference to the master copy -- in this case, our copy. See 

6380 https://www.hl7.org/fhir/datatypes.html#Identifier; 

6381 https://www.hl7.org/fhir/resource.html#identifiers. 

6382 

6383 FHIR identifiers have a "system" (which is a URL) and a "value". I don't 

6384 think that FHIR has a rule for combining the system and value to create a 

6385 full URL. For some (but by no means all) identifiers that we provide to 

6386 FHIR servers, the "system" refers to a CamCOPS task (and the value to some 

6387 attribute of that task, like the answer to a question (value of a field), 

6388 or a fixed string like "patient", and so on. 

6389 """ 

6390 table_name = req.matchdict[ViewParam.TABLE_NAME] 

6391 server_pk = req.matchdict[ViewParam.SERVER_PK] 

6392 return HTTPFound( 

6393 req.route_url( 

6394 Routes.TASK, 

6395 _query={ 

6396 ViewParam.TABLE_NAME: table_name, 

6397 ViewParam.SERVER_PK: server_pk, 

6398 ViewParam.VIEWTYPE: ViewArg.HTML, 

6399 }, 

6400 ) 

6401 ) 

6402 

6403 

6404@view_config( 

6405 route_name=Routes.FHIR_TABLENAME_PK_ID, 

6406 request_method=HttpMethod.GET, 

6407 http_cache=NEVER_CACHE, 

6408) 

6409def fhir_view_tablename_pk(req: "CamcopsRequest") -> Response: 

6410 """ 

6411 Deal with the slightly silly system that just takes a tablename and PK 

6412 directly. Security is key here! 

6413 """ 

6414 table_name = req.matchdict[ViewParam.TABLE_NAME] 

6415 server_pk = req.matchdict[ViewParam.SERVER_PK] 

6416 if table_name == Patient.__tablename__: 

6417 return view_patient(req, server_pk) 

6418 return HTTPFound( 

6419 req.route_url( 

6420 Routes.TASK, 

6421 _query={ 

6422 ViewParam.TABLE_NAME: table_name, 

6423 ViewParam.SERVER_PK: server_pk, 

6424 ViewParam.VIEWTYPE: ViewArg.HTML, 

6425 }, 

6426 ) 

6427 ) 

6428 

6429 

6430# ============================================================================= 

6431# Static assets 

6432# ============================================================================= 

6433# https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/assets.html#advanced-static # noqa 

6434 

6435 

6436def debug_form_rendering() -> None: 

6437 r""" 

6438 Test code for form rendering. 

6439 

6440 From the command line: 

6441 

6442 .. code-block:: bash 

6443 

6444 # Start in the CamCOPS source root directory. 

6445 # - Needs the "-f" option to follow forks. 

6446 # - "open" doesn't show all files opened. To see what you need, try 

6447 # strace cat /proc/version 

6448 # - ... which shows that "openat" is most useful. 

6449 

6450 strace -f --trace=openat \ 

6451 python -c 'from camcops_server.cc_modules.webview import debug_form_rendering; debug_form_rendering()' \ 

6452 | grep site-packages \ 

6453 | grep -v "\.pyc" 

6454 

6455 This tells us that the templates are files like: 

6456 

6457 .. code-block:: none 

6458 

6459 site-packages/deform/templates/form.pt 

6460 site-packages/deform/templates/select.pt 

6461 site-packages/deform/templates/textinput.pt 

6462 

6463 On 2020-06-29 we are interested in why a newer (Docker) installation 

6464 renders buggy HTML like: 

6465 

6466 .. code-block:: none 

6467 

6468 <select name="which_idnum" id="deformField2" class=" form-control " multiple="False"> 

6469 <option value="1">CPFT RiO number</option> 

6470 <option value="2">NHS number</option> 

6471 <option value="1000">MyHospital number</option> 

6472 </select> 

6473 

6474 ... the bug being that ``multiple="False"`` is wrong; an HTML boolean 

6475 attribute is false when *absent*, not when set to a certain value (see 

6476 https://developer.mozilla.org/en-US/docs/Web/HTML/Attributes#Boolean_Attributes). 

6477 The ``multiple`` attribute of ``<select>`` is a boolean attribute 

6478 (https://developer.mozilla.org/en-US/docs/Web/HTML/Element/select). 

6479 

6480 The ``select.pt`` file indicates that this is controlled by 

6481 ``tal:attributes`` syntax. TAL is Template Attribution Language 

6482 (https://sharptal.readthedocs.io/en/latest/tal.html). 

6483 

6484 TAL is either provided by Zope (given ZPT files) or Chameleon or both. The 

6485 tracing suggests Chameleon. So the TAL language reference is 

6486 https://chameleon.readthedocs.io/en/latest/reference.html. 

6487 

6488 Chameleon changelog is 

6489 https://github.com/malthe/chameleon/blob/master/CHANGES.rst. 

6490 

6491 Multiple sources for ``tal:attributes`` syntax say that a null value 

6492 (presumably: ``None``) is required to omit the attribute, not a false 

6493 value. 

6494 

6495 """ # noqa 

6496 

6497 import sys 

6498 

6499 from camcops_server.cc_modules.cc_debug import makefunc_trace_unique_calls 

6500 from camcops_server.cc_modules.cc_forms import ChooseTrackerForm 

6501 from camcops_server.cc_modules.cc_request import get_core_debugging_request 

6502 

6503 req = get_core_debugging_request() 

6504 form = ChooseTrackerForm(req, as_ctv=False) 

6505 

6506 sys.settrace(makefunc_trace_unique_calls(file_only=True)) 

6507 _ = form.render() 

6508 sys.settrace(None)