Coverage for tasks/diagnosis.py: 45%
398 statements
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
« prev ^ index » next coverage.py v7.9.2, created at 2025-07-15 14:23 +0100
1"""
2camcops_server/tasks/diagnosis.py
4===============================================================================
6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry.
7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk).
9 This file is part of CamCOPS.
11 CamCOPS is free software: you can redistribute it and/or modify
12 it under the terms of the GNU General Public License as published by
13 the Free Software Foundation, either version 3 of the License, or
14 (at your option) any later version.
16 CamCOPS is distributed in the hope that it will be useful,
17 but WITHOUT ANY WARRANTY; without even the implied warranty of
18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
19 GNU General Public License for more details.
21 You should have received a copy of the GNU General Public License
22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
24===============================================================================
26"""
28from abc import ABC, ABCMeta
29import datetime
30import logging
31from typing import Any, Dict, List, Optional, Sequence, Type, TYPE_CHECKING
33from cardinal_pythonlib.classes import classproperty
34from cardinal_pythonlib.colander_utils import get_child_node, OptionalIntNode
35from cardinal_pythonlib.datetimefunc import pendulum_date_to_datetime_date
36from cardinal_pythonlib.logs import BraceStyleAdapter
37import cardinal_pythonlib.rnc_web as ws
38from cardinal_pythonlib.sqlalchemy.dump import get_literal_query
39from colander import Invalid, SchemaNode, SequenceSchema, String
40from fhirclient.models.annotation import Annotation
41from fhirclient.models.codeableconcept import CodeableConcept
42from fhirclient.models.coding import Coding
43from fhirclient.models.condition import Condition
44import hl7
45from pyramid.renderers import render_to_response
46from pyramid.response import Response
47from sqlalchemy import CompoundSelect, Select
48from sqlalchemy.orm import Mapped, mapped_column
49from sqlalchemy.sql.expression import (
50 and_,
51 exists,
52 literal,
53 not_,
54 or_,
55 select,
56 union,
57)
58from sqlalchemy.sql.sqltypes import Date, UnicodeText
60from camcops_server.cc_modules.cc_constants import CssClass, FHIRConst as Fc
61from camcops_server.cc_modules.cc_ctvinfo import CtvInfo
62from camcops_server.cc_modules.cc_db import (
63 ancillary_relationship,
64 GenericTabletRecordMixin,
65 TaskDescendant,
66)
67from camcops_server.cc_modules.cc_fhir import make_fhir_bundle_entry
68from camcops_server.cc_modules.cc_forms import (
69 LinkingIdNumSelector,
70 or_join_description,
71 ReportParamSchema,
72 RequestAwareMixin,
73)
74from camcops_server.cc_modules.cc_hl7 import make_dg1_segment
75from camcops_server.cc_modules.cc_html import answer, tr
76from camcops_server.cc_modules.cc_nlp import guess_name_components
77from camcops_server.cc_modules.cc_patient import Patient
78from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
79from camcops_server.cc_modules.cc_pyramid import CamcopsPage, ViewParam
80from camcops_server.cc_modules.cc_task import (
81 Task,
82 TaskHasClinicianMixin,
83 TaskHasPatientMixin,
84)
85from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
86from camcops_server.cc_modules.cc_request import CamcopsRequest
87from camcops_server.cc_modules.cc_report import Report
88from camcops_server.cc_modules.cc_snomed import (
89 SnomedConcept,
90 SnomedExpression,
91 SnomedFocusConcept,
92)
93from camcops_server.cc_modules.cc_sqlalchemy import Base
94from camcops_server.cc_modules.cc_sqla_coltypes import (
95 DiagnosticCodeColType,
96 mapped_camcops_column,
97)
98from camcops_server.cc_modules.cc_validators import (
99 validate_restricted_sql_search_literal,
100)
102if TYPE_CHECKING:
103 from sqlalchemy.sql.elements import ColumnElement
105log = BraceStyleAdapter(logging.getLogger(__name__))
107# =============================================================================
108# Helpers
109# =============================================================================
111FK_COMMENT = "FK to parent table"
114# =============================================================================
115# DiagnosisBase
116# =============================================================================
119class DiagnosisItemBase(GenericTabletRecordMixin, Base):
120 __abstract__ = True
122 # noinspection PyMethodParameters
123 seqnum: Mapped[int] = mapped_column(
124 "seqnum",
125 comment="Sequence number (consistently 1-based as of 2018-12-01)",
126 )
128 # noinspection PyMethodParameters
129 code: Mapped[Optional[str]] = mapped_column(
130 "code",
131 DiagnosticCodeColType,
132 comment="Diagnostic code",
133 )
135 # noinspection PyMethodParameters
136 description: Mapped[Optional[str]] = mapped_camcops_column(
137 "description",
138 UnicodeText,
139 exempt_from_anonymisation=True,
140 comment="Description of the diagnostic code",
141 )
143 # noinspection PyMethodParameters
144 comment: Mapped[Optional[str]] = mapped_column(
145 "comment",
146 UnicodeText,
147 comment="Clinician's comment",
148 )
150 def get_html_table_row(self) -> str:
151 return tr(
152 self.seqnum,
153 answer(ws.webify(self.code)),
154 answer(ws.webify(self.description)),
155 answer(ws.webify(self.comment)),
156 )
158 def get_code_for_hl7(self) -> str:
159 # Normal format is to strip out periods, e.g. "F20.0" becomes "F200"
160 if not self.code:
161 return ""
162 return self.code.replace(".", "").upper()
164 def get_text_for_hl7(self) -> str:
165 return self.description or ""
167 def is_empty(self) -> bool:
168 return not bool(self.code)
170 def human(self) -> str:
171 suffix = f" [{self.comment}]" if self.comment else ""
172 return f"{self.code}: {self.description}{suffix}"
175class DiagnosisBase( # type: ignore[misc]
176 TaskHasClinicianMixin,
177 TaskHasPatientMixin,
178 Task,
179 ABC,
180 metaclass=ABCMeta,
181):
182 __abstract__ = True
184 # noinspection PyMethodParameters
185 relates_to_date: Mapped[Optional[datetime.date]] = mapped_column(
186 "relates_to_date",
187 Date,
188 comment="Date that diagnoses relate to",
189 )
191 items = None # type: List[DiagnosisItemBase]
192 # ... must be overridden by a relationship
194 hl7_coding_system = "?"
196 def get_num_items(self) -> int:
197 return len(self.items)
199 def is_complete(self) -> bool:
200 if self.relates_to_date is None:
201 return False
202 if self.get_num_items() == 0:
203 return False
204 for item in self.items: # type: DiagnosisItemBase
205 if item.is_empty():
206 return False
207 return True
209 def get_task_html(self, req: CamcopsRequest) -> str:
210 html = f"""
211 <div class="{CssClass.SUMMARY}">
212 <table class="{CssClass.SUMMARY}">
213 {self.get_is_complete_tr(req)}
214 </table>
215 </div>
216 <table class="{CssClass.TASKDETAIL}">
217 <tr>
218 <th width="10%">Diagnosis #</th>
219 <th width="10%">Code</th>
220 <th width="40%">Description</th>
221 <th width="40%">Comment</th>
222 </tr>
223 """
224 for item in self.items:
225 html += item.get_html_table_row()
226 html += """
227 </table>
228 """
229 return html
231 def get_clinical_text(self, req: CamcopsRequest) -> List[CtvInfo]:
232 infolist = []
233 for item in self.items:
234 infolist.append(
235 CtvInfo(
236 content=(
237 f"<b>{ws.webify(item.code)}</b>: "
238 f"{ws.webify(item.description)}"
239 )
240 )
241 )
242 return infolist
244 # noinspection PyUnusedLocal
245 def get_hl7_extra_data_segments(
246 self, recipient_def: ExportRecipient
247 ) -> List[hl7.Segment]:
248 segments = []
249 clinician = guess_name_components(self.clinician_name)
250 for i in range(len(self.items)):
251 set_id = i + 1 # make it 1-based, not 0-based
252 item = self.items[i]
253 segments.append(
254 make_dg1_segment(
255 set_id=set_id,
256 diagnosis_datetime=self.get_creation_datetime(),
257 coding_system=self.hl7_coding_system,
258 diagnosis_identifier=item.get_code_for_hl7(),
259 diagnosis_text=item.get_text_for_hl7(),
260 clinician_surname=clinician.get("surname") or "",
261 clinician_forename=clinician.get("forename") or "",
262 clinician_prefix=clinician.get("prefix") or "",
263 attestation_datetime=self.get_creation_datetime(),
264 )
265 )
266 return segments
268 def _get_fhir_extra_bundle_entries_for_system(
269 self, req: CamcopsRequest, recipient: ExportRecipient, system: str
270 ) -> List[Dict]:
271 bundle_entries = [] # type: List[Dict]
272 for item in self.items:
273 display = item.human()
274 condition_dict = {
275 Fc.CODE: CodeableConcept(
276 jsondict={
277 Fc.CODING: [
278 Coding(
279 jsondict={
280 Fc.SYSTEM: system,
281 Fc.CODE: item.code,
282 Fc.DISPLAY: display,
283 Fc.USER_SELECTED: True,
284 }
285 ).as_json()
286 ],
287 Fc.TEXT: display,
288 }
289 ).as_json(),
290 Fc.SUBJECT: self._get_fhir_subject_ref(req, recipient),
291 Fc.RECORDER: self._get_fhir_practitioner_ref(req),
292 }
293 if item.comment:
294 condition_dict[Fc.NOTE] = [
295 Annotation(
296 jsondict={
297 Fc.AUTHOR_REFERENCE: self._get_fhir_practitioner_ref( # noqa
298 req
299 ),
300 Fc.AUTHOR_STRING: self.get_clinician_name(),
301 Fc.TEXT: item.comment,
302 Fc.TIME: self.fhir_when_task_created,
303 }
304 ).as_json()
305 ]
306 bundle_entry = make_fhir_bundle_entry(
307 resource_type_url=Fc.RESOURCE_TYPE_CONDITION,
308 identifier=self._get_fhir_condition_id(req, item.seqnum),
309 resource=Condition(jsondict=condition_dict).as_json(),
310 )
311 bundle_entries.append(bundle_entry)
312 return bundle_entries
315# =============================================================================
316# DiagnosisIcd10
317# =============================================================================
320class DiagnosisIcd10Item(DiagnosisItemBase, TaskDescendant):
321 __tablename__ = "diagnosis_icd10_item"
323 diagnosis_icd10_id: Mapped[int] = mapped_column(comment=FK_COMMENT)
325 # -------------------------------------------------------------------------
326 # TaskDescendant overrides
327 # -------------------------------------------------------------------------
329 @classmethod
330 def task_ancestor_class(cls) -> Optional[Type["Task"]]:
331 return DiagnosisIcd10
333 def task_ancestor(self) -> Optional["DiagnosisIcd10"]:
334 return DiagnosisIcd10.get_linked(self.diagnosis_icd10_id, self) # type: ignore[return-value] # noqa: E501
337class DiagnosisIcd10(DiagnosisBase):
338 """
339 Server implementation of the Diagnosis/ICD-10 task.
340 """
342 __tablename__ = "diagnosis_icd10"
343 info_filename_stem = "icd"
345 items = ancillary_relationship( # type: ignore[assignment]
346 parent_class_name="DiagnosisIcd10",
347 ancillary_class_name="DiagnosisIcd10Item",
348 ancillary_fk_to_parent_attr_name="diagnosis_icd10_id",
349 ancillary_order_by_attr_name="seqnum",
350 ) # type: List[DiagnosisIcd10Item]
352 shortname = "Diagnosis_ICD10"
353 dependent_classes = [DiagnosisIcd10Item]
354 hl7_coding_system = "I10"
355 # Page A-129 of
356 # https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf
358 @staticmethod
359 def longname(req: "CamcopsRequest") -> str:
360 _ = req.gettext
361 return _("Diagnostic codes, ICD-10")
363 def get_snomed_codes(
364 self, req: CamcopsRequest, fallback: bool = True
365 ) -> List[SnomedExpression]:
366 """
367 Returns all SNOMED-CT codes for this task.
369 Args:
370 req: the
371 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
372 fallback: for example, if F32.10 is unknown, should we fall back to
373 F32.1?
375 Returns:
376 a list of
377 :class:`camcops_server.cc_modules.cc_snomed.SnomedExpression`
378 objects
379 """
380 if not req.icd10_snomed_supported:
381 return []
382 snomed_codes = [] # type: List[SnomedExpression]
383 for item in self.items:
384 concepts = self._get_snomed_concepts(item.code, req, fallback)
385 if not concepts:
386 continue
387 focusconcept = SnomedFocusConcept(concepts)
388 snomed_codes.append(SnomedExpression(focusconcept))
389 return snomed_codes
391 @staticmethod
392 def _get_snomed_concepts(
393 icd10_code: str, req: CamcopsRequest, fallback: bool = True
394 ) -> List[SnomedConcept]:
395 """
396 Internal function to return :class:`SnomedConcept` objects for an
397 ICD-10 code.
399 Args:
400 icd10_code: the ICD-10 code
401 req: the
402 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
403 fallback: for example, if F32.10 is unknown, should we fall back to
404 F32.1?
406 Returns:
407 list: of :class:`SnomedConcept` objects
409 """
410 concepts = [] # type: List[SnomedConcept]
411 while icd10_code:
412 try:
413 concepts = req.icd10_snomed(icd10_code)
414 except KeyError: # no known code
415 pass
416 if concepts or not fallback:
417 return concepts
418 # Now fall back
419 icd10_code = icd10_code[:-1]
420 # Run out of code
421 return concepts
423 def get_fhir_extra_bundle_entries(
424 self, req: CamcopsRequest, recipient: ExportRecipient
425 ) -> List[Dict]:
426 return self._get_fhir_extra_bundle_entries_for_system(
427 req, recipient, Fc.CODE_SYSTEM_ICD10
428 )
431# =============================================================================
432# DiagnosisIcd9CM
433# =============================================================================
436class DiagnosisIcd9CMItem(DiagnosisItemBase, TaskDescendant):
437 __tablename__ = "diagnosis_icd9cm_item"
439 diagnosis_icd9cm_id: Mapped[int] = mapped_column(comment=FK_COMMENT)
441 # -------------------------------------------------------------------------
442 # TaskDescendant overrides
443 # -------------------------------------------------------------------------
445 @classmethod
446 def task_ancestor_class(cls) -> Optional[Type["Task"]]:
447 return DiagnosisIcd9CM
449 def task_ancestor(self) -> Optional["DiagnosisIcd9CM"]:
450 return DiagnosisIcd9CM.get_linked(self.diagnosis_icd9cm_id, self) # type: ignore[return-value] # noqa: E501
453class DiagnosisIcd9CM(DiagnosisBase):
454 """
455 Server implementation of the Diagnosis/ICD-9-CM task.
456 """
458 __tablename__ = "diagnosis_icd9cm"
459 info_filename_stem = "icd"
461 items = ancillary_relationship( # type: ignore[assignment]
462 parent_class_name="DiagnosisIcd9CM",
463 ancillary_class_name="DiagnosisIcd9CMItem",
464 ancillary_fk_to_parent_attr_name="diagnosis_icd9cm_id",
465 ancillary_order_by_attr_name="seqnum",
466 ) # type: List[DiagnosisIcd9CMItem]
468 shortname = "Diagnosis_ICD9CM"
469 dependent_classes = [DiagnosisIcd9CMItem]
470 hl7_coding_system = "I9CM"
471 # Page A-129 of
472 # https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf
474 @staticmethod
475 def longname(req: "CamcopsRequest") -> str:
476 _ = req.gettext
477 return _("Diagnostic codes, ICD-9-CM (DSM-IV-TR)")
479 def get_snomed_codes(self, req: CamcopsRequest) -> List[SnomedExpression]:
480 if not req.icd9cm_snomed_supported:
481 return []
482 snomed_codes = [] # type: List[SnomedExpression]
483 # noinspection PyTypeChecker
484 for item in self.items:
485 try:
486 concepts = req.icd9cm_snomed(item.code)
487 except KeyError: # no known code
488 continue
489 if not concepts:
490 continue
491 focusconcept = SnomedFocusConcept(concepts)
492 snomed_codes.append(SnomedExpression(focusconcept))
493 return snomed_codes
495 def get_fhir_extra_bundle_entries(
496 self, req: CamcopsRequest, recipient: ExportRecipient
497 ) -> List[Dict]:
498 return self._get_fhir_extra_bundle_entries_for_system(
499 req, recipient, Fc.CODE_SYSTEM_ICD9_CM
500 )
503# =============================================================================
504# Reports
505# =============================================================================
507# -----------------------------------------------------------------------------
508# Helpers
509# -----------------------------------------------------------------------------
511ORDER_BY = [
512 "surname",
513 "forename",
514 "dob",
515 "sex",
516 "when_created",
517 "system",
518 "code",
519]
522# noinspection PyProtectedMember,PyUnresolvedReferences
523def get_diagnosis_report_query(
524 req: CamcopsRequest,
525 diagnosis_class: Type[DiagnosisBase],
526 item_class: Type[DiagnosisItemBase],
527 item_fk_fieldname: str,
528 system: str,
529) -> Select[Any]:
530 # SELECT surname, forename, dob, sex, ...
531 select_fields: list[ColumnElement[Any]] = [
532 Patient.surname.label("surname"),
533 Patient.forename.label("forename"),
534 Patient.dob.label("dob"),
535 Patient.sex.label("sex"),
536 ]
537 from_clause = (
538 # FROM patient
539 Patient.__table__
540 # INNER JOIN dxset ON (dxtable.patient_id == patient.id AND ...)
541 .join(
542 diagnosis_class.__table__,
543 and_(
544 diagnosis_class.patient_id == Patient.id,
545 diagnosis_class._device_id == Patient._device_id,
546 diagnosis_class._era == Patient._era,
547 ),
548 )
549 # INNER JOIN dxrow ON (dxrow.fk_dxset = dxset.pk AND ...)
550 .join(
551 item_class.__table__,
552 and_(
553 getattr(item_class, item_fk_fieldname) == diagnosis_class.id,
554 item_class._device_id == diagnosis_class._device_id,
555 item_class._era == diagnosis_class._era,
556 ),
557 )
558 )
559 for iddef in req.idnum_definitions:
560 n = iddef.which_idnum
561 desc = iddef.short_description
562 aliased_table = PatientIdNum.__table__.alias(f"i{n}")
563 # ... [also] SELECT i1.idnum_value AS 'NHS' (etc.)
564 select_fields.append(aliased_table.c.idnum_value.label(desc))
565 # ... [from] OUTER JOIN patientidnum AS i1 ON (...)
566 from_clause = from_clause.outerjoin(
567 aliased_table,
568 and_(
569 aliased_table.c.patient_id == Patient.id,
570 aliased_table.c._device_id == Patient._device_id,
571 aliased_table.c._era == Patient._era,
572 # Note: the following are part of the JOIN, not the WHERE:
573 # (or failure to match a row will wipe out the Patient from the
574 # OUTER JOIN):
575 aliased_table.c._current == True, # noqa: E712
576 aliased_table.c.which_idnum == n,
577 ),
578 )
579 select_fields += [
580 diagnosis_class.when_created.label("when_created"),
581 literal(system).label("system"),
582 item_class.code.label("code"),
583 item_class.description.label("description"),
584 ]
585 # WHERE...
586 wheres = [
587 Patient._current == True, # noqa: E712
588 diagnosis_class._current == True, # noqa: E712
589 item_class._current == True, # noqa: E712
590 ]
591 if not req.user.superuser:
592 # Restrict to accessible groups
593 group_ids = req.user.ids_of_groups_user_may_report_on
594 wheres.append(diagnosis_class._group_id.in_(group_ids))
595 # Helpfully, SQLAlchemy will render this as "... AND 1 != 1" if we
596 # pass an empty list to in_().
597 query = (
598 select(*select_fields).select_from(from_clause).where(and_(*wheres))
599 )
600 return query
603def get_diagnosis_report(
604 req: CamcopsRequest,
605 diagnosis_class: Type[DiagnosisBase],
606 item_class: Type[DiagnosisItemBase],
607 item_fk_fieldname: str,
608 system: str,
609) -> Select[Any]:
610 query = get_diagnosis_report_query(
611 req, diagnosis_class, item_class, item_fk_fieldname, system
612 )
613 query = query.order_by(*ORDER_BY)
614 return query
617# -----------------------------------------------------------------------------
618# Plain "all diagnoses" reports
619# -----------------------------------------------------------------------------
622class DiagnosisICD9CMReport(Report):
623 """Report to show ICD-9-CM (DSM-IV-TR) diagnoses."""
625 # noinspection PyMethodParameters
626 @classproperty
627 def report_id(cls) -> str:
628 return "diagnoses_icd9cm"
630 @classmethod
631 def title(cls, req: "CamcopsRequest") -> str:
632 _ = req.gettext
633 return _(
634 "Diagnosis – ICD-9-CM (DSM-IV-TR) diagnoses for all " "patients"
635 )
637 # noinspection PyMethodParameters
638 @classproperty
639 def superuser_only(cls) -> bool:
640 return False
642 def get_query(self, req: CamcopsRequest) -> Select[Any]:
643 return get_diagnosis_report(
644 req,
645 diagnosis_class=DiagnosisIcd9CM,
646 item_class=DiagnosisIcd9CMItem,
647 item_fk_fieldname="diagnosis_icd9cm_id",
648 system="ICD-9-CM",
649 )
652class DiagnosisICD10Report(Report):
653 """Report to show ICD-10 diagnoses."""
655 # noinspection PyMethodParameters
656 @classproperty
657 def report_id(cls) -> str:
658 return "diagnoses_icd10"
660 @classmethod
661 def title(cls, req: "CamcopsRequest") -> str:
662 _ = req.gettext
663 return _("Diagnosis – ICD-10 diagnoses for all patients")
665 # noinspection PyMethodParameters
666 @classproperty
667 def superuser_only(cls) -> bool:
668 return False
670 def get_query(self, req: CamcopsRequest) -> Select[Any]:
671 return get_diagnosis_report(
672 req,
673 diagnosis_class=DiagnosisIcd10,
674 item_class=DiagnosisIcd10Item,
675 item_fk_fieldname="diagnosis_icd10_id",
676 system="ICD-10",
677 )
680class DiagnosisAllReport(Report):
681 """Report to show all diagnoses."""
683 # noinspection PyMethodParameters
684 @classproperty
685 def report_id(cls) -> str:
686 return "diagnoses_all"
688 @classmethod
689 def title(cls, req: "CamcopsRequest") -> str:
690 _ = req.gettext
691 return _("Diagnosis – All diagnoses for all patients")
693 # noinspection PyMethodParameters
694 @classproperty
695 def superuser_only(cls) -> bool:
696 return False
698 def get_query(self, req: CamcopsRequest) -> CompoundSelect[Any]:
699 sql_icd9cm = get_diagnosis_report_query(
700 req,
701 diagnosis_class=DiagnosisIcd9CM,
702 item_class=DiagnosisIcd9CMItem,
703 item_fk_fieldname="diagnosis_icd9cm_id",
704 system="ICD-9-CM",
705 )
706 sql_icd10 = get_diagnosis_report_query(
707 req,
708 diagnosis_class=DiagnosisIcd10,
709 item_class=DiagnosisIcd10Item,
710 item_fk_fieldname="diagnosis_icd10_id",
711 system="ICD-10",
712 )
713 query = union(sql_icd9cm, sql_icd10)
714 query = query.order_by(*ORDER_BY)
715 return query
718# -----------------------------------------------------------------------------
719# "Find me patients matching certain diagnostic criteria"
720# -----------------------------------------------------------------------------
723class DiagnosisNode(SchemaNode, RequestAwareMixin):
724 schema_type = String
726 def __init__(self, *args: Any, **kwargs: Any) -> None:
727 self.title = "" # for type checker
728 self.description = "" # for type checker
729 super().__init__(*args, **kwargs)
731 # noinspection PyUnusedLocal
732 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
733 _ = self.gettext
734 self.title = _("Diagnostic code")
735 self.description = _(
736 "Type in a diagnostic code; you may use SQL 'LIKE' syntax for "
737 "wildcards, i.e. _ for one character and % for zero/one/lots"
738 )
740 def validator(self, node: SchemaNode, value: str) -> None:
741 try:
742 validate_restricted_sql_search_literal(value, self.request)
743 except ValueError as e:
744 raise Invalid(node, str(e))
747class DiagnosesSequence(SequenceSchema, RequestAwareMixin):
748 diagnoses = DiagnosisNode()
750 def __init__(
751 self, *args: Any, minimum_number: int = 0, **kwargs: Any
752 ) -> None:
753 self.minimum_number = minimum_number
754 self.title = "" # for type checker
755 self.description = "" # for type checker
756 super().__init__(*args, **kwargs)
758 # noinspection PyUnusedLocal
759 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
760 request = self.request
761 _ = request.gettext
762 self.title = _("Diagnostic codes")
763 self.description = (
764 _(
765 "Use % as a wildcard (e.g. F32 matches only F32, but F32% "
766 "matches F32, F32.1, F32.2...)."
767 )
768 + " "
769 + or_join_description(request)
770 )
772 def validator(self, node: SchemaNode, value: List[str]) -> None:
773 assert isinstance(value, list)
774 _ = self.gettext
775 if len(value) < self.minimum_number:
776 raise Invalid(
777 node,
778 _("You must specify at least") + f" {self.minimum_number}",
779 )
780 if len(value) != len(set(value)):
781 raise Invalid(node, _("You have specified duplicate diagnoses"))
784class DiagnosisFinderReportSchema(ReportParamSchema):
785 which_idnum = LinkingIdNumSelector() # must match ViewParam.WHICH_IDNUM
786 diagnoses_inclusion = DiagnosesSequence(
787 minimum_number=1
788 ) # must match ViewParam.DIAGNOSES_INCLUSION
789 diagnoses_exclusion = (
790 DiagnosesSequence()
791 ) # must match ViewParam.DIAGNOSES_EXCLUSION
792 age_minimum = OptionalIntNode() # must match ViewParam.AGE_MINIMUM
793 age_maximum = OptionalIntNode() # must match ViewParam.AGE_MAXIMUM
795 # noinspection PyUnusedLocal
796 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
797 _ = self.gettext
798 diagnoses_inclusion = get_child_node(self, "diagnoses_inclusion")
799 diagnoses_inclusion.title = _("Inclusion diagnoses (lifetime)")
800 diagnoses_exclusion = get_child_node(self, "diagnoses_exclusion")
801 diagnoses_exclusion.title = _("Exclusion diagnoses (lifetime)")
802 age_minimum = get_child_node(self, "age_minimum")
803 age_minimum.title = _("Minimum age (years) (optional)")
804 age_maximum = get_child_node(self, "age_maximum")
805 age_maximum.title = _("Maximum age (years) (optional)")
808# noinspection PyProtectedMember
809def get_diagnosis_inc_exc_report_query(
810 req: CamcopsRequest,
811 diagnosis_class: Type[DiagnosisBase],
812 item_class: Type[DiagnosisItemBase],
813 item_fk_fieldname: str,
814 system: str,
815 which_idnum: int,
816 inclusion_dx: List[str],
817 exclusion_dx: List[str],
818 age_minimum_y: int,
819 age_maximum_y: int,
820) -> Select[Any]:
821 """
822 As for get_diagnosis_report_query, but this makes some modifications to
823 do inclusion and exclusion criteria.
825 - We need a linking number to perform exclusion criteria.
826 - Therefore, we use a single ID number, which must not be NULL.
827 """
828 # The basics:
829 desc = req.get_id_desc(which_idnum) or "BAD_IDNUM"
830 # noinspection PyUnresolvedReferences
831 select_fields: list[ColumnElement[Any]] = [
832 Patient.surname.label("surname"),
833 Patient.forename.label("forename"),
834 Patient.dob.label("dob"),
835 Patient.sex.label("sex"),
836 PatientIdNum.idnum_value.label(desc),
837 diagnosis_class.when_created.label("when_created"),
838 literal(system).label("system"),
839 item_class.code.label("code"),
840 item_class.description.label("description"),
841 ]
842 # noinspection PyUnresolvedReferences
843 select_from = (
844 Patient.__table__.join(
845 diagnosis_class.__table__,
846 and_(
847 diagnosis_class.patient_id == Patient.id,
848 diagnosis_class._device_id == Patient._device_id,
849 diagnosis_class._era == Patient._era,
850 diagnosis_class._current == True, # noqa: E712
851 ),
852 )
853 .join(
854 item_class.__table__,
855 and_(
856 getattr(item_class, item_fk_fieldname) == diagnosis_class.id,
857 item_class._device_id == diagnosis_class._device_id,
858 item_class._era == diagnosis_class._era,
859 item_class._current == True, # noqa: E712
860 ),
861 )
862 .join(
863 PatientIdNum.__table__,
864 and_(
865 PatientIdNum.patient_id == Patient.id,
866 PatientIdNum._device_id == Patient._device_id,
867 PatientIdNum._era == Patient._era,
868 PatientIdNum._current == True, # noqa: E712
869 PatientIdNum.which_idnum == which_idnum,
870 PatientIdNum.idnum_value.isnot(None), # NOT NULL
871 ),
872 )
873 )
874 wheres = [Patient._current == True] # noqa: E712
876 group_ids: list[int] = []
878 if not req.user.superuser:
879 # Restrict to accessible groups
880 group_ids = req.user.ids_of_groups_user_may_report_on
881 wheres.append(diagnosis_class._group_id.in_(group_ids))
883 # Age limits are simple, as the same patient has the same age for
884 # all diagnosis rows.
885 today = req.today
886 if age_maximum_y is not None:
887 # Example: max age is 40; earliest (oldest) DOB is therefore 41
888 # years ago plus one day (e.g. if it's 15 June 2010, then earliest
889 # DOB is 16 June 1969; a person born then will be 41 tomorrow).
890 earliest_dob = pendulum_date_to_datetime_date(
891 today.subtract(years=age_maximum_y + 1).add(days=1)
892 )
893 wheres.append(Patient.dob >= earliest_dob)
894 if age_minimum_y is not None:
895 # Example: min age is 20; latest (youngest) DOB is therefore 20
896 # years ago (e.g. if it's 15 June 2010, latest DOB is 15 June 1990;
897 # if you're born after that, you're not 20 yet).
898 latest_dob = pendulum_date_to_datetime_date(
899 today.subtract(years=age_minimum_y)
900 )
901 wheres.append(Patient.dob <= latest_dob)
903 # Diagnosis criteria are a little bit more complex.
904 #
905 # We can reasonably do inclusion criteria as "show the diagnoses
906 # matching the inclusion criteria" (not the more complex "show all
907 # diagnoses for patients having at least one inclusion diagnosis",
908 # which is likely to be too verbose for patient finding).
909 inclusion_criteria = [] # type: List[ColumnElement]
910 for idx in inclusion_dx:
911 inclusion_criteria.append(item_class.code.like(idx))
912 wheres.append(or_(True, *inclusion_criteria)) # type: ignore[arg-type]
914 # Exclusion criteria are the trickier: we need to be able to link
915 # multiple diagnoses for the same patient, so we need to use a linking
916 # ID number.
917 if exclusion_dx:
918 # noinspection PyUnresolvedReferences
919 edx_items = item_class.__table__.alias("edx_items")
920 # noinspection PyUnresolvedReferences
921 edx_sets = diagnosis_class.__table__.alias("edx_sets")
922 # noinspection PyUnresolvedReferences
923 edx_patient = Patient.__table__.alias("edx_patient")
924 # noinspection PyUnresolvedReferences
925 edx_idnum = PatientIdNum.__table__.alias("edx_idnum")
926 edx_joined = (
927 edx_items.join(
928 edx_sets,
929 and_(
930 getattr(edx_items.c, item_fk_fieldname) == edx_sets.c.id,
931 edx_items.c._device_id == edx_sets.c._device_id,
932 edx_items.c._era == edx_sets.c._era,
933 edx_items.c._current == True, # noqa: E712
934 ),
935 )
936 .join(
937 edx_patient,
938 and_(
939 edx_sets.c.patient_id == edx_patient.c.id,
940 edx_sets.c._device_id == edx_patient.c._device_id,
941 edx_sets.c._era == edx_patient.c._era,
942 edx_sets.c._current == True, # noqa: E712
943 ),
944 )
945 .join(
946 edx_idnum,
947 and_(
948 edx_idnum.c.patient_id == edx_patient.c.id,
949 edx_idnum.c._device_id == edx_patient.c._device_id,
950 edx_idnum.c._era == edx_patient.c._era,
951 edx_idnum.c._current == True, # noqa: E712
952 edx_idnum.c.which_idnum == which_idnum,
953 ),
954 )
955 )
956 exclusion_criteria = [] # type: List[ColumnElement]
957 for edx in exclusion_dx:
958 exclusion_criteria.append(edx_items.c.code.like(edx))
959 edx_wheres = [
960 edx_items.c._current == True, # noqa: E712
961 edx_idnum.c.idnum_value == PatientIdNum.idnum_value,
962 or_(*exclusion_criteria),
963 ]
964 # Note the join above between the main and the EXISTS clauses.
965 # We don't use an alias for the main copy of the PatientIdNum table,
966 # and we do for the EXISTS version. This is fine; e.g.
967 # https://msdn.microsoft.com/en-us/library/ethytz2x.aspx example:
968 # SELECT boss.name, employee.name
969 # FROM employee
970 # INNER JOIN employee boss ON employee.manager_id = boss.emp_id;
971 if not req.user.superuser:
972 # Restrict to accessible groups
973 # group_ids already defined from above
974 edx_wheres.append(edx_sets.c._group_id.in_(group_ids))
975 # ... bugfix 2018-06-19: "wheres" -> "edx_wheres"
976 exclusion_select = (
977 select("*").select_from(edx_joined).where(and_(*edx_wheres))
978 )
979 wheres.append(not_(exists(exclusion_select)))
981 query = (
982 select(*select_fields).select_from(select_from).where(and_(*wheres))
983 )
984 return query
987# noinspection PyAbstractClass
988class DiagnosisFinderReportBase(Report):
989 """Report to show all diagnoses."""
991 # noinspection PyMethodParameters
992 @classproperty
993 def superuser_only(cls) -> bool:
994 return False
996 @staticmethod
997 def get_paramform_schema_class() -> Type["ReportParamSchema"]:
998 return DiagnosisFinderReportSchema
1000 @classmethod
1001 def get_specific_http_query_keys(cls) -> List[str]:
1002 return [
1003 ViewParam.WHICH_IDNUM,
1004 ViewParam.DIAGNOSES_INCLUSION,
1005 ViewParam.DIAGNOSES_EXCLUSION,
1006 ViewParam.AGE_MINIMUM,
1007 ViewParam.AGE_MAXIMUM,
1008 ]
1010 def render_single_page_html(
1011 self,
1012 req: "CamcopsRequest",
1013 column_names: Sequence[str],
1014 page: CamcopsPage,
1015 ) -> Response:
1016 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
1017 inclusion_dx = req.get_str_list_param(
1018 ViewParam.DIAGNOSES_INCLUSION,
1019 validator=validate_restricted_sql_search_literal,
1020 )
1021 exclusion_dx = req.get_str_list_param(
1022 ViewParam.DIAGNOSES_EXCLUSION,
1023 validator=validate_restricted_sql_search_literal,
1024 )
1025 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM)
1026 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM)
1027 idnum_desc = req.get_id_desc(which_idnum) or "BAD_IDNUM"
1028 query = self.get_query(req)
1029 sql = get_literal_query(query, bind=req.engine) # type: ignore[arg-type] # noqa: E501
1031 return render_to_response(
1032 "diagnosis_finder_report.mako",
1033 dict(
1034 title=self.title(req),
1035 page=page,
1036 column_names=column_names,
1037 report_id=self.report_id,
1038 idnum_desc=idnum_desc,
1039 inclusion_dx=inclusion_dx,
1040 exclusion_dx=exclusion_dx,
1041 age_minimum=age_minimum,
1042 age_maximum=age_maximum,
1043 sql=sql,
1044 ),
1045 request=req,
1046 )
1049class DiagnosisICD10FinderReport(DiagnosisFinderReportBase):
1050 # noinspection PyMethodParameters
1051 @classproperty
1052 def report_id(cls) -> str:
1053 return "diagnoses_finder_icd10"
1055 @classmethod
1056 def title(cls, req: "CamcopsRequest") -> str:
1057 _ = req.gettext
1058 return _("Diagnosis – Find patients by ICD-10 diagnosis ± age")
1060 def get_query(self, req: CamcopsRequest) -> Select[Any]:
1061 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
1062 inclusion_dx = req.get_str_list_param(
1063 ViewParam.DIAGNOSES_INCLUSION,
1064 validator=validate_restricted_sql_search_literal,
1065 )
1066 exclusion_dx = req.get_str_list_param(
1067 ViewParam.DIAGNOSES_EXCLUSION,
1068 validator=validate_restricted_sql_search_literal,
1069 )
1070 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM)
1071 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM)
1073 q = get_diagnosis_inc_exc_report_query(
1074 req,
1075 diagnosis_class=DiagnosisIcd10,
1076 item_class=DiagnosisIcd10Item,
1077 item_fk_fieldname="diagnosis_icd10_id",
1078 system="ICD-10",
1079 which_idnum=which_idnum,
1080 inclusion_dx=inclusion_dx,
1081 exclusion_dx=exclusion_dx,
1082 age_minimum_y=age_minimum,
1083 age_maximum_y=age_maximum,
1084 )
1085 q = q.order_by(*ORDER_BY)
1086 # log.debug("Final query:\n{}", get_literal_query(q, bind=req.engine))
1087 return q
1089 @staticmethod
1090 def get_test_querydict() -> Dict[str, Any]:
1091 return {
1092 ViewParam.WHICH_IDNUM: 1,
1093 ViewParam.DIAGNOSES_INCLUSION: ["F32%"],
1094 ViewParam.DIAGNOSES_EXCLUSION: [],
1095 ViewParam.AGE_MINIMUM: None,
1096 ViewParam.AGE_MAXIMUM: None,
1097 }
1100class DiagnosisICD9CMFinderReport(DiagnosisFinderReportBase):
1101 # noinspection PyMethodParameters
1102 @classproperty
1103 def report_id(cls) -> str:
1104 return "diagnoses_finder_icd9cm"
1106 @classmethod
1107 def title(cls, req: "CamcopsRequest") -> str:
1108 _ = req.gettext
1109 return _(
1110 "Diagnosis – Find patients by ICD-9-CM (DSM-IV-TR) diagnosis ± age"
1111 )
1113 def get_query(self, req: CamcopsRequest) -> Select[Any]:
1114 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
1115 inclusion_dx = req.get_str_list_param(
1116 ViewParam.DIAGNOSES_INCLUSION,
1117 validator=validate_restricted_sql_search_literal,
1118 )
1119 exclusion_dx = req.get_str_list_param(
1120 ViewParam.DIAGNOSES_EXCLUSION,
1121 validator=validate_restricted_sql_search_literal,
1122 )
1123 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM)
1124 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM)
1126 q = get_diagnosis_inc_exc_report_query(
1127 req,
1128 diagnosis_class=DiagnosisIcd9CM,
1129 item_class=DiagnosisIcd9CMItem,
1130 item_fk_fieldname="diagnosis_icd9cm_id",
1131 system="ICD-9-CM",
1132 which_idnum=which_idnum,
1133 inclusion_dx=inclusion_dx,
1134 exclusion_dx=exclusion_dx,
1135 age_minimum_y=age_minimum,
1136 age_maximum_y=age_maximum,
1137 )
1138 q = q.order_by(*ORDER_BY)
1139 # log.debug("Final query:\n{}", get_literal_query(q, bind=req.engine))
1140 return q
1142 @staticmethod
1143 def get_test_querydict() -> Dict[str, Any]:
1144 return {
1145 ViewParam.WHICH_IDNUM: 1,
1146 ViewParam.DIAGNOSES_INCLUSION: ["296%"],
1147 ViewParam.DIAGNOSES_EXCLUSION: [],
1148 ViewParam.AGE_MINIMUM: None,
1149 ViewParam.AGE_MAXIMUM: None,
1150 }