Coverage for cc_modules/cc_request.py: 54%
667 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_request.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Implements a Pyramid Request object customized for CamCOPS.**
28"""
30import collections
31from contextlib import contextmanager
32import datetime
33import gettext
34import logging
35import os
36import re
37import secrets
38from typing import (
39 Any,
40 Dict,
41 Generator,
42 List,
43 Optional,
44 Set,
45 Tuple,
46 TYPE_CHECKING,
47)
48import urllib.parse
50from cardinal_pythonlib.datetimefunc import (
51 coerce_to_pendulum,
52 coerce_to_pendulum_date,
53 convert_datetime_to_utc,
54 format_datetime,
55 pendulum_to_utc_datetime_without_tz,
56)
57from cardinal_pythonlib.fileops import get_directory_contents_size, mkdir_p
58from cardinal_pythonlib.httpconst import HttpMethod
59from cardinal_pythonlib.logs import BraceStyleAdapter
60from cardinal_pythonlib.plot import (
61 png_img_html_from_pyplot_figure,
62 svg_html_from_pyplot_figure,
63)
64import cardinal_pythonlib.rnc_web as ws
65from cardinal_pythonlib.wsgi.constants import WsgiEnvVar
66import lockfile
67from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
68from matplotlib.figure import Figure
69from matplotlib.font_manager import FontProperties
70from pendulum import Date, DateTime as Pendulum, Duration
71from pendulum.parsing.exceptions import ParserError
72from pyramid.config import Configurator
73from pyramid.decorator import reify
74from pyramid.httpexceptions import HTTPBadRequest, HTTPException
75from pyramid.interfaces import ISession
76from pyramid.request import Request
77from pyramid.response import Response
78from pyramid.testing import DummyRequest
79from sqlalchemy.engine.base import Engine
80from sqlalchemy.orm import sessionmaker
81from sqlalchemy.orm import Session as SqlASession
82from webob.multidict import MultiDict
84# Note: everything uder the sun imports this file, so keep the intra-package
85# imports as minimal as possible.
86from camcops_server.cc_modules.cc_baseconstants import (
87 DOCUMENTATION_URL,
88 TRANSLATIONS_DIR,
89)
90from camcops_server.cc_modules.cc_config import (
91 CamcopsConfig,
92 get_config,
93 get_config_filename_from_os_env,
94)
95from camcops_server.cc_modules.cc_constants import (
96 CSS_PAGED_MEDIA,
97 DateFormat,
98 PlotDefaults,
99 USE_SVG_IN_HTML,
100)
101from camcops_server.cc_modules.cc_idnumdef import (
102 get_idnum_definitions,
103 IdNumDefinition,
104 validate_id_number,
105)
106from camcops_server.cc_modules.cc_language import (
107 DEFAULT_LOCALE,
108 GETTEXT_DOMAIN,
109 POSSIBLE_LOCALES,
110)
112# noinspection PyUnresolvedReferences
113import camcops_server.cc_modules.cc_plot # import side effects (configure matplotlib) # noqa
114from camcops_server.cc_modules.cc_pyramid import (
115 camcops_add_mako_renderer,
116 CamcopsAuthenticationPolicy,
117 CamcopsAuthorizationPolicy,
118 CookieKey,
119 get_session_factory,
120 icon_html,
121 icon_text,
122 icons_text,
123 Permission,
124 RouteCollection,
125 Routes,
126 STATIC_CAMCOPS_PACKAGE_PATH,
127)
128from camcops_server.cc_modules.cc_response import camcops_response_factory
129from camcops_server.cc_modules.cc_serversettings import (
130 get_server_settings,
131 ServerSettings,
132)
133from camcops_server.cc_modules.cc_string import (
134 all_extra_strings_as_dicts,
135 APPSTRING_TASKNAME,
136 MISSING_LOCALE,
137)
138from camcops_server.cc_modules.cc_tabletsession import TabletSession
139from camcops_server.cc_modules.cc_text import SS, server_string
140from camcops_server.cc_modules.cc_user import User
141from camcops_server.cc_modules.cc_validators import (
142 STRING_VALIDATOR_TYPE,
143 validate_alphanum_underscore,
144 validate_redirect_url,
145)
147if TYPE_CHECKING:
148 from matplotlib.axis import Axis
149 from matplotlib.axes import Axes
150 from matplotlib.text import Text
151 from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
152 from camcops_server.cc_modules.cc_session import CamcopsSession
153 from camcops_server.cc_modules.cc_snomed import SnomedConcept
155log = BraceStyleAdapter(logging.getLogger(__name__))
158# =============================================================================
159# Debugging options
160# =============================================================================
162DEBUG_ADD_ROUTES = False
163DEBUG_AUTHORIZATION = False
164DEBUG_CAMCOPS_SESSION = False
165DEBUG_DBSESSION_MANAGEMENT = False
166DEBUG_GETTEXT = False
167DEBUG_REQUEST_CREATION = False
168DEBUG_TABLET_SESSION = False
170if any(
171 [
172 DEBUG_ADD_ROUTES,
173 DEBUG_AUTHORIZATION,
174 DEBUG_CAMCOPS_SESSION,
175 DEBUG_DBSESSION_MANAGEMENT,
176 DEBUG_GETTEXT,
177 DEBUG_REQUEST_CREATION,
178 DEBUG_TABLET_SESSION,
179 ]
180):
181 log.warning("Debugging options enabled!")
184# =============================================================================
185# Constants
186# =============================================================================
188TRUE_STRINGS_LOWER_CASE = ["true", "t", "1", "yes", "y"]
189FALSE_STRINGS_LOWER_CASE = ["false", "f", "0", "no", "n"]
192# =============================================================================
193# Modified Request interface, for type checking
194# =============================================================================
195# https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/auth/user_object.html
196# https://rollbar.com/blog/using-pyramid-request-factory-to-write-less-code/
197#
198# ... everything with reify=True is cached, so if we ask for something
199# more than once, we keep getting the same thing
200# ... https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.set_property # noqa
203class CamcopsRequest(Request):
204 """
205 The CamcopsRequest is an object central to all HTTP requests. It is the
206 main thing passed all around the server, and embodies what we need to know
207 about the client request -- including user information, ways of accessing
208 the database, and so on.
210 It reads its config (on first demand) from the config file specified in
211 ``os.environ[ENVVAR_CONFIG_FILE]``.
213 """
215 def __init__(self, *args: Any, **kwargs: Any) -> None:
216 """
217 This is called as the Pyramid request factory; see
218 ``config.set_request_factory(CamcopsRequest)``
220 What's the best way of handling the database client?
222 - With Titanium, we were constrained not to use cookies. With Qt, we
223 have the option.
224 - But are cookies a good idea?
225 Probably not; they are somewhat overcomplicated for this.
226 See also
228 - https://softwareengineering.stackexchange.com/questions/141019/
229 - https://stackoverflow.com/questions/6068113/do-sessions-really-violate-restfulness
231 - Let's continue to avoid cookies.
232 - We don't have to cache any information (we still send username/
233 password details with each request, and that is RESTful) but it
234 does save authentication time to do so on calls after the first.
235 - What we could try to do is:
237 - look up a session here, at Request creation time;
238 - add a new session if there wasn't one;
239 - but allow the database API code to replace that session (BEFORE
240 it's saved to the database and gains its PK) with another,
241 determined by the content.
242 - This gives one more database hit, but avoids the bcrypt time.
244 """ # noqa
245 super().__init__(*args, **kwargs)
246 self.use_svg = False # use SVG (not just PNG) for graphics
247 self.provide_png_fallback_for_svg = (
248 True # for SVG: provide PNG fallback image?
249 )
250 self.add_response_callback(complete_request_add_cookies)
251 self._camcops_session = None # type: Optional[CamcopsSession]
252 self._debugging_db_session = (
253 None
254 ) # type: Optional[SqlASession] # for unit testing only
255 self._debugging_user = (
256 None
257 ) # type: Optional[User] # for unit testing only
258 self._pending_export_push_requests = (
259 []
260 ) # type: List[Tuple[str, str, int]]
261 self._cached_sstring = {} # type: Dict[SS, str]
262 # Don't make the _camcops_session yet; it will want a Registry, and
263 # we may not have one yet; see command_line_request().
264 if DEBUG_REQUEST_CREATION:
265 log.debug(
266 "CamcopsRequest.__init__: args={!r}, kwargs={!r}", args, kwargs
267 )
269 # -------------------------------------------------------------------------
270 # HTTP nonce
271 # -------------------------------------------------------------------------
273 @reify
274 def nonce(self) -> str:
275 """
276 Return a nonce that is generated at random for each request, but
277 remains constant for that request (because we use ``@reify``).
279 See https://content-security-policy.com/examples/allow-inline-style/.
281 And for how to make one:
282 https://stackoverflow.com/questions/5590170/what-is-the-standard-method-for-generating-a-nonce-in-python
283 """
284 return secrets.token_urlsafe()
286 # -------------------------------------------------------------------------
287 # CamcopsSession
288 # -------------------------------------------------------------------------
290 @property
291 def camcops_session(self) -> "CamcopsSession":
292 """
293 Returns the
294 :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for this
295 request (q.v.).
297 Contrast:
299 .. code-block:: none
301 ccsession = request.camcops_session # type: CamcopsSession
302 pyramid_session = request.session # type: ISession
303 """
304 if self._camcops_session is None:
305 from camcops_server.cc_modules.cc_session import (
306 CamcopsSession,
307 ) # delayed import
309 self._camcops_session = CamcopsSession.get_session_using_cookies(
310 self
311 )
312 if DEBUG_CAMCOPS_SESSION:
313 log.debug("{!r}", self._camcops_session)
314 return self._camcops_session
316 def replace_camcops_session(self, ccsession: "CamcopsSession") -> None:
317 """
318 Replaces any existing
319 :class:`camcops_server.cc_modules.cc_session.CamcopsSession` with a new
320 one.
322 Rationale:
324 We may have created a new HTTP session because the request had no
325 cookies (added to the DB session but not yet saved), but we might
326 then enter the database/tablet upload API and find session details,
327 not from the cookies, but from the POST data. At that point, we
328 want to replace the session in the Request, without committing the
329 first one to disk.
330 """
331 if self._camcops_session is not None:
332 self.dbsession.expunge(self._camcops_session)
333 self._camcops_session = ccsession
335 def complete_request_add_cookies(self) -> None:
336 """
337 Finializes the response by adding session cookies.
338 We do this late so that we can hot-swap the session if we're using the
339 database/tablet API rather than a human web browser.
341 Response callbacks are called in the order
342 first-to-most-recently-added. See
343 :class:`pyramid.request.CallbackMethodsMixin`.
345 That looks like we can add a callback in the process of running a
346 callback. And when we add a cookie to a Pyramid session, that sets a
347 callback. Let's give it a go...
348 """
349 # 2019-03-21: If we've not used a CamcopsSession (e.g. for serving
350 # a static view), do we care?
351 if self._camcops_session is None:
352 return
354 dbsession = self.dbsession
355 dbsession.flush() # sets the PK for ccsession, if it wasn't set
356 # Write the details back to the Pyramid session (will be persisted
357 # via the Response automatically):
358 pyramid_session = self.session # type: ISession
359 ccsession = self.camcops_session
360 pyramid_session[CookieKey.SESSION_ID] = str(ccsession.id)
361 pyramid_session[CookieKey.SESSION_TOKEN] = ccsession.token
362 # ... should cause the ISession to add a callback to add cookies,
363 # which will be called immediately after this one.
365 # -------------------------------------------------------------------------
366 # Config
367 # -------------------------------------------------------------------------
369 @reify
370 def config_filename(self) -> str:
371 """
372 Gets the CamCOPS config filename in use, from the config file specified
373 in ``os.environ[ENVVAR_CONFIG_FILE]``.
374 """
375 return get_config_filename_from_os_env()
377 @reify
378 def config(self) -> CamcopsConfig:
379 """
380 Return an instance of
381 :class:`camcops_server/cc_modules/cc_config.CamcopsConfig` for the
382 request.
384 Access it as ``request.config``, with no brackets.
385 """
386 config = get_config(config_filename=self.config_filename)
387 return config
389 # -------------------------------------------------------------------------
390 # Database
391 # -------------------------------------------------------------------------
393 @reify
394 def engine(self) -> Engine:
395 """
396 Returns the SQLAlchemy :class:`Engine` for the request.
397 """
398 cfg = self.config
399 return cfg.get_sqla_engine()
401 @reify
402 def dbsession(self) -> SqlASession:
403 """
404 Return an SQLAlchemy session for the relevant request.
406 The use of ``@reify`` makes this elegant. If and only if a view wants a
407 database, it can say
409 .. code-block:: python
411 dbsession = request.dbsession
413 and if it requests that, the cleanup callbacks (COMMIT or ROLLBACK) get
414 installed.
415 """
416 # log.debug("CamcopsRequest.dbsession: caller stack:\n{}",
417 # "\n".join(get_caller_stack_info()))
418 _dbsession = self.get_bare_dbsession()
420 def end_sqlalchemy_session(req: Request) -> None:
421 # noinspection PyProtectedMember
422 req._finish_dbsession()
424 # - For command-line pseudo-requests, add_finished_callback is no use,
425 # because that's called by the Pyramid routing framework.
426 # - So how do we autocommit a command-line session?
427 # - Hooking into CamcopsRequest.__del__ did not work: called, yes, but
428 # object state (e.g. newly inserted User objects) went wrong (e.g.
429 # the objects had been blanked somehow, or that's what the INSERT
430 # statements looked like).
431 # - Use a context manager instead; see below.
432 self.add_finished_callback(end_sqlalchemy_session)
434 if DEBUG_DBSESSION_MANAGEMENT:
435 log.debug(
436 "Returning SQLAlchemy session as " "CamcopsRequest.dbsession"
437 )
439 return _dbsession
441 def _finish_dbsession(self) -> None:
442 """
443 A database session has finished. COMMIT or ROLLBACK, depending on how
444 things went.
445 """
446 # Do NOT roll back "if req.exception is not None"; that includes
447 # all sorts of exceptions like HTTPFound, HTTPForbidden, etc.
448 # See also
449 # - https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/pylons/exceptions.html # noqa
450 # But they are neatly subclasses of HTTPException, and isinstance()
451 # deals with None, so:
452 session = self.dbsession
453 if self.exception is not None and not isinstance(
454 self.exception, HTTPException
455 ):
456 log.critical(
457 "Request raised exception that wasn't an "
458 "HTTPException; rolling back; exception was: {!r}",
459 self.exception,
460 )
461 session.rollback()
462 else:
463 if DEBUG_DBSESSION_MANAGEMENT:
464 log.debug("Committing to database")
465 session.commit()
466 if self._pending_export_push_requests:
467 self._process_pending_export_push_requests()
468 if DEBUG_DBSESSION_MANAGEMENT:
469 log.debug("Closing SQLAlchemy session")
470 session.close()
472 def get_bare_dbsession(self) -> SqlASession:
473 """
474 Returns a bare SQLAlchemy session for the request.
476 See :func:`dbsession`, the more commonly used wrapper function.
477 """
478 if self._debugging_db_session:
479 log.debug("Request is using debugging SQLAlchemy session")
480 return self._debugging_db_session
481 if DEBUG_DBSESSION_MANAGEMENT:
482 log.debug("Making SQLAlchemy session")
483 engine = self.engine
484 maker = sessionmaker(bind=engine)
485 session = maker() # type: SqlASession
486 return session
488 # -------------------------------------------------------------------------
489 # TabletSession
490 # -------------------------------------------------------------------------
492 @reify
493 def tabletsession(self) -> TabletSession:
494 """
495 Request a
496 :class:`camcops_server.cc_modules.cc_tabletsession.TabletSession`,
497 which is an information structure geared to client (tablet) database
498 accesses.
500 If we're using this interface, we also want to ensure we're using
501 the :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for
502 the information provided by the tablet in the POST request, not
503 anything already loaded/reset via cookies.
504 """
505 from camcops_server.cc_modules.cc_session import (
506 CamcopsSession,
507 ) # delayed import
509 ts = TabletSession(self) # may raise UserErrorException
510 new_cc_session = CamcopsSession.get_session_for_tablet(ts)
511 # ... does login
512 self.replace_camcops_session(new_cc_session)
513 ts.set_session_id_token(new_cc_session.id, new_cc_session.token)
514 if DEBUG_TABLET_SESSION:
515 log.debug("CamcopsRequest: {!r}", self)
516 log.debug("CamcopsRequest.tabletsession: {!r}", ts)
517 log.debug(
518 "CamcopsRequest.camcops_session: {!r}", self._camcops_session
519 )
520 return ts
522 # -------------------------------------------------------------------------
523 # Date/time
524 # -------------------------------------------------------------------------
526 @reify
527 def now(self) -> Pendulum:
528 """
529 Returns the time of the request as an Pendulum object.
531 (Reified, so a request only ever has one time.)
532 Exposed as a property.
533 """
534 return Pendulum.now()
536 @reify
537 def now_utc(self) -> Pendulum:
538 """
539 Returns the time of the request as a UTC Pendulum.
540 """
541 p = self.now # type: Pendulum
542 return convert_datetime_to_utc(p)
544 @reify
545 def now_utc_no_tzinfo(self) -> datetime.datetime:
546 """
547 Returns the time of the request as a datetime in UTC with no timezone
548 information attached. For when you want to compare to something similar
549 without getting the error "TypeError: can't compare offset-naive and
550 offset-aware datetimes".
551 """
552 p = self.now # type: Pendulum
553 return pendulum_to_utc_datetime_without_tz(p)
555 @reify
556 def now_era_format(self) -> str:
557 """
558 Returns the request time in an ISO-8601 format suitable for use as a
559 CamCOPS ``era``.
560 """
561 return format_datetime(self.now_utc, DateFormat.ERA)
563 @property
564 def today(self) -> Date:
565 """
566 Returns today's date.
567 """
568 # noinspection PyTypeChecker
569 return self.now.date()
571 # -------------------------------------------------------------------------
572 # Logos, static files, and other institution-specific stuff
573 # -------------------------------------------------------------------------
575 @property
576 def url_local_institution(self) -> str:
577 """
578 Returns the local institution's home URL.
579 """
580 return self.config.local_institution_url
582 @property
583 def url_camcops_favicon(self) -> str:
584 """
585 Returns a URL to the favicon (see
586 https://en.wikipedia.org/wiki/Favicon) from within the CamCOPS static
587 files.
588 """
589 # Cope with reverse proxies, etc.
590 # https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.static_url # noqa
591 return self.static_url(
592 STATIC_CAMCOPS_PACKAGE_PATH + "favicon_camcops.png"
593 )
595 @property
596 def url_camcops_logo(self) -> str:
597 """
598 Returns a URL to the CamCOPS logo from within our static files.
599 Returns:
601 """
602 return self.static_url(
603 STATIC_CAMCOPS_PACKAGE_PATH + "logo_camcops.png"
604 )
606 @property
607 def url_local_logo(self) -> str:
608 """
609 Returns a URL to the local institution's logo, from somewhere on our
610 server.
611 """
612 return self.static_url(STATIC_CAMCOPS_PACKAGE_PATH + "logo_local.png")
614 @property
615 def url_camcops_docs(self) -> str:
616 """
617 Returns the URL to the CamCOPS documentation.
618 """
619 return DOCUMENTATION_URL
621 # -------------------------------------------------------------------------
622 # Icons
623 # -------------------------------------------------------------------------
625 @staticmethod
626 def icon(
627 icon: str,
628 alt: str,
629 url: str = None,
630 extra_classes: List[str] = None,
631 extra_styles: List[str] = None,
632 escape_alt: bool = True,
633 ) -> str:
634 """
635 Instantiates a Bootstrap icon, usually with a hyperlink. Returns
636 rendered HTML.
638 Args:
639 icon:
640 Icon name, without ".svg" extension (or "bi-" prefix!).
641 alt:
642 Alternative text for image.
643 url:
644 Optional URL of hyperlink.
645 extra_classes:
646 Optional extra CSS classes for the icon.
647 extra_styles:
648 Optional extra CSS styles for the icon (each looks like:
649 "color: blue").
650 escape_alt:
651 HTML-escape the alt text? Default is True.
652 """
653 return icon_html(
654 icon=icon,
655 alt=alt,
656 url=url,
657 extra_classes=extra_classes,
658 extra_styles=extra_styles,
659 escape_alt=escape_alt,
660 )
662 @staticmethod
663 def icon_text(
664 icon: str,
665 text: str,
666 url: str = None,
667 alt: str = None,
668 extra_icon_classes: List[str] = None,
669 extra_icon_styles: List[str] = None,
670 extra_a_classes: List[str] = None,
671 extra_a_styles: List[str] = None,
672 escape_alt: bool = True,
673 escape_text: bool = True,
674 hyperlink_together: bool = False,
675 ) -> str:
676 """
677 Provide an icon and accompanying text. Usually, both are hyperlinked
678 (to the same destination URL). Returns rendered HTML.
680 Args:
681 icon:
682 Icon name, without ".svg" extension.
683 url:
684 Optional URL of hyperlink.
685 alt:
686 Alternative text for image. Will default to the main text.
687 text:
688 Main text to display.
689 extra_icon_classes:
690 Optional extra CSS classes for the icon.
691 extra_icon_styles:
692 Optional extra CSS styles for the icon (each looks like:
693 "color: blue").
694 extra_a_classes:
695 Optional extra CSS classes for the <a> element.
696 extra_a_styles:
697 Optional extra CSS styles for the <a> element.
698 escape_alt:
699 HTML-escape the alt text?
700 escape_text:
701 HTML-escape the main text?
702 hyperlink_together:
703 Hyperlink the image and text as one (rather than separately and
704 adjacent to each other)?
705 """
706 return icon_text(
707 icon=icon,
708 text=text,
709 url=url,
710 alt=alt,
711 extra_icon_classes=extra_icon_classes,
712 extra_icon_styles=extra_icon_styles,
713 extra_a_classes=extra_a_classes,
714 extra_a_styles=extra_a_styles,
715 escape_alt=escape_alt,
716 escape_text=escape_text,
717 hyperlink_together=hyperlink_together,
718 )
720 @staticmethod
721 def icons_text(
722 icons: List[str],
723 text: str,
724 url: str = None,
725 alt: str = None,
726 extra_icon_classes: List[str] = None,
727 extra_icon_styles: List[str] = None,
728 extra_a_classes: List[str] = None,
729 extra_a_styles: List[str] = None,
730 escape_alt: bool = True,
731 escape_text: bool = True,
732 hyperlink_together: bool = False,
733 ) -> str:
734 """
735 Multiple-icon version of :meth:``icon_text``.
736 """
737 return icons_text(
738 icons=icons,
739 text=text,
740 url=url,
741 alt=alt,
742 extra_icon_classes=extra_icon_classes,
743 extra_icon_styles=extra_icon_styles,
744 extra_a_classes=extra_a_classes,
745 extra_a_styles=extra_a_styles,
746 escape_alt=escape_alt,
747 escape_text=escape_text,
748 hyperlink_together=hyperlink_together,
749 )
751 # -------------------------------------------------------------------------
752 # Low-level HTTP information
753 # -------------------------------------------------------------------------
755 @reify
756 def remote_port(self) -> Optional[int]:
757 """
758 What port number is the client using?
760 The ``remote_port`` variable is an optional WSGI extra provided by some
761 frameworks, such as mod_wsgi.
763 The WSGI spec:
764 - https://www.python.org/dev/peps/pep-0333/
766 The CGI spec:
767 - https://en.wikipedia.org/wiki/Common_Gateway_Interface
769 The Pyramid Request object:
770 - https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request
771 - ... note: that includes ``remote_addr``, but not ``remote_port``.
772 """ # noqa
773 try:
774 return int(self.environ.get("REMOTE_PORT", ""))
775 except (TypeError, ValueError):
776 return None
778 # -------------------------------------------------------------------------
779 # HTTP request convenience functions
780 # -------------------------------------------------------------------------
782 def has_param(self, key: str) -> bool:
783 """
784 Is the parameter in the request?
786 Args:
787 key: the parameter's name
788 """
789 return key in self.params
791 def get_str_param(
792 self,
793 key: str,
794 default: str = None,
795 lower: bool = False,
796 upper: bool = False,
797 validator: STRING_VALIDATOR_TYPE = validate_alphanum_underscore,
798 ) -> Optional[str]:
799 """
800 Returns an HTTP parameter from the request (GET or POST). If it does
801 not exist, or is blank, return ``default``. If it fails the validator,
802 raise :exc:`pyramid.httpexceptions.HTTPBadRequest`.
804 Args:
805 key: the parameter's name
806 default: the value to return if the parameter is not found
807 lower: convert to lower case?
808 upper: convert to upper case?
809 validator: validator function
811 Returns:
812 the parameter's (string) contents, or ``default``
814 """
815 # HTTP parameters are always strings at heart
816 if key not in self.params: # missing from request?
817 return default
818 value = self.params.get(key)
819 if not value: # blank, e.g. "source=" in URL?
820 return default
821 assert isinstance(value, str) # ... or we wouldn't have got here
822 if lower:
823 value = value.lower()
824 elif upper:
825 value = value.upper()
826 try:
827 validator(value, self)
828 return value
829 except ValueError as e:
830 raise HTTPBadRequest(f"Bad {key!r} parameter: {e}")
832 def get_str_list_param(
833 self,
834 key: str,
835 lower: bool = False,
836 upper: bool = False,
837 validator: STRING_VALIDATOR_TYPE = validate_alphanum_underscore,
838 ) -> List[str]:
839 """
840 Returns a list of HTTP parameter values from the request. Ensures all
841 have been validated.
843 Args:
844 key: the parameter's name
845 lower: convert to lower case?
846 upper: convert to upper case?
847 validator: validator function
849 Returns:
850 a list of string values
852 """
853 values = self.params.getall(key)
854 if lower:
855 values = [x.lower() for x in values]
856 elif upper:
857 values = [x.upper() for x in values]
858 try:
859 for v in values:
860 validator(v, self)
861 except ValueError as e:
862 raise HTTPBadRequest(
863 f"Parameter {key!r} contains a bad value: {e}"
864 )
865 return values
867 def get_int_param(self, key: str, default: int = None) -> Optional[int]:
868 """
869 Returns an integer parameter from the HTTP request.
871 Args:
872 key: the parameter's name
873 default: the value to return if the parameter is not found or is
874 not a valid integer
876 Returns:
877 an integer, or ``default``
879 """
880 try:
881 return int(self.params[key])
882 except (KeyError, TypeError, ValueError):
883 return default
885 def get_int_list_param(self, key: str) -> List[int]:
886 """
887 Returns a list of integer parameter values from the HTTP request.
889 Args:
890 key: the parameter's name
892 Returns:
893 a list of integer values
895 """
896 values = self.params.getall(key)
897 try:
898 return [int(x) for x in values]
899 except (KeyError, TypeError, ValueError):
900 return []
902 def get_bool_param(self, key: str, default: bool) -> bool:
903 """
904 Returns a boolean parameter from the HTTP request.
906 Args:
907 key: the parameter's name
908 default: the value to return if the parameter is not found or is
909 not a valid boolean value
911 Returns:
912 an integer, or ``default``
914 Valid "true" and "false" values (case-insensitive): see
915 ``TRUE_STRINGS_LOWER_CASE``, ``FALSE_STRINGS_LOWER_CASE``.
916 """
917 try:
918 param_str = self.params[key].lower()
919 if param_str in TRUE_STRINGS_LOWER_CASE:
920 return True
921 elif param_str in FALSE_STRINGS_LOWER_CASE:
922 return False
923 else:
924 return default
925 except (AttributeError, KeyError, TypeError, ValueError):
926 return default
928 def get_date_param(self, key: str) -> Optional[Date]:
929 """
930 Returns a date parameter from the HTTP request. If it is missing or
931 looks bad, return ``None``.
933 Args:
934 key: the parameter's name
936 Returns:
937 a :class:`pendulum.Date`, or ``None``
938 """
939 try:
940 return coerce_to_pendulum_date(self.params[key])
941 except (KeyError, ParserError, TypeError, ValueError):
942 return None
944 def get_datetime_param(self, key: str) -> Optional[Pendulum]:
945 """
946 Returns a datetime parameter from the HTTP request. If it is missing or
947 looks bad, return ``None``.
949 Args:
950 key: the parameter's name
952 Returns:
953 a :class:`pendulum.DateTime`, or ``None``
954 """
955 try:
956 return coerce_to_pendulum(self.params[key])
957 except (KeyError, ParserError, TypeError, ValueError):
958 return None
960 def get_redirect_url_param(
961 self, key: str, default: str = None
962 ) -> Optional[str]:
963 """
964 Returns a redirection URL parameter from the HTTP request, validating
965 it. (The validation process does not allow all types of URLs!)
966 If it was missing, return ``default``. If it was bad, raise
967 :exc:`pyramid.httpexceptions.HTTPBadRequest`.
969 Args:
970 key:
971 the parameter's name
972 default:
973 the value to return if the parameter is not found, or is
974 invalid
976 Returns:
977 a URL string, or ``default``
978 """
979 return self.get_str_param(
980 key, default=default, validator=validate_redirect_url
981 )
983 # -------------------------------------------------------------------------
984 # Routing
985 # -------------------------------------------------------------------------
987 def route_url_params(
988 self, route_name: str, paramdict: Dict[str, Any]
989 ) -> str:
990 """
991 Provides a simplified interface to :func:`Request.route_url` when you
992 have parameters to pass.
994 It does two things:
996 (1) convert all params to their ``str()`` form;
997 (2) allow you to pass parameters more easily using a string
998 parameter name.
1000 The normal Pyramid Request use is:
1002 .. code-block:: python
1004 Request.route_url(route_name, param1=value1, param2=value2)
1006 where "param1" is the literal name of the parameter, but here we can do
1008 .. code-block:: python
1010 CamcopsRequest.route_url_params(route_name, {
1011 PARAM1_NAME: value1_not_necessarily_str,
1012 PARAM2_NAME: value2
1013 })
1015 """
1016 strparamdict = {k: str(v) for k, v in paramdict.items()}
1017 return self.route_url(route_name, **strparamdict)
1019 # -------------------------------------------------------------------------
1020 # Strings
1021 # -------------------------------------------------------------------------
1023 @reify
1024 def _all_extra_strings(self) -> Dict[str, Dict[str, Dict[str, str]]]:
1025 """
1026 Returns all CamCOPS "extra strings" (from XML files) in the format
1027 used by :func:`camcops_server.cc_string.all_extra_strings_as_dicts`.
1028 """
1029 return all_extra_strings_as_dicts(self.config_filename)
1031 def xstring(
1032 self,
1033 taskname: str,
1034 stringname: str,
1035 default: str = None,
1036 provide_default_if_none: bool = True,
1037 language: str = None,
1038 ) -> Optional[str]:
1039 """
1040 Looks up a string from one of the optional extra XML string files.
1042 Args:
1043 taskname: task name (top-level key)
1044 stringname: string name within task (second-level key)
1045 default: default to return if the string is not found
1046 provide_default_if_none: if ``True`` and ``default is None``,
1047 return a helpful missing-string message in the style
1048 "string x.y not found"
1049 language: language code to use, e.g. ``en-GB``; if ``None`` is
1050 passed, the default behaviour is to look up the current
1051 language for this request (see :meth:`language`).
1053 Returns:
1054 the "extra string"
1056 """
1057 # For speed, calculate default only if needed:
1058 allstrings = self._all_extra_strings
1059 if taskname in allstrings:
1060 taskstrings = allstrings[taskname]
1061 if stringname in taskstrings:
1062 langversions = taskstrings[stringname]
1063 if language is None:
1064 language = self.language
1065 if language: # Specific language requested
1066 # 1. Requested language, e.g. "en-GB"
1067 if language in langversions:
1068 return langversions[language]
1069 # 2. Same language, different country, e.g. "en-US"
1070 shortlang = language[:2] # e.g. "en"
1071 for key in langversions.keys():
1072 if key.startswith(shortlang):
1073 return langversions[shortlang]
1074 # 3. Default language
1075 if DEFAULT_LOCALE in langversions:
1076 return langversions[DEFAULT_LOCALE]
1077 # 4. Strings with no language specified in the XML
1078 if MISSING_LOCALE in langversions:
1079 return langversions[MISSING_LOCALE]
1080 # Not found
1081 if default is None and provide_default_if_none:
1082 default = (
1083 f"EXTRA_STRING_NOT_FOUND({taskname}.{stringname}[{language}])"
1084 )
1085 return default
1087 def wxstring(
1088 self,
1089 taskname: str,
1090 stringname: str,
1091 default: str = None,
1092 provide_default_if_none: bool = True,
1093 language: str = None,
1094 ) -> Optional[str]:
1095 """
1096 Returns a web-safe version of an :func:`xstring` (q.v.).
1097 """
1098 value = self.xstring(
1099 taskname,
1100 stringname,
1101 default,
1102 provide_default_if_none=provide_default_if_none,
1103 language=language,
1104 )
1105 if value is None and not provide_default_if_none:
1106 return None
1107 return ws.webify(value)
1109 def wappstring(
1110 self,
1111 stringname: str,
1112 default: str = None,
1113 provide_default_if_none: bool = True,
1114 language: str = None,
1115 ) -> Optional[str]:
1116 """
1117 Returns a web-safe version of an appstring (an app-wide extra string).
1118 This uses the XML file shared between the client and the server.
1119 """
1120 value = self.xstring(
1121 APPSTRING_TASKNAME,
1122 stringname,
1123 default,
1124 provide_default_if_none=provide_default_if_none,
1125 language=language,
1126 )
1127 if value is None and not provide_default_if_none:
1128 return None
1129 return ws.webify(value)
1131 def get_all_extra_strings(self) -> List[Tuple[str, str, str, str]]:
1132 """
1133 Returns all extra strings, as a list of ``task, name, language, value``
1134 tuples.
1136 2019-09-16: these are filtered according to the :ref:`RESTRICTED_TASKS
1137 <RESTRICTED_TASKS>` option.
1138 """
1139 restricted_tasks = self.config.restricted_tasks
1140 user_group_names = None # type: Optional[Set[str]]
1142 def task_permitted(task_xml_name: str) -> bool:
1143 nonlocal user_group_names
1144 if task_xml_name not in restricted_tasks:
1145 return True
1146 if user_group_names is None:
1147 user_group_names = set(self.user.group_names)
1148 permitted_group_names = set(restricted_tasks[task_xml_name])
1149 return bool(permitted_group_names.intersection(user_group_names))
1151 allstrings = self._all_extra_strings
1152 rows = []
1153 for task, taskstrings in allstrings.items():
1154 if not task_permitted(task):
1155 log.debug(
1156 f"Skipping extra string download for task {task}: "
1157 f"not permitted for user {self.user.username}"
1158 )
1159 continue
1160 for name, langversions in taskstrings.items():
1161 for language, value in langversions.items():
1162 rows.append((task, name, language, value))
1163 return rows
1165 def task_extrastrings_exist(self, taskname: str) -> bool:
1166 """
1167 Has the server been supplied with any extra strings for a specific
1168 task?
1169 """
1170 allstrings = self._all_extra_strings
1171 return taskname in allstrings
1173 def extrastring_families(self, sort: bool = True) -> List[str]:
1174 """
1175 Which sets of extra strings do we have? A "family" here means, for
1176 example, "the server itself", "the PHQ9 task", etc.
1177 """
1178 families = list(self._all_extra_strings.keys())
1179 if sort:
1180 families.sort()
1181 return families
1183 @reify
1184 def language(self) -> str:
1185 """
1186 Returns the language code selected by the current user, or if none is
1187 selected (or the user isn't logged in) the server's default language.
1189 Returns:
1190 str: a language code of the form ``en-GB``
1192 """
1193 if self.user is not None:
1194 language = self.user.language
1195 if language in POSSIBLE_LOCALES:
1196 return language
1198 # Fallback to default
1199 return self.config.language
1201 @reify
1202 def language_iso_639_1(self) -> str:
1203 """
1204 Returns the language code selected by the current user, or if none is
1205 selected (or the user isn't logged in) the server's default language.
1207 This assumes all the possible supported languages start with a
1208 two-letter primary language tag, which currently they do.
1210 Returns:
1211 str: a two-letter language code of the form ``en``
1213 """
1214 return self.language[:2]
1216 def gettext(self, message: str) -> str:
1217 """
1218 Returns a version of ``msg`` translated into the current language.
1219 This is used for server-only strings.
1221 The ``gettext()`` function is normally aliased to ``_()`` for
1222 auto-translation tools to read the souce code.
1223 """
1224 lang = self.language
1225 # We can't work out if the string is missing; gettext falls back to
1226 # the source message.
1227 if lang == DEFAULT_LOCALE:
1228 translated = message
1229 else:
1230 try:
1231 translator = gettext.translation(
1232 domain=GETTEXT_DOMAIN,
1233 localedir=TRANSLATIONS_DIR,
1234 languages=[lang],
1235 )
1236 translated = translator.gettext(message)
1237 except OSError: # e.g. translation file not found
1238 log.warning(f"Failed to find translation files for {lang}")
1239 translated = message
1240 if DEBUG_GETTEXT:
1241 return f"[{message}→{lang}→{translated}]"
1242 else:
1243 return translated
1245 def wgettext(self, message: str) -> str:
1246 """
1247 A web-safe version of :func:`gettext`.
1248 """
1249 return ws.webify(self.gettext(message))
1251 def sstring(self, which_string: SS) -> str:
1252 """
1253 Returns a translated server string via a lookup mechanism.
1255 Args:
1256 which_string:
1257 which string? A :class:`camcops_server.cc_modules.cc_text.SS`
1258 enumeration value
1260 Returns:
1261 str: the string
1263 """
1264 try:
1265 result = self._cached_sstring[which_string]
1266 except KeyError:
1267 result = server_string(self, which_string)
1268 self._cached_sstring[which_string] = result
1269 return result
1271 def wsstring(self, which_string: SS) -> str:
1272 """
1273 Returns a web-safe version of a translated server string via a lookup
1274 mechanism.
1276 Args:
1277 which_string:
1278 which string? A :class:`camcops_server.cc_modules.cc_text.SS`
1279 enumeration value
1281 Returns:
1282 str: the string
1284 """
1285 return ws.webify(self.sstring(which_string))
1287 # -------------------------------------------------------------------------
1288 # PNG versus SVG output, so tasks don't have to care (for e.g. PDF/web)
1289 # -------------------------------------------------------------------------
1291 def prepare_for_pdf_figures(self) -> None:
1292 """
1293 Switch the server (for this request) to producing figures in a format
1294 most suitable for PDF.
1295 """
1296 if CSS_PAGED_MEDIA:
1297 # unlikely -- we use wkhtmltopdf instead now
1298 self.switch_output_to_png()
1299 # ... even weasyprint's SVG handling is inadequate
1300 else:
1301 # This is the main method -- we use wkhtmltopdf these days
1302 self.switch_output_to_svg(provide_png_fallback=False)
1303 # ... wkhtmltopdf can cope with SVGs
1305 def prepare_for_html_figures(self) -> None:
1306 """
1307 Switch the server (for this request) to producing figures in a format
1308 most suitable for HTML.
1309 """
1310 self.switch_output_to_svg()
1312 def switch_output_to_png(self) -> None:
1313 """
1314 Switch server (for this request) to producing figures in PNG format.
1315 """
1316 self.use_svg = False
1318 def switch_output_to_svg(self, provide_png_fallback: bool = True) -> None:
1319 """
1320 Switch server (for this request) to producing figures in SVG format.
1322 Args:
1323 provide_png_fallback:
1324 Offer a PNG fallback option/
1325 """
1326 self.use_svg = True
1327 self.provide_png_fallback_for_svg = provide_png_fallback
1329 @staticmethod
1330 def create_figure(**kwargs: Any) -> Figure:
1331 """
1332 Creates and returns a :class:`matplotlib.figure.Figure` with a canvas.
1333 The canvas will be available as ``fig.canvas``.
1334 """
1335 fig = Figure(**kwargs)
1336 # noinspection PyUnusedLocal
1337 canvas = FigureCanvas(fig) # noqa: F841
1338 # The canvas will be now available as fig.canvas, since
1339 # FigureCanvasBase.__init__ calls fig.set_canvas(self); similarly, the
1340 # figure is available from the canvas as canvas.figure
1342 # How do we set the font, so the caller doesn't have to?
1343 # The "nasty global" way is:
1344 # matplotlib.rc('font', **fontdict)
1345 # matplotlib.rc('legend', **fontdict)
1346 # or similar. Then matplotlib often works its way round to using its
1347 # global rcParams object, which is Not OK in a multithreaded context.
1348 #
1349 # https://github.com/matplotlib/matplotlib/issues/6514
1350 # https://github.com/matplotlib/matplotlib/issues/6518
1351 #
1352 # The other way is to specify a fontdict with each call, e.g.
1353 # ax.set_xlabel("some label", **fontdict)
1354 # https://stackoverflow.com/questions/21321670/how-to-change-fonts-in-matplotlib-python # noqa
1355 # Relevant calls with explicit "fontdict: Dict" parameters:
1356 # ax.set_xlabel(..., fontdict=XXX, ...)
1357 # ax.set_ylabel(..., fontdict=XXX, ...)
1358 # ax.set_xticklabels(..., fontdict=XXX, ...)
1359 # ax.set_yticklabels(..., fontdict=XXX, ...)
1360 # ax.text(..., fontdict=XXX, ...)
1361 # ax.set_label_text(..., fontdict=XXX, ...)
1362 # ax.set_title(..., fontdict=XXX, ...)
1363 #
1364 # And with "fontproperties: FontProperties"
1365 # sig.suptitle(..., fontproperties=XXX, ...)
1366 #
1367 # And with "prop: FontProperties":
1368 # ax.legend(..., prop=XXX, ...)
1369 #
1370 # Then, some things are automatically plotted...
1372 return fig
1374 @reify
1375 def fontdict(self) -> Dict[str, Any]:
1376 """
1377 Returns a font dictionary for use with Matplotlib plotting.
1379 **matplotlib font handling and fontdict parameter**
1381 - https://stackoverflow.com/questions/3899980
1382 - https://matplotlib.org/users/customizing.html
1383 - matplotlib/font_manager.py
1385 - Note that the default TrueType font is "DejaVu Sans"; see
1386 :class:`matplotlib.font_manager.FontManager`
1388 - Example sequence:
1390 - CamCOPS does e.g. ``ax.set_xlabel("Date/time",
1391 fontdict=self.req.fontdict)``
1393 - matplotlib.axes.Axes.set_xlabel:
1394 https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.set_xlabel.html
1396 - matplotlib.axes.Axes.text documentation, explaining the fontdict
1397 parameter:
1398 https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.text.html
1400 - What's created is probably a :class:`matplotlib.text.Text` object,
1401 whose ``update()`` function is called with the dictionary. Via its
1402 superclass :class:`matplotlib.artist.Artist` and its ``update()``
1403 function, this sets attributes on the Text object. Ultimately,
1404 without having explored this in too much depth, it's probably the
1405 ``self._fontproperties`` object of Text that holds this info.
1407 - That is an instance of
1408 :class:`matplotlib.font_manager.FontProperties`.
1410 **Linux fonts**
1412 Anyway, the main things are (1) that the relevant fonts need to be
1413 installed, and (2) that the default is DejaVu Sans.
1415 - Linux fonts are installed in ``/usr/share/fonts``, and TrueType fonts
1416 within ``/usr/share/fonts/truetype``.
1418 - Use ``fc-match`` to see the font mappings being used.
1420 - Use ``fc-list`` to list available fonts.
1422 - Use ``fc-cache`` to rebuild the font cache.
1424 - Files in ``/etc/fonts/conf.avail/`` do some thinking.
1426 **Problems with pixellated fonts in PDFs made via wkhtmltopdf**
1428 - See also https://github.com/wkhtmltopdf/wkhtmltopdf/issues/2193,
1429 about pixellated fonts via wkhtmltopdf (which was our problem for a
1430 subset of the fonts in trackers, on 2020-06-28, using wkhtmltopd
1431 0.12.5 with patched Qt).
1433 - When you get pixellated fonts in a PDF, look also at the embedded
1434 font list in the PDF (e.g. in Okular: File -> Properties -> Fonts).
1436 - Matplotlib helpfully puts the text (rendered as lines in SVG) as
1437 comments.
1439 - As a debugging sequence, we can manually trim the "pdfhtml" output
1440 down to just the SVG file. Still has problems. Yet there's no text
1441 in it; the text is made of pure SVG lines. And Chrome renders it
1442 perfectly. As does Firefox.
1444 - The rendering bug goes away entirely if you delete the opacity
1445 styling throughout the SVG:
1447 .. code-block:: none
1449 <g style="opacity:0.5;" transform=...>
1450 ^^^^^^^^^^^^^^^^^^^^
1451 this
1453 - So, simple fix:
1455 - rather than opacity (alpha) 0.5 and on top...
1457 - 50% grey colour and on the bottom.
1459 """
1460 fontsize = self.config.plot_fontsize
1461 return dict(
1462 family="sans-serif",
1463 # ... serif, sans-serif, cursive, fantasy, monospace
1464 style="normal", # normal (roman), italic, oblique
1465 variant="normal", # normal, small-caps
1466 weight="normal",
1467 # ... normal [=400], bold [=700], bolder [relative to current],
1468 # lighter [relative], 100, 200, 300, ..., 900
1469 size=fontsize, # in pt (default 12)
1470 )
1472 @reify
1473 def fontprops(self) -> FontProperties:
1474 """
1475 Return a :class:`matplotlib.font_manager.FontProperties` object for
1476 use with Matplotlib plotting.
1477 """
1478 return FontProperties(**self.fontdict)
1480 def set_figure_font_sizes(
1481 self,
1482 ax: "Axes", # "SubplotBase",
1483 fontdict: Dict[str, Any] = None,
1484 x_ticklabels: bool = True,
1485 y_ticklabels: bool = True,
1486 ) -> None:
1487 """
1488 Sets font sizes for the axes of the specified Matplotlib figure.
1490 Args:
1491 ax: the figure to modify
1492 fontdict: the font dictionary to use (if omitted, the default
1493 will be used)
1494 x_ticklabels: if ``True``, modify the X-axis tick labels
1495 y_ticklabels: if ``True``, modify the Y-axis tick labels
1496 """
1497 final_fontdict = self.fontdict.copy()
1498 if fontdict:
1499 final_fontdict.update(fontdict)
1500 fp = FontProperties(**final_fontdict)
1502 axes = [] # type: List[Axis]
1503 if x_ticklabels: # and hasattr(ax, "xaxis"):
1504 axes.append(ax.xaxis)
1505 if y_ticklabels: # and hasattr(ax, "yaxis"):
1506 axes.append(ax.yaxis)
1507 for axis in axes:
1508 for ticklabel in axis.get_ticklabels(
1509 which="both"
1510 ): # type: Text # I think!
1511 ticklabel.set_fontproperties(fp)
1513 def get_html_from_pyplot_figure(self, fig: Figure) -> str:
1514 """
1515 Make HTML (as PNG or SVG) from pyplot
1516 :class:`matplotlib.figure.Figure`.
1517 """
1518 if USE_SVG_IN_HTML and self.use_svg:
1519 result = svg_html_from_pyplot_figure(fig)
1520 if self.provide_png_fallback_for_svg:
1521 # return both an SVG and a PNG image, for browsers that can't
1522 # deal with SVG; the Javascript header will sort this out
1523 # http://www.voormedia.nl/blog/2012/10/displaying-and-detecting-support-for-svg-images # noqa
1524 result += png_img_html_from_pyplot_figure(
1525 fig, PlotDefaults.DEFAULT_PLOT_DPI, "pngfallback"
1526 )
1527 return result
1528 else:
1529 return png_img_html_from_pyplot_figure(
1530 fig, PlotDefaults.DEFAULT_PLOT_DPI
1531 )
1533 # -------------------------------------------------------------------------
1534 # Convenience functions for user information
1535 # -------------------------------------------------------------------------
1537 @property
1538 def user(self) -> Optional["User"]:
1539 """
1540 Returns the :class:`camcops_server.cc_modules.cc_user.User` for the
1541 current request.
1542 """
1543 return self._debugging_user or self.camcops_session.user
1545 @property
1546 def user_id(self) -> Optional[int]:
1547 """
1548 Returns the integer user ID for the current request.
1549 """
1550 if self._debugging_user:
1551 return self._debugging_user.id
1552 return self.camcops_session.user_id
1554 # -------------------------------------------------------------------------
1555 # ID number definitions
1556 # -------------------------------------------------------------------------
1558 @reify
1559 def idnum_definitions(self) -> List[IdNumDefinition]:
1560 """
1561 Returns all
1562 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` objects.
1563 """
1564 return get_idnum_definitions(self.dbsession) # no longer cached
1566 @reify
1567 def valid_which_idnums(self) -> List[int]:
1568 """
1569 Returns the ``which_idnum`` values for all ID number definitions.
1570 """
1571 return [iddef.which_idnum for iddef in self.idnum_definitions]
1572 # ... pre-sorted
1574 def get_idnum_definition(
1575 self, which_idnum: int
1576 ) -> Optional[IdNumDefinition]:
1577 """
1578 Retrieves an
1579 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` for the
1580 specified ``which_idnum`` value.
1581 """
1582 return next(
1583 (
1584 iddef
1585 for iddef in self.idnum_definitions
1586 if iddef.which_idnum == which_idnum
1587 ),
1588 None,
1589 )
1591 def get_id_desc(
1592 self, which_idnum: int, default: str = None
1593 ) -> Optional[str]:
1594 """
1595 Get the server's ID description for the specified ``which_idnum``
1596 value.
1597 """
1598 return next(
1599 (
1600 iddef.description
1601 for iddef in self.idnum_definitions
1602 if iddef.which_idnum == which_idnum
1603 ),
1604 default,
1605 )
1607 def get_id_shortdesc(
1608 self, which_idnum: int, default: str = None
1609 ) -> Optional[str]:
1610 """
1611 Get the server's short ID description for the specified ``which_idnum``
1612 value.
1613 """
1614 return next(
1615 (
1616 iddef.short_description
1617 for iddef in self.idnum_definitions
1618 if iddef.which_idnum == which_idnum
1619 ),
1620 default,
1621 )
1623 def is_idnum_valid(
1624 self, which_idnum: int, idnum_value: Optional[int]
1625 ) -> bool:
1626 """
1627 Does the ID number pass any extended validation checks?
1629 Args:
1630 which_idnum: which ID number type is this?
1631 idnum_value: ID number value
1633 Returns:
1634 bool: valid?
1635 """
1636 idnumdef = self.get_idnum_definition(which_idnum)
1637 if not idnumdef:
1638 return False
1639 valid, _ = validate_id_number(
1640 self, idnum_value, idnumdef.validation_method
1641 )
1642 return valid
1644 def why_idnum_invalid(
1645 self, which_idnum: int, idnum_value: Optional[int]
1646 ) -> str:
1647 """
1648 Why does the ID number fail any extended validation checks?
1650 Args:
1651 which_idnum: which ID number type is this?
1652 idnum_value: ID number value
1654 Returns:
1655 str: why invalid? (Human-readable string.)
1656 """
1657 idnumdef = self.get_idnum_definition(which_idnum)
1658 if not idnumdef:
1659 _ = self.gettext
1660 return _("Can't fetch ID number definition")
1661 _, why = validate_id_number(
1662 self, idnum_value, idnumdef.validation_method
1663 )
1664 return why
1666 # -------------------------------------------------------------------------
1667 # Server settings
1668 # -------------------------------------------------------------------------
1670 @reify
1671 def server_settings(self) -> ServerSettings:
1672 """
1673 Return the
1674 :class:`camcops_server.cc_modules.cc_serversettings.ServerSettings` for
1675 the server.
1676 """
1677 return get_server_settings(self)
1679 @reify
1680 def database_title(self) -> str:
1681 """
1682 Return the database friendly title for the server.
1683 """
1684 ss = self.server_settings
1685 return ss.database_title or ""
1687 def set_database_title(self, title: str) -> None:
1688 """
1689 Sets the database friendly title for the server.
1690 """
1691 ss = self.server_settings
1692 ss.database_title = title
1694 # -------------------------------------------------------------------------
1695 # SNOMED-CT
1696 # -------------------------------------------------------------------------
1698 @reify
1699 def snomed_supported(self) -> bool:
1700 """
1701 Is SNOMED-CT supported for CamCOPS tasks?
1702 """
1703 return bool(self.config.get_task_snomed_concepts())
1705 def snomed(self, lookup: str) -> "SnomedConcept":
1706 """
1707 Fetches a SNOMED-CT concept for a CamCOPS task.
1709 Args:
1710 lookup: a CamCOPS SNOMED lookup string
1712 Returns:
1713 a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
1715 Raises:
1716 :exc:`KeyError`, if the lookup cannot be found (e.g. UK data not
1717 installed)
1718 """
1719 concepts = self.config.get_task_snomed_concepts()
1720 assert concepts, "No SNOMED-CT data available for CamCOPS tasks"
1721 return concepts[lookup]
1723 @reify
1724 def icd9cm_snomed_supported(self) -> bool:
1725 """
1726 Is SNOMED-CT supported for ICD-9-CM codes?
1727 """
1728 return bool(self.config.get_icd9cm_snomed_concepts())
1730 def icd9cm_snomed(self, code: str) -> List["SnomedConcept"]:
1731 """
1732 Fetches a SNOMED-CT concept for an ICD-9-CM code
1734 Args:
1735 code: an ICD-9-CM code
1737 Returns:
1738 a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
1740 Raises:
1741 :exc:`KeyError`, if the lookup cannot be found (e.g. data not
1742 installed)
1743 """
1744 concepts = self.config.get_icd9cm_snomed_concepts()
1745 assert concepts, "No SNOMED-CT data available for ICD-9-CM"
1746 return concepts[code]
1748 @reify
1749 def icd10_snomed_supported(self) -> bool:
1750 """
1751 Is SNOMED-CT supported for ICD-10 codes?
1752 """
1753 return bool(self.config.get_icd9cm_snomed_concepts())
1755 def icd10_snomed(self, code: str) -> List["SnomedConcept"]:
1756 """
1757 Fetches a SNOMED-CT concept for an ICD-10 code
1759 Args:
1760 code: an ICD-10 code
1762 Returns:
1763 a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
1765 Raises:
1766 :exc:`KeyError`, if the lookup cannot be found (e.g. data not
1767 installed)
1768 """
1769 concepts = self.config.get_icd10_snomed_concepts()
1770 assert concepts, "No SNOMED-CT data available for ICD-10"
1771 return concepts[code]
1773 # -------------------------------------------------------------------------
1774 # Export recipients
1775 # -------------------------------------------------------------------------
1777 def get_export_recipients(
1778 self,
1779 recipient_names: List[str] = None,
1780 all_recipients: bool = False,
1781 all_push_recipients: bool = False,
1782 save: bool = True,
1783 database_versions: bool = True,
1784 ) -> List["ExportRecipient"]:
1785 """
1786 Returns a list of export recipients, with some filtering if desired.
1787 Validates them against the database.
1789 - If ``all_recipients``, return all.
1790 - Otherwise, if ``all_push_recipients``, return all "push" recipients.
1791 - Otherwise, return all named in ``recipient_names``.
1793 - If any are invalid, raise an error.
1794 - If any are duplicate, raise an error.
1796 - Holds a global export file lock for some database access relating to
1797 export recipient records.
1799 Args:
1800 all_recipients: use all recipients?
1801 all_push_recipients: use all "push" recipients?
1802 recipient_names: recipient names
1803 save: save any freshly created recipient records to the DB?
1804 database_versions: return ExportRecipient objects that are attached
1805 to a database session (rather than ExportRecipientInfo objects
1806 that aren't)?
1808 Returns:
1809 list: of :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
1811 Raises:
1812 - :exc:`ValueError` if a name is invalid
1813 - :exc:`ValueError` if a name is duplicated
1814 - :exc:`camcops_server.cc_modules.cc_exportrecipient.InvalidExportRecipient`
1815 if an export recipient configuration is invalid
1816 """ # noqa
1817 # Delayed imports
1818 from camcops_server.cc_modules.cc_exportrecipient import (
1819 ExportRecipient,
1820 ) # delayed import
1822 # Check parameters
1823 recipient_names = recipient_names or [] # type: List[str]
1824 if save and not database_versions:
1825 raise AssertionError("Can't save unless taking database versions")
1827 # Start with ExportRecipientInfo objects:
1828 recipientinfolist = self.config.get_all_export_recipient_info()
1830 # Restrict
1831 if not all_recipients:
1832 if all_push_recipients:
1833 recipientinfolist = [r for r in recipientinfolist if r.push]
1834 else:
1835 # Specified by name
1836 duplicates = [
1837 name
1838 for name, count in collections.Counter(
1839 recipient_names
1840 ).items()
1841 if count > 1
1842 ]
1843 if duplicates:
1844 raise ValueError(
1845 f"Duplicate export recipients "
1846 f"specified: {duplicates!r}"
1847 )
1848 valid_names = set(r.recipient_name for r in recipientinfolist)
1849 bad_names = [
1850 name for name in recipient_names if name not in valid_names
1851 ]
1852 if bad_names:
1853 raise ValueError(
1854 f"Bad export recipients specified: {bad_names!r}. "
1855 f"Valid recipients are: {valid_names!r}"
1856 )
1857 recipientinfolist = [
1858 r
1859 for r in recipientinfolist
1860 if r.recipient_name in recipient_names
1861 ]
1863 # Complete validation
1864 for r in recipientinfolist:
1865 r.validate(self)
1867 # Does the caller want them as ExportRecipientInfo objects
1868 if not database_versions:
1869 return recipientinfolist
1871 # Convert to SQLAlchemy ORM ExportRecipient objects:
1872 recipients = [
1873 ExportRecipient(other=x) for x in recipientinfolist
1874 ] # type: List[ExportRecipient]
1876 final_recipients = [] # type: List[ExportRecipient]
1877 dbsession = self.dbsession
1879 def process_final_recipients(_save: bool) -> None:
1880 for r in recipients:
1881 other = ExportRecipient.get_existing_matching_recipient(
1882 dbsession, r
1883 )
1884 if other:
1885 # This other one matches, and is already in the database.
1886 # Use it. But first...
1887 for (
1888 attrname
1889 ) in (
1890 ExportRecipient.RECOPY_EACH_TIME_FROM_CONFIG_ATTRNAMES
1891 ):
1892 setattr(other, attrname, getattr(r, attrname))
1893 # OK.
1894 final_recipients.append(other)
1895 else:
1896 # Our new object doesn't match. Use (+/- save) it.
1897 if save:
1898 log.debug(
1899 "Creating new ExportRecipient record in database"
1900 )
1901 dbsession.add(r)
1902 r.current = True
1903 final_recipients.append(r)
1905 if save:
1906 lockfilename = (
1907 self.config.get_master_export_recipient_lockfilename()
1908 )
1909 with lockfile.FileLock(
1910 lockfilename, timeout=None
1911 ): # waits forever if necessary
1912 process_final_recipients(_save=True)
1913 else:
1914 process_final_recipients(_save=False)
1916 # OK
1917 return final_recipients
1919 def get_export_recipient(
1920 self, recipient_name: str, save: bool = True
1921 ) -> "ExportRecipient":
1922 """
1923 Returns a single validated export recipient, given its name.
1925 Args:
1926 recipient_name: recipient name
1927 save: save any freshly created recipient records to the DB?
1929 Returns:
1930 list: of :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
1932 Raises:
1933 - :exc:`ValueError` if a name is invalid
1934 - :exc:`camcops_server.cc_modules.cc_exportrecipient.InvalidExportRecipient`
1935 if an export recipient configuration is invalid
1936 """ # noqa
1937 recipients = self.get_export_recipients([recipient_name], save=save)
1938 assert len(recipients) == 1
1939 return recipients[0]
1941 @reify
1942 def all_push_recipients(self) -> List["ExportRecipient"]:
1943 """
1944 Cached for speed (will potentially be called for multiple tables in
1945 a bulk upload).
1946 """
1947 return self.get_export_recipients(
1948 all_push_recipients=True,
1949 save=False,
1950 database_versions=True, # we need group ID info somehow
1951 )
1953 def add_export_push_request(
1954 self, recipient_name: str, basetable: str, task_pk: int
1955 ) -> None:
1956 """
1957 Adds a request to push a task to an export recipient.
1959 The reason we use this slightly convoluted approach is because
1960 otherwise, it's very easy to generate a backend request for a new task
1961 before it's actually been committed (so the backend finds no task).
1963 Args:
1964 recipient_name: name of the recipient
1965 basetable: name of the task's base table
1966 task_pk: server PK of the task
1967 """
1968 self._pending_export_push_requests.append(
1969 (recipient_name, basetable, task_pk)
1970 )
1972 def _process_pending_export_push_requests(self) -> None:
1973 """
1974 Sends pending export push requests to the backend.
1976 Called after the COMMIT.
1977 """
1978 from camcops_server.cc_modules.celery import (
1979 export_task_backend,
1980 ) # delayed import
1982 for (
1983 recipient_name,
1984 basetable,
1985 task_pk,
1986 ) in self._pending_export_push_requests:
1987 log.info(
1988 "Submitting background job to export task {}.{} to {}",
1989 basetable,
1990 task_pk,
1991 recipient_name,
1992 )
1993 export_task_backend.delay(
1994 recipient_name=recipient_name,
1995 basetable=basetable,
1996 task_pk=task_pk,
1997 )
1999 # -------------------------------------------------------------------------
2000 # User downloads
2001 # -------------------------------------------------------------------------
2003 @property
2004 def user_download_dir(self) -> str:
2005 """
2006 The directory in which this user's downloads should be/are stored, or a
2007 blank string if user downloads are not available. Also ensures it
2008 exists.
2009 """
2010 if self.config.user_download_max_space_mb <= 0:
2011 return ""
2012 basedir = self.config.user_download_dir
2013 if not basedir:
2014 return ""
2015 user_id = self.user_id
2016 if user_id is None:
2017 return ""
2018 userdir = os.path.join(basedir, str(user_id))
2019 mkdir_p(userdir)
2020 return userdir
2022 @property
2023 def user_download_bytes_permitted(self) -> int:
2024 """
2025 Amount of space the user is permitted.
2026 """
2027 if not self.user_download_dir:
2028 return 0
2029 return self.config.user_download_max_space_mb * 1024 * 1024
2031 @reify
2032 def user_download_bytes_used(self) -> int:
2033 """
2034 Returns the disk space used by this user.
2035 """
2036 download_dir = self.user_download_dir
2037 if not download_dir:
2038 return 0
2039 return get_directory_contents_size(download_dir)
2041 @property
2042 def user_download_bytes_available(self) -> int:
2043 """
2044 Returns the available space for this user in their download area.
2045 """
2046 permitted = self.user_download_bytes_permitted
2047 used = self.user_download_bytes_used
2048 available = permitted - used
2049 return available
2051 @property
2052 def user_download_lifetime_duration(self) -> Duration:
2053 """
2054 Returns the lifetime of user download objects.
2055 """
2056 return Duration(minutes=self.config.user_download_file_lifetime_min)
2059# noinspection PyUnusedLocal
2060def complete_request_add_cookies(
2061 req: CamcopsRequest, response: Response
2062) -> None:
2063 """
2064 Finializes the response by adding session cookies.
2066 See :meth:`CamcopsRequest.complete_request_add_cookies`.
2067 """
2068 req.complete_request_add_cookies()
2071# =============================================================================
2072# Configurator
2073# =============================================================================
2076@contextmanager
2077def camcops_pyramid_configurator_context(
2078 debug_toolbar: bool = False, static_cache_duration_s: int = 0
2079) -> Configurator:
2080 """
2081 Context manager to create a Pyramid configuration context, for making
2082 (for example) a WSGI server or a debugging request. That means setting up
2083 things like:
2085 - the authentication and authorization policies
2086 - our request and session factories
2087 - our Mako renderer
2088 - our routes and views
2090 Args:
2091 debug_toolbar:
2092 Add the Pyramid debug toolbar?
2093 static_cache_duration_s:
2094 Lifetime (in seconds) for the HTTP cache-control setting for
2095 static content.
2097 Returns:
2098 a :class:`Configurator` object
2100 Note this includes settings that transcend the config file.
2102 Most things should be in the config file. This enables us to run multiple
2103 configs (e.g. multiple CamCOPS databases) through the same process.
2104 However, some things we need to know right now, to make the WSGI app.
2105 Here, OS environment variables and command-line switches are appropriate.
2106 """
2108 # -------------------------------------------------------------------------
2109 # 1. Base app
2110 # -------------------------------------------------------------------------
2111 settings = { # Settings that can't be set directly?
2112 "debug_authorization": DEBUG_AUTHORIZATION,
2113 # ... see https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/security.html#debugging-view-authorization-failures # noqa
2114 }
2115 with Configurator(settings=settings) as config:
2116 # ---------------------------------------------------------------------
2117 # Authentication; authorizaion (permissions)
2118 # ---------------------------------------------------------------------
2119 authentication_policy = CamcopsAuthenticationPolicy()
2120 config.set_authentication_policy(authentication_policy)
2121 # Let's not use ACLAuthorizationPolicy, which checks an access control
2122 # list for a resource hierarchy of objects, but instead:
2123 authorization_policy = CamcopsAuthorizationPolicy()
2124 config.set_authorization_policy(authorization_policy)
2125 config.set_default_permission(Permission.HAPPY)
2126 # ... applies to all SUBSEQUENT view configuration registrations
2128 # ---------------------------------------------------------------------
2129 # Factories
2130 # ---------------------------------------------------------------------
2131 config.set_request_factory(CamcopsRequest)
2132 # ... for request attributes: config, database, etc.
2133 config.set_session_factory(get_session_factory())
2134 # ... for request.session
2135 config.set_response_factory(camcops_response_factory)
2137 # ---------------------------------------------------------------------
2138 # Renderers
2139 # ---------------------------------------------------------------------
2140 camcops_add_mako_renderer(config, extension=".mako")
2142 # deform_bootstrap.includeme(config)
2144 # ---------------------------------------------------------------------
2145 # Routes and accompanying views
2146 # ---------------------------------------------------------------------
2148 # Add static views
2149 # https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/assets.html#serving-static-assets # noqa
2150 # Hmm. We cannot fail to set up a static file route, because otherwise
2151 # we can't provide URLs to them.
2152 static_filepath = STATIC_CAMCOPS_PACKAGE_PATH
2153 static_name = RouteCollection.STATIC.route
2154 log.debug(
2155 "... including static files from {!r} at Pyramid static "
2156 "name {!r}",
2157 static_filepath,
2158 static_name,
2159 )
2160 # ... does the name needs to start with "/" or the pattern "static/"
2161 # will override the later "deform_static"? Not sure.
2163 # We were doing this:
2164 # config.add_static_view(name=static_name, path=static_filepath)
2165 # But now we need to (a) add the
2166 # "cache_max_age=static_cache_duration_s" argument, and (b) set the
2167 # HTTP header 'Cache-Control: no-cache="Set-Cookie, Set-Cookie2"',
2168 # for the ZAP penetration tester:
2169 # ... https://cheatsheetseries.owasp.org/cheatsheets/Session_Management_Cheat_Sheet.html#web-content-caching # noqa
2170 # We can do the former, but not the latter, via add_static_view(),
2171 # because it sends its keyword arguments to add_route(), not the view
2172 # creation. So, alternatives ways...
2173 # - from https://github.com/Pylons/pyramid/issues/1486
2174 # - and https://stackoverflow.com/questions/24854300/
2175 # - to https://github.com/Pylons/pyramid/pull/2021
2176 # - to https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-derivers # noqa
2178 config.add_static_view(
2179 name=static_name,
2180 path=static_filepath,
2181 cache_max_age=static_cache_duration_s,
2182 )
2184 # Add all the routes:
2185 for pr in RouteCollection.all_routes():
2186 if DEBUG_ADD_ROUTES:
2187 suffix = (
2188 f", pregenerator={pr.pregenerator}"
2189 if pr.pregenerator
2190 else ""
2191 )
2192 log.info("Adding route: {} -> {}{}", pr.route, pr.path, suffix)
2193 config.add_route(pr.route, pr.path, pregenerator=pr.pregenerator)
2194 # See also:
2195 # https://stackoverflow.com/questions/19184612/how-to-ensure-urls-generated-by-pyramids-route-url-and-route-path-are-valid # noqa
2197 # Routes added EARLIER have priority. So add this AFTER our custom
2198 # bugfix:
2199 config.add_static_view(
2200 name="/deform_static",
2201 path="deform:static/",
2202 cache_max_age=static_cache_duration_s,
2203 )
2205 # Most views are using @view_config() which calls add_view().
2206 # Scan for @view_config decorators, to map views to routes:
2207 # https://docs.pylonsproject.org/projects/venusian/en/latest/api.html
2208 config.scan(
2209 "camcops_server.cc_modules", ignore=[re.compile("_tests$").search]
2210 )
2212 # ---------------------------------------------------------------------
2213 # Add tweens (inner to outer)
2214 # ---------------------------------------------------------------------
2215 # We will use implicit positioning:
2216 # - https://www.slideshare.net/aconrad/alex-conrad-pyramid-tweens-ploneconf-2011 # noqa
2218 # config.add_tween('camcops_server.camcops.http_session_tween_factory')
2220 # ---------------------------------------------------------------------
2221 # Debug toolbar
2222 # ---------------------------------------------------------------------
2223 if debug_toolbar:
2224 log.debug("Enabling Pyramid debug toolbar")
2225 config.include("pyramid_debugtoolbar") # BEWARE! SIDE EFFECTS
2226 # ... Will trigger an import that hooks events into all
2227 # SQLAlchemy queries. There's a bug somewhere relating to that;
2228 # see notes below relating to the "mergedb" function.
2229 config.add_route(
2230 RouteCollection.DEBUG_TOOLBAR.route,
2231 RouteCollection.DEBUG_TOOLBAR.path,
2232 )
2234 yield config
2237# =============================================================================
2238# Debugging requests
2239# =============================================================================
2242def make_post_body_from_dict(
2243 d: Dict[str, str], encoding: str = "utf8"
2244) -> bytes:
2245 """
2246 Makes an HTTP POST body from a dictionary.
2248 For debugging HTTP requests.
2250 It mimics how the tablet operates.
2251 """
2252 # https://docs.pylonsproject.org/projects/pyramid-cookbook/en/latest/testing/testing_post_curl.html # noqa
2253 txt = urllib.parse.urlencode(query=d)
2254 # ... this encoding mimics how the tablet operates
2255 body = txt.encode(encoding)
2256 return body
2259class CamcopsDummyRequest(CamcopsRequest, DummyRequest):
2260 """
2261 Request class that allows manual manipulation of GET/POST parameters
2262 for debugging.
2264 It reads its config (on first demand) from the config file specified in
2265 ``os.environ[ENVVAR_CONFIG_FILE]``.
2267 Notes:
2269 - The important base class is :class:`webob.request.BaseRequest`.
2270 - ``self.params`` is a :class:`NestedMultiDict` (see
2271 ``webob/multidict.py``); these are intrinsically read-only.
2272 - ``self.params`` is also a read-only property. When read, it combines
2273 data from ``self.GET`` and ``self.POST``.
2274 - What we do here is to manipulate the underlying GET/POST data.
2276 """
2278 _CACHE_KEY = "webob._parsed_query_vars"
2279 _QUERY_STRING_KEY = "QUERY_STRING"
2281 # def __init__(self, *args, **kwargs) -> None:
2282 # super().__init__(*args, **kwargs)
2283 # # Just a technique worth noting:
2284 # #
2285 # # self._original_params_property = CamcopsRequest.params # type: property # noqa
2286 # # self._original_params = self._original_params_property.fget(self) # type: NestedMultiDict # noqa
2287 # # self._fake_params = self._original_params.copy() # type: MultiDict
2288 # # if params:
2289 # # self._fake_params.update(params)
2290 #
2291 # @property
2292 # def params(self):
2293 # log.debug(repr(self._fake_params))
2294 # return self._fake_params
2295 # # Returning the member object allows clients to call
2296 # # dummyreq.params.update(...)
2297 #
2298 # @params.setter
2299 # def params(self, value):
2300 # self._fake_params = value
2302 def set_method_get(self) -> None:
2303 """
2304 Sets the fictional request method to GET.
2305 """
2306 self.method = HttpMethod.GET
2308 def set_method_post(self) -> None:
2309 """
2310 Sets the fictional request method to POST.
2311 """
2312 self.method = HttpMethod.POST
2314 def clear_get_params(self) -> None:
2315 """
2316 Clear all GET parameters.
2317 """
2318 env = self.environ
2319 if self._CACHE_KEY in env:
2320 del env[self._CACHE_KEY]
2321 env[self._QUERY_STRING_KEY] = ""
2323 def add_get_params(
2324 self, d: Dict[str, str], set_method_get: bool = True
2325 ) -> None:
2326 """
2327 Add GET parameters.
2329 Args:
2330 d: dictionary of ``{parameter: value}`` pairs.
2331 set_method_get: also set the request's method to GET?
2332 """
2333 if not d:
2334 return
2335 # webob.request.BaseRequest.GET reads from self.environ['QUERY_STRING']
2336 paramdict = self.GET.copy() # type: MultiDict
2337 paramdict.update(d)
2338 env = self.environ
2339 # Delete the cached version.
2340 if self._CACHE_KEY in env:
2341 del env[self._CACHE_KEY]
2342 # Write the new version
2343 env[self._QUERY_STRING_KEY] = urllib.parse.urlencode(query=paramdict)
2344 if set_method_get:
2345 self.set_method_get()
2347 def set_get_params(
2348 self, d: Dict[str, str], set_method_get: bool = True
2349 ) -> None:
2350 """
2351 Clear any GET parameters, and then set them to new values.
2352 See :func:`add_get_params`.
2353 """
2354 self.clear_get_params()
2355 self.add_get_params(d, set_method_get=set_method_get)
2357 def set_post_body(self, body: bytes, set_method_post: bool = True) -> None:
2358 """
2359 Sets the fake POST body.
2361 Args:
2362 body: the body to set
2363 set_method_post: also set the request's method to POST?
2364 """
2365 log.debug("Applying fake POST body: {!r}", body)
2366 self.body = body
2367 self.content_length = len(body)
2368 if set_method_post:
2369 self.set_method_post()
2371 def fake_request_post_from_dict(
2372 self,
2373 d: Dict[str, str],
2374 encoding: str = "utf8",
2375 set_method_post: bool = True,
2376 ) -> None:
2377 """
2378 Sets the request's POST body according to a dictionary.
2380 Args:
2381 d: dictionary of ``{parameter: value}`` pairs.
2382 encoding: character encoding to use
2383 set_method_post: also set the request's method to POST?
2384 """
2385 # webob.request.BaseRequest.POST reads from 'body' (indirectly).
2386 body = make_post_body_from_dict(d, encoding=encoding)
2387 self.set_post_body(body, set_method_post=set_method_post)
2390_ = """
2391# A demonstration of the manipulation of superclass properties:
2393class Test(object):
2394 def __init__(self):
2395 self.a = 3
2397 @property
2398 def b(self):
2399 return 4
2402class Derived(Test):
2403 def __init__(self):
2404 super().__init__()
2405 self._superclass_b = super().b
2406 self._b = 4
2408 @property
2409 def b(self):
2410 print("Superclass b: {}".format(self._superclass_b.fget(self)))
2411 print("Self _b: {}".format(self._b))
2412 return self._b
2413 @b.setter
2414 def b(self, value):
2415 self._b = value
2418x = Test()
2419x.a # 3
2420x.a = 5
2421x.a # 5
2422x.b # 4
2423x.b = 6 # can't set attribute
2425y = Derived()
2426y.a # 3
2427y.a = 5
2428y.a # 5
2429y.b # 4
2430y.b = 6
2431y.b # 6
2433"""
2436def get_core_debugging_request() -> CamcopsDummyRequest:
2437 """
2438 Returns a basic :class:`CamcopsDummyRequest`.
2440 It reads its config (on first demand) from the config file specified in
2441 ``os.environ[ENVVAR_CONFIG_FILE]``.
2442 """
2443 with camcops_pyramid_configurator_context(debug_toolbar=False) as pyr_cfg:
2444 req = CamcopsDummyRequest(
2445 environ={
2446 # In URL sequence:
2447 WsgiEnvVar.WSGI_URL_SCHEME: "http",
2448 WsgiEnvVar.SERVER_NAME: "127.0.0.1",
2449 WsgiEnvVar.SERVER_PORT: "8000",
2450 WsgiEnvVar.SCRIPT_NAME: "",
2451 WsgiEnvVar.PATH_INFO: "/",
2452 } # environ parameter: goes to pyramid.testing.DummyRequest.__init__ # noqa
2453 )
2454 # ... must pass an actual dict to the "environ" parameter; os.environ
2455 # itself isn't OK ("TypeError: WSGI environ must be a dict; you passed
2456 # environ({'key1': 'value1', ...})
2458 # Being a CamcopsRequest, this object will read a config file from
2459 # os.environ[ENVVAR_CONFIG_FILE] -- not the environ dictionary above --
2460 # when needed. That means we can now rewrite some of these URL
2461 # components to give a valid external URL, if the config has the right
2462 # information.
2463 cfg = req.config
2464 req.environ[WsgiEnvVar.WSGI_URL_SCHEME] = cfg.external_url_scheme
2465 req.environ[WsgiEnvVar.SERVER_NAME] = cfg.external_server_name
2466 req.environ[WsgiEnvVar.SERVER_PORT] = cfg.external_server_port
2467 req.environ[WsgiEnvVar.SCRIPT_NAME] = cfg.external_script_name
2468 # PATH_INFO remains "/"
2470 req.registry = pyr_cfg.registry
2471 pyr_cfg.begin(request=req)
2472 return req
2475def get_command_line_request(user_id: int = None) -> CamcopsRequest:
2476 """
2477 Creates a dummy CamcopsRequest for use on the command line.
2478 By default, it does so for the system user. Optionally, you can specify a
2479 user by their ID number.
2481 - Presupposes that ``os.environ[ENVVAR_CONFIG_FILE]`` has been set, as it
2482 is in :func:`camcops_server.camcops.main`.
2484 **WARNING:** this does not provide a COMMIT/ROLLBACK context. If you use
2485 this directly, you must manage that yourself. Consider using
2486 :func:`command_line_request_context` instead.
2487 """
2488 log.debug(f"Creating command-line pseudo-request (user_id={user_id})")
2489 req = get_core_debugging_request()
2491 # If we proceed with an out-of-date database, we will have problems, and
2492 # those problems may not be immediately apparent, which is bad. So:
2493 req.config.assert_database_ok()
2495 # Ensure we have a user
2496 if user_id is None:
2497 req._debugging_user = User.get_system_user(req.dbsession)
2498 else:
2499 req._debugging_user = User.get_user_by_id(req.dbsession, user_id)
2501 log.debug(
2502 "Command-line request: external URL is {}", req.route_url(Routes.HOME)
2503 )
2504 return req
2507@contextmanager
2508def command_line_request_context(
2509 user_id: int = None,
2510) -> Generator[CamcopsRequest, None, None]:
2511 """
2512 Request objects are ubiquitous, and allow code to refer to the HTTP
2513 request, config, HTTP session, database session, and so on. Here we make
2514 a special sort of request for use from the command line, and provide it
2515 as a context manager that will COMMIT the database afterwards (because the
2516 normal method, via the Pyramid router, is unavailable).
2517 """
2518 req = get_command_line_request(user_id=user_id)
2519 yield req
2520 # noinspection PyProtectedMember
2521 req._finish_dbsession()
2524def get_unittest_request(
2525 dbsession: SqlASession, params: Dict[str, Any] = None
2526) -> CamcopsDummyRequest:
2527 """
2528 Creates a :class:`CamcopsDummyRequest` for use by unit tests.
2530 - Points to an existing database (e.g. SQLite in-memory database).
2531 - Presupposes that ``os.environ[ENVVAR_CONFIG_FILE]`` has been set, as it
2532 is in :func:`camcops_server.camcops.main`.
2533 """
2534 log.debug("Creating unit testing pseudo-request")
2535 req = get_core_debugging_request()
2536 req.set_get_params(params)
2538 req._debugging_db_session = dbsession
2540 return req