Coverage for tasks/diagnosis.py : 46%

Hot-keys on this page
r m x p toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python
3"""
4camcops_server/tasks/diagnosis.py
6===============================================================================
8 Copyright (C) 2012-2020 Rudolf Cardinal (rudolf@pobox.com).
10 This file is part of CamCOPS.
12 CamCOPS is free software: you can redistribute it and/or modify
13 it under the terms of the GNU General Public License as published by
14 the Free Software Foundation, either version 3 of the License, or
15 (at your option) any later version.
17 CamCOPS is distributed in the hope that it will be useful,
18 but WITHOUT ANY WARRANTY; without even the implied warranty of
19 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
20 GNU General Public License for more details.
22 You should have received a copy of the GNU General Public License
23 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>.
25===============================================================================
27"""
29from abc import ABC
30import logging
31from typing import Any, Dict, List, Optional, Type, TYPE_CHECKING
33from cardinal_pythonlib.classes import classproperty
34from cardinal_pythonlib.colander_utils import (
35 get_child_node,
36 OptionalIntNode,
37)
38from cardinal_pythonlib.datetimefunc import pendulum_date_to_datetime_date
39from cardinal_pythonlib.logs import BraceStyleAdapter
40import cardinal_pythonlib.rnc_web as ws
41from cardinal_pythonlib.sqlalchemy.dump import get_literal_query
42from colander import (
43 Invalid,
44 SchemaNode,
45 SequenceSchema,
46 String,
47)
48import hl7
49from pyramid.renderers import render_to_response
50from pyramid.response import Response
51from sqlalchemy.ext.declarative import declared_attr
52from sqlalchemy.sql.expression import (
53 and_, exists, literal, not_, or_, select, union,
54)
55from sqlalchemy.sql.selectable import SelectBase
56from sqlalchemy.sql.schema import Column
57from sqlalchemy.sql.sqltypes import Date, Integer, UnicodeText
59from camcops_server.cc_modules.cc_constants import CssClass
60from camcops_server.cc_modules.cc_ctvinfo import CtvInfo
61from camcops_server.cc_modules.cc_db import (
62 ancillary_relationship,
63 GenericTabletRecordMixin,
64 TaskDescendant,
65)
66from camcops_server.cc_modules.cc_forms import (
67 LinkingIdNumSelector,
68 or_join_description,
69 ReportParamSchema,
70 RequestAwareMixin,
71)
72from camcops_server.cc_modules.cc_hl7 import make_dg1_segment
73from camcops_server.cc_modules.cc_html import answer, tr
74from camcops_server.cc_modules.cc_nlp import guess_name_components
75from camcops_server.cc_modules.cc_patient import Patient
76from camcops_server.cc_modules.cc_patientidnum import PatientIdNum
77from camcops_server.cc_modules.cc_pyramid import CamcopsPage, ViewParam
78from camcops_server.cc_modules.cc_task import (
79 Task,
80 TaskHasClinicianMixin,
81 TaskHasPatientMixin,
82)
83from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient
84from camcops_server.cc_modules.cc_request import CamcopsRequest
85from camcops_server.cc_modules.cc_report import Report
86from camcops_server.cc_modules.cc_snomed import (
87 SnomedConcept,
88 SnomedExpression,
89 SnomedFocusConcept,
90)
91from camcops_server.cc_modules.cc_sqlalchemy import Base, DeclarativeAndABCMeta
92from camcops_server.cc_modules.cc_sqla_coltypes import (
93 CamcopsColumn,
94 DiagnosticCodeColType,
95)
96from camcops_server.cc_modules.cc_validators import (
97 validate_restricted_sql_search_literal,
98)
100if TYPE_CHECKING:
101 from sqlalchemy.sql.elements import ColumnElement
103log = BraceStyleAdapter(logging.getLogger(__name__))
105# =============================================================================
106# Helpers
107# =============================================================================
109FK_COMMENT = "FK to parent table"
112# =============================================================================
113# DiagnosisBase
114# =============================================================================
116class DiagnosisItemBase(GenericTabletRecordMixin, Base):
117 __abstract__ = True
119 # noinspection PyMethodParameters
120 @declared_attr
121 def seqnum(cls) -> Column:
122 return Column(
123 "seqnum", Integer,
124 nullable=False,
125 comment="Sequence number (consistently 1-based as of 2018-12-01)"
126 )
128 # noinspection PyMethodParameters
129 @declared_attr
130 def code(cls) -> Column:
131 return Column(
132 "code", DiagnosticCodeColType,
133 comment="Diagnostic code"
134 )
136 # noinspection PyMethodParameters
137 @declared_attr
138 def description(cls) -> Column:
139 return CamcopsColumn(
140 "description", UnicodeText,
141 exempt_from_anonymisation=True,
142 comment="Description of the diagnostic code"
143 )
145 # noinspection PyMethodParameters
146 @declared_attr
147 def comment(cls) -> Column:
148 return Column( # new in v2.0.0
149 "comment", UnicodeText,
150 comment="Clinician's comment"
151 )
153 def get_html_table_row(self) -> str:
154 return tr(
155 self.seqnum,
156 answer(ws.webify(self.code)),
157 answer(ws.webify(self.description)),
158 answer(ws.webify(self.comment)),
159 )
161 def get_code_for_hl7(self) -> str:
162 # Normal format is to strip out periods, e.g. "F20.0" becomes "F200"
163 if not self.code:
164 return ""
165 return self.code.replace(".", "").upper()
167 def get_text_for_hl7(self) -> str:
168 return self.description or ""
170 def is_empty(self) -> bool:
171 return not bool(self.code)
174class DiagnosisBase(TaskHasClinicianMixin, TaskHasPatientMixin, Task, ABC,
175 metaclass=DeclarativeAndABCMeta):
176 __abstract__ = True
178 # noinspection PyMethodParameters
179 @declared_attr
180 def relates_to_date(cls) -> Column:
181 return Column( # new in v2.0.0
182 "relates_to_date", Date,
183 comment="Date that diagnoses relate to"
184 )
186 items = None # type: List[DiagnosisItemBase] # must be overridden by a relationship # noqa: E501
188 hl7_coding_system = "?"
190 def get_num_items(self) -> int:
191 return len(self.items)
193 def is_complete(self) -> bool:
194 if self.relates_to_date is None:
195 return False
196 if self.get_num_items() == 0:
197 return False
198 for item in self.items: # type: DiagnosisItemBase
199 if item.is_empty():
200 return False
201 return True
203 def get_task_html(self, req: CamcopsRequest) -> str:
204 html = f"""
205 <div class="{CssClass.SUMMARY}">
206 <table class="{CssClass.SUMMARY}">
207 {self.get_is_complete_tr(req)}
208 </table>
209 </div>
210 <table class="{CssClass.TASKDETAIL}">
211 <tr>
212 <th width="10%">Diagnosis #</th>
213 <th width="10%">Code</th>
214 <th width="40%">Description</th>
215 <th width="40%">Comment</th>
216 </tr>
217 """
218 for item in self.items:
219 html += item.get_html_table_row()
220 html += """
221 </table>
222 """
223 return html
225 def get_clinical_text(self, req: CamcopsRequest) -> List[CtvInfo]:
226 infolist = []
227 for item in self.items:
228 infolist.append(CtvInfo(content=(
229 f"<b>{ws.webify(item.code)}</b>: {ws.webify(item.description)}"
230 )))
231 return infolist
233 # noinspection PyUnusedLocal
234 def get_hl7_extra_data_segments(self, recipient_def: ExportRecipient) \
235 -> List[hl7.Segment]:
236 segments = []
237 clinician = guess_name_components(self.clinician_name)
238 for i in range(len(self.items)):
239 set_id = i + 1 # make it 1-based, not 0-based
240 item = self.items[i]
241 segments.append(make_dg1_segment(
242 set_id=set_id,
243 diagnosis_datetime=self.get_creation_datetime(),
244 coding_system=self.hl7_coding_system,
245 diagnosis_identifier=item.get_code_for_hl7(),
246 diagnosis_text=item.get_text_for_hl7(),
247 clinician_surname=clinician.get("surname") or "",
248 clinician_forename=clinician.get("forename") or "",
249 clinician_prefix=clinician.get("prefix") or "",
250 attestation_datetime=self.get_creation_datetime(),
251 ))
252 return segments
255# =============================================================================
256# DiagnosisIcd10
257# =============================================================================
259class DiagnosisIcd10Item(DiagnosisItemBase, TaskDescendant):
260 __tablename__ = "diagnosis_icd10_item"
262 diagnosis_icd10_id = Column(
263 "diagnosis_icd10_id", Integer,
264 nullable=False,
265 comment=FK_COMMENT,
266 )
268 # -------------------------------------------------------------------------
269 # TaskDescendant overrides
270 # -------------------------------------------------------------------------
272 @classmethod
273 def task_ancestor_class(cls) -> Optional[Type["Task"]]:
274 return DiagnosisIcd10
276 def task_ancestor(self) -> Optional["DiagnosisIcd10"]:
277 return DiagnosisIcd10.get_linked(self.diagnosis_icd10_id, self)
280class DiagnosisIcd10(DiagnosisBase):
281 """
282 Server implementation of the Diagnosis/ICD-10 task.
283 """
284 __tablename__ = "diagnosis_icd10"
286 items = ancillary_relationship(
287 parent_class_name="DiagnosisIcd10",
288 ancillary_class_name="DiagnosisIcd10Item",
289 ancillary_fk_to_parent_attr_name="diagnosis_icd10_id",
290 ancillary_order_by_attr_name="seqnum"
291 )
293 shortname = "Diagnosis_ICD10"
294 dependent_classes = [DiagnosisIcd10Item]
295 hl7_coding_system = "I10"
296 # Page A-129 of https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf # noqa: E501
298 @staticmethod
299 def longname(req: "CamcopsRequest") -> str:
300 _ = req.gettext
301 return _("Diagnostic codes, ICD-10")
303 def get_snomed_codes(self, req: CamcopsRequest,
304 fallback: bool = True) -> List[SnomedExpression]:
305 """
306 Returns all SNOMED-CT codes for this task.
308 Args:
309 req: the
310 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
311 fallback: for example, if F32.10 is unknown, should we fall back to
312 F32.1?
314 Returns:
315 a list of
316 :class:`camcops_server.cc_modules.cc_snomed.SnomedExpression`
317 objects
318 """
319 if not req.icd10_snomed_supported:
320 return []
321 snomed_codes = [] # type: List[SnomedExpression]
322 # noinspection PyTypeChecker
323 for item in self.items:
324 concepts = self._get_snomed_concepts(item.code, req, fallback)
325 if not concepts:
326 continue
327 focusconcept = SnomedFocusConcept(concepts)
328 snomed_codes.append(SnomedExpression(focusconcept))
329 return snomed_codes
331 @staticmethod
332 def _get_snomed_concepts(icd10_code: str,
333 req: CamcopsRequest,
334 fallback: bool = True) -> List[SnomedConcept]:
335 """
336 Internal function to return :class:`SnomedConcept` objects for an
337 ICD-10 code.
339 Args:
340 icd10_code: the ICD-10 code
341 req: the
342 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest`
343 fallback: for example, if F32.10 is unknown, should we fall back to
344 F32.1?
346 Returns:
347 list: of :class:`SnomedConcept` objects
349 """
350 concepts = [] # type: List[SnomedConcept]
351 while icd10_code:
352 try:
353 concepts = req.icd10_snomed(icd10_code)
354 except KeyError: # no known code
355 pass
356 if concepts or not fallback:
357 return concepts
358 # Now fall back
359 icd10_code = icd10_code[:-1]
360 # Run out of code
361 return concepts
364# =============================================================================
365# DiagnosisIcd9CM
366# =============================================================================
368class DiagnosisIcd9CMItem(DiagnosisItemBase, TaskDescendant):
369 __tablename__ = "diagnosis_icd9cm_item"
371 diagnosis_icd9cm_id = Column(
372 "diagnosis_icd9cm_id", Integer,
373 nullable=False,
374 comment=FK_COMMENT,
375 )
377 # -------------------------------------------------------------------------
378 # TaskDescendant overrides
379 # -------------------------------------------------------------------------
381 @classmethod
382 def task_ancestor_class(cls) -> Optional[Type["Task"]]:
383 return DiagnosisIcd9CM
385 def task_ancestor(self) -> Optional["DiagnosisIcd9CM"]:
386 return DiagnosisIcd9CM.get_linked(self.diagnosis_icd9cm_id, self)
389class DiagnosisIcd9CM(DiagnosisBase):
390 """
391 Server implementation of the Diagnosis/ICD-9-CM task.
392 """
393 __tablename__ = "diagnosis_icd9cm"
395 items = ancillary_relationship(
396 parent_class_name="DiagnosisIcd9CM",
397 ancillary_class_name="DiagnosisIcd9CMItem",
398 ancillary_fk_to_parent_attr_name="diagnosis_icd9cm_id",
399 ancillary_order_by_attr_name="seqnum"
400 )
402 shortname = "Diagnosis_ICD9CM"
403 dependent_classes = [DiagnosisIcd9CMItem]
404 hl7_coding_system = "I9CM"
405 # Page A-129 of https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf # noqa: E501
407 @staticmethod
408 def longname(req: "CamcopsRequest") -> str:
409 _ = req.gettext
410 return _("Diagnostic codes, ICD-9-CM (DSM-IV-TR)")
412 def get_snomed_codes(self, req: CamcopsRequest) -> List[SnomedExpression]:
413 if not req.icd9cm_snomed_supported:
414 return []
415 snomed_codes = [] # type: List[SnomedExpression]
416 # noinspection PyTypeChecker
417 for item in self.items:
418 try:
419 concepts = req.icd9cm_snomed(item.code)
420 except KeyError: # no known code
421 continue
422 if not concepts:
423 continue
424 focusconcept = SnomedFocusConcept(concepts)
425 snomed_codes.append(SnomedExpression(focusconcept))
426 return snomed_codes
429# =============================================================================
430# Reports
431# =============================================================================
433# -----------------------------------------------------------------------------
434# Helpers
435# -----------------------------------------------------------------------------
437ORDER_BY = ["surname", "forename", "dob", "sex",
438 "when_created", "system", "code"]
441# noinspection PyProtectedMember,PyUnresolvedReferences
442def get_diagnosis_report_query(req: CamcopsRequest,
443 diagnosis_class: Type[DiagnosisBase],
444 item_class: Type[DiagnosisItemBase],
445 item_fk_fieldname: str,
446 system: str) -> SelectBase:
447 # SELECT surname, forename, dob, sex, ...
448 select_fields = [
449 Patient.surname.label("surname"),
450 Patient.forename.label("forename"),
451 Patient.dob.label("dob"),
452 Patient.sex.label("sex"),
453 ]
454 from_clause = (
455 # FROM patient
456 Patient.__table__
457 # INNER JOIN dxset ON (dxtable.patient_id == patient.id AND ...)
458 .join(diagnosis_class.__table__, and_(
459 diagnosis_class.patient_id == Patient.id,
460 diagnosis_class._device_id == Patient._device_id,
461 diagnosis_class._era == Patient._era
462 ))
463 # INNER JOIN dxrow ON (dxrow.fk_dxset = dxset.pk AND ...)
464 .join(item_class.__table__, and_(
465 getattr(item_class, item_fk_fieldname) == diagnosis_class.id,
466 item_class._device_id == diagnosis_class._device_id,
467 item_class._era == diagnosis_class._era
468 ))
469 )
470 for iddef in req.idnum_definitions:
471 n = iddef.which_idnum
472 desc = iddef.short_description
473 aliased_table = PatientIdNum.__table__.alias(f"i{n}")
474 # ... [also] SELECT i1.idnum_value AS 'NHS' (etc.)
475 select_fields.append(aliased_table.c.idnum_value.label(desc))
476 # ... [from] OUTER JOIN patientidnum AS i1 ON (...)
477 from_clause = from_clause.outerjoin(aliased_table, and_(
478 aliased_table.c.patient_id == Patient.id,
479 aliased_table.c._device_id == Patient._device_id,
480 aliased_table.c._era == Patient._era,
481 # Note: the following are part of the JOIN, not the WHERE:
482 # (or failure to match a row will wipe out the Patient from the
483 # OUTER JOIN):
484 aliased_table.c._current == True, # noqa: E712
485 aliased_table.c.which_idnum == n, # noqa: E712
486 )) # noqa: E712
487 select_fields += [
488 diagnosis_class.when_created.label("when_created"),
489 literal(system).label("system"),
490 item_class.code.label("code"),
491 item_class.description.label("description"),
492 ]
493 # WHERE...
494 wheres = [
495 Patient._current == True, # noqa: E712
496 diagnosis_class._current == True,
497 item_class._current == True,
498 ] # noqa: E712
499 if not req.user.superuser:
500 # Restrict to accessible groups
501 group_ids = req.user.ids_of_groups_user_may_report_on
502 wheres.append(diagnosis_class._group_id.in_(group_ids))
503 # Helpfully, SQLAlchemy will render this as "... AND 1 != 1" if we
504 # pass an empty list to in_().
505 query = select(select_fields).select_from(from_clause).where(and_(*wheres))
506 return query
509def get_diagnosis_report(req: CamcopsRequest,
510 diagnosis_class: Type[DiagnosisBase],
511 item_class: Type[DiagnosisItemBase],
512 item_fk_fieldname: str,
513 system: str) -> SelectBase:
514 query = get_diagnosis_report_query(req, diagnosis_class, item_class,
515 item_fk_fieldname, system)
516 query = query.order_by(*ORDER_BY)
517 return query
520# -----------------------------------------------------------------------------
521# Plain "all diagnoses" reports
522# -----------------------------------------------------------------------------
524class DiagnosisICD9CMReport(Report):
525 """Report to show ICD-9-CM (DSM-IV-TR) diagnoses."""
527 # noinspection PyMethodParameters
528 @classproperty
529 def report_id(cls) -> str:
530 return "diagnoses_icd9cm"
532 @classmethod
533 def title(cls, req: "CamcopsRequest") -> str:
534 _ = req.gettext
535 return _("Diagnosis – ICD-9-CM (DSM-IV-TR) diagnoses for all "
536 "patients")
538 # noinspection PyMethodParameters
539 @classproperty
540 def superuser_only(cls) -> bool:
541 return False
543 def get_query(self, req: CamcopsRequest) -> SelectBase:
544 return get_diagnosis_report(
545 req,
546 diagnosis_class=DiagnosisIcd9CM,
547 item_class=DiagnosisIcd9CMItem,
548 item_fk_fieldname='diagnosis_icd9cm_id',
549 system='ICD-9-CM'
550 )
553class DiagnosisICD10Report(Report):
554 """Report to show ICD-10 diagnoses."""
556 # noinspection PyMethodParameters
557 @classproperty
558 def report_id(cls) -> str:
559 return "diagnoses_icd10"
561 @classmethod
562 def title(cls, req: "CamcopsRequest") -> str:
563 _ = req.gettext
564 return _("Diagnosis – ICD-10 diagnoses for all patients")
566 # noinspection PyMethodParameters
567 @classproperty
568 def superuser_only(cls) -> bool:
569 return False
571 def get_query(self, req: CamcopsRequest) -> SelectBase:
572 return get_diagnosis_report(
573 req,
574 diagnosis_class=DiagnosisIcd10,
575 item_class=DiagnosisIcd10Item,
576 item_fk_fieldname='diagnosis_icd10_id',
577 system='ICD-10'
578 )
581class DiagnosisAllReport(Report):
582 """Report to show all diagnoses."""
584 # noinspection PyMethodParameters
585 @classproperty
586 def report_id(cls) -> str:
587 return "diagnoses_all"
589 @classmethod
590 def title(cls, req: "CamcopsRequest") -> str:
591 _ = req.gettext
592 return _("Diagnosis – All diagnoses for all patients")
594 # noinspection PyMethodParameters
595 @classproperty
596 def superuser_only(cls) -> bool:
597 return False
599 def get_query(self, req: CamcopsRequest) -> SelectBase:
600 sql_icd9cm = get_diagnosis_report_query(
601 req,
602 diagnosis_class=DiagnosisIcd9CM,
603 item_class=DiagnosisIcd9CMItem,
604 item_fk_fieldname='diagnosis_icd9cm_id',
605 system='ICD-9-CM'
606 )
607 sql_icd10 = get_diagnosis_report_query(
608 req,
609 diagnosis_class=DiagnosisIcd10,
610 item_class=DiagnosisIcd10Item,
611 item_fk_fieldname='diagnosis_icd10_id',
612 system='ICD-10'
613 )
614 query = union(sql_icd9cm, sql_icd10)
615 query = query.order_by(*ORDER_BY)
616 return query
619# -----------------------------------------------------------------------------
620# "Find me patients matching certain diagnostic criteria"
621# -----------------------------------------------------------------------------
623class DiagnosisNode(SchemaNode, RequestAwareMixin):
624 schema_type = String
626 def __init__(self, *args, **kwargs) -> None:
627 self.title = "" # for type checker
628 self.description = "" # for type checker
629 super().__init__(*args, **kwargs)
631 # noinspection PyUnusedLocal
632 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
633 _ = self.gettext
634 self.title = _("Diagnostic code")
635 self.description = _(
636 "Type in a diagnostic code; you may use SQL 'LIKE' syntax for "
637 "wildcards, i.e. _ for one character and % for zero/one/lots"
638 )
640 def validator(self, node: SchemaNode, value: str) -> None:
641 try:
642 validate_restricted_sql_search_literal(value, self.request)
643 except ValueError as e:
644 raise Invalid(node, str(e))
647class DiagnosesSequence(SequenceSchema, RequestAwareMixin):
648 diagnoses = DiagnosisNode()
650 def __init__(self, *args, minimum_number: int = 0, **kwargs) -> None:
651 self.minimum_number = minimum_number
652 self.title = "" # for type checker
653 self.description = "" # for type checker
654 super().__init__(*args, **kwargs)
656 # noinspection PyUnusedLocal
657 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
658 request = self.request
659 _ = request.gettext
660 self.title = _("Diagnostic codes")
661 self.description = (
662 _("Use % as a wildcard (e.g. F32 matches only F32, but F32% "
663 "matches F32, F32.1, F32.2...).") +
664 " " +
665 or_join_description(request)
666 )
668 def validator(self, node: SchemaNode, value: List[str]) -> None:
669 assert isinstance(value, list)
670 _ = self.gettext
671 if len(value) < self.minimum_number:
672 raise Invalid(
673 node,
674 _("You must specify at least") + f" {self.minimum_number}")
675 if len(value) != len(set(value)):
676 raise Invalid(node, _("You have specified duplicate diagnoses"))
679class DiagnosisFinderReportSchema(ReportParamSchema):
680 which_idnum = LinkingIdNumSelector() # must match ViewParam.WHICH_IDNUM
681 diagnoses_inclusion = DiagnosesSequence(minimum_number=1) # must match ViewParam.DIAGNOSES_INCLUSION # noqa: E501
682 diagnoses_exclusion = DiagnosesSequence() # must match ViewParam.DIAGNOSES_EXCLUSION # noqa: E501
683 age_minimum = OptionalIntNode() # must match ViewParam.AGE_MINIMUM
684 age_maximum = OptionalIntNode() # must match ViewParam.AGE_MAXIMUM
686 # noinspection PyUnusedLocal
687 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None:
688 _ = self.gettext
689 diagnoses_inclusion = get_child_node(self, "diagnoses_inclusion")
690 diagnoses_inclusion.title = _("Inclusion diagnoses (lifetime)")
691 diagnoses_exclusion = get_child_node(self, "diagnoses_exclusion")
692 diagnoses_exclusion.title = _("Exclusion diagnoses (lifetime)")
693 age_minimum = get_child_node(self, "age_minimum")
694 age_minimum.title = _("Minimum age (years) (optional)")
695 age_maximum = get_child_node(self, "age_maximum")
696 age_maximum.title = _("Maximum age (years) (optional)")
699# noinspection PyProtectedMember
700def get_diagnosis_inc_exc_report_query(req: CamcopsRequest,
701 diagnosis_class: Type[DiagnosisBase],
702 item_class: Type[DiagnosisItemBase],
703 item_fk_fieldname: str,
704 system: str,
705 which_idnum: int,
706 inclusion_dx: List[str],
707 exclusion_dx: List[str],
708 age_minimum_y: int,
709 age_maximum_y: int) -> SelectBase:
710 """
711 As for get_diagnosis_report_query, but this makes some modifications to
712 do inclusion and exclusion criteria.
714 - We need a linking number to perform exclusion criteria.
715 - Therefore, we use a single ID number, which must not be NULL.
716 """
717 # The basics:
718 desc = req.get_id_desc(which_idnum) or "BAD_IDNUM"
719 select_fields = [
720 Patient.surname.label("surname"),
721 Patient.forename.label("forename"),
722 Patient.dob.label("dob"),
723 Patient.sex.label("sex"),
724 PatientIdNum.idnum_value.label(desc),
725 diagnosis_class.when_created.label("when_created"),
726 literal(system).label("system"),
727 item_class.code.label("code"),
728 item_class.description.label("description"),
729 ]
730 # noinspection PyUnresolvedReferences
731 select_from = (
732 Patient.__table__
733 .join(diagnosis_class.__table__, and_(
734 diagnosis_class.patient_id == Patient.id,
735 diagnosis_class._device_id == Patient._device_id,
736 diagnosis_class._era == Patient._era,
737 diagnosis_class._current == True, # noqa: E712
738 ))
739 .join(item_class.__table__, and_(
740 getattr(item_class, item_fk_fieldname) == diagnosis_class.id,
741 item_class._device_id == diagnosis_class._device_id,
742 item_class._era == diagnosis_class._era,
743 item_class._current == True,
744 ))
745 .join(PatientIdNum.__table__, and_(
746 PatientIdNum.patient_id == Patient.id,
747 PatientIdNum._device_id == Patient._device_id,
748 PatientIdNum._era == Patient._era,
749 PatientIdNum._current == True,
750 PatientIdNum.which_idnum == which_idnum,
751 PatientIdNum.idnum_value.isnot(None), # NOT NULL
752 ))
753 )
754 wheres = [
755 Patient._current == True, # noqa: E712
756 ]
757 if not req.user.superuser:
758 # Restrict to accessible groups
759 group_ids = req.user.ids_of_groups_user_may_report_on
760 wheres.append(diagnosis_class._group_id.in_(group_ids))
761 else:
762 group_ids = [] # type: List[int] # to stop type-checker moaning below
764 # Age limits are simple, as the same patient has the same age for
765 # all diagnosis rows.
766 today = req.today
767 if age_maximum_y is not None:
768 # Example: max age is 40; earliest (oldest) DOB is therefore 41
769 # years ago plus one day (e.g. if it's 15 June 2010, then earliest
770 # DOB is 16 June 1969; a person born then will be 41 tomorrow).
771 earliest_dob = pendulum_date_to_datetime_date(
772 today.subtract(years=age_maximum_y + 1).add(days=1)
773 )
774 wheres.append(Patient.dob >= earliest_dob)
775 if age_minimum_y is not None:
776 # Example: min age is 20; latest (youngest) DOB is therefore 20
777 # years ago (e.g. if it's 15 June 2010, latest DOB is 15 June 1990;
778 # if you're born after that, you're not 20 yet).
779 latest_dob = pendulum_date_to_datetime_date(
780 today.subtract(years=age_minimum_y)
781 )
782 wheres.append(Patient.dob <= latest_dob)
784 # Diagnosis criteria are a little bit more complex.
785 #
786 # We can reasonably do inclusion criteria as "show the diagnoses
787 # matching the inclusion criteria" (not the more complex "show all
788 # diagnoses for patients having at least one inclusion diagnosis",
789 # which is likely to be too verbose for patient finding).
790 inclusion_criteria = [] # type: List[ColumnElement]
791 for idx in inclusion_dx:
792 inclusion_criteria.append(item_class.code.like(idx))
793 wheres.append(or_(*inclusion_criteria))
795 # Exclusion criteria are the trickier: we need to be able to link
796 # multiple diagnoses for the same patient, so we need to use a linking
797 # ID number.
798 if exclusion_dx:
799 # noinspection PyUnresolvedReferences
800 edx_items = item_class.__table__.alias("edx_items")
801 # noinspection PyUnresolvedReferences
802 edx_sets = diagnosis_class.__table__.alias("edx_sets")
803 # noinspection PyUnresolvedReferences
804 edx_patient = Patient.__table__.alias("edx_patient")
805 # noinspection PyUnresolvedReferences
806 edx_idnum = PatientIdNum.__table__.alias("edx_idnum")
807 edx_joined = (
808 edx_items
809 .join(edx_sets, and_(
810 getattr(edx_items.c, item_fk_fieldname) == edx_sets.c.id,
811 edx_items.c._device_id == edx_sets.c._device_id,
812 edx_items.c._era == edx_sets.c._era,
813 edx_items.c._current == True, # noqa: E712
814 ))
815 .join(edx_patient, and_(
816 edx_sets.c.patient_id == edx_patient.c.id,
817 edx_sets.c._device_id == edx_patient.c._device_id,
818 edx_sets.c._era == edx_patient.c._era,
819 edx_sets.c._current == True, # noqa: E712
820 ))
821 .join(edx_idnum, and_(
822 edx_idnum.c.patient_id == edx_patient.c.id,
823 edx_idnum.c._device_id == edx_patient.c._device_id,
824 edx_idnum.c._era == edx_patient.c._era,
825 edx_idnum.c._current == True, # noqa: E712
826 edx_idnum.c.which_idnum == which_idnum,
827 ))
828 )
829 exclusion_criteria = [] # type: List[ColumnElement]
830 for edx in exclusion_dx:
831 exclusion_criteria.append(edx_items.c.code.like(edx))
832 edx_wheres = [
833 edx_items.c._current == True, # noqa: E712
834 edx_idnum.c.idnum_value == PatientIdNum.idnum_value,
835 or_(*exclusion_criteria)
836 ]
837 # Note the join above between the main and the EXISTS clauses.
838 # We don't use an alias for the main copy of the PatientIdNum table,
839 # and we do for the EXISTS version. This is fine; e.g.
840 # https://msdn.microsoft.com/en-us/library/ethytz2x.aspx example:
841 # SELECT boss.name, employee.name
842 # FROM employee
843 # INNER JOIN employee boss ON employee.manager_id = boss.emp_id;
844 if not req.user.superuser:
845 # Restrict to accessible groups
846 # group_ids already defined from above
847 edx_wheres.append(edx_sets.c._group_id.in_(group_ids))
848 # ... bugfix 2018-06-19: "wheres" -> "edx_wheres"
849 exclusion_select = (
850 select(["*"])
851 .select_from(edx_joined)
852 .where(and_(*edx_wheres))
853 )
854 wheres.append(not_(exists(exclusion_select)))
856 query = select(select_fields).select_from(select_from).where(and_(*wheres))
857 return query
860# noinspection PyAbstractClass
861class DiagnosisFinderReportBase(Report):
862 """Report to show all diagnoses."""
864 # noinspection PyMethodParameters
865 @classproperty
866 def superuser_only(cls) -> bool:
867 return False
869 @staticmethod
870 def get_paramform_schema_class() -> Type["ReportParamSchema"]:
871 return DiagnosisFinderReportSchema
873 @classmethod
874 def get_specific_http_query_keys(cls) -> List[str]:
875 return [
876 ViewParam.WHICH_IDNUM,
877 ViewParam.DIAGNOSES_INCLUSION,
878 ViewParam.DIAGNOSES_EXCLUSION,
879 ViewParam.AGE_MINIMUM,
880 ViewParam.AGE_MAXIMUM
881 ]
883 def render_single_page_html(self,
884 req: "CamcopsRequest",
885 column_names: List[str],
886 page: CamcopsPage) -> Response:
887 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
888 inclusion_dx = req.get_str_list_param(
889 ViewParam.DIAGNOSES_INCLUSION,
890 validator=validate_restricted_sql_search_literal)
891 exclusion_dx = req.get_str_list_param(
892 ViewParam.DIAGNOSES_EXCLUSION,
893 validator=validate_restricted_sql_search_literal)
894 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM)
895 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM)
896 idnum_desc = req.get_id_desc(which_idnum) or "BAD_IDNUM"
897 query = self.get_query(req)
898 sql = get_literal_query(query, bind=req.engine)
900 return render_to_response(
901 "diagnosis_finder_report.mako",
902 dict(title=self.title(req),
903 page=page,
904 column_names=column_names,
905 report_id=self.report_id,
906 idnum_desc=idnum_desc,
907 inclusion_dx=inclusion_dx,
908 exclusion_dx=exclusion_dx,
909 age_minimum=age_minimum,
910 age_maximum=age_maximum,
911 sql=sql),
912 request=req
913 )
916class DiagnosisICD10FinderReport(DiagnosisFinderReportBase):
917 # noinspection PyMethodParameters
918 @classproperty
919 def report_id(cls) -> str:
920 return "diagnoses_finder_icd10"
922 @classmethod
923 def title(cls, req: "CamcopsRequest") -> str:
924 _ = req.gettext
925 return _("Diagnosis – Find patients by ICD-10 diagnosis ± age")
927 def get_query(self, req: CamcopsRequest) -> SelectBase:
928 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
929 inclusion_dx = req.get_str_list_param(
930 ViewParam.DIAGNOSES_INCLUSION,
931 validator=validate_restricted_sql_search_literal)
932 exclusion_dx = req.get_str_list_param(
933 ViewParam.DIAGNOSES_EXCLUSION,
934 validator=validate_restricted_sql_search_literal)
935 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM)
936 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM)
938 q = get_diagnosis_inc_exc_report_query(
939 req,
940 diagnosis_class=DiagnosisIcd10,
941 item_class=DiagnosisIcd10Item,
942 item_fk_fieldname='diagnosis_icd10_id',
943 system='ICD-10',
944 which_idnum=which_idnum,
945 inclusion_dx=inclusion_dx,
946 exclusion_dx=exclusion_dx,
947 age_minimum_y=age_minimum,
948 age_maximum_y=age_maximum,
949 )
950 q = q.order_by(*ORDER_BY)
951 # log.debug("Final query:\n{}", get_literal_query(q, bind=req.engine))
952 return q
954 @staticmethod
955 def get_test_querydict() -> Dict[str, Any]:
956 return {
957 ViewParam.WHICH_IDNUM: 1,
958 ViewParam.DIAGNOSES_INCLUSION: ['F32%'],
959 ViewParam.DIAGNOSES_EXCLUSION: [],
960 ViewParam.AGE_MINIMUM: None,
961 ViewParam.AGE_MAXIMUM: None,
962 }
965class DiagnosisICD9CMFinderReport(DiagnosisFinderReportBase):
966 # noinspection PyMethodParameters
967 @classproperty
968 def report_id(cls) -> str:
969 return "diagnoses_finder_icd9cm"
971 @classmethod
972 def title(cls, req: "CamcopsRequest") -> str:
973 _ = req.gettext
974 return _(
975 "Diagnosis – Find patients by ICD-9-CM (DSM-IV-TR) diagnosis ± age"
976 )
978 def get_query(self, req: CamcopsRequest) -> SelectBase:
979 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM)
980 inclusion_dx = req.get_str_list_param(
981 ViewParam.DIAGNOSES_INCLUSION,
982 validator=validate_restricted_sql_search_literal)
983 exclusion_dx = req.get_str_list_param(
984 ViewParam.DIAGNOSES_EXCLUSION,
985 validator=validate_restricted_sql_search_literal)
986 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM)
987 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM)
989 q = get_diagnosis_inc_exc_report_query(
990 req,
991 diagnosis_class=DiagnosisIcd9CM,
992 item_class=DiagnosisIcd9CMItem,
993 item_fk_fieldname='diagnosis_icd9cm_id',
994 system='ICD-9-CM',
995 which_idnum=which_idnum,
996 inclusion_dx=inclusion_dx,
997 exclusion_dx=exclusion_dx,
998 age_minimum_y=age_minimum,
999 age_maximum_y=age_maximum,
1000 )
1001 q = q.order_by(*ORDER_BY)
1002 # log.debug("Final query:\n{}", get_literal_query(q, bind=req.engine))
1003 return q
1005 @staticmethod
1006 def get_test_querydict() -> Dict[str, Any]:
1007 return {
1008 ViewParam.WHICH_IDNUM: 1,
1009 ViewParam.DIAGNOSES_INCLUSION: ['296%'],
1010 ViewParam.DIAGNOSES_EXCLUSION: [],
1011 ViewParam.AGE_MINIMUM: None,
1012 ViewParam.AGE_MAXIMUM: None,
1013 }