Coverage for cc_modules/cc_request.py : 44%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/cc_modules/cc_request.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27**Implements a Pyramid Request object customized for CamCOPS.**
29"""
31import collections
32from contextlib import contextmanager
33import datetime
34import gettext
35import logging
36import os
37import secrets
38from typing import (Any, Dict, Generator, List, Optional, Set,
39 Tuple, TYPE_CHECKING, Union)
40import urllib.parse
42from cardinal_pythonlib.datetimefunc import (
43 coerce_to_pendulum,
44 coerce_to_pendulum_date,
45 convert_datetime_to_utc,
46 format_datetime,
47 pendulum_to_utc_datetime_without_tz,
48)
49# from cardinal_pythonlib.debugging import get_caller_stack_info
50from cardinal_pythonlib.fileops import get_directory_contents_size, mkdir_p
51from cardinal_pythonlib.logs import BraceStyleAdapter
52from cardinal_pythonlib.plot import (
53 png_img_html_from_pyplot_figure,
54 svg_html_from_pyplot_figure,
55)
56import cardinal_pythonlib.rnc_web as ws
57from cardinal_pythonlib.wsgi.constants import WsgiEnvVar
58import lockfile
59from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
60from matplotlib.figure import Figure
61from matplotlib.font_manager import FontProperties
62from pendulum import Date, DateTime as Pendulum, Duration
63from pendulum.parsing.exceptions import ParserError
64from pyramid.config import Configurator
65from pyramid.decorator import reify
66from pyramid.httpexceptions import HTTPBadRequest, HTTPException
67from pyramid.interfaces import ISession
68from pyramid.request import Request
69from pyramid.response import Response
70from pyramid.testing import DummyRequest
71from sqlalchemy.engine.base import Engine
72from sqlalchemy.orm import sessionmaker
73from sqlalchemy.orm import Session as SqlASession
74from webob.multidict import MultiDict
76# Note: everything uder the sun imports this file, so keep the intra-package
77# imports as minimal as possible.
78from camcops_server.cc_modules.cc_baseconstants import (
79 DOCUMENTATION_URL,
80 ENVVAR_CONFIG_FILE,
81 TRANSLATIONS_DIR,
82)
83from camcops_server.cc_modules.cc_config import (
84 CamcopsConfig,
85 get_config,
86 get_config_filename_from_os_env,
87)
88from camcops_server.cc_modules.cc_constants import (
89 CSS_PAGED_MEDIA,
90 DateFormat,
91 PlotDefaults,
92 USE_SVG_IN_HTML,
93)
94from camcops_server.cc_modules.cc_idnumdef import (
95 get_idnum_definitions,
96 IdNumDefinition,
97 validate_id_number,
98)
99from camcops_server.cc_modules.cc_language import (
100 DEFAULT_LOCALE,
101 GETTEXT_DOMAIN,
102)
103# noinspection PyUnresolvedReferences
104import camcops_server.cc_modules.cc_plot # import side effects (configure matplotlib) # noqa
105from camcops_server.cc_modules.cc_pyramid import (
106 camcops_add_mako_renderer,
107 CamcopsAuthenticationPolicy,
108 CamcopsAuthorizationPolicy,
109 CookieKey,
110 get_session_factory,
111 Permission,
112 RequestMethod,
113 RouteCollection,
114 STATIC_CAMCOPS_PACKAGE_PATH,
115)
116from camcops_server.cc_modules.cc_response import camcops_response_factory
117from camcops_server.cc_modules.cc_serversettings import (
118 get_server_settings,
119 ServerSettings,
120)
121from camcops_server.cc_modules.cc_string import (
122 all_extra_strings_as_dicts,
123 APPSTRING_TASKNAME,
124 MISSING_LOCALE,
125)
126from camcops_server.cc_modules.cc_tabletsession import TabletSession
127from camcops_server.cc_modules.cc_text import SS, server_string
128from camcops_server.cc_modules.cc_user import User
129from camcops_server.cc_modules.cc_validators import (
130 STRING_VALIDATOR_TYPE,
131 validate_alphanum_underscore,
132 validate_redirect_url,
133)
135if TYPE_CHECKING:
136 from matplotlib.axis import Axis
137 from matplotlib.axes import Axes
138 # from matplotlib.figure import SubplotBase
139 from matplotlib.text import Text
140 from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
141 from camcops_server.cc_modules.cc_exportrecipientinfo import ExportRecipientInfo # noqa
142 from camcops_server.cc_modules.cc_session import CamcopsSession
143 from camcops_server.cc_modules.cc_snomed import SnomedConcept
145log = BraceStyleAdapter(logging.getLogger(__name__))
148# =============================================================================
149# Debugging options
150# =============================================================================
152DEBUG_ADD_ROUTES = False
153DEBUG_AUTHORIZATION = False
154DEBUG_CAMCOPS_SESSION = False
155DEBUG_DBSESSION_MANAGEMENT = False
156DEBUG_GETTEXT = False
157DEBUG_REQUEST_CREATION = False
158DEBUG_TABLET_SESSION = False
160if any([DEBUG_ADD_ROUTES,
161 DEBUG_AUTHORIZATION,
162 DEBUG_CAMCOPS_SESSION,
163 DEBUG_DBSESSION_MANAGEMENT,
164 DEBUG_GETTEXT,
165 DEBUG_REQUEST_CREATION,
166 DEBUG_TABLET_SESSION]):
167 log.warning("cc_request: Debugging options enabled!")
170# =============================================================================
171# Constants
172# =============================================================================
174TRUE_STRINGS_LOWER_CASE = ["true", "t", "1", "yes", "y"]
175FALSE_STRINGS_LOWER_CASE = ["false", "f", "0", "no", "n"]
178# =============================================================================
179# Modified Request interface, for type checking
180# =============================================================================
181# https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/auth/user_object.html
182# https://rollbar.com/blog/using-pyramid-request-factory-to-write-less-code/
183#
184# ... everything with reify=True is cached, so if we ask for something
185# more than once, we keep getting the same thing
186# ... https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.set_property # noqa
188class CamcopsRequest(Request):
189 """
190 The CamcopsRequest is an object central to all HTTP requests. It is the
191 main thing passed all around the server, and embodies what we need to know
192 about the client request -- including user information, ways of accessing
193 the database, and so on.
194 """
195 def __init__(self, *args, **kwargs):
196 """
197 This is called as the Pyramid request factory; see
198 ``config.set_request_factory(CamcopsRequest)``
200 What's the best way of handling the database client?
202 - With Titanium, we were constrained not to use cookies. With Qt, we
203 have the option.
204 - But are cookies a good idea?
205 Probably not; they are somewhat overcomplicated for this.
206 See also
208 - https://softwareengineering.stackexchange.com/questions/141019/
209 - https://stackoverflow.com/questions/6068113/do-sessions-really-violate-restfulness
211 - Let's continue to avoid cookies.
212 - We don't have to cache any information (we still send username/
213 password details with each request, and that is RESTful) but it
214 does save authentication time to do so on calls after the first.
215 - What we could try to do is:
217 - look up a session here, at Request creation time;
218 - add a new session if there wasn't one;
219 - but allow the database API code to replace that session (BEFORE
220 it's saved to the database and gains its PK) with another,
221 determined by the content.
222 - This gives one more database hit, but avoids the bcrypt time.
224 """ # noqa
225 super().__init__(*args, **kwargs)
226 self.use_svg = False # use SVG (not just PNG) for graphics
227 self.provide_png_fallback_for_svg = True # for SVG: provide PNG fallback image? # noqa
228 self.add_response_callback(complete_request_add_cookies)
229 self._camcops_session = None # type: Optional[CamcopsSession]
230 self._debugging_db_session = None # type: Optional[SqlASession] # for unit testing only # noqa
231 self._debugging_user = None # type: Optional[User] # for unit testing only # noqa
232 self._pending_export_push_requests = [] # type: List[Tuple[str, str, int]] # noqa
233 self._cached_sstring = {} # type: Dict[SS, str]
234 # Don't make the _camcops_session yet; it will want a Registry, and
235 # we may not have one yet; see command_line_request().
236 if DEBUG_REQUEST_CREATION:
237 log.debug("CamcopsRequest.__init__: args={!r}, kwargs={!r}",
238 args, kwargs)
240 # -------------------------------------------------------------------------
241 # HTTP nonce
242 # -------------------------------------------------------------------------
244 @reify
245 def nonce(self) -> str:
246 """
247 Return a nonce that is generated at random for each request, but
248 remains constant for that request (because we use ``@reify``).
250 See https://content-security-policy.com/examples/allow-inline-style/.
252 And for how to make one:
253 https://stackoverflow.com/questions/5590170/what-is-the-standard-method-for-generating-a-nonce-in-python
254 """ # noqa
255 return secrets.token_urlsafe()
257 # -------------------------------------------------------------------------
258 # CamcopsSession
259 # -------------------------------------------------------------------------
261 @property
262 def camcops_session(self) -> "CamcopsSession":
263 """
264 Returns the
265 :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for this
266 request (q.v.).
268 Contrast:
270 .. code-block:: none
272 ccsession = request.camcops_session # type: CamcopsSession
273 pyramid_session = request.session # type: ISession
274 """
275 if self._camcops_session is None:
276 from camcops_server.cc_modules.cc_session import CamcopsSession # delayed import # noqa
277 self._camcops_session = CamcopsSession.get_session_using_cookies(
278 self)
279 if DEBUG_CAMCOPS_SESSION:
280 log.debug("{!r}", self._camcops_session)
281 return self._camcops_session
283 def replace_camcops_session(self, ccsession: "CamcopsSession") -> None:
284 """
285 Replaces any existing
286 :class:`camcops_server.cc_modules.cc_session.CamcopsSession` with a new
287 one.
289 Rationale:
291 We may have created a new HTTP session because the request had no
292 cookies (added to the DB session but not yet saved), but we might
293 then enter the database/tablet upload API and find session details,
294 not from the cookies, but from the POST data. At that point, we
295 want to replace the session in the Request, without committing the
296 first one to disk.
297 """
298 if self._camcops_session is not None:
299 self.dbsession.expunge(self._camcops_session)
300 self._camcops_session = ccsession
302 def complete_request_add_cookies(self) -> None:
303 """
304 Finializes the response by adding session cookies.
305 We do this late so that we can hot-swap the session if we're using the
306 database/tablet API rather than a human web browser.
308 Response callbacks are called in the order first-to-most-recently-added.
309 See :class:`pyramid.request.CallbackMethodsMixin`.
311 That looks like we can add a callback in the process of running a
312 callback. And when we add a cookie to a Pyramid session, that sets a
313 callback. Let's give it a go...
314 """
315 # 2019-03-21: If we've not used a CamcopsSession (e.g. for serving
316 # a static view), do we care?
317 if self._camcops_session is None:
318 return
320 dbsession = self.dbsession
321 dbsession.flush() # sets the PK for ccsession, if it wasn't set
322 # Write the details back to the Pyramid session (will be persisted
323 # via the Response automatically):
324 pyramid_session = self.session # type: ISession
325 ccsession = self.camcops_session
326 pyramid_session[CookieKey.SESSION_ID] = str(ccsession.id)
327 pyramid_session[CookieKey.SESSION_TOKEN] = ccsession.token
328 # ... should cause the ISession to add a callback to add cookies,
329 # which will be called immediately after this one.
331 # -------------------------------------------------------------------------
332 # Config
333 # -------------------------------------------------------------------------
335 @reify
336 def config_filename(self) -> str:
337 """
338 Gets the CamCOPS config filename in use.
339 """
340 return get_config_filename_from_os_env()
342 @reify
343 def config(self) -> CamcopsConfig:
344 """
345 Return an instance of
346 :class:`camcops_server/cc_modules/cc_config.CamcopsConfig` for the
347 request.
349 Access it as ``request.config``, with no brackets.
350 """
351 config = get_config(config_filename=self.config_filename)
352 return config
354 # -------------------------------------------------------------------------
355 # Database
356 # -------------------------------------------------------------------------
358 @reify
359 def engine(self) -> Engine:
360 """
361 Returns the SQLAlchemy :class:`Engine` for the request.
362 """
363 cfg = self.config
364 return cfg.get_sqla_engine()
366 @reify
367 def dbsession(self) -> SqlASession:
368 """
369 Return an SQLAlchemy session for the relevant request.
371 The use of ``@reify`` makes this elegant. If and only if a view wants a
372 database, it can say
374 .. code-block:: python
376 dbsession = request.dbsession
378 and if it requests that, the cleanup callbacks (COMMIT or ROLLBACK) get
379 installed.
380 """
381 # log.critical("CamcopsRequest.dbsession: caller stack:\n{}",
382 # "\n".join(get_caller_stack_info()))
383 _dbsession = self.get_bare_dbsession()
385 def end_sqlalchemy_session(req: Request) -> None:
386 # noinspection PyProtectedMember
387 req._finish_dbsession()
389 # - For command-line pseudo-requests, add_finished_callback is no use,
390 # because that's called by the Pyramid routing framework.
391 # - So how do we autocommit a command-line session?
392 # - Hooking into CamcopsRequest.__del__ did not work: called, yes, but
393 # object state (e.g. newly inserted User objects) went wrong (e.g.
394 # the objects had been blanked somehow, or that's what the INSERT
395 # statements looked like).
396 # - Use a context manager instead; see below.
397 self.add_finished_callback(end_sqlalchemy_session)
399 if DEBUG_DBSESSION_MANAGEMENT:
400 log.warning(
401 "Returning SQLAlchemy session as CamcopsRequest.dbsession")
403 return _dbsession
405 def _finish_dbsession(self) -> None:
406 """
407 A database session has finished. COMMIT or ROLLBACK, depending on how
408 things went.
409 """
410 # Do NOT roll back "if req.exception is not None"; that includes
411 # all sorts of exceptions like HTTPFound, HTTPForbidden, etc.
412 # See also
413 # - https://docs.pylonsproject.org/projects/pyramid_cookbook/en/latest/pylons/exceptions.html # noqa
414 # But they are neatly subclasses of HTTPException, and isinstance()
415 # deals with None, so:
416 session = self.dbsession
417 if (self.exception is not None and
418 not isinstance(self.exception, HTTPException)):
419 log.critical(
420 "Request raised exception that wasn't an HTTPException; "
421 "rolling back; exception was: {!r}", self.exception)
422 session.rollback()
423 else:
424 if DEBUG_DBSESSION_MANAGEMENT:
425 log.warning("Committing to database")
426 session.commit()
427 if self._pending_export_push_requests:
428 self._process_pending_export_push_requests()
429 if DEBUG_DBSESSION_MANAGEMENT:
430 log.warning("Closing SQLAlchemy session")
431 session.close()
433 def get_bare_dbsession(self) -> SqlASession:
434 """
435 Returns a bare SQLAlchemy session for the request.
437 See :func:`dbsession`, the more commonly used wrapper function.
438 """
439 if self._debugging_db_session:
440 log.debug("Request is using debugging SQLAlchemy session")
441 return self._debugging_db_session
442 if DEBUG_DBSESSION_MANAGEMENT:
443 log.warning("Making SQLAlchemy session")
444 engine = self.engine
445 maker = sessionmaker(bind=engine)
446 session = maker() # type: SqlASession
447 return session
449 # -------------------------------------------------------------------------
450 # TabletSession
451 # -------------------------------------------------------------------------
453 @reify
454 def tabletsession(self) -> TabletSession:
455 """
456 Request a
457 :class:`camcops_server.cc_modules.cc_tabletsession.TabletSession`,
458 which is an information structure geared to client (tablet) database
459 accesses.
461 If we're using this interface, we also want to ensure we're using
462 the :class:`camcops_server.cc_modules.cc_session.CamcopsSession` for
463 the information provided by the tablet in the POST request, not
464 anything already loaded/reset via cookies.
465 """
466 from camcops_server.cc_modules.cc_session import CamcopsSession # delayed import # noqa
467 ts = TabletSession(self)
468 new_cc_session = CamcopsSession.get_session_for_tablet(ts)
469 # ... does login
470 self.replace_camcops_session(new_cc_session)
471 ts.set_session_id_token(new_cc_session.id, new_cc_session.token)
472 if DEBUG_TABLET_SESSION:
473 log.debug("CamcopsRequest: {!r}", self)
474 log.debug("CamcopsRequest.tabletsession: {!r}", ts)
475 log.debug("CamcopsRequest.camcops_session: {!r}",
476 self._camcops_session)
477 return ts
479 # -------------------------------------------------------------------------
480 # Date/time
481 # -------------------------------------------------------------------------
483 @reify
484 def now(self) -> Pendulum:
485 """
486 Returns the time of the request as an Pendulum object.
488 (Reified, so a request only ever has one time.)
489 Exposed as a property.
490 """
491 return Pendulum.now()
493 @reify
494 def now_utc(self) -> Pendulum:
495 """
496 Returns the time of the request as a UTC Pendulum.
497 """
498 p = self.now # type: Pendulum
499 return convert_datetime_to_utc(p)
501 @reify
502 def now_utc_no_tzinfo(self) -> datetime.datetime:
503 """
504 Returns the time of the request as a datetime in UTC with no timezone
505 information attached. For when you want to compare to something similar
506 without getting the error "TypeError: can't compare offset-naive and
507 offset-aware datetimes".
508 """
509 p = self.now # type: Pendulum
510 return pendulum_to_utc_datetime_without_tz(p)
512 @reify
513 def now_era_format(self) -> str:
514 """
515 Returns the request time in an ISO-8601 format suitable for use as a
516 CamCOPS ``era``.
517 """
518 return format_datetime(self.now_utc, DateFormat.ERA)
520 @property
521 def today(self) -> Date:
522 """
523 Returns today's date.
524 """
525 # noinspection PyTypeChecker
526 return self.now.date()
528 # -------------------------------------------------------------------------
529 # Logos, static files, and other institution-specific stuff
530 # -------------------------------------------------------------------------
532 @property
533 def url_local_institution(self) -> str:
534 """
535 Returns the local institution's home URL.
536 """
537 return self.config.local_institution_url
539 @property
540 def url_camcops_favicon(self) -> str:
541 """
542 Returns a URL to the favicon (see
543 https://en.wikipedia.org/wiki/Favicon) from within the CamCOPS static
544 files.
545 """
546 # Cope with reverse proxies, etc.
547 # https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request.static_url # noqa
548 return self.static_url(STATIC_CAMCOPS_PACKAGE_PATH +
549 "favicon_camcops.png")
551 @property
552 def url_camcops_logo(self) -> str:
553 """
554 Returns a URL to the CamCOPS logo from within our static files.
555 Returns:
557 """
558 return self.static_url(STATIC_CAMCOPS_PACKAGE_PATH +
559 "logo_camcops.png")
561 @property
562 def url_local_logo(self) -> str:
563 """
564 Returns a URL to the local institution's logo, from somewhere on our
565 server.
566 """
567 return self.static_url(STATIC_CAMCOPS_PACKAGE_PATH + "logo_local.png")
569 @property
570 def url_camcops_docs(self) -> str:
571 """
572 Returns the URL to the CamCOPS documentation.
573 """
574 return DOCUMENTATION_URL
576 # -------------------------------------------------------------------------
577 # Low-level HTTP information
578 # -------------------------------------------------------------------------
580 @reify
581 def remote_port(self) -> Optional[int]:
582 """
583 What port number is the client using?
585 The ``remote_port`` variable is an optional WSGI extra provided by some
586 frameworks, such as mod_wsgi.
588 The WSGI spec:
589 - https://www.python.org/dev/peps/pep-0333/
591 The CGI spec:
592 - https://en.wikipedia.org/wiki/Common_Gateway_Interface
594 The Pyramid Request object:
595 - https://docs.pylonsproject.org/projects/pyramid/en/latest/api/request.html#pyramid.request.Request
596 - ... note: that includes ``remote_addr``, but not ``remote_port``.
597 """ # noqa
598 try:
599 return int(self.environ.get("REMOTE_PORT", ""))
600 except (TypeError, ValueError):
601 return None
603 # -------------------------------------------------------------------------
604 # HTTP request convenience functions
605 # -------------------------------------------------------------------------
607 def has_param(self, key: str) -> bool:
608 """
609 Is the parameter in the request?
611 Args:
612 key: the parameter's name
613 """
614 return key in self.params
616 def get_str_param(
617 self,
618 key: str,
619 default: str = None,
620 lower: bool = False,
621 upper: bool = False,
622 validator: STRING_VALIDATOR_TYPE = validate_alphanum_underscore) \
623 -> Optional[str]:
624 """
625 Returns an HTTP parameter from the request (GET or POST). If it does
626 not exist, or is blank, return ``default``. If it fails the validator,
627 raise :exc:`pyramid.httpexceptions.HTTPBadRequest`.
629 Args:
630 key: the parameter's name
631 default: the value to return if the parameter is not found
632 lower: convert to lower case?
633 upper: convert to upper case?
634 validator: validator function
636 Returns:
637 the parameter's (string) contents, or ``default``
639 """
640 # HTTP parameters are always strings at heart
641 if key not in self.params: # missing from request?
642 return default
643 value = self.params.get(key)
644 if not value: # blank, e.g. "source=" in URL?
645 return default
646 assert isinstance(value, str) # ... or we wouldn't have got here
647 if lower:
648 value = value.lower()
649 elif upper:
650 value = value.upper()
651 try:
652 validator(value, self)
653 return value
654 except ValueError as e:
655 raise HTTPBadRequest(f"Bad {key!r} parameter: {e}")
657 def get_str_list_param(
658 self,
659 key: str,
660 lower: bool = False,
661 upper: bool = False,
662 validator: STRING_VALIDATOR_TYPE = validate_alphanum_underscore) \
663 -> List[str]:
664 """
665 Returns a list of HTTP parameter values from the request. Ensures all
666 have been validated.
668 Args:
669 key: the parameter's name
670 lower: convert to lower case?
671 upper: convert to upper case?
672 validator: validator function
674 Returns:
675 a list of string values
677 """
678 values = self.params.getall(key)
679 if lower:
680 values = [x.lower() for x in values]
681 elif upper:
682 values = [x.upper() for x in values]
683 try:
684 for v in values:
685 validator(v, self)
686 except ValueError as e:
687 raise HTTPBadRequest(
688 f"Parameter {key!r} contains a bad value: {e}")
689 return values
691 def get_int_param(self, key: str, default: int = None) -> Optional[int]:
692 """
693 Returns an integer parameter from the HTTP request.
695 Args:
696 key: the parameter's name
697 default: the value to return if the parameter is not found or is
698 not a valid integer
700 Returns:
701 an integer, or ``default``
703 """
704 try:
705 return int(self.params[key])
706 except (KeyError, TypeError, ValueError):
707 return default
709 def get_int_list_param(self, key: str) -> List[int]:
710 """
711 Returns a list of integer parameter values from the HTTP request.
713 Args:
714 key: the parameter's name
716 Returns:
717 a list of integer values
719 """
720 values = self.params.getall(key)
721 try:
722 return [int(x) for x in values]
723 except (KeyError, TypeError, ValueError):
724 return []
726 def get_bool_param(self, key: str, default: bool) -> bool:
727 """
728 Returns a boolean parameter from the HTTP request.
730 Args:
731 key: the parameter's name
732 default: the value to return if the parameter is not found or is
733 not a valid boolean value
735 Returns:
736 an integer, or ``default``
738 Valid "true" and "false" values (case-insensitive): see
739 ``TRUE_STRINGS_LOWER_CASE``, ``FALSE_STRINGS_LOWER_CASE``.
740 """
741 try:
742 param_str = self.params[key].lower()
743 if param_str in TRUE_STRINGS_LOWER_CASE:
744 return True
745 elif param_str in FALSE_STRINGS_LOWER_CASE:
746 return False
747 else:
748 return default
749 except (AttributeError, KeyError, TypeError, ValueError):
750 return default
752 def get_date_param(self, key: str) -> Optional[Date]:
753 """
754 Returns a date parameter from the HTTP request. If it is missing or
755 looks bad, return ``None``.
757 Args:
758 key: the parameter's name
760 Returns:
761 a :class:`pendulum.Date`, or ``None``
762 """
763 try:
764 return coerce_to_pendulum_date(self.params[key])
765 except (KeyError, ParserError, TypeError, ValueError):
766 return None
768 def get_datetime_param(self, key: str) -> Optional[Pendulum]:
769 """
770 Returns a datetime parameter from the HTTP request. If it is missing or
771 looks bad, return ``None``.
773 Args:
774 key: the parameter's name
776 Returns:
777 a :class:`pendulum.DateTime`, or ``None``
778 """
779 try:
780 return coerce_to_pendulum(self.params[key])
781 except (KeyError, ParserError, TypeError, ValueError):
782 return None
784 def get_redirect_url_param(self,
785 key: str,
786 default: str = None) -> Optional[str]:
787 """
788 Returns a redirection URL parameter from the HTTP request, validating
789 it. (The validation process does not allow all types of URLs!)
790 If it was missing, return ``default``. If it was bad, raise
791 :exc:`pyramid.httpexceptions.HTTPBadRequest`.
793 Args:
794 key:
795 the parameter's name
796 default:
797 the value to return if the parameter is not found, or is
798 invalid
800 Returns:
801 a URL string, or ``default``
802 """
803 return self.get_str_param(key, default=default,
804 validator=validate_redirect_url)
806 # -------------------------------------------------------------------------
807 # Routing
808 # -------------------------------------------------------------------------
810 def route_url_params(self, route_name: str,
811 paramdict: Dict[str, Any]) -> str:
812 """
813 Provides a simplified interface to :func:`Request.route_url` when you
814 have parameters to pass.
816 It does two things:
818 (1) convert all params to their ``str()`` form;
819 (2) allow you to pass parameters more easily using a string
820 parameter name.
822 The normal Pyramid Request use is:
824 .. code-block:: python
826 Request.route_url(route_name, param1=value1, param2=value2)
828 where "param1" is the literal name of the parameter, but here we can do
830 .. code-block:: python
832 CamcopsRequest.route_url_params(route_name, {
833 PARAM1_NAME: value1_not_necessarily_str,
834 PARAM2_NAME: value2
835 })
837 """
838 strparamdict = {k: str(v) for k, v in paramdict.items()}
839 return self.route_url(route_name, **strparamdict)
841 # -------------------------------------------------------------------------
842 # Strings
843 # -------------------------------------------------------------------------
845 @reify
846 def _all_extra_strings(self) -> Dict[str, Dict[str, Dict[str, str]]]:
847 """
848 Returns all CamCOPS "extra strings" (from XML files) in the format
849 used by :func:`camcops_server.cc_string.all_extra_strings_as_dicts`.
850 """
851 return all_extra_strings_as_dicts(self.config_filename)
853 def xstring(self,
854 taskname: str,
855 stringname: str,
856 default: str = None,
857 provide_default_if_none: bool = True,
858 language: str = None) -> Optional[str]:
859 """
860 Looks up a string from one of the optional extra XML string files.
862 Args:
863 taskname: task name (top-level key)
864 stringname: string name within task (second-level key)
865 default: default to return if the string is not found
866 provide_default_if_none: if ``True`` and ``default is None``,
867 return a helpful missing-string message in the style
868 "string x.y not found"
869 language: language code to use, e.g. ``en-GB``; if ``None`` is
870 passed, the default behaviour is to look up the current
871 language for this request (see :meth:`language`).
873 Returns:
874 the "extra string"
876 """
877 # For speed, calculate default only if needed:
878 allstrings = self._all_extra_strings
879 if taskname in allstrings:
880 taskstrings = allstrings[taskname]
881 if stringname in taskstrings:
882 langversions = taskstrings[stringname]
883 if language is None:
884 language = self.language
885 if language: # Specific language requested
886 # 1. Requested language, e.g. "en-GB"
887 if language in langversions:
888 return langversions[language]
889 # 2. Same language, different country, e.g. "en-US"
890 shortlang = language[:2] # e.g. "en"
891 for key in langversions.keys():
892 if key.startswith(shortlang):
893 return langversions[shortlang]
894 # 3. Default language
895 if DEFAULT_LOCALE in langversions:
896 return langversions[DEFAULT_LOCALE]
897 # 4. Strings with no language specified in the XML
898 if MISSING_LOCALE in langversions:
899 return langversions[MISSING_LOCALE]
900 # Not found
901 if default is None and provide_default_if_none:
902 default = (
903 f"EXTRA_STRING_NOT_FOUND({taskname}.{stringname}[{language}])"
904 )
905 return default
907 def wxstring(self,
908 taskname: str,
909 stringname: str,
910 default: str = None,
911 provide_default_if_none: bool = True,
912 language: str = None) -> Optional[str]:
913 """
914 Returns a web-safe version of an :func:`xstring` (q.v.).
915 """
916 value = self.xstring(taskname, stringname, default,
917 provide_default_if_none=provide_default_if_none,
918 language=language)
919 if value is None and not provide_default_if_none:
920 return None
921 return ws.webify(value)
923 def wappstring(self,
924 stringname: str,
925 default: str = None,
926 provide_default_if_none: bool = True,
927 language: str = None) -> Optional[str]:
928 """
929 Returns a web-safe version of an appstring (an app-wide extra string).
930 This uses the XML file shared between the client and the server.
931 """
932 value = self.xstring(APPSTRING_TASKNAME, stringname, default,
933 provide_default_if_none=provide_default_if_none,
934 language=language)
935 if value is None and not provide_default_if_none:
936 return None
937 return ws.webify(value)
939 def get_all_extra_strings(self) -> List[Tuple[str, str, str, str]]:
940 """
941 Returns all extra strings, as a list of ``task, name, language, value``
942 tuples.
944 2019-09-16: these are filtered according to the :ref:`RESTRICTED_TASKS
945 <RESTRICTED_TASKS>` option.
946 """
947 restricted_tasks = self.config.restricted_tasks
948 user_group_names = None # type: Optional[Set[str]]
950 def task_permitted(task_xml_name: str) -> bool:
951 nonlocal user_group_names
952 if task_xml_name not in restricted_tasks:
953 return True
954 if user_group_names is None:
955 user_group_names = set(self.user.group_names)
956 permitted_group_names = set(restricted_tasks[task_xml_name])
957 return bool(permitted_group_names.intersection(user_group_names))
959 allstrings = self._all_extra_strings
960 rows = []
961 for task, taskstrings in allstrings.items():
962 if not task_permitted(task):
963 log.debug(f"Skipping extra string download for task {task}: "
964 f"not permitted for user {self.user.username}")
965 continue
966 for name, langversions in taskstrings.items():
967 for language, value in langversions.items():
968 rows.append((task, name, language, value))
969 return rows
971 def task_extrastrings_exist(self, taskname: str) -> bool:
972 """
973 Has the server been supplied with any extra strings for a specific
974 task?
975 """
976 allstrings = self._all_extra_strings
977 return taskname in allstrings
979 def extrastring_families(self, sort: bool = True) -> List[str]:
980 """
981 Which sets of extra strings do we have? A "family" here means, for
982 example, "the server itself", "the PHQ9 task", etc.
983 """
984 families = list(self._all_extra_strings.keys())
985 if sort:
986 families.sort()
987 return families
989 @reify
990 def language(self) -> str:
991 """
992 Returns the language code selected by the current user, or if none is
993 selected (or the user isn't logged in) the server's default language.
995 Returns:
996 str: a language code of the form ``en-GB``
998 """
999 if self.user is not None:
1000 language = self.user.language
1001 if language:
1002 return language
1003 # Fallback to default
1004 return self.config.language
1006 def gettext(self, message: str) -> str:
1007 """
1008 Returns a version of ``msg`` translated into the current language.
1009 This is used for server-only strings.
1011 The ``gettext()`` function is normally aliased to ``_()`` for
1012 auto-translation tools to read the souce code.
1013 """
1014 lang = self.language
1015 # We can't work out if the string is missing; gettext falls back to
1016 # the source message.
1017 if lang == DEFAULT_LOCALE:
1018 translated = message
1019 else:
1020 try:
1021 translator = gettext.translation(
1022 domain=GETTEXT_DOMAIN,
1023 localedir=TRANSLATIONS_DIR,
1024 languages=[lang]
1025 )
1026 translated = translator.gettext(message)
1027 except OSError: # e.g. translation file not found
1028 log.warning(f"Failed to find translation files for {lang}")
1029 translated = message
1030 if DEBUG_GETTEXT:
1031 return f"[{message}→{lang}→{translated}]"
1032 else:
1033 return translated
1035 def wgettext(self, message: str) -> str:
1036 """
1037 A web-safe version of :func:`gettext`.
1038 """
1039 return ws.webify(self.gettext(message))
1041 def sstring(self, which_string: SS) -> str:
1042 """
1043 Returns a translated server string via a lookup mechanism.
1045 Args:
1046 which_string:
1047 which string? A :class:`camcops_server.cc_modules.cc_text.SS`
1048 enumeration value
1050 Returns:
1051 str: the string
1053 """
1054 try:
1055 result = self._cached_sstring[which_string]
1056 except KeyError:
1057 result = server_string(self, which_string)
1058 self._cached_sstring[which_string] = result
1059 return result
1061 def wsstring(self, which_string: SS) -> str:
1062 """
1063 Returns a web-safe version of a translated server string via a lookup
1064 mechanism.
1066 Args:
1067 which_string:
1068 which string? A :class:`camcops_server.cc_modules.cc_text.SS`
1069 enumeration value
1071 Returns:
1072 str: the string
1074 """
1075 return ws.webify(self.sstring(which_string))
1077 # -------------------------------------------------------------------------
1078 # PNG versus SVG output, so tasks don't have to care (for e.g. PDF/web)
1079 # -------------------------------------------------------------------------
1081 def prepare_for_pdf_figures(self) -> None:
1082 """
1083 Switch the server (for this request) to producing figures in a format
1084 most suitable for PDF.
1085 """
1086 if CSS_PAGED_MEDIA:
1087 # unlikely -- we use wkhtmltopdf instead now
1088 self.switch_output_to_png()
1089 # ... even weasyprint's SVG handling is inadequate
1090 else:
1091 # This is the main method -- we use wkhtmltopdf these days
1092 self.switch_output_to_svg(provide_png_fallback=False)
1093 # ... wkhtmltopdf can cope with SVGs
1095 def prepare_for_html_figures(self) -> None:
1096 """
1097 Switch the server (for this request) to producing figures in a format
1098 most suitable for HTML.
1099 """
1100 self.switch_output_to_svg()
1102 def switch_output_to_png(self) -> None:
1103 """
1104 Switch server (for this request) to producing figures in PNG format.
1105 """
1106 self.use_svg = False
1108 def switch_output_to_svg(self, provide_png_fallback: bool = True) -> None:
1109 """
1110 Switch server (for this request) to producing figures in SVG format.
1112 Args:
1113 provide_png_fallback:
1114 Offer a PNG fallback option/
1115 """
1116 self.use_svg = True
1117 self.provide_png_fallback_for_svg = provide_png_fallback
1119 @staticmethod
1120 def create_figure(**kwargs) -> Figure:
1121 """
1122 Creates and returns a :class:`matplotlib.figure.Figure` with a canvas.
1123 The canvas will be available as ``fig.canvas``.
1124 """
1125 fig = Figure(**kwargs)
1126 # noinspection PyUnusedLocal
1127 canvas = FigureCanvas(fig) # noqa: F841
1128 # The canvas will be now available as fig.canvas, since
1129 # FigureCanvasBase.__init__ calls fig.set_canvas(self); similarly, the
1130 # figure is available from the canvas as canvas.figure
1132 # How do we set the font, so the caller doesn't have to?
1133 # The "nasty global" way is:
1134 # matplotlib.rc('font', **fontdict)
1135 # matplotlib.rc('legend', **fontdict)
1136 # or similar. Then matplotlib often works its way round to using its
1137 # global rcParams object, which is Not OK in a multithreaded context.
1138 #
1139 # https://github.com/matplotlib/matplotlib/issues/6514
1140 # https://github.com/matplotlib/matplotlib/issues/6518
1141 #
1142 # The other way is to specify a fontdict with each call, e.g.
1143 # ax.set_xlabel("some label", **fontdict)
1144 # https://stackoverflow.com/questions/21321670/how-to-change-fonts-in-matplotlib-python # noqa
1145 # Relevant calls with explicit "fontdict: Dict" parameters:
1146 # ax.set_xlabel(..., fontdict=XXX, ...)
1147 # ax.set_ylabel(..., fontdict=XXX, ...)
1148 # ax.set_xticklabels(..., fontdict=XXX, ...)
1149 # ax.set_yticklabels(..., fontdict=XXX, ...)
1150 # ax.text(..., fontdict=XXX, ...)
1151 # ax.set_label_text(..., fontdict=XXX, ...)
1152 # ax.set_title(..., fontdict=XXX, ...)
1153 #
1154 # And with "fontproperties: FontProperties"
1155 # sig.suptitle(..., fontproperties=XXX, ...)
1156 #
1157 # And with "prop: FontProperties":
1158 # ax.legend(..., prop=XXX, ...)
1159 #
1160 # Then, some things are automatically plotted...
1162 return fig
1164 @reify
1165 def fontdict(self) -> Dict[str, Any]:
1166 """
1167 Returns a font dictionary for use with Matplotlib plotting.
1169 **matplotlib font handling and fontdict parameter**
1171 - https://stackoverflow.com/questions/3899980
1172 - https://matplotlib.org/users/customizing.html
1173 - matplotlib/font_manager.py
1175 - Note that the default TrueType font is "DejaVu Sans"; see
1176 :class:`matplotlib.font_manager.FontManager`
1178 - Example sequence:
1180 - CamCOPS does e.g. ``ax.set_xlabel("Date/time",
1181 fontdict=self.req.fontdict)``
1183 - matplotlib.axes.Axes.set_xlabel:
1184 https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.set_xlabel.html
1186 - matplotlib.axes.Axes.text documentation, explaining the fontdict
1187 parameter:
1188 https://matplotlib.org/api/_as_gen/matplotlib.axes.Axes.text.html
1190 - What's created is probably a :class:`matplotlib.text.Text` object,
1191 whose ``update()`` function is called with the dictionary. Via its
1192 superclass :class:`matplotlib.artist.Artist` and its ``update()``
1193 function, this sets attributes on the Text object. Ultimately,
1194 without having explored this in too much depth, it's probably the
1195 ``self._fontproperties`` object of Text that holds this info.
1197 - That is an instance of
1198 :class:`matplotlib.font_manager.FontProperties`.
1200 **Linux fonts**
1202 Anyway, the main things are (1) that the relevant fonts need to be
1203 installed, and (2) that the default is DejaVu Sans.
1205 - Linux fonts are installed in ``/usr/share/fonts``, and TrueType fonts
1206 within ``/usr/share/fonts/truetype``.
1208 - Use ``fc-match`` to see the font mappings being used.
1210 - Use ``fc-list`` to list available fonts.
1212 - Use ``fc-cache`` to rebuild the font cache.
1214 - Files in ``/etc/fonts/conf.avail/`` do some thinking.
1216 **Problems with pixellated fonts in PDFs made via wkhtmltopdf**
1218 - See also https://github.com/wkhtmltopdf/wkhtmltopdf/issues/2193,
1219 about pixellated fonts via wkhtmltopdf (which was our problem for a
1220 subset of the fonts in trackers, on 2020-06-28, using wkhtmltopd
1221 0.12.5 with patched Qt).
1223 - When you get pixellated fonts in a PDF, look also at the embedded
1224 font list in the PDF (e.g. in Okular: File -> Properties -> Fonts).
1226 - Matplotlib helpfully puts the text (rendered as lines in SVG) as
1227 comments.
1229 - As a debugging sequence, we can manually trim the "pdfhtml" output
1230 down to just the SVG file. Still has problems. Yet there's no text
1231 in it; the text is made of pure SVG lines. And Chrome renders it
1232 perfectly. As does Firefox.
1234 - The rendering bug goes away entirely if you delete the opacity
1235 styling throughout the SVG:
1237 .. code-block:: none
1239 <g style="opacity:0.5;" transform=...>
1240 ^^^^^^^^^^^^^^^^^^^^
1241 this
1243 - So, simple fix:
1245 - rather than opacity (alpha) 0.5 and on top...
1247 - 50% grey colour and on the bottom.
1249 """ # noqa
1250 fontsize = self.config.plot_fontsize
1251 return dict(
1252 family='sans-serif',
1253 # ... serif, sans-serif, cursive, fantasy, monospace
1254 style='normal', # normal (roman), italic, oblique
1255 variant='normal', # normal, small-caps
1256 weight='normal',
1257 # ... normal [=400], bold [=700], bolder [relative to current],
1258 # lighter [relative], 100, 200, 300, ..., 900
1259 size=fontsize # in pt (default 12)
1260 )
1262 @reify
1263 def fontprops(self) -> FontProperties:
1264 """
1265 Return a :class:`matplotlib.font_manager.FontProperties` object for
1266 use with Matplotlib plotting.
1267 """
1268 return FontProperties(**self.fontdict)
1270 def set_figure_font_sizes(self,
1271 ax: "Axes", # "SubplotBase",
1272 fontdict: Dict[str, Any] = None,
1273 x_ticklabels: bool = True,
1274 y_ticklabels: bool = True) -> None:
1275 """
1276 Sets font sizes for the axes of the specified Matplotlib figure.
1278 Args:
1279 ax: the figure to modify
1280 fontdict: the font dictionary to use (if omitted, the default
1281 will be used)
1282 x_ticklabels: if ``True``, modify the X-axis tick labels
1283 y_ticklabels: if ``True``, modify the Y-axis tick labels
1284 """
1285 final_fontdict = self.fontdict.copy()
1286 if fontdict:
1287 final_fontdict.update(fontdict)
1288 fp = FontProperties(**final_fontdict)
1290 axes = [] # type: List[Axis]
1291 if x_ticklabels: # and hasattr(ax, "xaxis"):
1292 axes.append(ax.xaxis)
1293 if y_ticklabels: # and hasattr(ax, "yaxis"):
1294 axes.append(ax.yaxis)
1295 for axis in axes:
1296 for ticklabel in axis.get_ticklabels(which='both'): # type: Text # I think! # noqa
1297 ticklabel.set_fontproperties(fp)
1299 def get_html_from_pyplot_figure(self, fig: Figure) -> str:
1300 """
1301 Make HTML (as PNG or SVG) from pyplot
1302 :class:`matplotlib.figure.Figure`.
1303 """
1304 if USE_SVG_IN_HTML and self.use_svg:
1305 result = svg_html_from_pyplot_figure(fig)
1306 if self.provide_png_fallback_for_svg:
1307 # return both an SVG and a PNG image, for browsers that can't
1308 # deal with SVG; the Javascript header will sort this out
1309 # http://www.voormedia.nl/blog/2012/10/displaying-and-detecting-support-for-svg-images # noqa
1310 result += png_img_html_from_pyplot_figure(
1311 fig, PlotDefaults.DEFAULT_PLOT_DPI, "pngfallback")
1312 return result
1313 else:
1314 return png_img_html_from_pyplot_figure(
1315 fig, PlotDefaults.DEFAULT_PLOT_DPI)
1317 # -------------------------------------------------------------------------
1318 # Convenience functions for user information
1319 # -------------------------------------------------------------------------
1321 @property
1322 def user(self) -> Optional["User"]:
1323 """
1324 Returns the :class:`camcops_server.cc_modules.cc_user.User` for the
1325 current request.
1326 """
1327 return self._debugging_user or self.camcops_session.user
1329 @property
1330 def user_id(self) -> Optional[int]:
1331 """
1332 Returns the integer user ID for the current request.
1333 """
1334 if self._debugging_user:
1335 return self._debugging_user.id
1336 return self.camcops_session.user_id
1338 # -------------------------------------------------------------------------
1339 # ID number definitions
1340 # -------------------------------------------------------------------------
1342 @reify
1343 def idnum_definitions(self) -> List[IdNumDefinition]:
1344 """
1345 Returns all
1346 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` objects.
1347 """
1348 return get_idnum_definitions(self.dbsession) # no longer cached
1350 @reify
1351 def valid_which_idnums(self) -> List[int]:
1352 """
1353 Returns the ``which_idnum`` values for all ID number definitions.
1354 """
1355 return [iddef.which_idnum for iddef in self.idnum_definitions]
1356 # ... pre-sorted
1358 def get_idnum_definition(self,
1359 which_idnum: int) -> Optional[IdNumDefinition]:
1360 """
1361 Retrieves an
1362 :class:`camcops_server.cc_modules.cc_idnumdef.IdNumDefinition` for the
1363 specified ``which_idnum`` value.
1364 """
1365 return next((iddef for iddef in self.idnum_definitions
1366 if iddef.which_idnum == which_idnum), None)
1368 def get_id_desc(self, which_idnum: int,
1369 default: str = None) -> Optional[str]:
1370 """
1371 Get the server's ID description for the specified ``which_idnum``
1372 value.
1373 """
1374 return next((iddef.description for iddef in self.idnum_definitions
1375 if iddef.which_idnum == which_idnum),
1376 default)
1378 def get_id_shortdesc(self, which_idnum: int,
1379 default: str = None) -> Optional[str]:
1380 """
1381 Get the server's short ID description for the specified ``which_idnum``
1382 value.
1383 """
1384 return next((iddef.short_description
1385 for iddef in self.idnum_definitions
1386 if iddef.which_idnum == which_idnum),
1387 default)
1389 def is_idnum_valid(self, which_idnum: int,
1390 idnum_value: Optional[int]) -> bool:
1391 """
1392 Does the ID number pass any extended validation checks?
1394 Args:
1395 which_idnum: which ID number type is this?
1396 idnum_value: ID number value
1398 Returns:
1399 bool: valid?
1400 """
1401 idnumdef = self.get_idnum_definition(which_idnum)
1402 if not idnumdef:
1403 return False
1404 valid, _ = validate_id_number(self,
1405 idnum_value, idnumdef.validation_method)
1406 return valid
1408 def why_idnum_invalid(self, which_idnum: int,
1409 idnum_value: Optional[int]) -> str:
1410 """
1411 Why does the ID number fail any extended validation checks?
1413 Args:
1414 which_idnum: which ID number type is this?
1415 idnum_value: ID number value
1417 Returns:
1418 str: why invalid? (Human-readable string.)
1419 """
1420 idnumdef = self.get_idnum_definition(which_idnum)
1421 if not idnumdef:
1422 _ = self.gettext
1423 return _("Can't fetch ID number definition")
1424 _, why = validate_id_number(self,
1425 idnum_value, idnumdef.validation_method)
1426 return why
1428 # -------------------------------------------------------------------------
1429 # Server settings
1430 # -------------------------------------------------------------------------
1432 @reify
1433 def server_settings(self) -> ServerSettings:
1434 """
1435 Return the
1436 :class:`camcops_server.cc_modules.cc_serversettings.ServerSettings` for
1437 the server.
1438 """
1439 return get_server_settings(self)
1441 @reify
1442 def database_title(self) -> str:
1443 """
1444 Return the database friendly title for the server.
1445 """
1446 ss = self.server_settings
1447 return ss.database_title or ""
1449 def set_database_title(self, title: str) -> None:
1450 """
1451 Sets the database friendly title for the server.
1452 """
1453 ss = self.server_settings
1454 ss.database_title = title
1456 # -------------------------------------------------------------------------
1457 # SNOMED-CT
1458 # -------------------------------------------------------------------------
1460 @reify
1461 def snomed_supported(self) -> bool:
1462 """
1463 Is SNOMED-CT supported for CamCOPS tasks?
1464 """
1465 return bool(self.config.get_task_snomed_concepts())
1467 def snomed(self, lookup: str) -> "SnomedConcept":
1468 """
1469 Fetches a SNOMED-CT concept for a CamCOPS task.
1471 Args:
1472 lookup: a CamCOPS SNOMED lookup string
1474 Returns:
1475 a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
1477 Raises:
1478 :exc:`KeyError`, if the lookup cannot be found (e.g. UK data not
1479 installed)
1480 """
1481 concepts = self.config.get_task_snomed_concepts()
1482 assert concepts, "No SNOMED-CT data available for CamCOPS tasks"
1483 return concepts[lookup]
1485 @reify
1486 def icd9cm_snomed_supported(self) -> bool:
1487 """
1488 Is SNOMED-CT supported for ICD-9-CM codes?
1489 """
1490 return bool(self.config.get_icd9cm_snomed_concepts())
1492 def icd9cm_snomed(self, code: str) -> List["SnomedConcept"]:
1493 """
1494 Fetches a SNOMED-CT concept for an ICD-9-CM code
1496 Args:
1497 code: an ICD-9-CM code
1499 Returns:
1500 a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
1502 Raises:
1503 :exc:`KeyError`, if the lookup cannot be found (e.g. data not
1504 installed)
1505 """
1506 concepts = self.config.get_icd9cm_snomed_concepts()
1507 assert concepts, "No SNOMED-CT data available for ICD-9-CM"
1508 return concepts[code]
1510 @reify
1511 def icd10_snomed_supported(self) -> bool:
1512 """
1513 Is SNOMED-CT supported for ICD-10 codes?
1514 """
1515 return bool(self.config.get_icd9cm_snomed_concepts())
1517 def icd10_snomed(self, code: str) -> List["SnomedConcept"]:
1518 """
1519 Fetches a SNOMED-CT concept for an ICD-10 code
1521 Args:
1522 code: an ICD-10 code
1524 Returns:
1525 a :class:`camcops_server.cc_modules.cc_snomed.SnomedConcept`
1527 Raises:
1528 :exc:`KeyError`, if the lookup cannot be found (e.g. data not
1529 installed)
1530 """
1531 concepts = self.config.get_icd10_snomed_concepts()
1532 assert concepts, "No SNOMED-CT data available for ICD-10"
1533 return concepts[code]
1535 # -------------------------------------------------------------------------
1536 # Export recipients
1537 # -------------------------------------------------------------------------
1539 def get_export_recipients(self,
1540 recipient_names: List[str] = None,
1541 all_recipients: bool = False,
1542 all_push_recipients: bool = False,
1543 save: bool = True,
1544 database_versions: bool = True) \
1545 -> List[Union["ExportRecipient", "ExportRecipientInfo"]]:
1546 """
1547 Returns a list of export recipients, with some filtering if desired.
1548 Validates them against the database.
1550 - If ``all_recipients``, return all.
1551 - Otherwise, if ``all_push_recipients``, return all "push" recipients.
1552 - Otherwise, return all named in ``recipient_names``.
1554 - If any are invalid, raise an error.
1555 - If any are duplicate, raise an error.
1557 Args:
1558 all_recipients: use all recipients?
1559 all_push_recipients: use all "push" recipients?
1560 recipient_names: recipient names
1561 save: save any freshly created recipient records to the DB?
1562 database_versions: return ExportRecipient objects that are attached
1563 to a database session (rather than ExportRecipientInfo objects
1564 that aren't)?
1566 Returns:
1567 list: of :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
1569 Raises:
1570 - :exc:`ValueError` if a name is invalid
1571 - :exc:`ValueError` if a name is duplicated
1572 - :exc:`camcops_server.cc_modules.cc_exportrecipient.InvalidExportRecipient`
1573 if an export recipient configuration is invalid
1574 """ # noqa
1575 # Delayed imports
1576 from camcops_server.cc_modules.cc_exportrecipient import \
1577 ExportRecipient # delayed import # noqa
1579 # Check parameters
1580 recipient_names = recipient_names or [] # type: List[str]
1581 if save and not database_versions:
1582 raise AssertionError("Can't save unless taking database versions")
1584 # Start with ExportRecipientInfo objects:
1585 recipientinfolist = self.config.get_all_export_recipient_info()
1587 # Restrict
1588 if not all_recipients:
1589 if all_push_recipients:
1590 recipientinfolist = [r for r in recipientinfolist if r.push]
1591 else:
1592 # Specified by name
1593 duplicates = [name for name, count in
1594 collections.Counter(recipient_names).items()
1595 if count > 1]
1596 if duplicates:
1597 raise ValueError(f"Duplicate export recipients "
1598 f"specified: {duplicates!r}")
1599 valid_names = set(r.recipient_name for r in recipientinfolist)
1600 bad_names = [name for name in recipient_names
1601 if name not in valid_names]
1602 if bad_names:
1603 raise ValueError(
1604 f"Bad export recipients specified: {bad_names!r}. "
1605 f"Valid recipients are: {valid_names!r}")
1606 recipientinfolist = [r for r in recipientinfolist
1607 if r.recipient_name in recipient_names]
1609 # Complete validation
1610 for r in recipientinfolist:
1611 r.validate(self)
1613 # Does the caller want them as ExportRecipientInfo objects
1614 if not database_versions:
1615 return recipientinfolist
1617 # Convert to SQLAlchemy ORM ExportRecipient objects:
1618 recipients = [ExportRecipient(x) for x in recipientinfolist] # type: List[ExportRecipient] # noqa
1620 final_recipients = [] # type: List[ExportRecipient]
1621 dbsession = self.dbsession
1623 def process_final_recipients(_save: bool) -> None:
1624 for r in recipients:
1625 other = ExportRecipient.get_existing_matching_recipient(
1626 dbsession, r)
1627 if other:
1628 # This other one matches, and is already in the database.
1629 # Use it. But first...
1630 for attrname in ExportRecipient.NEEDS_RECOPYING_EACH_TIME_FROM_CONFIG_ATTRNAMES: # noqa
1631 setattr(other, attrname, getattr(r, attrname))
1632 # OK.
1633 final_recipients.append(other)
1634 else:
1635 # Our new object doesn't match. Use (+/- save) it.
1636 if save:
1637 log.debug(
1638 "Creating new ExportRecipient record in database")
1639 dbsession.add(r)
1640 r.current = True
1641 final_recipients.append(r)
1643 if save:
1644 lockfilename = self.config.get_master_export_recipient_lockfilename() # noqa
1645 with lockfile.FileLock(lockfilename, timeout=None): # waits forever if necessary # noqa
1646 process_final_recipients(_save=True)
1647 else:
1648 process_final_recipients(_save=False)
1650 # OK
1651 return final_recipients
1653 def get_export_recipient(self,
1654 recipient_name: str,
1655 save: bool = True) -> "ExportRecipient":
1656 """
1657 Returns a single validated export recipient, given its name.
1659 Args:
1660 recipient_name: recipient name
1661 save: save any freshly created recipient records to the DB?
1663 Returns:
1664 list: of :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
1666 Raises:
1667 - :exc:`ValueError` if a name is invalid
1668 - :exc:`camcops_server.cc_modules.cc_exportrecipient.InvalidExportRecipient`
1669 if an export recipient configuration is invalid
1670 """ # noqa
1671 recipients = self.get_export_recipients([recipient_name], save=save)
1672 assert len(recipients) == 1
1673 return recipients[0]
1675 @reify
1676 def all_push_recipients(self) -> List["ExportRecipient"]:
1677 """
1678 Cached for speed (will potentially be called for multiple tables in
1679 a bulk upload).
1680 """
1681 return self.get_export_recipients(
1682 all_push_recipients=True,
1683 save=False,
1684 database_versions=True, # we need group ID info somehow
1685 )
1687 def add_export_push_request(self,
1688 recipient_name: str,
1689 basetable: str,
1690 task_pk: int) -> None:
1691 """
1692 Adds a request to push a task to an export recipient.
1694 The reason we use this slightly convoluted approach is because
1695 otherwise, it's very easy to generate a backend request for a new task
1696 before it's actually been committed (so the backend finds no task).
1698 Args:
1699 recipient_name: name of the recipient
1700 basetable: name of the task's base table
1701 task_pk: server PK of the task
1702 """
1703 self._pending_export_push_requests.append(
1704 (recipient_name, basetable, task_pk)
1705 )
1707 def _process_pending_export_push_requests(self) -> None:
1708 """
1709 Sends pending export push requests to the backend.
1711 Called after the COMMIT.
1712 """
1713 from camcops_server.cc_modules.celery import export_task_backend # delayed import # noqa
1715 for recipient_name, basetable, task_pk in self._pending_export_push_requests: # noqa
1716 export_task_backend.delay(
1717 recipient_name=recipient_name,
1718 basetable=basetable,
1719 task_pk=task_pk
1720 )
1722 # -------------------------------------------------------------------------
1723 # User downloads
1724 # -------------------------------------------------------------------------
1726 @property
1727 def user_download_dir(self) -> str:
1728 """
1729 The directory in which this user's downloads should be/are stored, or a
1730 blank string if user downloads are not available. Also ensures it
1731 exists.
1732 """
1733 if self.config.user_download_max_space_mb <= 0:
1734 return ""
1735 basedir = self.config.user_download_dir
1736 if not basedir:
1737 return ""
1738 user_id = self.user_id
1739 if user_id is None:
1740 return ""
1741 userdir = os.path.join(basedir, str(user_id))
1742 mkdir_p(userdir)
1743 return userdir
1745 @property
1746 def user_download_bytes_permitted(self) -> int:
1747 """
1748 Amount of space the user is permitted.
1749 """
1750 if not self.user_download_dir:
1751 return 0
1752 return self.config.user_download_max_space_mb * 1024 * 1024
1754 @reify
1755 def user_download_bytes_used(self) -> int:
1756 """
1757 Returns the disk space used by this user.
1758 """
1759 download_dir = self.user_download_dir
1760 if not download_dir:
1761 return 0
1762 return get_directory_contents_size(download_dir)
1764 @property
1765 def user_download_bytes_available(self) -> int:
1766 """
1767 Returns the available space for this user in their download area.
1768 """
1769 permitted = self.user_download_bytes_permitted
1770 used = self.user_download_bytes_used
1771 available = permitted - used
1772 return available
1774 @property
1775 def user_download_lifetime_duration(self) -> Duration:
1776 """
1777 Returns the lifetime of user download objects.
1778 """
1779 return Duration(minutes=self.config.user_download_file_lifetime_min)
1782# noinspection PyUnusedLocal
1783def complete_request_add_cookies(req: CamcopsRequest,
1784 response: Response) -> None:
1785 """
1786 Finializes the response by adding session cookies.
1788 See :meth:`CamcopsRequest.complete_request_add_cookies`.
1789 """
1790 req.complete_request_add_cookies()
1793# =============================================================================
1794# Configurator
1795# =============================================================================
1797@contextmanager
1798def camcops_pyramid_configurator_context(
1799 debug_toolbar: bool = False,
1800 static_cache_duration_s: int = 0) -> Configurator:
1801 """
1802 Context manager to create a Pyramid configuration context, for making
1803 (for example) a WSGI server or a debugging request. That means setting up
1804 things like:
1806 - the authentication and authorization policies
1807 - our request and session factories
1808 - our Mako renderer
1809 - our routes and views
1811 Args:
1812 debug_toolbar:
1813 Add the Pyramid debug toolbar?
1814 static_cache_duration_s:
1815 Lifetime (in seconds) for the HTTP cache-control setting for
1816 static content.
1818 Returns:
1819 a :class:`Configurator` object
1821 Note this includes settings that transcend the config file.
1823 Most things should be in the config file. This enables us to run multiple
1824 configs (e.g. multiple CamCOPS databases) through the same process.
1825 However, some things we need to know right now, to make the WSGI app.
1826 Here, OS environment variables and command-line switches are appropriate.
1827 """
1829 # -------------------------------------------------------------------------
1830 # 1. Base app
1831 # -------------------------------------------------------------------------
1832 settings = { # Settings that can't be set directly?
1833 'debug_authorization': DEBUG_AUTHORIZATION,
1834 # ... see https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/security.html#debugging-view-authorization-failures # noqa
1835 }
1836 with Configurator(settings=settings) as config:
1837 # ---------------------------------------------------------------------
1838 # Authentication; authorizaion (permissions)
1839 # ---------------------------------------------------------------------
1840 authentication_policy = CamcopsAuthenticationPolicy()
1841 config.set_authentication_policy(authentication_policy)
1842 # Let's not use ACLAuthorizationPolicy, which checks an access control
1843 # list for a resource hierarchy of objects, but instead:
1844 authorization_policy = CamcopsAuthorizationPolicy()
1845 config.set_authorization_policy(authorization_policy)
1846 config.set_default_permission(Permission.HAPPY)
1847 # ... applies to all SUBSEQUENT view configuration registrations
1849 # ---------------------------------------------------------------------
1850 # Factories
1851 # ---------------------------------------------------------------------
1852 config.set_request_factory(CamcopsRequest)
1853 # ... for request attributes: config, database, etc.
1854 config.set_session_factory(get_session_factory())
1855 # ... for request.session
1856 config.set_response_factory(camcops_response_factory)
1858 # ---------------------------------------------------------------------
1859 # Renderers
1860 # ---------------------------------------------------------------------
1861 camcops_add_mako_renderer(config, extension='.mako')
1863 # deform_bootstrap.includeme(config)
1865 # ---------------------------------------------------------------------
1866 # Routes and accompanying views
1867 # ---------------------------------------------------------------------
1869 # Add static views
1870 # https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/assets.html#serving-static-assets # noqa
1871 # Hmm. We cannot fail to set up a static file route, because otherwise
1872 # we can't provide URLs to them.
1873 static_filepath = STATIC_CAMCOPS_PACKAGE_PATH
1874 static_name = RouteCollection.STATIC.route
1875 log.debug("... including static files from {!r} at Pyramid static "
1876 "name {!r}", static_filepath, static_name)
1877 # ... does the name needs to start with "/" or the pattern "static/"
1878 # will override the later "deform_static"? Not sure.
1880 # We were doing this:
1881 # config.add_static_view(name=static_name, path=static_filepath)
1882 # But now we need to (a) add the
1883 # "cache_max_age=static_cache_duration_s" argument, and (b) set the
1884 # HTTP header 'Cache-Control: no-cache="Set-Cookie, Set-Cookie2"',
1885 # for the ZAP penetration tester:
1886 # ... https://cheatsheetseries.owasp.org/cheatsheets/Session_Management_Cheat_Sheet.html#web-content-caching # noqa
1887 # We can do the former, but not the latter, via add_static_view(),
1888 # because it sends its keyword arguments to add_route(), not the view
1889 # creation. So, alternatives ways...
1890 # - from https://github.com/Pylons/pyramid/issues/1486
1891 # - and https://stackoverflow.com/questions/24854300/
1892 # - to https://github.com/Pylons/pyramid/pull/2021
1893 # - to https://docs.pylonsproject.org/projects/pyramid/en/latest/narr/hooks.html#view-derivers # noqa
1895 config.add_static_view(name=static_name,
1896 path=static_filepath,
1897 cache_max_age=static_cache_duration_s)
1899 # Add all the routes:
1900 for pr in RouteCollection.all_routes():
1901 if DEBUG_ADD_ROUTES:
1902 log.info("{} -> {}", pr.route, pr.path)
1903 config.add_route(pr.route, pr.path)
1904 # See also:
1905 # https://stackoverflow.com/questions/19184612/how-to-ensure-urls-generated-by-pyramids-route-url-and-route-path-are-valid # noqa
1907 # Routes added EARLIER have priority. So add this AFTER our custom
1908 # bugfix:
1909 config.add_static_view(name='/deform_static',
1910 path='deform:static/',
1911 cache_max_age=static_cache_duration_s)
1913 # Most views are using @view_config() which calls add_view().
1914 # Scan for @view_config decorators, to map views to routes:
1915 # https://docs.pylonsproject.org/projects/venusian/en/latest/api.html
1916 config.scan("camcops_server.cc_modules")
1918 # ---------------------------------------------------------------------
1919 # Add tweens (inner to outer)
1920 # ---------------------------------------------------------------------
1921 # We will use implicit positioning:
1922 # - https://www.slideshare.net/aconrad/alex-conrad-pyramid-tweens-ploneconf-2011 # noqa
1924 # config.add_tween('camcops_server.camcops.http_session_tween_factory')
1926 # ---------------------------------------------------------------------
1927 # Debug toolbar
1928 # ---------------------------------------------------------------------
1929 if debug_toolbar:
1930 log.debug("Enabling Pyramid debug toolbar")
1931 config.include('pyramid_debugtoolbar') # BEWARE! SIDE EFFECTS
1932 # ... Will trigger an import that hooks events into all
1933 # SQLAlchemy queries. There's a bug somewhere relating to that;
1934 # see notes below relating to the "mergedb" function.
1935 config.add_route(RouteCollection.DEBUG_TOOLBAR.route,
1936 RouteCollection.DEBUG_TOOLBAR.path)
1938 yield config
1941# =============================================================================
1942# Debugging requests
1943# =============================================================================
1945def make_post_body_from_dict(d: Dict[str, str],
1946 encoding: str = "utf8") -> bytes:
1947 """
1948 Makes an HTTP POST body from a dictionary.
1950 For debugging HTTP requests.
1952 It mimics how the tablet operates.
1953 """
1954 # https://docs.pylonsproject.org/projects/pyramid-cookbook/en/latest/testing/testing_post_curl.html # noqa
1955 txt = urllib.parse.urlencode(query=d)
1956 # ... this encoding mimics how the tablet operates
1957 body = txt.encode(encoding)
1958 return body
1961class CamcopsDummyRequest(CamcopsRequest, DummyRequest):
1962 """
1963 Request class that allows manual manipulation of GET/POST parameters
1964 for debugging.
1966 Notes:
1968 - The important base class is :class:`webob.request.BaseRequest`.
1969 - ``self.params`` is a :class:`NestedMultiDict` (see
1970 ``webob/multidict.py``); these are intrinsically read-only.
1971 - ``self.params`` is also a read-only property. When read, it combines
1972 data from ``self.GET`` and ``self.POST``.
1973 - What we do here is to manipulate the underlying GET/POST data.
1975 """
1976 _CACHE_KEY = "webob._parsed_query_vars"
1977 _QUERY_STRING_KEY = "QUERY_STRING"
1979 # def __init__(self, *args, **kwargs) -> None:
1980 # super().__init__(*args, **kwargs)
1981 # # Just a technique worth noting:
1982 # #
1983 # # self._original_params_property = CamcopsRequest.params # type: property # noqa
1984 # # self._original_params = self._original_params_property.fget(self) # type: NestedMultiDict # noqa
1985 # # self._fake_params = self._original_params.copy() # type: MultiDict
1986 # # if params:
1987 # # self._fake_params.update(params)
1988 #
1989 # @property
1990 # def params(self):
1991 # log.debug(repr(self._fake_params))
1992 # return self._fake_params
1993 # # Returning the member object allows clients to call
1994 # # dummyreq.params.update(...)
1995 #
1996 # @params.setter
1997 # def params(self, value):
1998 # self._fake_params = value
2000 def set_method_get(self) -> None:
2001 """
2002 Sets the fictional request method to GET.
2003 """
2004 self.method = RequestMethod.GET
2006 def set_method_post(self) -> None:
2007 """
2008 Sets the fictional request method to POST.
2009 """
2010 self.method = RequestMethod.POST
2012 def clear_get_params(self) -> None:
2013 """
2014 Clear all GET parameters.
2015 """
2016 env = self.environ
2017 if self._CACHE_KEY in env:
2018 del env[self._CACHE_KEY]
2019 env[self._QUERY_STRING_KEY] = ""
2021 def add_get_params(self, d: Dict[str, str],
2022 set_method_get: bool = True) -> None:
2023 """
2024 Add GET parameters.
2026 Args:
2027 d: dictionary of ``{parameter: value}`` pairs.
2028 set_method_get: also set the request's method to GET?
2029 """
2030 if not d:
2031 return
2032 # webob.request.BaseRequest.GET reads from self.environ['QUERY_STRING']
2033 paramdict = self.GET.copy() # type: MultiDict
2034 paramdict.update(d)
2035 env = self.environ
2036 # Delete the cached version.
2037 if self._CACHE_KEY in env:
2038 del env[self._CACHE_KEY]
2039 # Write the new version
2040 env[self._QUERY_STRING_KEY] = urllib.parse.urlencode(query=paramdict)
2041 if set_method_get:
2042 self.set_method_get()
2044 def set_get_params(self, d: Dict[str, str],
2045 set_method_get: bool = True) -> None:
2046 """
2047 Clear any GET parameters, and then set them to new values.
2048 See :func:`add_get_params`.
2049 """
2050 self.clear_get_params()
2051 self.add_get_params(d, set_method_get=set_method_get)
2053 def set_post_body(self, body: bytes,
2054 set_method_post: bool = True) -> None:
2055 """
2056 Sets the fake POST body.
2058 Args:
2059 body: the body to set
2060 set_method_post: also set the request's method to POST?
2061 """
2062 log.debug("Applying fake POST body: {!r}", body)
2063 self.body = body
2064 self.content_length = len(body)
2065 if set_method_post:
2066 self.set_method_post()
2068 def fake_request_post_from_dict(self,
2069 d: Dict[str, str],
2070 encoding: str = "utf8",
2071 set_method_post: bool = True) -> None:
2072 """
2073 Sets the request's POST body according to a dictionary.
2075 Args:
2076 d: dictionary of ``{parameter: value}`` pairs.
2077 encoding: character encoding to use
2078 set_method_post: also set the request's method to POST?
2079 """
2080 # webob.request.BaseRequest.POST reads from 'body' (indirectly).
2081 body = make_post_body_from_dict(d, encoding=encoding)
2082 self.set_post_body(body, set_method_post=set_method_post)
2085_ = """
2086# A demonstration of the manipulation of superclass properties:
2088class Test(object):
2089 def __init__(self):
2090 self.a = 3
2092 @property
2093 def b(self):
2094 return 4
2097class Derived(Test):
2098 def __init__(self):
2099 super().__init__()
2100 self._superclass_b = super().b
2101 self._b = 4
2103 @property
2104 def b(self):
2105 print("Superclass b: {}".format(self._superclass_b.fget(self)))
2106 print("Self _b: {}".format(self._b))
2107 return self._b
2108 @b.setter
2109 def b(self, value):
2110 self._b = value
2113x = Test()
2114x.a # 3
2115x.a = 5
2116x.a # 5
2117x.b # 4
2118x.b = 6 # can't set attribute
2120y = Derived()
2121y.a # 3
2122y.a = 5
2123y.a # 5
2124y.b # 4
2125y.b = 6
2126y.b # 6
2128"""
2131def get_core_debugging_request() -> CamcopsDummyRequest:
2132 """
2133 Returns a basic :class:`CamcopsDummyRequest`.
2134 """
2135 with camcops_pyramid_configurator_context(debug_toolbar=False) as pyr_cfg:
2136 req = CamcopsDummyRequest(
2137 environ={
2138 ENVVAR_CONFIG_FILE: "nonexistent_camcops_config_file.nonexistent", # noqa
2139 WsgiEnvVar.PATH_INFO: '/',
2140 WsgiEnvVar.SCRIPT_NAME: '',
2141 WsgiEnvVar.SERVER_NAME: '127.0.0.1',
2142 WsgiEnvVar.SERVER_PORT: '8000',
2143 WsgiEnvVar.WSGI_URL_SCHEME: 'http',
2144 }
2145 )
2146 # ... must pass an actual dict to the "environ" parameter; os.environ
2147 # itself isn't OK ("TypeError: WSGI environ must be a dict; you passed
2148 # environ({'key1': 'value1', ...})
2150 req.registry = pyr_cfg.registry
2151 pyr_cfg.begin(request=req)
2152 return req
2155def get_command_line_request(user_id: int = None) -> CamcopsRequest:
2156 """
2157 Creates a dummy CamcopsRequest for use on the command line.
2158 By default, it does so for the system user. Optionally, you can specify a
2159 user by their ID number.
2161 - Presupposes that ``os.environ[ENVVAR_CONFIG_FILE]`` has been set, as it
2162 is in :func:`camcops_server.camcops.main`.
2164 **WARNING:** this does not provide a COMMIT/ROLLBACK context. If you use
2165 this directly, you must manage that yourself. Consider using
2166 :func:`command_line_request_context` instead.
2167 """
2168 log.debug(f"Creating command-line pseudo-request (user_id={user_id})")
2169 req = get_core_debugging_request()
2171 # If we proceed with an out-of-date database, we will have problems, and
2172 # those problems may not be immediately apparent, which is bad. So:
2173 req.config.assert_database_ok()
2175 # Ensure we have a user
2176 if user_id is None:
2177 req._debugging_user = User.get_system_user(req.dbsession)
2178 else:
2179 req._debugging_user = User.get_user_by_id(
2180 req.dbsession, user_id)
2182 return req
2185@contextmanager
2186def command_line_request_context(user_id: int = None) \
2187 -> Generator[CamcopsRequest, None, None]:
2188 """
2189 Request objects are ubiquitous, and allow code to refer to the HTTP
2190 request, config, HTTP session, database session, and so on. Here we make
2191 a special sort of request for use from the command line, and provide it
2192 as a context manager that will COMMIT the database afterwards (because the
2193 normal method, via the Pyramid router, is unavailable).
2194 """
2195 req = get_command_line_request(user_id=user_id)
2196 yield req
2197 # noinspection PyProtectedMember
2198 req._finish_dbsession()
2201def get_unittest_request(dbsession: SqlASession,
2202 params: Dict[str, Any] = None) -> CamcopsDummyRequest:
2203 """
2204 Creates a :class:`CamcopsDummyRequest` for use by unit tests.
2206 - Points to an existing database (e.g. SQLite in-memory database).
2207 - Presupposes that ``os.environ[ENVVAR_CONFIG_FILE]`` has been set, as it
2208 is in :func:`camcops_server.camcops.main`.
2209 """
2210 log.debug("Creating unit testing pseudo-request")
2211 req = get_core_debugging_request()
2212 req.set_get_params(params)
2214 req._debugging_db_session = dbsession
2215 user = User()
2216 user.superuser = True
2217 req._debugging_user = user
2219 return req