Coverage for cc_modules/cc_task.py: 38%
910 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 15:51 +0100
1"""
2camcops_server/cc_modules/cc_task.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26**Represents CamCOPS tasks.**
28Core task export methods:
30======= =======================================================================
31Format Comment
32======= =======================================================================
33HTML The task in a user-friendly format.
34PDF Essentially the HTML output, but with page headers and (for clinician
35 tasks) a signature block, and without additional HTML administrative
36 hyperlinks.
37XML Centres on the task with its subdata integrated.
38TSV Tab-separated value format.
39SQL As part of an SQL or SQLite download.
40======= =======================================================================
42"""
44from base64 import b64encode
45from collections import Counter, OrderedDict
46import datetime
47import logging
48import statistics
49from typing import (
50 Any,
51 Dict,
52 Iterable,
53 Generator,
54 List,
55 Optional,
56 Set,
57 Tuple,
58 Type,
59 TYPE_CHECKING,
60 Union,
61)
63from cardinal_pythonlib.classes import classproperty
64from cardinal_pythonlib.datetimefunc import (
65 convert_datetime_to_utc,
66 format_datetime,
67 pendulum_to_utc_datetime_without_tz,
68)
69from cardinal_pythonlib.httpconst import MimeType
70from cardinal_pythonlib.logs import BraceStyleAdapter
71from cardinal_pythonlib.sqlalchemy.dialect import SqlaDialectName
72from cardinal_pythonlib.sqlalchemy.orm_inspect import (
73 gen_columns,
74 gen_orm_classes_from_base,
75)
76from cardinal_pythonlib.sqlalchemy.schema import (
77 is_sqlatype_binary,
78 is_sqlatype_string,
79)
80from cardinal_pythonlib.stringfunc import mangle_unicode_to_ascii
81from fhirclient.models.attachment import Attachment
82from fhirclient.models.bundle import Bundle
83from fhirclient.models.codeableconcept import CodeableConcept
84from fhirclient.models.coding import Coding
85from fhirclient.models.contactpoint import ContactPoint
86from fhirclient.models.documentreference import (
87 DocumentReference,
88 DocumentReferenceContent,
89)
90from fhirclient.models.fhirreference import FHIRReference
91from fhirclient.models.humanname import HumanName
92from fhirclient.models.identifier import Identifier
93from fhirclient.models.observation import Observation
94from fhirclient.models.practitioner import Practitioner
95from fhirclient.models.questionnaire import Questionnaire
96from fhirclient.models.questionnaireresponse import QuestionnaireResponse
97import hl7
98from pendulum import Date as PendulumDate, DateTime as Pendulum
99from pyramid.renderers import render
100from semantic_version import Version
101from sqlalchemy.ext.declarative import declared_attr
102from sqlalchemy.orm import Mapped, mapped_column, relationship
103from sqlalchemy.sql.expression import not_, update
104from sqlalchemy.sql.schema import Column, Table
105from sqlalchemy.sql.sqltypes import (
106 Boolean,
107 Date as DateColType,
108 DateTime,
109 Float,
110 Integer,
111 Numeric,
112 String,
113 Text,
114 Time,
115)
117from camcops_server.cc_modules.cc_audit import audit
118from camcops_server.cc_modules.cc_baseconstants import DOCUMENTATION_URL
119from camcops_server.cc_modules.cc_blob import Blob, get_blob_img_html
120from camcops_server.cc_modules.cc_cache import cache_region_static, fkg
121from camcops_server.cc_modules.cc_constants import (
122 ASCII,
123 CssClass,
124 CSS_PAGED_MEDIA,
125 DateFormat,
126 FHIRConst as Fc,
127 FileType,
128 ERA_NOW,
129 INVALID_VALUE,
130 UTF8,
131)
132from camcops_server.cc_modules.cc_dataclasses import SummarySchemaInfo
133from camcops_server.cc_modules.cc_db import (
134 GenericTabletRecordMixin,
135 SFN_CAMCOPS_SERVER_VERSION,
136 SFN_IS_COMPLETE,
137 SFN_SECONDS_CREATION_TO_FIRST_FINISH,
138 TASK_FREQUENT_FIELDS,
139 TFN_EDITING_TIME_S,
140 TFN_FIRSTEXIT_IS_ABORT,
141 TFN_FIRSTEXIT_IS_FINISH,
142 TFN_PATIENT_ID,
143 TFN_RESPONDENT_NAME,
144 TFN_RESPONDENT_RELATIONSHIP,
145 TFN_WHEN_CREATED,
146 TFN_WHEN_FIRSTEXIT,
147)
148from camcops_server.cc_modules.cc_exception import FhirExportException
149from camcops_server.cc_modules.cc_fhir import (
150 fhir_observation_component_from_snomed,
151 fhir_system_value,
152 fhir_sysval_from_id,
153 FHIRAnsweredQuestion,
154 FHIRAnswerType,
155 FHIRQuestionType,
156 make_fhir_bundle_entry,
157)
158from camcops_server.cc_modules.cc_filename import get_export_filename
159from camcops_server.cc_modules.cc_hl7 import make_obr_segment, make_obx_segment
160from camcops_server.cc_modules.cc_html import (
161 get_present_absent_none,
162 get_true_false_none,
163 get_yes_no,
164 get_yes_no_none,
165 tr,
166 tr_qa,
167)
168from camcops_server.cc_modules.cc_pdf import pdf_from_html
169from camcops_server.cc_modules.cc_pyramid import Routes, ViewArg
170from camcops_server.cc_modules.cc_simpleobjects import TaskExportOptions
171from camcops_server.cc_modules.cc_snomed import SnomedLookup
172from camcops_server.cc_modules.cc_specialnote import SpecialNote
173from camcops_server.cc_modules.cc_sqla_coltypes import (
174 camcops_column,
175 COLATTR_PERMITTED_VALUE_CHECKER,
176 gen_ancillary_relationships,
177 get_camcops_blob_column_attr_names,
178 get_column_attr_names,
179 mapped_camcops_column,
180 PendulumDateTimeAsIsoTextColType,
181 permitted_value_failure_msgs,
182 permitted_values_ok,
183 PermittedValueChecker,
184 SemanticVersionColType,
185 TableNameColType,
186)
187from camcops_server.cc_modules.cc_sqlalchemy import Base, get_table_ddl
188from camcops_server.cc_modules.cc_summaryelement import (
189 ExtraSummaryTable,
190 SummaryElement,
191)
192from camcops_server.cc_modules.cc_version import (
193 CAMCOPS_SERVER_VERSION,
194 CAMCOPS_SERVER_VERSION_STRING,
195 MINIMUM_TABLET_VERSION,
196)
197from camcops_server.cc_modules.cc_xml import (
198 get_xml_document,
199 XML_COMMENT_ANCILLARY,
200 XML_COMMENT_ANONYMOUS,
201 XML_COMMENT_BLOBS,
202 XML_COMMENT_CALCULATED,
203 XML_COMMENT_PATIENT,
204 XML_COMMENT_SNOMED_CT,
205 XML_COMMENT_SPECIAL_NOTES,
206 XML_NAME_SNOMED_CODES,
207 XmlElement,
208 XmlLiteral,
209)
211if TYPE_CHECKING:
212 from camcops_server.cc_modules.cc_ctvinfo import CtvInfo
213 from camcops_server.cc_modules.cc_exportrecipient import (
214 ExportRecipient,
215 )
216 from camcops_server.cc_modules.cc_patient import Patient
217 from camcops_server.cc_modules.cc_patientidnum import (
218 PatientIdNum,
219 )
220 from camcops_server.cc_modules.cc_request import (
221 CamcopsRequest,
222 )
223 from camcops_server.cc_modules.cc_snomed import (
224 SnomedExpression,
225 )
226 from camcops_server.cc_modules.cc_trackerhelpers import (
227 TrackerInfo,
228 )
229 from camcops_server.cc_modules.cc_spreadsheet import (
230 SpreadsheetPage,
231 )
233log = BraceStyleAdapter(logging.getLogger(__name__))
236# =============================================================================
237# Debugging options
238# =============================================================================
240DEBUG_SKIP_FHIR_DOCS = False
241DEBUG_SHOW_FHIR_QUESTIONNAIRE = False
243if any([DEBUG_SKIP_FHIR_DOCS, DEBUG_SHOW_FHIR_QUESTIONNAIRE]):
244 log.warning("Debugging options enabled!")
247# =============================================================================
248# Constants
249# =============================================================================
251ANCILLARY_FWD_REF = "Ancillary"
252TASK_FWD_REF = "Task"
254FHIR_UNKNOWN_TEXT = "[?]"
256SNOMED_TABLENAME = "_snomed_ct"
257SNOMED_COLNAME_TASKTABLE = "task_tablename"
258SNOMED_COLNAME_TASKPK = "task_pk"
259SNOMED_COLNAME_WHENCREATED_UTC = "when_created"
260SNOMED_COLNAME_EXPRESSION = "snomed_expression"
261UNUSED_SNOMED_XML_NAME = "snomed_ct_expressions"
264# =============================================================================
265# Patient mixin
266# =============================================================================
269class TaskHasPatientMixin(object):
270 """
271 Mixin for tasks that have a patient (aren't anonymous).
272 """
274 # https://docs.sqlalchemy.org/en/latest/orm/extensions/declarative/mixins.html#using-advanced-relationship-arguments-e-g-primaryjoin-etc # noqa
276 """
277 SQLAlchemy :class:`Column` that is a foreign key to the patient table.
278 """
279 # noinspection PyMethodParameters
280 patient_id: Mapped[int] = mapped_column(
281 TFN_PATIENT_ID,
282 index=True,
283 comment="(TASK) Foreign key to patient.id (for this device/era)",
284 )
286 # noinspection PyMethodParameters
287 @declared_attr
288 def patient(cls) -> Mapped["Patient"]:
289 """
290 SQLAlchemy relationship: "the patient for this task".
292 Note that this refers to the CURRENT version of the patient. If there
293 is an editing chain, older patient versions are not retrieved.
295 Compare :func:`camcops_server.cc_modules.cc_blob.blob_relationship`,
296 which uses the same strategy, as do several other similar functions.
298 """
299 return relationship(
300 "Patient",
301 primaryjoin=(
302 "and_("
303 " remote(Patient.id) == foreign({task}.patient_id), "
304 " remote(Patient._device_id) == foreign({task}._device_id), "
305 " remote(Patient._era) == foreign({task}._era), "
306 " remote(Patient._current) == True "
307 ")".format(task=cls.__name__) # type: ignore[attr-defined]
308 ),
309 uselist=False,
310 viewonly=True,
311 # Profiling results 2019-10-14 exporting 4185 phq9 records with
312 # unique patients to xlsx
313 # lazy="select" : 59.7s
314 # lazy="joined" : 44.3s
315 # lazy="subquery": 36.9s
316 # lazy="selectin": 35.3s
317 # See also idnums relationship on Patient class (cc_patient.py)
318 lazy="selectin",
319 )
320 # NOTE: this retrieves the most recent (i.e. the current) information
321 # on that patient. Consequently, task version history doesn't show the
322 # history of patient edits. This is consistent with our relationship
323 # strategy throughout for the web front-end viewer.
325 # noinspection PyMethodParameters
326 @classproperty
327 def has_patient(cls) -> bool:
328 """
329 Does this task have a patient? (Yes.)
330 """
331 return True
334# =============================================================================
335# Clinician mixin
336# =============================================================================
339class TaskHasClinicianMixin(object):
340 """
341 Mixin to add clinician columns and override clinician-related methods.
343 Must be to the LEFT of ``Task`` in the class's base class list, i.e.
344 must have higher precedence than ``Task`` in the method resolution order.
345 """
347 # noinspection PyMethodParameters
348 clinician_specialty: Mapped[Optional[str]] = mapped_camcops_column(
349 Text,
350 exempt_from_anonymisation=True,
351 comment="(CLINICIAN) Clinician's specialty "
352 "(e.g. Liaison Psychiatry)",
353 )
355 # noinspection PyMethodParameters
356 clinician_name: Mapped[Optional[str]] = mapped_camcops_column(
357 Text,
358 exempt_from_anonymisation=True,
359 comment="(CLINICIAN) Clinician's name (e.g. Dr X)",
360 )
362 # noinspection PyMethodParameters
363 clinician_professional_registration: Mapped[Optional[str]] = (
364 mapped_camcops_column(
365 Text,
366 exempt_from_anonymisation=True,
367 comment="(CLINICIAN) Clinician's professional registration (e.g. "
368 "GMC# 12345)",
369 )
370 )
372 # noinspection PyMethodParameters
373 clinician_post: Mapped[Optional[str]] = mapped_camcops_column(
374 Text,
375 exempt_from_anonymisation=True,
376 comment="(CLINICIAN) Clinician's post (e.g. Consultant)",
377 )
379 # noinspection PyMethodParameters
380 clinician_service: Mapped[Optional[str]] = mapped_camcops_column(
381 Text,
382 exempt_from_anonymisation=True,
383 comment="(CLINICIAN) Clinician's service (e.g. Liaison Psychiatry "
384 "Service)",
385 )
387 # noinspection PyMethodParameters
388 clinician_contact_details: Mapped[Optional[str]] = mapped_camcops_column(
389 Text,
390 exempt_from_anonymisation=True,
391 comment="(CLINICIAN) Clinician's contact details (e.g. bleep, "
392 "extension)",
393 )
395 # For field order, see also:
396 # https://stackoverflow.com/questions/3923910/sqlalchemy-move-mixin-columns-to-end # noqa
398 # noinspection PyMethodParameters
399 @classproperty
400 def has_clinician(cls) -> bool:
401 """
402 Does the task have a clinician? (Yes.)
403 """
404 return True
406 def get_clinician_name(self) -> str:
407 """
408 Returns the clinician's name.
409 """
410 return self.clinician_name or ""
412 def get_clinician_fhir_telecom_other(self, req: "CamcopsRequest") -> str:
413 """
414 Return a mishmash of information that doesn't fit neatly into a FHIR
415 Practitioner object, but people might actually want to know.
416 """
417 _ = req.gettext
418 components = [] # type: List[str]
419 # In sequence, e.g.:
420 # - Consultant
421 if self.clinician_post:
422 components.append(f'{_("Post:")} {self.clinician_post}')
423 # - Liaison Psychiatry
424 if self.clinician_specialty:
425 components.append(f'{_("Specialty:")} {self.clinician_specialty}')
426 # - GMC# 12345
427 if self.clinician_professional_registration:
428 components.append(
429 f'{_("Professional registration:")} '
430 f"{self.clinician_professional_registration}"
431 )
432 # - Liaison Psychiatry Service
433 if self.clinician_service:
434 components.append(f'{_("Service:")} {self.clinician_service}')
435 # - tel. x12345
436 if self.clinician_contact_details:
437 components.append(
438 f'{_("Contact details:")} ' f"{self.clinician_contact_details}"
439 )
440 return " | ".join(components)
443# =============================================================================
444# Respondent mixin
445# =============================================================================
448class TaskHasRespondentMixin(object):
449 """
450 Mixin to add respondent columns and override respondent-related methods.
452 A respondent is someone who isn't the patient and isn't a clinician, such
453 as a family member or carer.
455 Must be to the LEFT of ``Task`` in the class's base class list, i.e.
456 must have higher precedence than ``Task`` in the method resolution order.
458 Notes:
460 - If you don't use ``@declared_attr``, the ``comment`` property on columns
461 doesn't work.
462 """
464 # noinspection PyMethodParameters
465 respondent_name: Mapped[Optional[str]] = camcops_column( # type: ignore[assignment] # noqa: E501
466 TFN_RESPONDENT_NAME,
467 Text,
468 identifies_patient=True,
469 comment="(RESPONDENT) Respondent's name",
470 )
472 # noinspection PyMethodParameters
473 respondent_relationship: Mapped[Optional[str]] = mapped_column(
474 TFN_RESPONDENT_RELATIONSHIP,
475 Text,
476 comment="(RESPONDENT) Respondent's relationship to patient",
477 )
479 # noinspection PyMethodParameters
480 @classproperty
481 def has_respondent(cls) -> bool:
482 """
483 Does the class have a respondent? (Yes.)
484 """
485 return True
487 def is_respondent_complete(self) -> bool:
488 """
489 Do we have sufficient information about the respondent?
490 (That means: name, relationship to the patient.)
491 """
492 return all([self.respondent_name, self.respondent_relationship])
495# =============================================================================
496# Task base class
497# =============================================================================
500class Task(GenericTabletRecordMixin, Base):
501 """
502 Abstract base class for all tasks.
504 Note:
506 - For column definitions: use
507 :func:`camcops_server.cc_modules.cc_sqla_coltypes.camcops_column`, not
508 :class:`Column`, if you have fields that need to define permitted values,
509 mark them as BLOB-referencing fields, or do other CamCOPS-specific
510 things.
512 """
514 __abstract__ = True
516 # noinspection PyMethodParameters
517 @declared_attr.directive
518 def __mapper_args__(cls) -> dict[str, Any]:
519 return {"polymorphic_identity": cls.__name__, "concrete": True} # type: ignore[attr-defined] # noqa: E501
521 # =========================================================================
522 # PART 0: COLUMNS COMMON TO ALL TASKS
523 # =========================================================================
525 # Columns
527 """
528 Column representing the task's creation time.
529 """
530 # noinspection PyMethodParameters
531 when_created: Mapped[Pendulum] = mapped_column(
532 TFN_WHEN_CREATED,
533 PendulumDateTimeAsIsoTextColType,
534 comment="(TASK) Date/time this task instance was created "
535 "(ISO 8601)",
536 )
538 """
539 Column representing when the user first exited the task's editor
540 (i.e. first "finish" or first "abort").
541 """
542 # noinspection PyMethodParameters
543 when_firstexit: Mapped[Optional[Pendulum]] = mapped_column(
544 TFN_WHEN_FIRSTEXIT,
545 PendulumDateTimeAsIsoTextColType,
546 comment="(TASK) Date/time of the first exit from this task (ISO 8601)",
547 )
549 """
550 Was the first exit from the task's editor a successful "finish"?
551 """
552 # noinspection PyMethodParameters
553 firstexit_is_finish: Mapped[Optional[bool]] = mapped_column(
554 TFN_FIRSTEXIT_IS_FINISH,
555 comment="(TASK) Was the first exit from the task because it was "
556 "finished (1)?",
557 )
559 """
560 Was the first exit from the task's editor an "abort"?
561 """
562 # noinspection PyMethodParameters
563 firstexit_is_abort: Mapped[Optional[bool]] = mapped_column(
564 TFN_FIRSTEXIT_IS_ABORT,
565 comment="(TASK) Was the first exit from this task because it was "
566 "aborted (1)?",
567 )
569 """
570 How long has the user spent editing the task?
571 (Calculated by the CamCOPS client.)
572 """
573 # noinspection PyMethodParameters
574 editing_time_: Mapped[Optional[float]] = mapped_column(
575 TFN_EDITING_TIME_S, comment="(TASK) Time spent editing (s)"
576 )
578 # Relationships
580 # noinspection PyMethodParameters
581 @declared_attr
582 def special_notes(cls) -> Mapped[List[SpecialNote]]:
583 """
584 List-style SQLAlchemy relationship to any :class:`SpecialNote` objects
585 attached to this class. Skips hidden (quasi-deleted) notes.
586 """
587 return relationship(
588 SpecialNote,
589 primaryjoin=(
590 "and_("
591 " remote(SpecialNote.basetable) == literal({repr_task_tablename}), " # noqa
592 " remote(SpecialNote.task_id) == foreign({task}.id), "
593 " remote(SpecialNote.device_id) == foreign({task}._device_id), " # noqa
594 " remote(SpecialNote.era) == foreign({task}._era), "
595 " not_(SpecialNote.hidden)"
596 ")".format(
597 task=cls.__name__, # type: ignore[attr-defined]
598 repr_task_tablename=repr(cls.__tablename__),
599 )
600 ),
601 uselist=True,
602 order_by="SpecialNote.note_at",
603 viewonly=True, # for now!
604 )
606 # =========================================================================
607 # PART 1: THINGS THAT DERIVED CLASSES MAY CARE ABOUT
608 # =========================================================================
609 #
610 # Notes:
611 #
612 # - for summaries, see GenericTabletRecordMixin.get_summaries
614 # -------------------------------------------------------------------------
615 # Attributes that must be provided
616 # -------------------------------------------------------------------------
617 __tablename__ = None # type: str # also the SQLAlchemy table name
618 shortname = None # type: str
620 # -------------------------------------------------------------------------
621 # Attributes that can be overridden
622 # -------------------------------------------------------------------------
623 extrastring_taskname = (
624 None
625 ) # type: str # if None, tablename is used instead
626 info_filename_stem = (
627 None
628 ) # type: str # if None, tablename is used instead
629 provides_trackers = False
630 use_landscape_for_pdf = False
631 dependent_classes = [] # type: ignore[var-annotated]
633 prohibits_clinical = False
634 prohibits_commercial = False
635 prohibits_educational = False
636 prohibits_research = False
638 @classmethod
639 def prohibits_anything(cls) -> bool:
640 return any(
641 [
642 cls.prohibits_clinical,
643 cls.prohibits_commercial,
644 cls.prohibits_educational,
645 cls.prohibits_research,
646 ]
647 )
649 # -------------------------------------------------------------------------
650 # Methods always overridden by the actual task
651 # -------------------------------------------------------------------------
653 @staticmethod
654 def longname(req: "CamcopsRequest") -> str:
655 """
656 Long name (in the relevant language).
657 """
658 raise NotImplementedError("Task.longname must be overridden")
660 def is_complete(self) -> bool:
661 """
662 Is the task instance complete?
664 Must be overridden.
665 """
666 raise NotImplementedError("Task.is_complete must be overridden")
668 def get_task_html(self, req: "CamcopsRequest") -> str:
669 """
670 HTML for the main task content.
672 Must be overridden by derived classes.
673 """
674 raise NotImplementedError(
675 "No get_task_html() HTML generator for this task class!"
676 )
678 # -------------------------------------------------------------------------
679 # Implement if you provide trackers
680 # -------------------------------------------------------------------------
682 def get_trackers(self, req: "CamcopsRequest") -> List["TrackerInfo"]:
683 """
684 Tasks that provide quantitative information for tracking over time
685 should override this and return a list of
686 :class:`camcops_server.cc_modules.cc_trackerhelpers.TrackerInfo`
687 objects, one per tracker.
689 The information is read by
690 :meth:`camcops_server.cc_modules.cc_tracker.Tracker.get_all_plots_for_one_task_html`.
692 Time information will be retrieved using :func:`get_creation_datetime`.
693 """
694 return []
696 # -------------------------------------------------------------------------
697 # Override to provide clinical text
698 # -------------------------------------------------------------------------
700 # noinspection PyMethodMayBeStatic
701 def get_clinical_text(
702 self, req: "CamcopsRequest"
703 ) -> Optional[List["CtvInfo"]]:
704 """
705 Tasks that provide clinical text information should override this
706 to provide a list of
707 :class:`camcops_server.cc_modules.cc_ctvinfo.CtvInfo` objects.
709 Return ``None`` (default) for a task that doesn't provide clinical
710 text, or ``[]`` for one that does in general but has no information for
711 this particular instance, or a list of
712 :class:`camcops_server.cc_modules.cc_ctvinfo.CtvInfo` objects.
713 """
714 return None
716 # -------------------------------------------------------------------------
717 # Override some of these if you provide summaries
718 # -------------------------------------------------------------------------
720 # noinspection PyMethodMayBeStatic,PyUnusedLocal
721 def get_extra_summary_tables(
722 self, req: "CamcopsRequest"
723 ) -> List[ExtraSummaryTable]:
724 """
725 Override if you wish to create extra summary tables, not just add
726 summary columns to task/ancillary tables.
728 Return a list of
729 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`
730 objects.
731 """
732 return []
734 # -------------------------------------------------------------------------
735 # Implement if you provide SNOMED-CT codes
736 # -------------------------------------------------------------------------
738 # noinspection PyMethodMayBeStatic,PyUnusedLocal
739 def get_snomed_codes(
740 self, req: "CamcopsRequest"
741 ) -> List["SnomedExpression"]:
742 """
743 Returns all SNOMED-CT codes for this task.
745 Args:
746 req: the
747 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
749 Returns:
750 a list of
751 :class:`camcops_server.cc_modules.cc_snomed.SnomedExpression`
752 objects
754 """
755 return []
757 # =========================================================================
758 # PART 2: INTERNALS
759 # =========================================================================
761 # -------------------------------------------------------------------------
762 # Representations
763 # -------------------------------------------------------------------------
765 def __str__(self) -> str:
766 if self.is_anonymous:
767 patient_str = ""
768 else:
769 patient_str = f", patient={self.patient}"
770 return "{t} (_pk={pk}, when_created={wc}{patient})".format(
771 t=self.tablename,
772 pk=self.pk,
773 wc=(
774 format_datetime(self.when_created, DateFormat.ERA)
775 if self.when_created
776 else "None"
777 ),
778 patient=patient_str,
779 )
781 def __repr__(self) -> str:
782 return "<{classname}(_pk={pk}, when_created={wc})>".format(
783 classname=self.__class__.__qualname__,
784 pk=self.pk,
785 wc=(
786 format_datetime(self.when_created, DateFormat.ERA)
787 if self.when_created
788 else "None"
789 ),
790 )
792 # -------------------------------------------------------------------------
793 # Way to fetch all task types
794 # -------------------------------------------------------------------------
796 @classmethod
797 def gen_all_subclasses(cls) -> Generator[Type[TASK_FWD_REF], None, None]: # type: ignore[valid-type] # noqa: E501
798 """
799 Generate all non-abstract SQLAlchemy ORM subclasses of :class:`Task` --
800 that is, all task classes.
802 We require that actual tasks are subclasses of both :class:`Task` and
803 :class:`camcops_server.cc_modules.cc_sqlalchemy.Base`.
805 OLD WAY (ignore): this means we can (a) inherit from Task to make an
806 abstract base class for actual tasks, as with PCL, HADS, HoNOS, etc.;
807 and (b) not have those intermediate classes appear in the task list.
808 Since all actual classes must be SQLAlchemy ORM objects inheriting from
809 Base, that common inheritance is an excellent way to define them.
811 NEW WAY: things now inherit from Base/Task without necessarily
812 being actual tasks; we discriminate using ``__abstract__`` and/or
813 ``__tablename__``. See
814 https://docs.sqlalchemy.org/en/latest/orm/inheritance.html#abstract-concrete-classes
815 """
816 # noinspection PyTypeChecker
817 return gen_orm_classes_from_base(cls)
819 @classmethod
820 @cache_region_static.cache_on_arguments(function_key_generator=fkg)
821 def all_subclasses_by_tablename(cls) -> List[Type[TASK_FWD_REF]]: # type: ignore[valid-type] # noqa: E501
822 """
823 Return all task classes, ordered by table name.
824 """
825 classes = list(cls.gen_all_subclasses())
826 classes.sort(key=lambda c: c.tablename) # type: ignore[attr-defined]
827 return classes
829 @classmethod
830 @cache_region_static.cache_on_arguments(function_key_generator=fkg)
831 def all_subclasses_by_shortname(cls) -> List[Type[TASK_FWD_REF]]: # type: ignore[valid-type] # noqa: E501
832 """
833 Return all task classes, ordered by short name.
834 """
835 classes = list(cls.gen_all_subclasses())
836 classes.sort(key=lambda c: c.shortname) # type: ignore[attr-defined]
837 return classes
839 @classmethod
840 def all_subclasses_by_longname(
841 cls, req: "CamcopsRequest"
842 ) -> List[Type[TASK_FWD_REF]]: # type: ignore[valid-type]
843 """
844 Return all task classes, ordered by long name.
845 """
846 classes = cls.all_subclasses_by_shortname()
847 classes.sort(key=lambda c: c.longname(req))
848 return classes
850 # -------------------------------------------------------------------------
851 # Methods that may be overridden by mixins
852 # -------------------------------------------------------------------------
854 # noinspection PyMethodParameters
855 @classproperty
856 def has_patient(cls) -> bool:
857 """
858 Does the task have a patient? (No.)
860 May be overridden by :class:`TaskHasPatientMixin`.
861 """
862 return False
864 # noinspection PyMethodParameters
865 @classproperty
866 def is_anonymous(cls) -> bool:
867 """
868 Antonym for :attr:`has_patient`.
869 """
870 return not cls.has_patient
872 # noinspection PyMethodParameters
873 @classproperty
874 def has_clinician(cls) -> bool:
875 """
876 Does the task have a clinician? (No.)
878 May be overridden by :class:`TaskHasClinicianMixin`.
879 """
880 return False
882 # noinspection PyMethodParameters
883 @classproperty
884 def has_respondent(cls) -> bool:
885 """
886 Does the task have a respondent? (No.)
888 May be overridden by :class:`TaskHasRespondentMixin`.
889 """
890 return False
892 # -------------------------------------------------------------------------
893 # Other classmethods
894 # -------------------------------------------------------------------------
896 # noinspection PyMethodParameters
897 @classproperty
898 def tablename(cls) -> str:
899 """
900 Returns the database table name for the task's primary table.
901 """
902 return cls.__tablename__
904 # noinspection PyMethodParameters
905 @classproperty
906 def minimum_client_version(cls) -> Version:
907 """
908 Returns the minimum client version that provides this task.
910 Override this as you add tasks.
912 Used by
913 :func:`camcops_server.cc_modules.client_api.ensure_valid_table_name`.
915 (There are some pre-C++ client versions for which the default is not
916 exactly accurate, and the tasks do not override, but this is of no
917 consequence and the version numbering system also changed, from
918 something legible as a float -- e.g. ``1.2 > 1.14`` -- to something
919 interpreted as a semantic version -- e.g. ``1.2 < 1.14``. So we ignore
920 that.)
921 """
922 return MINIMUM_TABLET_VERSION
924 # noinspection PyMethodParameters
925 @classmethod
926 def all_tables_with_min_client_version(cls) -> Dict[str, Version]:
927 """
928 Returns a dictionary mapping all this task's table names (primary and
929 ancillary) to the corresponding minimum client version.
930 """
931 v = cls.minimum_client_version
932 d = {cls.__tablename__: v} # type: Dict[str, Version]
933 for _, _, rel_cls in gen_ancillary_relationships(cls):
934 d[rel_cls.__tablename__] = v
935 return d
937 @classmethod
938 def all_tables(cls) -> List[Table]:
939 """
940 Returns all table classes (primary table plus any ancillary tables).
941 """
942 # noinspection PyUnresolvedReferences
943 return [cls.__table__] + [ # type: ignore[return-value]
944 rel_cls.__table__ # type: ignore[attr-defined]
945 for _, _, rel_cls in gen_ancillary_relationships(cls)
946 ]
948 @classmethod
949 def get_ddl(cls, dialect_name: str = SqlaDialectName.MYSQL) -> str:
950 """
951 Returns DDL for the primary and any ancillary tables.
952 """
953 return "\n\n".join(
954 get_table_ddl(t, dialect_name).strip() for t in cls.all_tables()
955 )
957 @classmethod
958 def help_url(cls) -> str:
959 """
960 Returns the URL for task-specific online help.
962 By default, this is based on the tablename -- e.g. ``phq9``, giving
963 ``phq9.html`` in the documentation (from ``phq9.rst`` in the source).
964 However, some tasks override this -- which they may do by writing
966 .. code-block:: python
968 info_filename_stem = "XXX"
970 In the C++ code, compare infoFilenameStem() for individual tasks and
971 urlconst::taskDocUrl() overall.
973 The online help is presently only in English.
974 """
975 basename = cls.help_url_basename()
976 language = "en"
977 # DOCUMENTATION_URL has a trailing slash already
978 return f"{DOCUMENTATION_URL}{language}/latest/tasks/{basename}.html"
980 @classmethod
981 def help_url_basename(cls) -> str:
982 return cls.info_filename_stem or cls.tablename
984 # -------------------------------------------------------------------------
985 # More on fields
986 # -------------------------------------------------------------------------
988 @classmethod
989 def get_fieldnames(cls) -> List[str]:
990 """
991 Returns all field (column) names for this task's primary table.
992 """
993 return get_column_attr_names(cls)
995 def field_contents_valid(self) -> bool:
996 """
997 Checks field contents validity.
999 This is a high-speed function that doesn't bother with explanations,
1000 since we use it for lots of task :func:`is_complete` calculations.
1001 """
1002 return permitted_values_ok(self)
1004 def field_contents_invalid_because(self) -> List[str]:
1005 """
1006 Explains why contents are invalid.
1007 """
1008 return permitted_value_failure_msgs(self)
1010 def get_blob_fields(self) -> List[str]:
1011 """
1012 Returns field (column) names for all BLOB fields in this class.
1013 """
1014 return get_camcops_blob_column_attr_names(self)
1016 # -------------------------------------------------------------------------
1017 # Server field calculations
1018 # -------------------------------------------------------------------------
1020 def is_preserved(self) -> bool:
1021 """
1022 Is the task preserved and erased from the tablet?
1023 """
1024 return self._pk is not None and self._era != ERA_NOW
1026 def was_forcibly_preserved(self) -> bool:
1027 """
1028 Was this task forcibly preserved?
1029 """
1030 return self._forcibly_preserved and self.is_preserved()
1032 def get_creation_datetime(self) -> Optional[Pendulum]:
1033 """
1034 Creation datetime, or None.
1035 """
1036 return self.when_created
1038 def get_creation_datetime_utc(self) -> Optional[Pendulum]:
1039 """
1040 Creation datetime in UTC, or None.
1041 """
1042 localtime = self.get_creation_datetime()
1043 if localtime is None:
1044 return None
1045 return convert_datetime_to_utc(localtime)
1047 def get_creation_datetime_utc_tz_unaware(
1048 self,
1049 ) -> Optional[datetime.datetime]:
1050 """
1051 Creation time as a :class:`datetime.datetime` object on UTC with no
1052 timezone (i.e. an "offset-naive" datetime), or None.
1053 """
1054 localtime = self.get_creation_datetime()
1055 if localtime is None:
1056 return None
1057 return pendulum_to_utc_datetime_without_tz(localtime)
1059 def get_seconds_from_creation_to_first_finish(self) -> Optional[float]:
1060 """
1061 Time in seconds from creation time to first finish (i.e. first exit
1062 if the first exit was a finish rather than an abort), or None.
1063 """
1064 if not self.firstexit_is_finish:
1065 return None
1066 start = self.get_creation_datetime()
1067 end = self.when_firstexit
1068 if not start or not end:
1069 return None
1070 diff = end - start
1071 return diff.total_seconds()
1073 def get_adding_user_id(self) -> Optional[int]:
1074 """
1075 Returns the user ID of the user who uploaded this task.
1076 """
1077 # noinspection PyTypeChecker
1078 return self._adding_user_id
1080 def get_adding_user_username(self) -> str:
1081 """
1082 Returns the username of the user who uploaded this task.
1083 """
1084 return self._adding_user.username if self._adding_user else ""
1086 def get_removing_user_username(self) -> str:
1087 """
1088 Returns the username of the user who deleted this task (by removing it
1089 on the client and re-uploading).
1090 """
1091 return self._removing_user.username if self._removing_user else ""
1093 def get_preserving_user_username(self) -> str:
1094 """
1095 Returns the username of the user who "preserved" this task (marking it
1096 to be saved on the server and then deleting it from the client).
1097 """
1098 return self._preserving_user.username if self._preserving_user else ""
1100 def get_manually_erasing_user_username(self) -> str:
1101 """
1102 Returns the username of the user who erased this task manually on the
1103 server.
1104 """
1105 return (
1106 self._manually_erasing_user.username
1107 if self._manually_erasing_user
1108 else ""
1109 )
1111 # -------------------------------------------------------------------------
1112 # Summary tables
1113 # -------------------------------------------------------------------------
1115 def standard_task_summary_fields(self) -> List[SummaryElement]:
1116 """
1117 Returns summary fields/values provided by all tasks.
1118 """
1119 return [
1120 SummaryElement(
1121 name=SFN_IS_COMPLETE,
1122 coltype=Boolean(),
1123 value=self.is_complete(),
1124 comment="(GENERIC) Task complete?",
1125 ),
1126 SummaryElement(
1127 name=SFN_SECONDS_CREATION_TO_FIRST_FINISH,
1128 coltype=Float(),
1129 value=self.get_seconds_from_creation_to_first_finish(),
1130 comment="(GENERIC) Time (in seconds) from record creation to "
1131 "first exit, if that was a finish not an abort",
1132 ),
1133 SummaryElement(
1134 name=SFN_CAMCOPS_SERVER_VERSION,
1135 coltype=SemanticVersionColType(),
1136 value=CAMCOPS_SERVER_VERSION,
1137 comment="(GENERIC) CamCOPS server version that created the "
1138 "summary information",
1139 ),
1140 ]
1142 def get_all_summary_tables(
1143 self, req: "CamcopsRequest"
1144 ) -> List[ExtraSummaryTable]:
1145 """
1146 Returns all
1147 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`
1148 objects for this class, including any provided by subclasses, plus
1149 SNOMED CT codes if enabled.
1150 """
1151 tables = self.get_extra_summary_tables(req)
1152 if req.snomed_supported:
1153 tables.append(self._get_snomed_extra_summary_table(req))
1154 return tables
1156 def _get_snomed_extra_summary_table(
1157 self, req: "CamcopsRequest"
1158 ) -> ExtraSummaryTable:
1159 """
1160 Returns a
1161 :class:`camcops_server.cc_modules.cc_summaryelement.ExtraSummaryTable`
1162 for this task's SNOMED CT codes.
1163 """
1164 codes = self.get_snomed_codes(req)
1165 columns = [
1166 Column(
1167 SNOMED_COLNAME_TASKTABLE,
1168 TableNameColType,
1169 comment="Task's base table name",
1170 ),
1171 Column(
1172 SNOMED_COLNAME_TASKPK,
1173 Integer,
1174 comment="Task's server primary key",
1175 ),
1176 Column(
1177 SNOMED_COLNAME_WHENCREATED_UTC,
1178 DateTime,
1179 comment="Task's creation date/time (UTC)",
1180 ),
1181 camcops_column(
1182 SNOMED_COLNAME_EXPRESSION,
1183 Text,
1184 exempt_from_anonymisation=True,
1185 comment="SNOMED CT expression",
1186 ),
1187 ]
1188 rows = [] # type: List[Dict[str, Any]]
1189 for code in codes:
1190 d = OrderedDict(
1191 [
1192 (SNOMED_COLNAME_TASKTABLE, self.tablename),
1193 (SNOMED_COLNAME_TASKPK, self.pk),
1194 (
1195 SNOMED_COLNAME_WHENCREATED_UTC,
1196 self.get_creation_datetime_utc_tz_unaware(),
1197 ),
1198 (SNOMED_COLNAME_EXPRESSION, code.as_string()),
1199 ]
1200 )
1201 rows.append(d)
1202 return ExtraSummaryTable(
1203 tablename=SNOMED_TABLENAME,
1204 xmlname=UNUSED_SNOMED_XML_NAME, # though actual XML doesn't use this route # noqa
1205 columns=columns, # type: ignore[arg-type]
1206 rows=rows,
1207 task=self,
1208 )
1210 # -------------------------------------------------------------------------
1211 # Testing
1212 # -------------------------------------------------------------------------
1214 def dump(self) -> None:
1215 """
1216 Dump a description of the task instance to the Python log, for
1217 debugging.
1218 """
1219 line_equals = "=" * 79
1220 lines = ["", line_equals]
1221 for f in self.get_fieldnames():
1222 lines.append(f"{f}: {getattr(self, f)!r}")
1223 lines.append(line_equals)
1224 log.info("\n".join(lines))
1226 # -------------------------------------------------------------------------
1227 # Special notes
1228 # -------------------------------------------------------------------------
1230 def apply_special_note(
1231 self, req: "CamcopsRequest", note: str, from_console: bool = False
1232 ) -> None:
1233 """
1234 Manually applies a special note to a task.
1236 Applies it to all predecessor/successor versions as well.
1237 WRITES TO THE DATABASE.
1238 """
1239 sn = SpecialNote()
1240 sn.basetable = self.tablename
1241 sn.task_id = self.id
1242 sn.device_id = self._device_id
1243 sn.era = self._era
1244 sn.note_at = req.now
1245 sn.user_id = req.user_id
1246 sn.note = note
1247 dbsession = req.dbsession
1248 dbsession.add(sn)
1249 self.audit(req, "Special note applied manually", from_console)
1250 self.cancel_from_export_log(req, from_console)
1252 # -------------------------------------------------------------------------
1253 # Clinician
1254 # -------------------------------------------------------------------------
1256 # noinspection PyMethodMayBeStatic
1257 def get_clinician_name(self) -> str:
1258 """
1259 May be overridden by :class:`TaskHasClinicianMixin`; q.v.
1260 """
1261 return ""
1263 # noinspection PyMethodMayBeStatic,PyUnusedLocal
1264 def get_clinician_fhir_telecom_other(self, req: "CamcopsRequest") -> str:
1265 """
1266 May be overridden by :class:`TaskHasClinicianMixin`; q.v.
1267 """
1268 return ""
1270 # -------------------------------------------------------------------------
1271 # Respondent
1272 # -------------------------------------------------------------------------
1274 # noinspection PyMethodMayBeStatic
1275 def is_respondent_complete(self) -> bool:
1276 """
1277 Is the respondent information complete?
1279 May be overridden by :class:`TaskHasRespondentMixin`.
1280 """
1281 return False
1283 # -------------------------------------------------------------------------
1284 # About the associated patient
1285 # -------------------------------------------------------------------------
1287 @property
1288 def patient(self) -> Optional["Patient"]:
1289 """
1290 Returns the :class:`camcops_server.cc_modules.cc_patient.Patient` for
1291 this task.
1293 Overridden by :class:`TaskHasPatientMixin`.
1294 """
1295 return None
1297 def is_female(self) -> bool:
1298 """
1299 Is the patient female?
1300 """
1301 return self.patient.is_female() if self.patient else False
1303 def is_male(self) -> bool:
1304 """
1305 Is the patient male?
1306 """
1307 return self.patient.is_male() if self.patient else False
1309 def get_patient_server_pk(self) -> Optional[int]:
1310 """
1311 Get the server PK of the patient, or None.
1312 """
1313 return self.patient.pk if self.patient else None
1315 def get_patient_forename(self) -> str:
1316 """
1317 Get the patient's forename, in upper case, or "".
1318 """
1319 return self.patient.get_forename() if self.patient else ""
1321 def get_patient_surname(self) -> str:
1322 """
1323 Get the patient's surname, in upper case, or "".
1324 """
1325 return self.patient.get_surname() if self.patient else ""
1327 def get_patient_dob(self) -> Optional[PendulumDate]:
1328 """
1329 Get the patient's DOB, or None.
1330 """
1331 return self.patient.get_dob() if self.patient else None
1333 def get_patient_dob_first11chars(self) -> Optional[str]:
1334 """
1335 Gets the patient's date of birth in an 11-character human-readable
1336 short format. For example: ``29 Dec 1999``.
1337 """
1338 if not self.patient:
1339 return None
1340 dob_str = self.patient.get_dob_str()
1341 if not dob_str:
1342 return None
1343 return dob_str[:11]
1345 def get_patient_sex(self) -> str:
1346 """
1347 Get the patient's sex, or "".
1348 """
1349 return self.patient.get_sex() if self.patient else ""
1351 def get_patient_address(self) -> str:
1352 """
1353 Get the patient's address, or "".
1354 """
1355 return self.patient.get_address() if self.patient else ""
1357 def get_patient_idnum_objects(self) -> List["PatientIdNum"]:
1358 """
1359 Gets all
1360 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum` objects
1361 for the patient.
1362 """
1363 return self.patient.get_idnum_objects() if self.patient else []
1365 def get_patient_idnum_object(
1366 self, which_idnum: int
1367 ) -> Optional["PatientIdNum"]:
1368 """
1369 Get the patient's
1370 :class:`camcops_server.cc_modules.cc_patientidnum.PatientIdNum` for the
1371 specified ID number type (``which_idnum``), or None.
1372 """
1373 return (
1374 self.patient.get_idnum_object(which_idnum)
1375 if self.patient
1376 else None
1377 )
1379 def any_patient_idnums_invalid(self, req: "CamcopsRequest") -> bool:
1380 """
1381 Do we have a patient who has any invalid ID numbers?
1383 Args:
1384 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1385 """
1386 idnums = self.get_patient_idnum_objects()
1387 for idnum in idnums:
1388 if not idnum.is_fully_valid(req):
1389 return True
1390 return False
1392 def get_patient_idnum_value(self, which_idnum: int) -> Optional[int]:
1393 """
1394 Get the patient's ID number value for the specified ID number
1395 type (``which_idnum``), or None.
1396 """
1397 idobj = self.get_patient_idnum_object(which_idnum=which_idnum)
1398 return idobj.idnum_value if idobj else None
1400 def get_patient_hl7_pid_segment(
1401 self, req: "CamcopsRequest", recipient_def: "ExportRecipient"
1402 ) -> Union[hl7.Segment, str]:
1403 """
1404 Get an HL7 PID segment for the patient, or "".
1405 """
1406 return (
1407 self.patient.get_hl7_pid_segment(req, recipient_def)
1408 if self.patient
1409 else ""
1410 )
1412 # -------------------------------------------------------------------------
1413 # HL7 v2
1414 # -------------------------------------------------------------------------
1416 def get_hl7_data_segments(
1417 self, req: "CamcopsRequest", recipient_def: "ExportRecipient"
1418 ) -> List[hl7.Segment]:
1419 """
1420 Returns a list of HL7 data segments.
1422 These will be:
1424 - observation request (OBR) segment
1425 - observation result (OBX) segment
1426 - any extra ones offered by the task
1427 """
1428 obr_segment = make_obr_segment(self)
1429 export_options = recipient_def.get_task_export_options()
1430 obx_segment = make_obx_segment(
1431 req,
1432 self,
1433 task_format=recipient_def.task_format,
1434 observation_identifier=self.tablename + "_" + str(self._pk),
1435 observation_datetime=self.get_creation_datetime(),
1436 responsible_observer=self.get_clinician_name(),
1437 export_options=export_options,
1438 )
1439 return [obr_segment, obx_segment] + self.get_hl7_extra_data_segments(
1440 recipient_def
1441 )
1443 # noinspection PyMethodMayBeStatic,PyUnusedLocal
1444 def get_hl7_extra_data_segments(
1445 self, recipient_def: "ExportRecipient"
1446 ) -> List[hl7.Segment]:
1447 """
1448 Return a list of any extra HL7 data segments. (See
1449 :func:`get_hl7_data_segments`, which calls this function.)
1451 May be overridden.
1452 """
1453 return []
1455 # -------------------------------------------------------------------------
1456 # FHIR: framework
1457 # -------------------------------------------------------------------------
1459 def get_fhir_bundle(
1460 self,
1461 req: "CamcopsRequest",
1462 recipient: "ExportRecipient",
1463 skip_docs_if_other_content: bool = DEBUG_SKIP_FHIR_DOCS,
1464 ) -> Bundle:
1465 """
1466 Get a single FHIR Bundle with all entries. See
1467 :meth:`get_fhir_bundle_entries`.
1468 """
1469 # Get the content:
1470 bundle_entries = self.get_fhir_bundle_entries(
1471 req,
1472 recipient,
1473 skip_docs_if_other_content=skip_docs_if_other_content,
1474 )
1475 # ... may raise FhirExportException
1477 # Sanity checks:
1478 id_counter = Counter() # type: ignore[var-annotated]
1479 for entry in bundle_entries:
1480 assert (
1481 Fc.RESOURCE in entry
1482 ), f"Bundle entry has no resource: {entry}" # just wrong
1483 resource = entry[Fc.RESOURCE]
1484 assert Fc.IDENTIFIER in resource, (
1485 f"Bundle entry has no identifier for its resource: "
1486 f"{resource}"
1487 ) # might succeed, but would insert an unidentified resource
1488 identifier = resource[Fc.IDENTIFIER]
1489 if not isinstance(identifier, list):
1490 identifier = [identifier]
1491 for id_ in identifier:
1492 system = id_[Fc.SYSTEM]
1493 value = id_[Fc.VALUE]
1494 id_counter.update([fhir_system_value(system, value)])
1495 most_common = id_counter.most_common(1)[0]
1496 assert (
1497 most_common[1] == 1
1498 ), f"Resources have duplicate IDs: {most_common[0]}"
1500 # Bundle up the content into a transaction bundle:
1501 return Bundle(
1502 jsondict={Fc.TYPE: Fc.TRANSACTION, Fc.ENTRY: bundle_entries}
1503 )
1504 # This is one of the few FHIR objects that we don't return with
1505 # ".as_json()", because Bundle objects have useful methods for talking
1506 # to the FHIR server.
1508 def get_fhir_bundle_entries(
1509 self,
1510 req: "CamcopsRequest",
1511 recipient: "ExportRecipient",
1512 skip_docs_if_other_content: bool = DEBUG_SKIP_FHIR_DOCS,
1513 ) -> List[Dict]:
1514 """
1515 Get all FHIR bundle entries. This is the "top-level" function to
1516 provide all FHIR information for the task. That information includes:
1518 - the Patient, if applicable;
1519 - the Questionnaire (task) itself;
1520 - multiple QuestionnaireResponse entries for the specific answers from
1521 this task instance.
1523 If the task refuses to support FHIR, raises :exc:`FhirExportException`.
1525 Args:
1526 req:
1527 a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
1528 recipient:
1529 an
1530 :class:`camcops_server.cc_modules.cc_exportrecipient.ExportRecipient`
1531 skip_docs_if_other_content:
1532 A debugging option: skip the document (e.g. PDF, HTML, XML),
1533 making the FHIR output smaller and more legible for debugging.
1534 However, if the task offers no other content, this will raise
1535 :exc:`FhirExportException`.
1536 """
1537 bundle_entries = [] # type: List[Dict]
1539 # Patient (0 or 1)
1540 if self.has_patient:
1541 bundle_entries.append(
1542 self.patient.get_fhir_bundle_entry(req, recipient)
1543 )
1545 # Clinician (0 or 1)
1546 if self.has_clinician:
1547 bundle_entries.append(self._get_fhir_clinician_bundle_entry(req))
1549 # Questionnaire, QuestionnaireResponse
1550 q_bundle_entry, qr_bundle_entry = self._get_fhir_q_qr_bundle_entries(
1551 req, recipient
1552 )
1553 if q_bundle_entry and qr_bundle_entry:
1554 bundle_entries += [
1555 # Questionnaire
1556 q_bundle_entry,
1557 # Collection of QuestionnaireResponse entries
1558 qr_bundle_entry,
1559 ]
1561 # Observation (0 or more) -- includes Coding
1562 bundle_entries += self._get_fhir_detail_bundle_entries(req, recipient)
1564 # DocumentReference (0-1; always 1 in normal use )
1565 if skip_docs_if_other_content:
1566 if not bundle_entries:
1567 # We can't have nothing!
1568 raise FhirExportException(
1569 "Skipping task because DEBUG_SKIP_FHIR_DOCS set and no "
1570 "other content"
1571 )
1572 else:
1573 bundle_entries.append(
1574 self._get_fhir_docref_bundle_entry(req, recipient)
1575 )
1577 return bundle_entries
1579 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1580 # Generic
1581 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1583 @property
1584 def fhir_when_task_created(self) -> str:
1585 """
1586 Time of task creation, in a FHIR-compatible format.
1587 """
1588 return self.when_created.isoformat()
1590 def _get_fhir_detail_bundle_entries(
1591 self, req: "CamcopsRequest", recipient: "ExportRecipient"
1592 ) -> List[Dict]:
1593 """
1594 Returns a list of bundle entries (0-1 of them) for Observation objects,
1595 which may each contain several ObservationComponent objects. This
1596 includes any SNOMED codes offered, and any extras.
1598 See:
1600 - https://www.hl7.org/fhir/terminologies-systems.html
1601 - https://www.hl7.org/fhir/observation.html#code-interop
1602 - https://www.hl7.org/fhir/observation.html#gr-comp
1604 In particular, whether information should be grouped into one
1605 Observation (via ObservationComponent objects) or as separate
1606 observations depends on whether it is conceptually independent. For
1607 example, for BMI, height and weight should be separate.
1608 """
1609 bundle_entries = [] # type: List[Dict]
1611 # SNOMED, as one observation with several components:
1612 if req.snomed_supported:
1613 snomed_components = [] # type: List[Dict]
1614 for expr in self.get_snomed_codes(req):
1615 snomed_components.append(
1616 fhir_observation_component_from_snomed(req, expr)
1617 )
1618 if snomed_components:
1619 observable_entity = req.snomed(SnomedLookup.OBSERVABLE_ENTITY)
1620 snomed_observation = self._get_fhir_observation(
1621 req,
1622 recipient,
1623 obs_dict={
1624 # "code" is mandatory even if there are components.
1625 Fc.CODE: CodeableConcept(
1626 jsondict={
1627 Fc.CODING: [
1628 Coding(
1629 jsondict={
1630 Fc.SYSTEM: Fc.CODE_SYSTEM_SNOMED_CT, # noqa
1631 Fc.CODE: str(
1632 observable_entity.identifier
1633 ),
1634 Fc.DISPLAY: observable_entity.as_string( # noqa
1635 longform=True
1636 ),
1637 Fc.USER_SELECTED: False,
1638 }
1639 ).as_json()
1640 ],
1641 Fc.TEXT: observable_entity.term,
1642 }
1643 ).as_json(),
1644 Fc.COMPONENT: snomed_components,
1645 },
1646 )
1647 bundle_entries.append(
1648 make_fhir_bundle_entry(
1649 resource_type_url=Fc.RESOURCE_TYPE_OBSERVATION,
1650 identifier=self._get_fhir_observation_id(
1651 req, name="snomed"
1652 ),
1653 resource=snomed_observation,
1654 )
1655 )
1657 # Extra -- these can be very varied:
1658 bundle_entries += self.get_fhir_extra_bundle_entries(req, recipient)
1660 # Done
1661 return bundle_entries
1663 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1664 # Identifiers
1665 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1667 # Generic:
1669 def _get_fhir_id_this_task_class(
1670 self,
1671 req: "CamcopsRequest",
1672 route_name: str,
1673 value_within_task_class: Union[int, str],
1674 ) -> Identifier:
1675 """
1676 For when we want to refer to something within a specific task class, in
1677 the abstract. The URL refers to the task class, not the task instance.
1678 """
1679 return Identifier(
1680 jsondict={
1681 Fc.SYSTEM: req.route_url(
1682 route_name,
1683 table_name=self.tablename, # to match ViewParam.TABLE_NAME
1684 ),
1685 Fc.VALUE: str(value_within_task_class),
1686 }
1687 )
1689 def _get_fhir_id_this_task_instance(
1690 self,
1691 req: "CamcopsRequest",
1692 route_name: str,
1693 value_within_task_instance: Union[int, str],
1694 ) -> Identifier:
1695 """
1696 A number of FHIR identifiers refer to "this task" and nothing very much
1697 more specific (because they represent a type of thing of which there
1698 can only be one per task), but do so through a range of different route
1699 names that make the FHIR URLs look sensible. This is a convenience
1700 function for them. The intention is to route to the specific task
1701 instance concerned.
1702 """
1703 return Identifier(
1704 jsondict={
1705 Fc.SYSTEM: req.route_url(
1706 route_name,
1707 table_name=self.tablename, # to match ViewParam.TABLE_NAME
1708 server_pk=str(self._pk), # to match ViewParam.SERVER_PK
1709 ),
1710 Fc.VALUE: str(value_within_task_instance),
1711 }
1712 )
1714 # Specific:
1716 def _get_fhir_condition_id(
1717 self, req: "CamcopsRequest", name: Union[int, str]
1718 ) -> Identifier:
1719 """
1720 Returns a FHIR Identifier for an Observation, representing this task
1721 instance and a named observation within it.
1722 """
1723 return self._get_fhir_id_this_task_instance(
1724 req, Routes.FHIR_CONDITION, name
1725 )
1727 def _get_fhir_docref_id(
1728 self, req: "CamcopsRequest", task_format: str
1729 ) -> Identifier:
1730 """
1731 Returns a FHIR Identifier (e.g. for a DocumentReference collection)
1732 representing the view of this task.
1733 """
1734 return self._get_fhir_id_this_task_instance(
1735 req, Routes.FHIR_DOCUMENT_REFERENCE, task_format
1736 )
1738 def _get_fhir_observation_id(
1739 self, req: "CamcopsRequest", name: str
1740 ) -> Identifier:
1741 """
1742 Returns a FHIR Identifier for an Observation, representing this task
1743 instance and a named observation within it.
1744 """
1745 return self._get_fhir_id_this_task_instance(
1746 req, Routes.FHIR_OBSERVATION, name
1747 )
1749 def _get_fhir_practitioner_id(self, req: "CamcopsRequest") -> Identifier:
1750 """
1751 Returns a FHIR Identifier for the clinician. (Clinicians are not
1752 sensibly made unique across tasks, but are task-specific.)
1753 """
1754 return self._get_fhir_id_this_task_instance(
1755 req,
1756 Routes.FHIR_PRACTITIONER,
1757 Fc.CAMCOPS_VALUE_CLINICIAN_WITHIN_TASK,
1758 )
1760 def _get_fhir_questionnaire_id(self, req: "CamcopsRequest") -> Identifier:
1761 """
1762 Returns a FHIR Identifier (e.g. for a Questionnaire) representing this
1763 task, in the abstract.
1765 Incorporates the CamCOPS version, so that if aspects (even the
1766 formatting of question text) changes, a new version will be stored
1767 despite the "ifNoneExist" clause.
1768 """
1769 return Identifier(
1770 jsondict={
1771 Fc.SYSTEM: req.route_url(Routes.FHIR_QUESTIONNAIRE_SYSTEM),
1772 Fc.VALUE: f"{self.tablename}/{CAMCOPS_SERVER_VERSION_STRING}",
1773 }
1774 )
1776 def _get_fhir_questionnaire_response_id(
1777 self, req: "CamcopsRequest"
1778 ) -> Identifier:
1779 """
1780 Returns a FHIR Identifier (e.g. for a QuestionnaireResponse collection)
1781 representing this task instance. QuestionnaireResponse items are
1782 specific answers, not abstract descriptions.
1783 """
1784 return self._get_fhir_id_this_task_instance(
1785 req,
1786 Routes.FHIR_QUESTIONNAIRE_RESPONSE,
1787 Fc.CAMCOPS_VALUE_QUESTIONNAIRE_RESPONSE_WITHIN_TASK,
1788 )
1790 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1791 # References to identifiers
1792 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1794 def _get_fhir_subject_ref(
1795 self, req: "CamcopsRequest", recipient: "ExportRecipient"
1796 ) -> Dict:
1797 """
1798 Returns a reference to the patient, for "subject" fields.
1799 """
1800 assert (
1801 self.has_patient
1802 ), "Don't call Task._get_fhir_subject_ref() for anonymous tasks"
1803 return self.patient.get_fhir_subject_ref(req, recipient)
1805 def _get_fhir_practitioner_ref(self, req: "CamcopsRequest") -> Dict:
1806 """
1807 Returns a reference to the clinician, for "practitioner" fields.
1808 """
1809 assert self.has_clinician, (
1810 "Don't call Task._get_fhir_clinician_ref() "
1811 "for tasks without a clinician"
1812 )
1813 return FHIRReference(
1814 jsondict={
1815 Fc.TYPE: Fc.RESOURCE_TYPE_PRACTITIONER,
1816 Fc.IDENTIFIER: self._get_fhir_practitioner_id(req).as_json(),
1817 }
1818 ).as_json()
1820 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1821 # DocumentReference
1822 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1824 def _get_fhir_docref_bundle_entry(
1825 self,
1826 req: "CamcopsRequest",
1827 recipient: "ExportRecipient",
1828 text_encoding: str = UTF8,
1829 ) -> Dict:
1830 """
1831 Returns bundle entries for an attached document, which is a full
1832 representation of the task according to the selected task format (e.g.
1833 PDF).
1835 This requires a DocumentReference, which can (in theory) either embed
1836 the data, or refer via a URL to an associated Binary object. We do it
1837 directly.
1839 See:
1841 - https://fhirblog.com/2013/11/06/fhir-and-xds-submitting-a-document-from-a-document-source/
1842 - https://fhirblog.com/2013/11/12/the-fhir-documentreference-resource/
1843 - https://build.fhir.org/ig/HL7/US-Core/StructureDefinition-us-core-documentreference.html
1844 - https://build.fhir.org/ig/HL7/US-Core/clinical-notes-guidance.html
1845 """ # noqa
1847 # Establish content_type and binary_data
1848 task_format = recipient.task_format
1849 if task_format == FileType.PDF:
1850 binary_data = self.get_pdf(req)
1851 content_type = MimeType.PDF
1852 else:
1853 if task_format == FileType.XML:
1854 txt = self.get_xml(
1855 req,
1856 options=TaskExportOptions(
1857 include_blobs=False,
1858 xml_include_ancillary=True,
1859 xml_include_calculated=True,
1860 xml_include_comments=True,
1861 xml_include_patient=True,
1862 xml_include_plain_columns=True,
1863 xml_include_snomed=True,
1864 xml_with_header_comments=True,
1865 ),
1866 )
1867 content_type = MimeType.XML
1868 elif task_format == FileType.HTML:
1869 txt = self.get_html(req)
1870 content_type = MimeType.HTML
1871 else:
1872 raise ValueError(f"Unknown task format: {task_format!r}")
1873 binary_data = txt.encode(text_encoding)
1874 b64_encoded_bytes = b64encode(binary_data) # type: bytes
1875 b64_encoded_str = b64_encoded_bytes.decode(ASCII)
1877 # Build the DocumentReference
1878 docref_id = self._get_fhir_docref_id(req, task_format)
1879 dr_dict = {
1880 # Metadata:
1881 Fc.DATE: self.fhir_when_task_created,
1882 Fc.DESCRIPTION: self.longname(req),
1883 Fc.DOCSTATUS: (
1884 Fc.DOCSTATUS_FINAL
1885 if self.is_finalized()
1886 else Fc.DOCSTATUS_PRELIMINARY
1887 ),
1888 Fc.MASTER_IDENTIFIER: docref_id.as_json(),
1889 Fc.STATUS: Fc.DOCSTATUS_CURRENT,
1890 # And the content:
1891 Fc.CONTENT: [
1892 DocumentReferenceContent(
1893 jsondict={
1894 Fc.ATTACHMENT: Attachment(
1895 jsondict={
1896 Fc.CONTENT_TYPE: content_type,
1897 Fc.DATA: b64_encoded_str,
1898 }
1899 ).as_json()
1900 }
1901 ).as_json()
1902 ],
1903 }
1904 # Optional metadata:
1905 if self.has_clinician:
1906 dr_dict[Fc.AUTHOR] = [self._get_fhir_practitioner_ref(req)]
1907 if self.has_patient:
1908 dr_dict[Fc.SUBJECT] = self._get_fhir_subject_ref(req, recipient)
1910 # DocumentReference
1911 return make_fhir_bundle_entry(
1912 resource_type_url=Fc.RESOURCE_TYPE_DOCUMENT_REFERENCE,
1913 identifier=docref_id,
1914 resource=DocumentReference(jsondict=dr_dict).as_json(),
1915 )
1917 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1918 # Observation
1919 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1921 def _get_fhir_observation(
1922 self,
1923 req: "CamcopsRequest",
1924 recipient: "ExportRecipient",
1925 obs_dict: Dict,
1926 ) -> Dict:
1927 """
1928 Given a starting dictionary for an Observation, complete it for this
1929 task (by adding "when", "who", and status information) and return the
1930 Observation (as a dict in JSON format).
1931 """
1932 obs_dict.update(
1933 {
1934 Fc.EFFECTIVE_DATE_TIME: self.fhir_when_task_created,
1935 Fc.STATUS: (
1936 Fc.OBSSTATUS_FINAL
1937 if self.is_finalized()
1938 else Fc.OBSSTATUS_PRELIMINARY
1939 ),
1940 }
1941 )
1942 if self.has_patient:
1943 obs_dict[Fc.SUBJECT] = self._get_fhir_subject_ref(req, recipient)
1944 return Observation(jsondict=obs_dict).as_json()
1946 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1947 # Practitioner (clinician)
1948 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1950 def _get_fhir_clinician_bundle_entry(self, req: "CamcopsRequest") -> Dict:
1951 """
1952 Supplies information on the clinician associated with this task, as a
1953 FHIR Practitioner object (within a bundle).
1954 """
1955 assert self.has_clinician, (
1956 "Don't call Task._get_fhir_practitioner_bundle_entry() "
1957 "for tasks without a clinician"
1958 )
1959 practitioner = Practitioner(
1960 jsondict={
1961 Fc.NAME: [
1962 HumanName(
1963 jsondict={Fc.TEXT: self.get_clinician_name()}
1964 ).as_json()
1965 ],
1966 # "qualification" is too structured.
1967 # There isn't anywhere to represent our other information, so
1968 # we jam it in to "telecom"/"other".
1969 Fc.TELECOM: [
1970 ContactPoint(
1971 jsondict={
1972 Fc.SYSTEM: Fc.TELECOM_SYSTEM_OTHER,
1973 Fc.VALUE: self.get_clinician_fhir_telecom_other(
1974 req
1975 ),
1976 }
1977 ).as_json()
1978 ],
1979 }
1980 ).as_json()
1981 return make_fhir_bundle_entry(
1982 resource_type_url=Fc.RESOURCE_TYPE_PRACTITIONER,
1983 identifier=self._get_fhir_practitioner_id(req),
1984 resource=practitioner,
1985 )
1987 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1988 # Questionnaire, QuestionnaireResponse
1989 # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
1991 def _get_fhir_q_qr_bundle_entries(
1992 self, req: "CamcopsRequest", recipient: "ExportRecipient"
1993 ) -> Tuple[Optional[Dict], Optional[Dict]]:
1994 """
1995 Get a tuple of FHIR bundles: ``questionnaire_bundle_entry,
1996 questionnaire_response_bundle_entry``.
1998 A Questionnaire object represents the task in the abstract;
1999 QuestionnaireReponse items represent each answered question for a
2000 specific task instance.
2001 """
2002 # Ask the task for its details (which it may provide directly, by
2003 # overriding, or rely on autodiscovery for the default).
2004 aq_items = self.get_fhir_questionnaire(req)
2005 if DEBUG_SHOW_FHIR_QUESTIONNAIRE:
2006 if aq_items:
2007 qa_str = "\n".join(f"- {str(x)}" for x in aq_items)
2008 log.debug(f"FHIR questions/answers:\n{qa_str}")
2009 else:
2010 log.debug("No FHIR questionnaire data")
2012 # Do we have data?
2013 if not aq_items:
2014 return None, None
2016 # Now finish off:
2017 q_items = [aq.questionnaire_item() for aq in aq_items]
2018 qr_items = [aq.questionnaire_response_item() for aq in aq_items]
2019 q_bundle_entry = self._make_fhir_questionnaire_bundle_entry(
2020 req, q_items
2021 )
2022 qr_bundle_entry = self._make_fhir_questionnaire_response_bundle_entry(
2023 req, recipient, qr_items
2024 )
2025 return q_bundle_entry, qr_bundle_entry
2027 def _make_fhir_questionnaire_bundle_entry(
2028 self, req: "CamcopsRequest", q_items: List[Dict]
2029 ) -> Optional[Dict]:
2030 """
2031 Make a FHIR bundle entry describing this task, as a FHIR Questionnaire,
2032 from supplied Questionnaire items. Note: here we mean "abstract task",
2033 not "task instance".
2034 """
2035 # FHIR supports versioning of questionnaires. Might be useful if the
2036 # wording of questions change. Could either use FHIR's version
2037 # field or include the version in the identifier below. Either way
2038 # we'd need the version in the 'ifNoneExist' part of the request.
2039 q_identifier = self._get_fhir_questionnaire_id(req)
2041 # Other things we could add:
2042 # https://www.hl7.org/fhir/questionnaire.html
2043 #
2044 # date: Date last changed
2045 # useContext: https://www.hl7.org/fhir/metadatatypes.html#UsageContext
2046 help_url = self.help_url()
2047 questionnaire = Questionnaire(
2048 jsondict={
2049 Fc.NAME: self.shortname, # Computer-friendly name
2050 Fc.TITLE: self.longname(req), # Human name
2051 Fc.DESCRIPTION: help_url, # Natural language description of the questionnaire # noqa
2052 Fc.COPYRIGHT: help_url, # Use and/or publishing restrictions
2053 Fc.VERSION: CAMCOPS_SERVER_VERSION_STRING,
2054 Fc.STATUS: Fc.QSTATUS_ACTIVE, # Could also be: draft, retired, unknown # noqa
2055 Fc.ITEM: q_items,
2056 }
2057 )
2058 return make_fhir_bundle_entry(
2059 resource_type_url=Fc.RESOURCE_TYPE_QUESTIONNAIRE,
2060 identifier=q_identifier,
2061 resource=questionnaire.as_json(),
2062 )
2064 def _make_fhir_questionnaire_response_bundle_entry(
2065 self,
2066 req: "CamcopsRequest",
2067 recipient: "ExportRecipient",
2068 qr_items: List[Dict],
2069 ) -> Dict:
2070 """
2071 Make a bundle entry from FHIR QuestionnaireResponse items (e.g. one for
2072 the response to each question in a quesionnaire-style task).
2073 """
2074 q_identifier = self._get_fhir_questionnaire_id(req)
2075 qr_identifier = self._get_fhir_questionnaire_response_id(req)
2077 # Status:
2078 # https://www.hl7.org/fhir/valueset-questionnaire-answers-status.html
2079 # It is probably undesirable to export tasks that are incomplete in the
2080 # sense of "not finalized". The user can control this (via the
2081 # FINALIZED_ONLY config option for exports). However, we also need to
2082 # handle finalized but incomplete data.
2083 if self.is_complete():
2084 status = Fc.QSTATUS_COMPLETED
2085 elif self.is_live_on_tablet():
2086 status = Fc.QSTATUS_IN_PROGRESS
2087 else:
2088 # Incomplete, but finalized.
2089 status = Fc.QSTATUS_STOPPED
2091 qr_jsondict = {
2092 # https://r4.smarthealthit.org does not like "questionnaire" in
2093 # this form:
2094 # FHIR Server; FHIR 4.0.0/R4; HAPI FHIR 4.0.0-SNAPSHOT)
2095 # error is:
2096 # Invalid resource reference found at
2097 # path[QuestionnaireResponse.questionnaire]- Resource type is
2098 # unknown or not supported on this server
2099 # - http://127.0.0.1:8000/fhir_questionnaire|phq9
2100 # http://hapi.fhir.org/baseR4/ (4.0.1 (R4)) is OK
2101 Fc.QUESTIONNAIRE: fhir_sysval_from_id(q_identifier),
2102 Fc.AUTHORED: self.fhir_when_task_created,
2103 Fc.STATUS: status,
2104 # TODO: Could also add:
2105 # https://www.hl7.org/fhir/questionnaireresponse.html
2106 # author: Person who received and recorded the answers
2107 # source: The person who answered the questions
2108 Fc.ITEM: qr_items,
2109 }
2111 if self.has_patient:
2112 qr_jsondict[Fc.SUBJECT] = self._get_fhir_subject_ref( # type: ignore[assignment] # noqa: E501
2113 req, recipient
2114 )
2116 return make_fhir_bundle_entry(
2117 resource_type_url=Fc.RESOURCE_TYPE_QUESTIONNAIRE_RESPONSE,
2118 identifier=qr_identifier,
2119 resource=QuestionnaireResponse(qr_jsondict).as_json(),
2120 identifier_is_list=False,
2121 )
2123 # -------------------------------------------------------------------------
2124 # FHIR: functions to override if desired
2125 # -------------------------------------------------------------------------
2127 def get_fhir_questionnaire(
2128 self, req: "CamcopsRequest"
2129 ) -> List[FHIRAnsweredQuestion]:
2130 """
2131 Return FHIR information about a questionnaire: both about the task in
2132 the abstract (the questions) and the answers for this specific
2133 instance.
2135 May be overridden.
2136 """
2137 return self._fhir_autodiscover(req)
2139 def get_fhir_extra_bundle_entries(
2140 self, req: "CamcopsRequest", recipient: "ExportRecipient"
2141 ) -> List[Dict]:
2142 """
2143 Return a list of extra FHIR bundle entries, if relevant. (SNOMED-CT
2144 codes are done automatically; don't repeat those.)
2145 """
2146 return []
2148 def get_qtext(self, req: "CamcopsRequest", attrname: str) -> Optional[str]:
2149 """
2150 Returns the text associated with a particular question.
2151 The default implementation is a guess.
2153 Args:
2154 req:
2155 A :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`.
2156 attrname:
2157 Name of the attribute (field) on this task that represents the
2158 question.
2159 """
2160 return self.xstring(req, attrname, provide_default_if_none=False)
2162 def get_atext(
2163 self, req: "CamcopsRequest", attrname: str, answer_value: int
2164 ) -> Optional[str]:
2165 """
2166 Returns the text associated with a particular answer to a question.
2167 The default implementation is a guess.
2169 Args:
2170 req:
2171 A :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`.
2172 attrname:
2173 Name of the attribute (field) on this task that represents the
2174 question.
2175 answer_value:
2176 Answer value.
2177 """
2178 stringname = f"{attrname}_a{answer_value}"
2179 return self.xstring(req, stringname, provide_default_if_none=False)
2181 # -------------------------------------------------------------------------
2182 # FHIR automatic interrogation
2183 # -------------------------------------------------------------------------
2185 def _fhir_autodiscover(
2186 self, req: "CamcopsRequest"
2187 ) -> List[FHIRAnsweredQuestion]:
2188 """
2189 Inspect this task instance and create information about both the task
2190 in the abstract and the answers for this specific instance.
2191 """
2192 qa_items = [] # type: List[FHIRAnsweredQuestion]
2194 skip_fields = TASK_FREQUENT_FIELDS
2195 for attrname, column in gen_columns(self):
2196 if attrname in skip_fields:
2197 continue
2198 comment = column.comment
2199 coltype = column.type
2201 # Question text:
2202 retrieved_qtext = self.get_qtext(req, attrname)
2203 qtext_components = []
2204 if retrieved_qtext:
2205 qtext_components.append(retrieved_qtext)
2206 if comment:
2207 qtext_components.append(f"[{comment}]")
2208 if not qtext_components:
2209 qtext_components = (attrname,) # type: ignore[assignment]
2210 if not qtext_components:
2211 qtext_components = (FHIR_UNKNOWN_TEXT,) # type: ignore[assignment] # noqa: E501
2212 qtext = " ".join(qtext_components)
2213 # Note that it's good to get the column comment in somewhere; these
2214 # often explain the meaning of the field quite well. It may or may
2215 # not be possible to get it into the option values -- many answer
2216 # types don't permit those. QuestionnaireItem records don't have a
2217 # comment field (see
2218 # https://www.hl7.org/fhir/questionnaire-definitions.html#Questionnaire.item), # noqa
2219 # so the best we can do is probably to stuff it into the question
2220 # text, even if that causes some visual duplication.
2222 # Thinking about types:
2223 int_type = isinstance(coltype, Integer)
2224 bool_type = (
2225 is_sqlatype_binary(coltype)
2226 or isinstance(coltype, Boolean)
2227 # For booleans represented as integers: it is better to be as
2228 # constraining as possible and say that only 0/1 options are
2229 # present by marking these as Boolean, which is less
2230 # complicated for the recipient than "integer but with possible
2231 # options 0 or 1". We will *also* show the possible options,
2232 # just to be clear.
2233 )
2234 if int_type:
2235 qtype = FHIRQuestionType.INTEGER
2236 atype = FHIRAnswerType.INTEGER
2237 elif isinstance(coltype, String): # includes its subclass, Text
2238 qtype = FHIRQuestionType.STRING
2239 atype = FHIRAnswerType.STRING
2240 elif isinstance(coltype, Numeric): # includes Float, Decimal
2241 qtype = FHIRQuestionType.QUANTITY
2242 atype = FHIRAnswerType.QUANTITY
2243 elif isinstance(
2244 coltype, (DateTime, PendulumDateTimeAsIsoTextColType)
2245 ):
2246 qtype = FHIRQuestionType.DATETIME
2247 atype = FHIRAnswerType.DATETIME
2248 elif isinstance(coltype, DateColType):
2249 qtype = FHIRQuestionType.DATE
2250 atype = FHIRAnswerType.DATE
2251 elif isinstance(coltype, Time):
2252 qtype = FHIRQuestionType.TIME
2253 atype = FHIRAnswerType.TIME
2254 elif bool_type:
2255 qtype = FHIRQuestionType.BOOLEAN
2256 atype = FHIRAnswerType.BOOLEAN
2257 else:
2258 raise NotImplementedError(f"Unknown column type: {coltype!r}")
2260 # Thinking about MCQ options:
2261 answer_options = None # type: Optional[Dict[Any, str]]
2262 if (int_type or bool_type) and hasattr(
2263 column, COLATTR_PERMITTED_VALUE_CHECKER
2264 ):
2265 pvc = getattr(
2266 column, COLATTR_PERMITTED_VALUE_CHECKER
2267 ) # type: PermittedValueChecker
2268 if pvc is not None:
2269 pv = pvc.permitted_values_inc_minmax()
2270 if pv:
2271 qtype = FHIRQuestionType.CHOICE
2272 # ... has to be of type "choice" to transmit the
2273 # possible values.
2274 answer_options = {}
2275 for v in pv:
2276 answer_options[v] = (
2277 self.get_atext(req, attrname, v)
2278 or comment
2279 or FHIR_UNKNOWN_TEXT
2280 )
2282 # Assemble:
2283 qa_items.append(
2284 FHIRAnsweredQuestion(
2285 qname=attrname,
2286 qtext=qtext,
2287 qtype=qtype,
2288 answer_type=atype,
2289 answer=getattr(self, attrname),
2290 answer_options=answer_options,
2291 )
2292 )
2294 # We don't currently put any summary information into FHIR exports. I
2295 # think that isn't within the spirit of the system, but am not sure.
2296 # todo: Check if summary information should go into FHIR exports.
2298 return qa_items
2300 # -------------------------------------------------------------------------
2301 # Export (generically)
2302 # -------------------------------------------------------------------------
2304 def cancel_from_export_log(
2305 self, req: "CamcopsRequest", from_console: bool = False
2306 ) -> None:
2307 """
2308 Marks all instances of this task as "cancelled" in the export log, so
2309 it will be resent.
2310 """
2311 if self._pk is None:
2312 return
2313 from camcops_server.cc_modules.cc_exportmodels import (
2314 ExportedTask,
2315 ) # delayed import
2317 # noinspection PyUnresolvedReferences
2318 statement = (
2319 update(ExportedTask.__table__) # type: ignore[arg-type]
2320 .where(ExportedTask.basetable == self.tablename)
2321 .where(ExportedTask.task_server_pk == self._pk)
2322 .where(
2323 not_(ExportedTask.cancelled) | ExportedTask.cancelled.is_(None)
2324 )
2325 .values(cancelled=1, cancelled_at_utc=req.now_utc)
2326 )
2327 # ... this bit: ... AND (NOT cancelled OR cancelled IS NULL) ...:
2328 # https://stackoverflow.com/questions/37445041/sqlalchemy-how-to-filter-column-which-contains-both-null-and-integer-values # noqa
2329 req.dbsession.execute(statement)
2330 self.audit(
2331 req,
2332 "Task cancelled in export log (may trigger resending)",
2333 from_console,
2334 )
2336 # -------------------------------------------------------------------------
2337 # Audit
2338 # -------------------------------------------------------------------------
2340 def audit(
2341 self, req: "CamcopsRequest", details: str, from_console: bool = False
2342 ) -> None:
2343 """
2344 Audits actions to this task.
2345 """
2346 audit(
2347 req,
2348 details,
2349 patient_server_pk=self.get_patient_server_pk(),
2350 table=self.tablename,
2351 server_pk=self._pk,
2352 from_console=from_console,
2353 )
2355 # -------------------------------------------------------------------------
2356 # Erasure (wiping, leaving record as placeholder)
2357 # -------------------------------------------------------------------------
2359 def manually_erase(self, req: "CamcopsRequest") -> None:
2360 """
2361 Manually erases a task (including sub-tables).
2362 Also erases linked non-current records.
2363 This WIPES THE CONTENTS but LEAVES THE RECORD AS A PLACEHOLDER.
2365 Audits the erasure. Propagates erase through to the HL7 log, so those
2366 records will be re-sent. WRITES TO DATABASE.
2367 """
2368 # Erase ourself and any other in our "family"
2369 for task in self.get_lineage():
2370 task.manually_erase_with_dependants(req)
2371 # Audit and clear HL7 message log
2372 self.audit(req, "Task details erased manually")
2373 self.cancel_from_export_log(req)
2375 def is_erased(self) -> bool:
2376 """
2377 Has the task been manually erased? See :func:`manually_erase`.
2378 """
2379 return self._manually_erased
2381 # -------------------------------------------------------------------------
2382 # Complete deletion
2383 # -------------------------------------------------------------------------
2385 def delete_entirely(self, req: "CamcopsRequest") -> None:
2386 """
2387 Completely delete this task, its lineage, and its dependants.
2388 """
2389 for task in self.get_lineage():
2390 task.delete_with_dependants(req)
2391 self.audit(req, "Task deleted")
2393 # -------------------------------------------------------------------------
2394 # Filtering tasks for the task list
2395 # -------------------------------------------------------------------------
2397 @classmethod
2398 def gen_text_filter_columns(
2399 cls,
2400 ) -> Generator[Tuple[str, Column], None, None]:
2401 """
2402 Yields tuples of ``attrname, column``, for columns that are suitable
2403 for text filtering.
2404 """
2405 for attrname, column in gen_columns(cls):
2406 if attrname.startswith("_"): # system field
2407 continue
2408 if not is_sqlatype_string(column.type):
2409 continue
2410 yield attrname, column
2412 @classmethod
2413 @cache_region_static.cache_on_arguments(function_key_generator=fkg)
2414 def get_text_filter_columns(cls) -> List[Column]:
2415 """
2416 Cached function to return a list of SQLAlchemy Column objects suitable
2417 for text filtering.
2418 """
2419 return [col for _, col in cls.gen_text_filter_columns()]
2421 def contains_text(self, text: str) -> bool:
2422 """
2423 Does this task contain the specified text?
2425 Args:
2426 text:
2427 string that must be present in at least one of our text
2428 columns
2430 Returns:
2431 is the strings present?
2432 """
2433 text = text.lower()
2434 for attrname, _ in self.gen_text_filter_columns():
2435 value = getattr(self, attrname)
2436 if value is None:
2437 continue
2438 assert isinstance(value, str), "Internal bug in contains_text"
2439 if text in value.lower():
2440 return True
2441 return False
2443 def contains_all_strings(self, strings: List[str]) -> bool:
2444 """
2445 Does this task contain all the specified strings?
2447 Args:
2448 strings:
2449 list of strings; each string must be present in at least
2450 one of our text columns
2452 Returns:
2453 are all strings present?
2454 """
2455 return all(self.contains_text(text) for text in strings)
2457 # -------------------------------------------------------------------------
2458 # Spreadsheet export for basic research dump
2459 # -------------------------------------------------------------------------
2461 def get_spreadsheet_pages(
2462 self, req: "CamcopsRequest"
2463 ) -> List["SpreadsheetPage"]:
2464 """
2465 Returns information used for the basic research dump in (e.g.) TSV
2466 format.
2467 """
2468 # 1. Our core fields, plus summary information
2469 main_page = self._get_core_spreadsheet_page(req)
2471 # 2. Patient details.
2472 if self.patient:
2473 main_page.add_or_set_columns_from_page(
2474 self.patient.get_spreadsheet_page(req)
2475 )
2476 pages = [main_page]
2478 # 3. +/- Ancillary objects
2479 for (
2480 ancillary
2481 ) in self.gen_ancillary_instances(): # type: GenericTabletRecordMixin
2482 page = ancillary._get_core_spreadsheet_page(req)
2483 pages.append(page)
2485 # 4. +/- Extra summary tables (inc. SNOMED)
2486 for est in self.get_all_summary_tables(req):
2487 pages.append(est.get_spreadsheet_page())
2489 # Done
2490 return pages
2492 def get_spreadsheet_schema_elements(
2493 self, req: "CamcopsRequest"
2494 ) -> Set[SummarySchemaInfo]:
2495 """
2496 Returns schema information used for spreadsheets -- more than just
2497 the database columns, and in the same format as the spreadsheets.
2498 """
2499 table_name = self.__tablename__
2501 # 1(a). Database columns: main table
2502 items = self._get_core_spreadsheet_schema()
2503 # 1(b). Summary information.
2504 for summary in self.get_summaries(req):
2505 items.add(
2506 SummarySchemaInfo.from_summary_element(table_name, summary)
2507 )
2509 # 2. Patient details
2510 if self.patient:
2511 items.update(
2512 self.patient.get_spreadsheet_schema_elements(req, table_name)
2513 )
2515 # 3. Ancillary objects
2516 for (
2517 ancillary
2518 ) in self.gen_ancillary_instances(): # type: GenericTabletRecordMixin
2519 items.update(ancillary._get_core_spreadsheet_schema())
2521 # 4. Extra summary tables
2522 for est in self.get_all_summary_tables(req):
2523 items.update(est.get_spreadsheet_schema_elements())
2525 return items
2527 # -------------------------------------------------------------------------
2528 # XML view
2529 # -------------------------------------------------------------------------
2531 def get_xml(
2532 self,
2533 req: "CamcopsRequest",
2534 options: TaskExportOptions = None,
2535 indent_spaces: int = 4,
2536 eol: str = "\n",
2537 ) -> str:
2538 """
2539 Returns XML describing the task.
2541 Args:
2542 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2543 options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions`
2545 indent_spaces: number of spaces to indent formatted XML
2546 eol: end-of-line string
2548 Returns:
2549 an XML UTF-8 document representing the task.
2551 """ # noqa
2552 options = options or TaskExportOptions()
2553 tree = self.get_xml_root(req=req, options=options)
2554 return get_xml_document(
2555 tree,
2556 indent_spaces=indent_spaces,
2557 eol=eol,
2558 include_comments=options.xml_include_comments,
2559 )
2561 def get_xml_root(
2562 self, req: "CamcopsRequest", options: TaskExportOptions
2563 ) -> XmlElement:
2564 """
2565 Returns an XML tree. The return value is the root
2566 :class:`camcops_server.cc_modules.cc_xml.XmlElement`.
2568 Override to include other tables, or to deal with BLOBs, if the default
2569 methods are insufficient.
2571 Args:
2572 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2573 options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions`
2574 """ # noqa
2575 # Core (inc. core BLOBs)
2576 branches = self._get_xml_core_branches(req=req, options=options)
2577 tree = XmlElement(name=self.tablename, value=branches)
2578 return tree
2580 def _get_xml_core_branches(
2581 self, req: "CamcopsRequest", options: TaskExportOptions = None
2582 ) -> List[XmlElement]:
2583 """
2584 Returns a list of :class:`camcops_server.cc_modules.cc_xml.XmlElement`
2585 elements representing stored, calculated, patient, and/or BLOB fields,
2586 depending on the options.
2588 Args:
2589 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2590 options: a :class:`camcops_server.cc_modules.cc_simpleobjects.TaskExportOptions`
2591 """ # noqa
2592 options = options or TaskExportOptions(
2593 xml_include_plain_columns=True,
2594 xml_include_ancillary=True,
2595 include_blobs=False,
2596 xml_include_calculated=True,
2597 xml_include_patient=True,
2598 xml_include_snomed=True,
2599 )
2601 def add_comment(comment: XmlLiteral) -> None:
2602 if options.xml_with_header_comments:
2603 branches.append(comment)
2605 # Stored values +/- calculated values
2606 core_options = options.clone()
2607 core_options.include_blobs = False
2608 branches = self._get_xml_branches(req=req, options=core_options)
2610 # SNOMED-CT codes
2611 if options.xml_include_snomed and req.snomed_supported:
2612 add_comment(XML_COMMENT_SNOMED_CT)
2613 snomed_codes = self.get_snomed_codes(req)
2614 snomed_branches = [] # type: List[XmlElement]
2615 for code in snomed_codes:
2616 snomed_branches.append(code.xml_element())
2617 branches.append(
2618 XmlElement(name=XML_NAME_SNOMED_CODES, value=snomed_branches)
2619 )
2621 # Special notes
2622 add_comment(XML_COMMENT_SPECIAL_NOTES)
2623 for sn in self.special_notes:
2624 branches.append(sn.get_xml_root())
2626 # Patient details
2627 if self.is_anonymous:
2628 add_comment(XML_COMMENT_ANONYMOUS)
2629 elif options.xml_include_patient:
2630 add_comment(XML_COMMENT_PATIENT)
2631 patient_options = TaskExportOptions(
2632 xml_include_plain_columns=True,
2633 xml_with_header_comments=options.xml_with_header_comments,
2634 )
2635 if self.patient:
2636 branches.append(
2637 self.patient.get_xml_root(req, patient_options)
2638 )
2640 # BLOBs
2641 if options.include_blobs:
2642 add_comment(XML_COMMENT_BLOBS)
2643 blob_options = TaskExportOptions(
2644 include_blobs=True,
2645 xml_skip_fields=options.xml_skip_fields,
2646 xml_sort_by_name=True,
2647 xml_with_header_comments=False,
2648 )
2649 branches += self._get_xml_branches(req=req, options=blob_options)
2651 # Ancillary objects
2652 if options.xml_include_ancillary:
2653 ancillary_options = TaskExportOptions(
2654 xml_include_plain_columns=True,
2655 xml_include_ancillary=True,
2656 include_blobs=options.include_blobs,
2657 xml_include_calculated=options.xml_include_calculated,
2658 xml_sort_by_name=True,
2659 xml_with_header_comments=options.xml_with_header_comments,
2660 )
2661 item_collections = [] # type: List[XmlElement]
2662 found_ancillary = False
2663 # We use a slightly more manual iteration process here so that
2664 # we iterate through individual ancillaries but clustered by their
2665 # name (e.g. if we have 50 trials and 5 groups, we do them in
2666 # collections).
2667 for attrname, rel_prop, rel_cls in gen_ancillary_relationships(
2668 self
2669 ):
2670 if not found_ancillary:
2671 add_comment(XML_COMMENT_ANCILLARY)
2672 found_ancillary = True
2673 itembranches = [] # type: List[XmlElement]
2674 if rel_prop.uselist:
2675 ancillaries = getattr(
2676 self, attrname
2677 ) # type: List[GenericTabletRecordMixin]
2678 else:
2679 ancillaries = [ # type: ignore[no-redef]
2680 getattr(self, attrname)
2681 ] # type: List[GenericTabletRecordMixin]
2682 for ancillary in ancillaries:
2683 itembranches.append(
2684 ancillary._get_xml_root(
2685 req=req, options=ancillary_options
2686 )
2687 )
2688 itemcollection = XmlElement(name=attrname, value=itembranches)
2689 item_collections.append(itemcollection)
2690 item_collections.sort(key=lambda el: el.name)
2691 branches += item_collections
2693 # Completely separate additional summary tables
2694 if options.xml_include_calculated:
2695 item_collections = [] # type: ignore[no-redef]
2696 found_est = False
2697 for est in self.get_extra_summary_tables(req):
2698 # ... not get_all_summary_tables(); we handled SNOMED
2699 # differently, above
2700 if not found_est and est.rows:
2701 add_comment(XML_COMMENT_CALCULATED)
2702 found_est = True
2703 item_collections.append(est.get_xml_element())
2704 item_collections.sort(key=lambda el: el.name)
2705 branches += item_collections
2707 return branches
2709 # -------------------------------------------------------------------------
2710 # HTML view
2711 # -------------------------------------------------------------------------
2713 def get_html(self, req: "CamcopsRequest", anonymise: bool = False) -> str:
2714 """
2715 Returns HTML representing the task, for our HTML view.
2717 Args:
2718 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2719 anonymise: hide patient identifying details?
2720 """
2721 req.prepare_for_html_figures()
2722 return render(
2723 "task.mako",
2724 dict(
2725 task=self,
2726 anonymise=anonymise,
2727 signature=False,
2728 viewtype=ViewArg.HTML,
2729 ),
2730 request=req,
2731 )
2733 def title_for_html(
2734 self, req: "CamcopsRequest", anonymise: bool = False
2735 ) -> str:
2736 """
2737 Returns the plain text used for the HTML ``<title>`` block (by
2738 ``task.mako``), and also for the PDF title for PDF exports.
2740 Should be plain text only.
2742 Args:
2743 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2744 anonymise: hide patient identifying details?
2745 """
2746 if anonymise:
2747 patient = "?"
2748 elif self.patient:
2749 patient = self.patient.prettystr(req)
2750 else:
2751 _ = req.gettext
2752 patient = _("Anonymous")
2753 tasktype = self.tablename
2754 when = format_datetime(
2755 self.get_creation_datetime(),
2756 DateFormat.ISO8601_HUMANIZED_TO_MINUTES,
2757 "",
2758 )
2759 return f"CamCOPS: {patient}; {tasktype}; {when}"
2761 # -------------------------------------------------------------------------
2762 # PDF view
2763 # -------------------------------------------------------------------------
2765 def get_pdf(self, req: "CamcopsRequest", anonymise: bool = False) -> bytes:
2766 """
2767 Returns a PDF representing the task.
2769 Args:
2770 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2771 anonymise: hide patient identifying details?
2772 """
2773 html = self.get_pdf_html(req, anonymise=anonymise) # main content
2774 if CSS_PAGED_MEDIA:
2775 return pdf_from_html(req, html=html)
2776 else:
2777 return pdf_from_html(
2778 req,
2779 html=html,
2780 header_html=render(
2781 "wkhtmltopdf_header.mako",
2782 dict(
2783 inner_text=render(
2784 "task_page_header.mako",
2785 dict(task=self, anonymise=anonymise),
2786 request=req,
2787 )
2788 ),
2789 request=req,
2790 ),
2791 footer_html=render(
2792 "wkhtmltopdf_footer.mako",
2793 dict(
2794 inner_text=render(
2795 "task_page_footer.mako",
2796 dict(task=self),
2797 request=req,
2798 )
2799 ),
2800 request=req,
2801 ),
2802 extra_wkhtmltopdf_options={
2803 "orientation": (
2804 "Landscape"
2805 if self.use_landscape_for_pdf
2806 else "Portrait"
2807 )
2808 },
2809 )
2811 def get_pdf_html(
2812 self, req: "CamcopsRequest", anonymise: bool = False
2813 ) -> str:
2814 """
2815 Gets the HTML used to make the PDF (slightly different from the HTML
2816 used for the HTML view).
2817 """
2818 req.prepare_for_pdf_figures()
2819 return render(
2820 "task.mako",
2821 dict(
2822 task=self,
2823 anonymise=anonymise,
2824 pdf_landscape=self.use_landscape_for_pdf,
2825 signature=self.has_clinician,
2826 viewtype=ViewArg.PDF,
2827 ),
2828 request=req,
2829 )
2831 def suggested_pdf_filename(
2832 self, req: "CamcopsRequest", anonymise: bool = False
2833 ) -> str:
2834 """
2835 Suggested filename for the PDF copy (for downloads).
2837 Args:
2838 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2839 anonymise: hide patient identifying details?
2840 """
2841 cfg = req.config
2842 if anonymise:
2843 is_anonymous = True
2844 else:
2845 is_anonymous = self.is_anonymous
2846 patient = self.patient
2847 return get_export_filename(
2848 req=req,
2849 patient_spec_if_anonymous=cfg.patient_spec_if_anonymous,
2850 patient_spec=cfg.patient_spec,
2851 filename_spec=cfg.task_filename_spec,
2852 filetype=ViewArg.PDF,
2853 is_anonymous=is_anonymous,
2854 surname=patient.get_surname() if patient else "",
2855 forename=patient.get_forename() if patient else "",
2856 dob=patient.get_dob() if patient else None,
2857 sex=patient.get_sex() if patient else None,
2858 idnum_objects=patient.get_idnum_objects() if patient else None,
2859 creation_datetime=self.get_creation_datetime(),
2860 basetable=self.tablename,
2861 serverpk=self._pk,
2862 )
2864 def write_pdf_to_disk(self, req: "CamcopsRequest", filename: str) -> None:
2865 """
2866 Writes the PDF to disk, using ``filename``.
2867 """
2868 pdffile = open(filename, "wb")
2869 pdffile.write(self.get_pdf(req))
2871 # -------------------------------------------------------------------------
2872 # Metadata for e.g. RiO
2873 # -------------------------------------------------------------------------
2875 def get_rio_metadata(
2876 self,
2877 req: "CamcopsRequest",
2878 which_idnum: int,
2879 uploading_user_id: str,
2880 document_type: str,
2881 ) -> str:
2882 """
2883 Returns metadata for the task that Servelec's RiO electronic patient
2884 record may want.
2886 Args:
2887 req: a :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
2888 which_idnum: which CamCOPS ID number type corresponds to the RiO
2889 client ID?
2890 uploading_user_id: RiO user ID (string) of the user who will
2891 be recorded as uploading this information; see below
2892 document_type: a string indicating the RiO-defined document type
2893 (this is system-specific); see below
2895 Returns:
2896 a newline-terminated single line of CSV values; see below
2898 Called by
2899 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task`.
2901 From Servelec (Lee Meredith) to Rudolf Cardinal, 2014-12-04:
2903 .. code-block:: none
2905 Batch Document Upload
2907 The RiO batch document upload function can be used to upload
2908 documents in bulk automatically. RiO includes a Batch Upload
2909 windows service which monitors a designated folder for new files.
2910 Each file which is scanned must be placed in the designated folder
2911 along with a meta-data file which describes the document. So
2912 essentially if a document had been scanned in and was called
2913 ‘ThisIsANewReferralLetterForAPatient.pdf’ then there would also
2914 need to be a meta file in the same folder called
2915 ‘ThisIsANewReferralLetterForAPatient.metadata’. The contents of
2916 the meta file would need to include the following:
2918 Field Order; Field Name; Description; Data Mandatory (Y/N);
2919 Format
2921 1; ClientID; RiO Client ID which identifies the patient in RiO
2922 against which the document will be uploaded.; Y; 15
2923 Alphanumeric Characters
2925 2; UserID; User ID of the uploaded document, this is any user
2926 defined within the RiO system and can be a single system user
2927 called ‘AutomaticDocumentUploadUser’ for example.; Y; 10
2928 Alphanumeric Characters
2930 [NB example longer than that!]
2932 3; DocumentType; The RiO defined document type eg: APT; Y; 80
2933 Alphanumeric Characters
2935 4; Title; The title of the document; N; 40 Alphanumeric
2936 Characters
2938 5; Description; The document description.; N; 500 Alphanumeric
2939 Characters
2941 6; Author; The author of the document; N; 80 Alphanumeric
2942 Characters
2944 7; DocumentDate; The date of the document; N; dd/MM/yyyy HH:mm
2946 8; FinalRevision; The revision values are 0 Draft or 1 Final,
2947 this is defaulted to 1 which is Final revision.; N; 0 or 1
2949 As an example, this is what would be needed in a meta file:
2951 “1000001”,”TRUST1”,”APT”,”A title”, “A description of the
2952 document”, “An author”,”01/12/2012 09:45”,”1”
2954 (on one line)
2956 Clarification, from Lee Meredith to Rudolf Cardinal, 2015-02-18:
2958 - metadata files must be plain ASCII, not UTF-8
2960 - ... here and
2961 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task`
2963 - line terminator is <CR>
2965 - BUT see
2966 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task`
2968 - user name limit is 10 characters, despite incorrect example
2970 - search for ``RIO_MAX_USER_LEN``
2972 - DocumentType is a code that maps to a human-readable document
2973 type; for example, "APT" might map to "Appointment Letter". These
2974 mappings are specific to the local system. (We will probably want
2975 one that maps to "Clinical Correspondence" in the absence of
2976 anything more specific.)
2978 - RiO will delete the files after it's processed them.
2980 - Filenames should avoid spaces, but otherwise any other standard
2981 ASCII code is fine within filenames.
2983 - see
2984 :meth:`camcops_server.cc_modules.cc_exportmodels.ExportedTaskFileGroup.export_task`
2986 """
2988 try:
2989 client_id = str(self.patient.get_idnum_value(which_idnum))
2990 except AttributeError:
2991 client_id = ""
2992 title = "CamCOPS_" + self.shortname
2993 description = self.longname(req)
2994 author = self.get_clinician_name() # may be blank
2995 document_date = format_datetime(
2996 self.when_created, DateFormat.RIO_EXPORT_UK
2997 )
2998 # This STRIPS the timezone information; i.e. it is in the local
2999 # timezone but doesn't tell you which timezone that is. (That's fine;
3000 # it should be local or users would be confused.)
3001 final_revision = 0 if self.is_live_on_tablet() else 1
3003 item_list = [
3004 client_id,
3005 uploading_user_id,
3006 document_type,
3007 title,
3008 description,
3009 author,
3010 document_date,
3011 final_revision,
3012 ]
3013 # UTF-8 is NOT supported by RiO for metadata. So:
3014 csv_line = ",".join(
3015 [f'"{mangle_unicode_to_ascii(x)}"' for x in item_list]
3016 )
3017 return csv_line + "\n"
3019 # -------------------------------------------------------------------------
3020 # HTML elements used by tasks
3021 # -------------------------------------------------------------------------
3023 # noinspection PyMethodMayBeStatic
3024 def get_standard_clinician_comments_block(
3025 self, req: "CamcopsRequest", comments: str
3026 ) -> str:
3027 """
3028 HTML DIV for clinician's comments.
3029 """
3030 return render(
3031 "clinician_comments.mako", dict(comment=comments), request=req
3032 )
3034 def get_is_complete_td_pair(self, req: "CamcopsRequest") -> str:
3035 """
3036 HTML to indicate whether task is complete or not, and to make it
3037 very obvious visually when it isn't.
3038 """
3039 c = self.is_complete()
3040 td_class = "" if c else f' class="{CssClass.INCOMPLETE}"'
3041 return (
3042 f"<td>Completed?</td>"
3043 f"<td{td_class}><b>{get_yes_no(req, c)}</b></td>"
3044 )
3046 def get_is_complete_tr(self, req: "CamcopsRequest") -> str:
3047 """
3048 HTML table row to indicate whether task is complete or not, and to
3049 make it very obvious visually when it isn't.
3050 """
3051 return f"<tr>{self.get_is_complete_td_pair(req)}</tr>"
3053 def get_twocol_val_row(
3054 self, fieldname: str, default: str = None, label: str = None
3055 ) -> str:
3056 """
3057 HTML table row, two columns, without web-safing of value.
3059 Args:
3060 fieldname: field (attribute) name; the value will be retrieved
3061 from this attribute
3062 default: default to show if the value is ``None``
3063 label: descriptive label
3065 Returns:
3066 two-column HTML table row (label, value)
3068 """
3069 val = getattr(self, fieldname)
3070 if val is None:
3071 val = default
3072 if label is None:
3073 label = fieldname
3074 return tr_qa(label, val)
3076 def get_twocol_string_row(self, fieldname: str, label: str = None) -> str:
3077 """
3078 HTML table row, two columns, with web-safing of value.
3080 Args:
3081 fieldname: field (attribute) name; the value will be retrieved
3082 from this attribute
3083 label: descriptive label
3085 Returns:
3086 two-column HTML table row (label, value)
3087 """
3088 if label is None:
3089 label = fieldname
3090 return tr_qa(label, getattr(self, fieldname))
3092 def get_twocol_bool_row(
3093 self, req: "CamcopsRequest", fieldname: str, label: str = None
3094 ) -> str:
3095 """
3096 HTML table row, two columns, with Boolean Y/N formatter for value.
3098 Args:
3099 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3100 fieldname: field (attribute) name; the value will be retrieved
3101 from this attribute
3102 label: descriptive label
3104 Returns:
3105 two-column HTML table row (label, value)
3106 """
3107 if label is None:
3108 label = fieldname
3109 return tr_qa(label, get_yes_no_none(req, getattr(self, fieldname)))
3111 def get_twocol_bool_row_true_false(
3112 self, req: "CamcopsRequest", fieldname: str, label: str = None
3113 ) -> str:
3114 """
3115 HTML table row, two columns, with Boolean true/false formatter for
3116 value.
3118 Args:
3119 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3120 fieldname: field (attribute) name; the value will be retrieved
3121 from this attribute
3122 label: descriptive label
3124 Returns:
3125 two-column HTML table row (label, value)
3126 """
3127 if label is None:
3128 label = fieldname
3129 return tr_qa(label, get_true_false_none(req, getattr(self, fieldname)))
3131 def get_twocol_bool_row_present_absent(
3132 self, req: "CamcopsRequest", fieldname: str, label: str = None
3133 ) -> str:
3134 """
3135 HTML table row, two columns, with Boolean present/absent formatter for
3136 value.
3138 Args:
3139 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3140 fieldname: field (attribute) name; the value will be retrieved
3141 from this attribute
3142 label: descriptive label
3144 Returns:
3145 two-column HTML table row (label, value)
3146 """
3147 if label is None:
3148 label = fieldname
3149 return tr_qa(
3150 label, get_present_absent_none(req, getattr(self, fieldname))
3151 )
3153 @staticmethod
3154 def get_twocol_picture_row(blob: Optional[Blob], label: str) -> str:
3155 """
3156 HTML table row, two columns, with PNG on right.
3158 Args:
3159 blob: the :class:`camcops_server.cc_modules.cc_blob.Blob` object
3160 label: descriptive label
3162 Returns:
3163 two-column HTML table row (label, picture)
3164 """
3165 return tr(label, get_blob_img_html(blob))
3167 # -------------------------------------------------------------------------
3168 # Field helper functions for subclasses
3169 # -------------------------------------------------------------------------
3171 def get_values(self, fields: List[str]) -> List:
3172 """
3173 Get list of object's values from list of field names.
3174 """
3175 return [getattr(self, f) for f in fields]
3177 def is_field_not_none(self, field: str) -> bool:
3178 """
3179 Is the field not None?
3180 """
3181 return getattr(self, field) is not None
3183 def any_fields_none(self, fields: List[str]) -> bool:
3184 """
3185 Are any specified fields None?
3186 """
3187 for f in fields:
3188 if getattr(self, f) is None:
3189 return True
3190 return False
3192 def all_fields_not_none(self, fields: List[str]) -> bool:
3193 """
3194 Are all specified fields not None?
3195 """
3196 return not self.any_fields_none(fields)
3198 def any_fields_null_or_empty_str(self, fields: List[str]) -> bool:
3199 """
3200 Are any specified fields either None or the empty string?
3201 """
3202 for f in fields:
3203 v = getattr(self, f)
3204 if v is None or v == "":
3205 return True
3206 return False
3208 def are_all_fields_not_null_or_empty_str(self, fields: List[str]) -> bool:
3209 """
3210 Are all specified fields neither None nor the empty string?
3211 """
3212 return not self.any_fields_null_or_empty_str(fields)
3214 def n_fields_not_none(self, fields: List[str]) -> int:
3215 """
3216 How many of the specified fields are not None?
3217 """
3218 total = 0
3219 for f in fields:
3220 if getattr(self, f) is not None:
3221 total += 1
3222 return total
3224 def n_fields_none(self, fields: List[str]) -> int:
3225 """
3226 How many of the specified fields are None?
3227 """
3228 total = 0
3229 for f in fields:
3230 if getattr(self, f) is None:
3231 total += 1
3232 return total
3234 def count_booleans(self, fields: List[str]) -> int:
3235 """
3236 How many of the specified fields evaluate to True (are truthy)?
3237 """
3238 total = 0
3239 for f in fields:
3240 value = getattr(self, f)
3241 if value:
3242 total += 1
3243 return total
3245 def all_truthy(self, fields: List[str]) -> bool:
3246 """
3247 Do all the specified fields evaluate to True (are they all truthy)?
3248 """
3249 for f in fields:
3250 value = getattr(self, f)
3251 if not value:
3252 return False
3253 return True
3255 def count_where(self, fields: List[str], wherevalues: List[Any]) -> int:
3256 """
3257 Count how many values for the specified fields are in ``wherevalues``.
3258 """
3259 return sum(1 for x in self.get_values(fields) if x in wherevalues)
3261 def count_wherenot(self, fields: List[str], notvalues: List[Any]) -> int:
3262 """
3263 Count how many values for the specified fields are NOT in
3264 ``notvalues``.
3265 """
3266 return sum(1 for x in self.get_values(fields) if x not in notvalues)
3268 @staticmethod
3269 def sum_values(
3270 values: List[Union[int, float, None]],
3271 ignorevalues: List[Union[int, float, None]] = None,
3272 ) -> Union[int, float, None]:
3273 """
3274 Sum the values provided (skipping any whose value is
3275 in ``ignorevalues``, which defaults to [None]). Returns None on error.
3276 """
3277 if ignorevalues is None: # don't bool-test it; [] is valid
3278 ignorevalues = [None]
3279 filtered_values = [v for v in values if v not in ignorevalues]
3280 try:
3281 return sum(filtered_values)
3282 except TypeError:
3283 return None
3285 def sum_fields(
3286 self,
3287 fields: List[str],
3288 ignorevalues: List[Union[int, float, None]] = None,
3289 ) -> Union[int, float, None]:
3290 """
3291 Sum values stored in all specified fields (skipping any whose value is
3292 in ``ignorevalues``, which defaults to [None]). Returns None on error.
3293 """
3294 values = [getattr(self, f) for f in fields]
3295 return self.sum_values(values, ignorevalues=ignorevalues)
3297 @staticmethod
3298 def mean_values(
3299 values: List[Any], ignorevalues: List[Union[int, float, None]] = None
3300 ) -> Union[int, float, None]:
3301 """
3302 Return the mean of the values provided (skipping any whose value is
3303 in ``ignorevalues``, which defaults to [None]). Returns None on error.
3304 """
3305 if ignorevalues is None: # don't bool-test it; [] is valid
3306 ignorevalues = [None]
3307 filtered_values = [v for v in values if v not in ignorevalues]
3308 try:
3309 return statistics.mean(filtered_values)
3310 except (TypeError, statistics.StatisticsError):
3311 return None
3313 def mean_fields(
3314 self,
3315 fields: List[str],
3316 ignorevalues: List[Union[int, float, None]] = None,
3317 ) -> Union[int, float, None]:
3318 """
3319 Return the mean of the values stored in all specified fields (skipping
3320 any whose value is in ``ignorevalues``, which defaults to [None]).
3321 Returns None on error.
3322 """
3323 raw_values = [getattr(self, f) for f in fields]
3324 return self.mean_values(raw_values, ignorevalues=ignorevalues)
3326 @staticmethod
3327 def fieldnames_from_prefix(prefix: str, start: int, end: int) -> List[str]:
3328 """
3329 Returns a list of field (column, attribute) names from a prefix.
3330 For example, ``fieldnames_from_prefix("q", 1, 5)`` produces
3331 ``["q1", "q2", "q3", "q4", "q5"]``.
3333 Args:
3334 prefix: string prefix
3335 start: first value (inclusive)
3336 end: last value (inclusive
3338 Returns:
3339 list of fieldnames, as above
3341 """
3342 return [prefix + str(x) for x in range(start, end + 1)]
3344 @staticmethod
3345 def fieldnames_from_list(
3346 prefix: str, suffixes: Iterable[Any]
3347 ) -> List[str]:
3348 """
3349 Returns a list of fieldnames made by appending each suffix to the
3350 prefix.
3352 Args:
3353 prefix: string prefix
3354 suffixes: list of suffixes, which will be coerced to ``str``
3356 Returns:
3357 list of fieldnames, as above
3359 """
3360 return [prefix + str(x) for x in suffixes]
3362 # -------------------------------------------------------------------------
3363 # Extra strings
3364 # -------------------------------------------------------------------------
3366 @classmethod
3367 def get_extrastring_taskname(cls) -> str:
3368 """
3369 Get the taskname used as the top-level key for this task's extra
3370 strings (loaded by the server from XML files). By default, this is the
3371 task's primary tablename, but tasks may override that via
3372 ``extrastring_taskname``.
3373 """
3374 return cls.extrastring_taskname or cls.tablename
3376 @classmethod
3377 def extrastrings_exist(cls, req: "CamcopsRequest") -> bool:
3378 """
3379 Does the server have any extra strings for this task?
3380 """
3381 return req.task_extrastrings_exist(cls.get_extrastring_taskname())
3383 @classmethod
3384 def wxstring(
3385 cls,
3386 req: "CamcopsRequest",
3387 name: str,
3388 defaultvalue: str = None,
3389 provide_default_if_none: bool = True,
3390 ) -> str:
3391 """
3392 Return a web-safe version of an extra string for this task.
3394 Args:
3395 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3396 name: name (second-level key) of the string, within the set of
3397 this task's extra strings
3398 defaultvalue: default to return if the string is not found
3399 provide_default_if_none: if ``True`` and ``default is None``,
3400 return a helpful missing-string message in the style
3401 "string x.y not found"
3402 """
3403 if defaultvalue is None and provide_default_if_none:
3404 defaultvalue = f"[{cls.get_extrastring_taskname()}: {name}]"
3405 return req.wxstring(
3406 cls.get_extrastring_taskname(),
3407 name,
3408 defaultvalue,
3409 provide_default_if_none=provide_default_if_none,
3410 )
3412 @classmethod
3413 def xstring(
3414 cls,
3415 req: "CamcopsRequest",
3416 name: str,
3417 defaultvalue: str = None,
3418 provide_default_if_none: bool = True,
3419 ) -> str:
3420 """
3421 Return a raw (not necessarily web-safe) version of an extra string for
3422 this task.
3424 Args:
3425 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3426 name: name (second-level key) of the string, within the set of
3427 this task's extra strings
3428 defaultvalue: default to return if the string is not found
3429 provide_default_if_none: if ``True`` and ``default is None``,
3430 return a helpful missing-string message in the style
3431 "string x.y not found"
3432 """
3433 if defaultvalue is None and provide_default_if_none:
3434 defaultvalue = f"[{cls.get_extrastring_taskname()}: {name}]"
3435 return req.xstring(
3436 cls.get_extrastring_taskname(),
3437 name,
3438 defaultvalue,
3439 provide_default_if_none=provide_default_if_none,
3440 )
3442 @classmethod
3443 def make_options_from_xstrings(
3444 cls,
3445 req: "CamcopsRequest",
3446 prefix: str,
3447 first: int,
3448 last: int,
3449 suffix: str = "",
3450 ) -> Dict[int, str]:
3451 """
3452 Creates a lookup dictionary from xstrings.
3454 Args:
3455 req: :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
3456 prefix: prefix for xstring
3457 first: first value
3458 last: last value
3459 suffix: optional suffix
3461 Returns:
3462 dict: Each entry maps ``value`` to an xstring named
3463 ``<PREFIX><VALUE><SUFFIX>``.
3465 """
3466 d = {} # type: Dict[int, str]
3467 if first > last: # descending order
3468 for i in range(first, last - 1, -1):
3469 d[i] = cls.xstring(req, f"{prefix}{i}{suffix}")
3470 else: # ascending order
3471 for i in range(first, last + 1):
3472 d[i] = cls.xstring(req, f"{prefix}{i}{suffix}")
3473 return d
3475 @staticmethod
3476 def make_options_from_numbers(first: int, last: int) -> Dict[int, str]:
3477 """
3478 Creates a simple dictionary mapping numbers to string versions of those
3479 numbers. Usually for subsequent (more interesting) processing!
3481 Args:
3482 first: first value
3483 last: last value
3485 Returns:
3486 dict
3488 """
3489 d = {} # type: Dict[int, str]
3490 if first > last: # descending order
3491 for i in range(first, last - 1, -1):
3492 d[i] = str(i)
3493 else: # ascending order
3494 for i in range(first, last + 1):
3495 d[i] = str(i)
3496 return d
3499# =============================================================================
3500# Collating all task tables for specific purposes
3501# =============================================================================
3502# Function, staticmethod, classmethod?
3503# https://stackoverflow.com/questions/8108688/in-python-when-should-i-use-a-function-instead-of-a-method # noqa
3504# https://stackoverflow.com/questions/11788195/module-function-vs-staticmethod-vs-classmethod-vs-no-decorators-which-idiom-is # noqa
3505# https://stackoverflow.com/questions/15017734/using-static-methods-in-python-best-practice # noqa
3508def all_task_tables_with_min_client_version() -> Dict[str, Version]:
3509 """
3510 Across all tasks, return a mapping from each of their tables to the
3511 minimum client version.
3513 Used by
3514 :func:`camcops_server.cc_modules.client_api.all_tables_with_min_client_version`.
3516 """
3517 d = {} # type: Dict[str, Version]
3518 classes = list(Task.gen_all_subclasses())
3519 for cls in classes:
3520 d.update(cls.all_tables_with_min_client_version()) # type: ignore[attr-defined] # noqa: E501
3521 return d
3524@cache_region_static.cache_on_arguments(function_key_generator=fkg)
3525def tablename_to_task_class_dict() -> Dict[str, Type[Task]]:
3526 """
3527 Returns a mapping from task base tablenames to task classes.
3528 """
3529 d = {} # type: Dict[str, Type[Task]]
3530 for cls in Task.gen_all_subclasses():
3531 d[cls.tablename] = cls # type: ignore[attr-defined]
3532 return d
3535@cache_region_static.cache_on_arguments(function_key_generator=fkg)
3536def all_task_tablenames() -> List[str]:
3537 """
3538 Returns all task base table names.
3539 """
3540 d = tablename_to_task_class_dict()
3541 return list(d.keys())
3544@cache_region_static.cache_on_arguments(function_key_generator=fkg)
3545def all_task_classes() -> List[Type[Task]]:
3546 """
3547 Returns all task base table names.
3548 """
3549 d = tablename_to_task_class_dict()
3550 return list(d.values())
3553# =============================================================================
3554# Support functions
3555# =============================================================================
3558def get_from_dict(d: Dict, key: Any, default: Any = INVALID_VALUE) -> Any:
3559 """
3560 Returns a value from a dictionary. This is not a very complex function...
3561 all it really does in practice is provide a default for ``default``.
3563 Args:
3564 d: the dictionary
3565 key: the key
3566 default: value to return if none is provided
3567 """
3568 return d.get(key, default)