Coverage for tasks/diagnosis.py: 45%

398 statements  

« prev     ^ index     » next       coverage.py v7.9.2, created at 2025-07-15 14:23 +0100

1""" 

2camcops_server/tasks/diagnosis.py 

3 

4=============================================================================== 

5 

6 Copyright (C) 2012, University of Cambridge, Department of Psychiatry. 

7 Created by Rudolf Cardinal (rnc1001@cam.ac.uk). 

8 

9 This file is part of CamCOPS. 

10 

11 CamCOPS is free software: you can redistribute it and/or modify 

12 it under the terms of the GNU General Public License as published by 

13 the Free Software Foundation, either version 3 of the License, or 

14 (at your option) any later version. 

15 

16 CamCOPS is distributed in the hope that it will be useful, 

17 but WITHOUT ANY WARRANTY; without even the implied warranty of 

18 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 

19 GNU General Public License for more details. 

20 

21 You should have received a copy of the GNU General Public License 

22 along with CamCOPS. If not, see <https://www.gnu.org/licenses/>. 

23 

24=============================================================================== 

25 

26""" 

27 

28from abc import ABC, ABCMeta 

29import datetime 

30import logging 

31from typing import Any, Dict, List, Optional, Sequence, Type, TYPE_CHECKING 

32 

33from cardinal_pythonlib.classes import classproperty 

34from cardinal_pythonlib.colander_utils import get_child_node, OptionalIntNode 

35from cardinal_pythonlib.datetimefunc import pendulum_date_to_datetime_date 

36from cardinal_pythonlib.logs import BraceStyleAdapter 

37import cardinal_pythonlib.rnc_web as ws 

38from cardinal_pythonlib.sqlalchemy.dump import get_literal_query 

39from colander import Invalid, SchemaNode, SequenceSchema, String 

40from fhirclient.models.annotation import Annotation 

41from fhirclient.models.codeableconcept import CodeableConcept 

42from fhirclient.models.coding import Coding 

43from fhirclient.models.condition import Condition 

44import hl7 

45from pyramid.renderers import render_to_response 

46from pyramid.response import Response 

47from sqlalchemy import CompoundSelect, Select 

48from sqlalchemy.orm import Mapped, mapped_column 

49from sqlalchemy.sql.expression import ( 

50 and_, 

51 exists, 

52 literal, 

53 not_, 

54 or_, 

55 select, 

56 union, 

57) 

58from sqlalchemy.sql.sqltypes import Date, UnicodeText 

59 

60from camcops_server.cc_modules.cc_constants import CssClass, FHIRConst as Fc 

61from camcops_server.cc_modules.cc_ctvinfo import CtvInfo 

62from camcops_server.cc_modules.cc_db import ( 

63 ancillary_relationship, 

64 GenericTabletRecordMixin, 

65 TaskDescendant, 

66) 

67from camcops_server.cc_modules.cc_fhir import make_fhir_bundle_entry 

68from camcops_server.cc_modules.cc_forms import ( 

69 LinkingIdNumSelector, 

70 or_join_description, 

71 ReportParamSchema, 

72 RequestAwareMixin, 

73) 

74from camcops_server.cc_modules.cc_hl7 import make_dg1_segment 

75from camcops_server.cc_modules.cc_html import answer, tr 

76from camcops_server.cc_modules.cc_nlp import guess_name_components 

77from camcops_server.cc_modules.cc_patient import Patient 

78from camcops_server.cc_modules.cc_patientidnum import PatientIdNum 

79from camcops_server.cc_modules.cc_pyramid import CamcopsPage, ViewParam 

80from camcops_server.cc_modules.cc_task import ( 

81 Task, 

82 TaskHasClinicianMixin, 

83 TaskHasPatientMixin, 

84) 

85from camcops_server.cc_modules.cc_exportrecipient import ExportRecipient 

86from camcops_server.cc_modules.cc_request import CamcopsRequest 

87from camcops_server.cc_modules.cc_report import Report 

88from camcops_server.cc_modules.cc_snomed import ( 

89 SnomedConcept, 

90 SnomedExpression, 

91 SnomedFocusConcept, 

92) 

93from camcops_server.cc_modules.cc_sqlalchemy import Base 

94from camcops_server.cc_modules.cc_sqla_coltypes import ( 

95 DiagnosticCodeColType, 

96 mapped_camcops_column, 

97) 

98from camcops_server.cc_modules.cc_validators import ( 

99 validate_restricted_sql_search_literal, 

100) 

101 

102if TYPE_CHECKING: 

103 from sqlalchemy.sql.elements import ColumnElement 

104 

105log = BraceStyleAdapter(logging.getLogger(__name__)) 

106 

107# ============================================================================= 

108# Helpers 

109# ============================================================================= 

110 

111FK_COMMENT = "FK to parent table" 

112 

113 

114# ============================================================================= 

115# DiagnosisBase 

116# ============================================================================= 

117 

118 

119class DiagnosisItemBase(GenericTabletRecordMixin, Base): 

120 __abstract__ = True 

121 

122 # noinspection PyMethodParameters 

123 seqnum: Mapped[int] = mapped_column( 

124 "seqnum", 

125 comment="Sequence number (consistently 1-based as of 2018-12-01)", 

126 ) 

127 

128 # noinspection PyMethodParameters 

129 code: Mapped[Optional[str]] = mapped_column( 

130 "code", 

131 DiagnosticCodeColType, 

132 comment="Diagnostic code", 

133 ) 

134 

135 # noinspection PyMethodParameters 

136 description: Mapped[Optional[str]] = mapped_camcops_column( 

137 "description", 

138 UnicodeText, 

139 exempt_from_anonymisation=True, 

140 comment="Description of the diagnostic code", 

141 ) 

142 

143 # noinspection PyMethodParameters 

144 comment: Mapped[Optional[str]] = mapped_column( 

145 "comment", 

146 UnicodeText, 

147 comment="Clinician's comment", 

148 ) 

149 

150 def get_html_table_row(self) -> str: 

151 return tr( 

152 self.seqnum, 

153 answer(ws.webify(self.code)), 

154 answer(ws.webify(self.description)), 

155 answer(ws.webify(self.comment)), 

156 ) 

157 

158 def get_code_for_hl7(self) -> str: 

159 # Normal format is to strip out periods, e.g. "F20.0" becomes "F200" 

160 if not self.code: 

161 return "" 

162 return self.code.replace(".", "").upper() 

163 

164 def get_text_for_hl7(self) -> str: 

165 return self.description or "" 

166 

167 def is_empty(self) -> bool: 

168 return not bool(self.code) 

169 

170 def human(self) -> str: 

171 suffix = f" [{self.comment}]" if self.comment else "" 

172 return f"{self.code}: {self.description}{suffix}" 

173 

174 

175class DiagnosisBase( # type: ignore[misc] 

176 TaskHasClinicianMixin, 

177 TaskHasPatientMixin, 

178 Task, 

179 ABC, 

180 metaclass=ABCMeta, 

181): 

182 __abstract__ = True 

183 

184 # noinspection PyMethodParameters 

185 relates_to_date: Mapped[Optional[datetime.date]] = mapped_column( 

186 "relates_to_date", 

187 Date, 

188 comment="Date that diagnoses relate to", 

189 ) 

190 

191 items = None # type: List[DiagnosisItemBase] 

192 # ... must be overridden by a relationship 

193 

194 hl7_coding_system = "?" 

195 

196 def get_num_items(self) -> int: 

197 return len(self.items) 

198 

199 def is_complete(self) -> bool: 

200 if self.relates_to_date is None: 

201 return False 

202 if self.get_num_items() == 0: 

203 return False 

204 for item in self.items: # type: DiagnosisItemBase 

205 if item.is_empty(): 

206 return False 

207 return True 

208 

209 def get_task_html(self, req: CamcopsRequest) -> str: 

210 html = f""" 

211 <div class="{CssClass.SUMMARY}"> 

212 <table class="{CssClass.SUMMARY}"> 

213 {self.get_is_complete_tr(req)} 

214 </table> 

215 </div> 

216 <table class="{CssClass.TASKDETAIL}"> 

217 <tr> 

218 <th width="10%">Diagnosis #</th> 

219 <th width="10%">Code</th> 

220 <th width="40%">Description</th> 

221 <th width="40%">Comment</th> 

222 </tr> 

223 """ 

224 for item in self.items: 

225 html += item.get_html_table_row() 

226 html += """ 

227 </table> 

228 """ 

229 return html 

230 

231 def get_clinical_text(self, req: CamcopsRequest) -> List[CtvInfo]: 

232 infolist = [] 

233 for item in self.items: 

234 infolist.append( 

235 CtvInfo( 

236 content=( 

237 f"<b>{ws.webify(item.code)}</b>: " 

238 f"{ws.webify(item.description)}" 

239 ) 

240 ) 

241 ) 

242 return infolist 

243 

244 # noinspection PyUnusedLocal 

245 def get_hl7_extra_data_segments( 

246 self, recipient_def: ExportRecipient 

247 ) -> List[hl7.Segment]: 

248 segments = [] 

249 clinician = guess_name_components(self.clinician_name) 

250 for i in range(len(self.items)): 

251 set_id = i + 1 # make it 1-based, not 0-based 

252 item = self.items[i] 

253 segments.append( 

254 make_dg1_segment( 

255 set_id=set_id, 

256 diagnosis_datetime=self.get_creation_datetime(), 

257 coding_system=self.hl7_coding_system, 

258 diagnosis_identifier=item.get_code_for_hl7(), 

259 diagnosis_text=item.get_text_for_hl7(), 

260 clinician_surname=clinician.get("surname") or "", 

261 clinician_forename=clinician.get("forename") or "", 

262 clinician_prefix=clinician.get("prefix") or "", 

263 attestation_datetime=self.get_creation_datetime(), 

264 ) 

265 ) 

266 return segments 

267 

268 def _get_fhir_extra_bundle_entries_for_system( 

269 self, req: CamcopsRequest, recipient: ExportRecipient, system: str 

270 ) -> List[Dict]: 

271 bundle_entries = [] # type: List[Dict] 

272 for item in self.items: 

273 display = item.human() 

274 condition_dict = { 

275 Fc.CODE: CodeableConcept( 

276 jsondict={ 

277 Fc.CODING: [ 

278 Coding( 

279 jsondict={ 

280 Fc.SYSTEM: system, 

281 Fc.CODE: item.code, 

282 Fc.DISPLAY: display, 

283 Fc.USER_SELECTED: True, 

284 } 

285 ).as_json() 

286 ], 

287 Fc.TEXT: display, 

288 } 

289 ).as_json(), 

290 Fc.SUBJECT: self._get_fhir_subject_ref(req, recipient), 

291 Fc.RECORDER: self._get_fhir_practitioner_ref(req), 

292 } 

293 if item.comment: 

294 condition_dict[Fc.NOTE] = [ 

295 Annotation( 

296 jsondict={ 

297 Fc.AUTHOR_REFERENCE: self._get_fhir_practitioner_ref( # noqa 

298 req 

299 ), 

300 Fc.AUTHOR_STRING: self.get_clinician_name(), 

301 Fc.TEXT: item.comment, 

302 Fc.TIME: self.fhir_when_task_created, 

303 } 

304 ).as_json() 

305 ] 

306 bundle_entry = make_fhir_bundle_entry( 

307 resource_type_url=Fc.RESOURCE_TYPE_CONDITION, 

308 identifier=self._get_fhir_condition_id(req, item.seqnum), 

309 resource=Condition(jsondict=condition_dict).as_json(), 

310 ) 

311 bundle_entries.append(bundle_entry) 

312 return bundle_entries 

313 

314 

315# ============================================================================= 

316# DiagnosisIcd10 

317# ============================================================================= 

318 

319 

320class DiagnosisIcd10Item(DiagnosisItemBase, TaskDescendant): 

321 __tablename__ = "diagnosis_icd10_item" 

322 

323 diagnosis_icd10_id: Mapped[int] = mapped_column(comment=FK_COMMENT) 

324 

325 # ------------------------------------------------------------------------- 

326 # TaskDescendant overrides 

327 # ------------------------------------------------------------------------- 

328 

329 @classmethod 

330 def task_ancestor_class(cls) -> Optional[Type["Task"]]: 

331 return DiagnosisIcd10 

332 

333 def task_ancestor(self) -> Optional["DiagnosisIcd10"]: 

334 return DiagnosisIcd10.get_linked(self.diagnosis_icd10_id, self) # type: ignore[return-value] # noqa: E501 

335 

336 

337class DiagnosisIcd10(DiagnosisBase): 

338 """ 

339 Server implementation of the Diagnosis/ICD-10 task. 

340 """ 

341 

342 __tablename__ = "diagnosis_icd10" 

343 info_filename_stem = "icd" 

344 

345 items = ancillary_relationship( # type: ignore[assignment] 

346 parent_class_name="DiagnosisIcd10", 

347 ancillary_class_name="DiagnosisIcd10Item", 

348 ancillary_fk_to_parent_attr_name="diagnosis_icd10_id", 

349 ancillary_order_by_attr_name="seqnum", 

350 ) # type: List[DiagnosisIcd10Item] 

351 

352 shortname = "Diagnosis_ICD10" 

353 dependent_classes = [DiagnosisIcd10Item] 

354 hl7_coding_system = "I10" 

355 # Page A-129 of 

356 # https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf 

357 

358 @staticmethod 

359 def longname(req: "CamcopsRequest") -> str: 

360 _ = req.gettext 

361 return _("Diagnostic codes, ICD-10") 

362 

363 def get_snomed_codes( 

364 self, req: CamcopsRequest, fallback: bool = True 

365 ) -> List[SnomedExpression]: 

366 """ 

367 Returns all SNOMED-CT codes for this task. 

368 

369 Args: 

370 req: the 

371 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

372 fallback: for example, if F32.10 is unknown, should we fall back to 

373 F32.1? 

374 

375 Returns: 

376 a list of 

377 :class:`camcops_server.cc_modules.cc_snomed.SnomedExpression` 

378 objects 

379 """ 

380 if not req.icd10_snomed_supported: 

381 return [] 

382 snomed_codes = [] # type: List[SnomedExpression] 

383 for item in self.items: 

384 concepts = self._get_snomed_concepts(item.code, req, fallback) 

385 if not concepts: 

386 continue 

387 focusconcept = SnomedFocusConcept(concepts) 

388 snomed_codes.append(SnomedExpression(focusconcept)) 

389 return snomed_codes 

390 

391 @staticmethod 

392 def _get_snomed_concepts( 

393 icd10_code: str, req: CamcopsRequest, fallback: bool = True 

394 ) -> List[SnomedConcept]: 

395 """ 

396 Internal function to return :class:`SnomedConcept` objects for an 

397 ICD-10 code. 

398 

399 Args: 

400 icd10_code: the ICD-10 code 

401 req: the 

402 :class:`camcops_server.cc_modules.cc_request.CamcopsRequest` 

403 fallback: for example, if F32.10 is unknown, should we fall back to 

404 F32.1? 

405 

406 Returns: 

407 list: of :class:`SnomedConcept` objects 

408 

409 """ 

410 concepts = [] # type: List[SnomedConcept] 

411 while icd10_code: 

412 try: 

413 concepts = req.icd10_snomed(icd10_code) 

414 except KeyError: # no known code 

415 pass 

416 if concepts or not fallback: 

417 return concepts 

418 # Now fall back 

419 icd10_code = icd10_code[:-1] 

420 # Run out of code 

421 return concepts 

422 

423 def get_fhir_extra_bundle_entries( 

424 self, req: CamcopsRequest, recipient: ExportRecipient 

425 ) -> List[Dict]: 

426 return self._get_fhir_extra_bundle_entries_for_system( 

427 req, recipient, Fc.CODE_SYSTEM_ICD10 

428 ) 

429 

430 

431# ============================================================================= 

432# DiagnosisIcd9CM 

433# ============================================================================= 

434 

435 

436class DiagnosisIcd9CMItem(DiagnosisItemBase, TaskDescendant): 

437 __tablename__ = "diagnosis_icd9cm_item" 

438 

439 diagnosis_icd9cm_id: Mapped[int] = mapped_column(comment=FK_COMMENT) 

440 

441 # ------------------------------------------------------------------------- 

442 # TaskDescendant overrides 

443 # ------------------------------------------------------------------------- 

444 

445 @classmethod 

446 def task_ancestor_class(cls) -> Optional[Type["Task"]]: 

447 return DiagnosisIcd9CM 

448 

449 def task_ancestor(self) -> Optional["DiagnosisIcd9CM"]: 

450 return DiagnosisIcd9CM.get_linked(self.diagnosis_icd9cm_id, self) # type: ignore[return-value] # noqa: E501 

451 

452 

453class DiagnosisIcd9CM(DiagnosisBase): 

454 """ 

455 Server implementation of the Diagnosis/ICD-9-CM task. 

456 """ 

457 

458 __tablename__ = "diagnosis_icd9cm" 

459 info_filename_stem = "icd" 

460 

461 items = ancillary_relationship( # type: ignore[assignment] 

462 parent_class_name="DiagnosisIcd9CM", 

463 ancillary_class_name="DiagnosisIcd9CMItem", 

464 ancillary_fk_to_parent_attr_name="diagnosis_icd9cm_id", 

465 ancillary_order_by_attr_name="seqnum", 

466 ) # type: List[DiagnosisIcd9CMItem] 

467 

468 shortname = "Diagnosis_ICD9CM" 

469 dependent_classes = [DiagnosisIcd9CMItem] 

470 hl7_coding_system = "I9CM" 

471 # Page A-129 of 

472 # https://www.hl7.org/special/committees/vocab/V26_Appendix_A.pdf 

473 

474 @staticmethod 

475 def longname(req: "CamcopsRequest") -> str: 

476 _ = req.gettext 

477 return _("Diagnostic codes, ICD-9-CM (DSM-IV-TR)") 

478 

479 def get_snomed_codes(self, req: CamcopsRequest) -> List[SnomedExpression]: 

480 if not req.icd9cm_snomed_supported: 

481 return [] 

482 snomed_codes = [] # type: List[SnomedExpression] 

483 # noinspection PyTypeChecker 

484 for item in self.items: 

485 try: 

486 concepts = req.icd9cm_snomed(item.code) 

487 except KeyError: # no known code 

488 continue 

489 if not concepts: 

490 continue 

491 focusconcept = SnomedFocusConcept(concepts) 

492 snomed_codes.append(SnomedExpression(focusconcept)) 

493 return snomed_codes 

494 

495 def get_fhir_extra_bundle_entries( 

496 self, req: CamcopsRequest, recipient: ExportRecipient 

497 ) -> List[Dict]: 

498 return self._get_fhir_extra_bundle_entries_for_system( 

499 req, recipient, Fc.CODE_SYSTEM_ICD9_CM 

500 ) 

501 

502 

503# ============================================================================= 

504# Reports 

505# ============================================================================= 

506 

507# ----------------------------------------------------------------------------- 

508# Helpers 

509# ----------------------------------------------------------------------------- 

510 

511ORDER_BY = [ 

512 "surname", 

513 "forename", 

514 "dob", 

515 "sex", 

516 "when_created", 

517 "system", 

518 "code", 

519] 

520 

521 

522# noinspection PyProtectedMember,PyUnresolvedReferences 

523def get_diagnosis_report_query( 

524 req: CamcopsRequest, 

525 diagnosis_class: Type[DiagnosisBase], 

526 item_class: Type[DiagnosisItemBase], 

527 item_fk_fieldname: str, 

528 system: str, 

529) -> Select[Any]: 

530 # SELECT surname, forename, dob, sex, ... 

531 select_fields: list[ColumnElement[Any]] = [ 

532 Patient.surname.label("surname"), 

533 Patient.forename.label("forename"), 

534 Patient.dob.label("dob"), 

535 Patient.sex.label("sex"), 

536 ] 

537 from_clause = ( 

538 # FROM patient 

539 Patient.__table__ 

540 # INNER JOIN dxset ON (dxtable.patient_id == patient.id AND ...) 

541 .join( 

542 diagnosis_class.__table__, 

543 and_( 

544 diagnosis_class.patient_id == Patient.id, 

545 diagnosis_class._device_id == Patient._device_id, 

546 diagnosis_class._era == Patient._era, 

547 ), 

548 ) 

549 # INNER JOIN dxrow ON (dxrow.fk_dxset = dxset.pk AND ...) 

550 .join( 

551 item_class.__table__, 

552 and_( 

553 getattr(item_class, item_fk_fieldname) == diagnosis_class.id, 

554 item_class._device_id == diagnosis_class._device_id, 

555 item_class._era == diagnosis_class._era, 

556 ), 

557 ) 

558 ) 

559 for iddef in req.idnum_definitions: 

560 n = iddef.which_idnum 

561 desc = iddef.short_description 

562 aliased_table = PatientIdNum.__table__.alias(f"i{n}") 

563 # ... [also] SELECT i1.idnum_value AS 'NHS' (etc.) 

564 select_fields.append(aliased_table.c.idnum_value.label(desc)) 

565 # ... [from] OUTER JOIN patientidnum AS i1 ON (...) 

566 from_clause = from_clause.outerjoin( 

567 aliased_table, 

568 and_( 

569 aliased_table.c.patient_id == Patient.id, 

570 aliased_table.c._device_id == Patient._device_id, 

571 aliased_table.c._era == Patient._era, 

572 # Note: the following are part of the JOIN, not the WHERE: 

573 # (or failure to match a row will wipe out the Patient from the 

574 # OUTER JOIN): 

575 aliased_table.c._current == True, # noqa: E712 

576 aliased_table.c.which_idnum == n, 

577 ), 

578 ) 

579 select_fields += [ 

580 diagnosis_class.when_created.label("when_created"), 

581 literal(system).label("system"), 

582 item_class.code.label("code"), 

583 item_class.description.label("description"), 

584 ] 

585 # WHERE... 

586 wheres = [ 

587 Patient._current == True, # noqa: E712 

588 diagnosis_class._current == True, # noqa: E712 

589 item_class._current == True, # noqa: E712 

590 ] 

591 if not req.user.superuser: 

592 # Restrict to accessible groups 

593 group_ids = req.user.ids_of_groups_user_may_report_on 

594 wheres.append(diagnosis_class._group_id.in_(group_ids)) 

595 # Helpfully, SQLAlchemy will render this as "... AND 1 != 1" if we 

596 # pass an empty list to in_(). 

597 query = ( 

598 select(*select_fields).select_from(from_clause).where(and_(*wheres)) 

599 ) 

600 return query 

601 

602 

603def get_diagnosis_report( 

604 req: CamcopsRequest, 

605 diagnosis_class: Type[DiagnosisBase], 

606 item_class: Type[DiagnosisItemBase], 

607 item_fk_fieldname: str, 

608 system: str, 

609) -> Select[Any]: 

610 query = get_diagnosis_report_query( 

611 req, diagnosis_class, item_class, item_fk_fieldname, system 

612 ) 

613 query = query.order_by(*ORDER_BY) 

614 return query 

615 

616 

617# ----------------------------------------------------------------------------- 

618# Plain "all diagnoses" reports 

619# ----------------------------------------------------------------------------- 

620 

621 

622class DiagnosisICD9CMReport(Report): 

623 """Report to show ICD-9-CM (DSM-IV-TR) diagnoses.""" 

624 

625 # noinspection PyMethodParameters 

626 @classproperty 

627 def report_id(cls) -> str: 

628 return "diagnoses_icd9cm" 

629 

630 @classmethod 

631 def title(cls, req: "CamcopsRequest") -> str: 

632 _ = req.gettext 

633 return _( 

634 "Diagnosis – ICD-9-CM (DSM-IV-TR) diagnoses for all " "patients" 

635 ) 

636 

637 # noinspection PyMethodParameters 

638 @classproperty 

639 def superuser_only(cls) -> bool: 

640 return False 

641 

642 def get_query(self, req: CamcopsRequest) -> Select[Any]: 

643 return get_diagnosis_report( 

644 req, 

645 diagnosis_class=DiagnosisIcd9CM, 

646 item_class=DiagnosisIcd9CMItem, 

647 item_fk_fieldname="diagnosis_icd9cm_id", 

648 system="ICD-9-CM", 

649 ) 

650 

651 

652class DiagnosisICD10Report(Report): 

653 """Report to show ICD-10 diagnoses.""" 

654 

655 # noinspection PyMethodParameters 

656 @classproperty 

657 def report_id(cls) -> str: 

658 return "diagnoses_icd10" 

659 

660 @classmethod 

661 def title(cls, req: "CamcopsRequest") -> str: 

662 _ = req.gettext 

663 return _("Diagnosis – ICD-10 diagnoses for all patients") 

664 

665 # noinspection PyMethodParameters 

666 @classproperty 

667 def superuser_only(cls) -> bool: 

668 return False 

669 

670 def get_query(self, req: CamcopsRequest) -> Select[Any]: 

671 return get_diagnosis_report( 

672 req, 

673 diagnosis_class=DiagnosisIcd10, 

674 item_class=DiagnosisIcd10Item, 

675 item_fk_fieldname="diagnosis_icd10_id", 

676 system="ICD-10", 

677 ) 

678 

679 

680class DiagnosisAllReport(Report): 

681 """Report to show all diagnoses.""" 

682 

683 # noinspection PyMethodParameters 

684 @classproperty 

685 def report_id(cls) -> str: 

686 return "diagnoses_all" 

687 

688 @classmethod 

689 def title(cls, req: "CamcopsRequest") -> str: 

690 _ = req.gettext 

691 return _("Diagnosis – All diagnoses for all patients") 

692 

693 # noinspection PyMethodParameters 

694 @classproperty 

695 def superuser_only(cls) -> bool: 

696 return False 

697 

698 def get_query(self, req: CamcopsRequest) -> CompoundSelect[Any]: 

699 sql_icd9cm = get_diagnosis_report_query( 

700 req, 

701 diagnosis_class=DiagnosisIcd9CM, 

702 item_class=DiagnosisIcd9CMItem, 

703 item_fk_fieldname="diagnosis_icd9cm_id", 

704 system="ICD-9-CM", 

705 ) 

706 sql_icd10 = get_diagnosis_report_query( 

707 req, 

708 diagnosis_class=DiagnosisIcd10, 

709 item_class=DiagnosisIcd10Item, 

710 item_fk_fieldname="diagnosis_icd10_id", 

711 system="ICD-10", 

712 ) 

713 query = union(sql_icd9cm, sql_icd10) 

714 query = query.order_by(*ORDER_BY) 

715 return query 

716 

717 

718# ----------------------------------------------------------------------------- 

719# "Find me patients matching certain diagnostic criteria" 

720# ----------------------------------------------------------------------------- 

721 

722 

723class DiagnosisNode(SchemaNode, RequestAwareMixin): 

724 schema_type = String 

725 

726 def __init__(self, *args: Any, **kwargs: Any) -> None: 

727 self.title = "" # for type checker 

728 self.description = "" # for type checker 

729 super().__init__(*args, **kwargs) 

730 

731 # noinspection PyUnusedLocal 

732 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None: 

733 _ = self.gettext 

734 self.title = _("Diagnostic code") 

735 self.description = _( 

736 "Type in a diagnostic code; you may use SQL 'LIKE' syntax for " 

737 "wildcards, i.e. _ for one character and % for zero/one/lots" 

738 ) 

739 

740 def validator(self, node: SchemaNode, value: str) -> None: 

741 try: 

742 validate_restricted_sql_search_literal(value, self.request) 

743 except ValueError as e: 

744 raise Invalid(node, str(e)) 

745 

746 

747class DiagnosesSequence(SequenceSchema, RequestAwareMixin): 

748 diagnoses = DiagnosisNode() 

749 

750 def __init__( 

751 self, *args: Any, minimum_number: int = 0, **kwargs: Any 

752 ) -> None: 

753 self.minimum_number = minimum_number 

754 self.title = "" # for type checker 

755 self.description = "" # for type checker 

756 super().__init__(*args, **kwargs) 

757 

758 # noinspection PyUnusedLocal 

759 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None: 

760 request = self.request 

761 _ = request.gettext 

762 self.title = _("Diagnostic codes") 

763 self.description = ( 

764 _( 

765 "Use % as a wildcard (e.g. F32 matches only F32, but F32% " 

766 "matches F32, F32.1, F32.2...)." 

767 ) 

768 + " " 

769 + or_join_description(request) 

770 ) 

771 

772 def validator(self, node: SchemaNode, value: List[str]) -> None: 

773 assert isinstance(value, list) 

774 _ = self.gettext 

775 if len(value) < self.minimum_number: 

776 raise Invalid( 

777 node, 

778 _("You must specify at least") + f" {self.minimum_number}", 

779 ) 

780 if len(value) != len(set(value)): 

781 raise Invalid(node, _("You have specified duplicate diagnoses")) 

782 

783 

784class DiagnosisFinderReportSchema(ReportParamSchema): 

785 which_idnum = LinkingIdNumSelector() # must match ViewParam.WHICH_IDNUM 

786 diagnoses_inclusion = DiagnosesSequence( 

787 minimum_number=1 

788 ) # must match ViewParam.DIAGNOSES_INCLUSION 

789 diagnoses_exclusion = ( 

790 DiagnosesSequence() 

791 ) # must match ViewParam.DIAGNOSES_EXCLUSION 

792 age_minimum = OptionalIntNode() # must match ViewParam.AGE_MINIMUM 

793 age_maximum = OptionalIntNode() # must match ViewParam.AGE_MAXIMUM 

794 

795 # noinspection PyUnusedLocal 

796 def after_bind(self, node: SchemaNode, kw: Dict[str, Any]) -> None: 

797 _ = self.gettext 

798 diagnoses_inclusion = get_child_node(self, "diagnoses_inclusion") 

799 diagnoses_inclusion.title = _("Inclusion diagnoses (lifetime)") 

800 diagnoses_exclusion = get_child_node(self, "diagnoses_exclusion") 

801 diagnoses_exclusion.title = _("Exclusion diagnoses (lifetime)") 

802 age_minimum = get_child_node(self, "age_minimum") 

803 age_minimum.title = _("Minimum age (years) (optional)") 

804 age_maximum = get_child_node(self, "age_maximum") 

805 age_maximum.title = _("Maximum age (years) (optional)") 

806 

807 

808# noinspection PyProtectedMember 

809def get_diagnosis_inc_exc_report_query( 

810 req: CamcopsRequest, 

811 diagnosis_class: Type[DiagnosisBase], 

812 item_class: Type[DiagnosisItemBase], 

813 item_fk_fieldname: str, 

814 system: str, 

815 which_idnum: int, 

816 inclusion_dx: List[str], 

817 exclusion_dx: List[str], 

818 age_minimum_y: int, 

819 age_maximum_y: int, 

820) -> Select[Any]: 

821 """ 

822 As for get_diagnosis_report_query, but this makes some modifications to 

823 do inclusion and exclusion criteria. 

824 

825 - We need a linking number to perform exclusion criteria. 

826 - Therefore, we use a single ID number, which must not be NULL. 

827 """ 

828 # The basics: 

829 desc = req.get_id_desc(which_idnum) or "BAD_IDNUM" 

830 # noinspection PyUnresolvedReferences 

831 select_fields: list[ColumnElement[Any]] = [ 

832 Patient.surname.label("surname"), 

833 Patient.forename.label("forename"), 

834 Patient.dob.label("dob"), 

835 Patient.sex.label("sex"), 

836 PatientIdNum.idnum_value.label(desc), 

837 diagnosis_class.when_created.label("when_created"), 

838 literal(system).label("system"), 

839 item_class.code.label("code"), 

840 item_class.description.label("description"), 

841 ] 

842 # noinspection PyUnresolvedReferences 

843 select_from = ( 

844 Patient.__table__.join( 

845 diagnosis_class.__table__, 

846 and_( 

847 diagnosis_class.patient_id == Patient.id, 

848 diagnosis_class._device_id == Patient._device_id, 

849 diagnosis_class._era == Patient._era, 

850 diagnosis_class._current == True, # noqa: E712 

851 ), 

852 ) 

853 .join( 

854 item_class.__table__, 

855 and_( 

856 getattr(item_class, item_fk_fieldname) == diagnosis_class.id, 

857 item_class._device_id == diagnosis_class._device_id, 

858 item_class._era == diagnosis_class._era, 

859 item_class._current == True, # noqa: E712 

860 ), 

861 ) 

862 .join( 

863 PatientIdNum.__table__, 

864 and_( 

865 PatientIdNum.patient_id == Patient.id, 

866 PatientIdNum._device_id == Patient._device_id, 

867 PatientIdNum._era == Patient._era, 

868 PatientIdNum._current == True, # noqa: E712 

869 PatientIdNum.which_idnum == which_idnum, 

870 PatientIdNum.idnum_value.isnot(None), # NOT NULL 

871 ), 

872 ) 

873 ) 

874 wheres = [Patient._current == True] # noqa: E712 

875 

876 group_ids: list[int] = [] 

877 

878 if not req.user.superuser: 

879 # Restrict to accessible groups 

880 group_ids = req.user.ids_of_groups_user_may_report_on 

881 wheres.append(diagnosis_class._group_id.in_(group_ids)) 

882 

883 # Age limits are simple, as the same patient has the same age for 

884 # all diagnosis rows. 

885 today = req.today 

886 if age_maximum_y is not None: 

887 # Example: max age is 40; earliest (oldest) DOB is therefore 41 

888 # years ago plus one day (e.g. if it's 15 June 2010, then earliest 

889 # DOB is 16 June 1969; a person born then will be 41 tomorrow). 

890 earliest_dob = pendulum_date_to_datetime_date( 

891 today.subtract(years=age_maximum_y + 1).add(days=1) 

892 ) 

893 wheres.append(Patient.dob >= earliest_dob) 

894 if age_minimum_y is not None: 

895 # Example: min age is 20; latest (youngest) DOB is therefore 20 

896 # years ago (e.g. if it's 15 June 2010, latest DOB is 15 June 1990; 

897 # if you're born after that, you're not 20 yet). 

898 latest_dob = pendulum_date_to_datetime_date( 

899 today.subtract(years=age_minimum_y) 

900 ) 

901 wheres.append(Patient.dob <= latest_dob) 

902 

903 # Diagnosis criteria are a little bit more complex. 

904 # 

905 # We can reasonably do inclusion criteria as "show the diagnoses 

906 # matching the inclusion criteria" (not the more complex "show all 

907 # diagnoses for patients having at least one inclusion diagnosis", 

908 # which is likely to be too verbose for patient finding). 

909 inclusion_criteria = [] # type: List[ColumnElement] 

910 for idx in inclusion_dx: 

911 inclusion_criteria.append(item_class.code.like(idx)) 

912 wheres.append(or_(True, *inclusion_criteria)) # type: ignore[arg-type] 

913 

914 # Exclusion criteria are the trickier: we need to be able to link 

915 # multiple diagnoses for the same patient, so we need to use a linking 

916 # ID number. 

917 if exclusion_dx: 

918 # noinspection PyUnresolvedReferences 

919 edx_items = item_class.__table__.alias("edx_items") 

920 # noinspection PyUnresolvedReferences 

921 edx_sets = diagnosis_class.__table__.alias("edx_sets") 

922 # noinspection PyUnresolvedReferences 

923 edx_patient = Patient.__table__.alias("edx_patient") 

924 # noinspection PyUnresolvedReferences 

925 edx_idnum = PatientIdNum.__table__.alias("edx_idnum") 

926 edx_joined = ( 

927 edx_items.join( 

928 edx_sets, 

929 and_( 

930 getattr(edx_items.c, item_fk_fieldname) == edx_sets.c.id, 

931 edx_items.c._device_id == edx_sets.c._device_id, 

932 edx_items.c._era == edx_sets.c._era, 

933 edx_items.c._current == True, # noqa: E712 

934 ), 

935 ) 

936 .join( 

937 edx_patient, 

938 and_( 

939 edx_sets.c.patient_id == edx_patient.c.id, 

940 edx_sets.c._device_id == edx_patient.c._device_id, 

941 edx_sets.c._era == edx_patient.c._era, 

942 edx_sets.c._current == True, # noqa: E712 

943 ), 

944 ) 

945 .join( 

946 edx_idnum, 

947 and_( 

948 edx_idnum.c.patient_id == edx_patient.c.id, 

949 edx_idnum.c._device_id == edx_patient.c._device_id, 

950 edx_idnum.c._era == edx_patient.c._era, 

951 edx_idnum.c._current == True, # noqa: E712 

952 edx_idnum.c.which_idnum == which_idnum, 

953 ), 

954 ) 

955 ) 

956 exclusion_criteria = [] # type: List[ColumnElement] 

957 for edx in exclusion_dx: 

958 exclusion_criteria.append(edx_items.c.code.like(edx)) 

959 edx_wheres = [ 

960 edx_items.c._current == True, # noqa: E712 

961 edx_idnum.c.idnum_value == PatientIdNum.idnum_value, 

962 or_(*exclusion_criteria), 

963 ] 

964 # Note the join above between the main and the EXISTS clauses. 

965 # We don't use an alias for the main copy of the PatientIdNum table, 

966 # and we do for the EXISTS version. This is fine; e.g. 

967 # https://msdn.microsoft.com/en-us/library/ethytz2x.aspx example: 

968 # SELECT boss.name, employee.name 

969 # FROM employee 

970 # INNER JOIN employee boss ON employee.manager_id = boss.emp_id; 

971 if not req.user.superuser: 

972 # Restrict to accessible groups 

973 # group_ids already defined from above 

974 edx_wheres.append(edx_sets.c._group_id.in_(group_ids)) 

975 # ... bugfix 2018-06-19: "wheres" -> "edx_wheres" 

976 exclusion_select = ( 

977 select("*").select_from(edx_joined).where(and_(*edx_wheres)) 

978 ) 

979 wheres.append(not_(exists(exclusion_select))) 

980 

981 query = ( 

982 select(*select_fields).select_from(select_from).where(and_(*wheres)) 

983 ) 

984 return query 

985 

986 

987# noinspection PyAbstractClass 

988class DiagnosisFinderReportBase(Report): 

989 """Report to show all diagnoses.""" 

990 

991 # noinspection PyMethodParameters 

992 @classproperty 

993 def superuser_only(cls) -> bool: 

994 return False 

995 

996 @staticmethod 

997 def get_paramform_schema_class() -> Type["ReportParamSchema"]: 

998 return DiagnosisFinderReportSchema 

999 

1000 @classmethod 

1001 def get_specific_http_query_keys(cls) -> List[str]: 

1002 return [ 

1003 ViewParam.WHICH_IDNUM, 

1004 ViewParam.DIAGNOSES_INCLUSION, 

1005 ViewParam.DIAGNOSES_EXCLUSION, 

1006 ViewParam.AGE_MINIMUM, 

1007 ViewParam.AGE_MAXIMUM, 

1008 ] 

1009 

1010 def render_single_page_html( 

1011 self, 

1012 req: "CamcopsRequest", 

1013 column_names: Sequence[str], 

1014 page: CamcopsPage, 

1015 ) -> Response: 

1016 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM) 

1017 inclusion_dx = req.get_str_list_param( 

1018 ViewParam.DIAGNOSES_INCLUSION, 

1019 validator=validate_restricted_sql_search_literal, 

1020 ) 

1021 exclusion_dx = req.get_str_list_param( 

1022 ViewParam.DIAGNOSES_EXCLUSION, 

1023 validator=validate_restricted_sql_search_literal, 

1024 ) 

1025 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM) 

1026 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM) 

1027 idnum_desc = req.get_id_desc(which_idnum) or "BAD_IDNUM" 

1028 query = self.get_query(req) 

1029 sql = get_literal_query(query, bind=req.engine) # type: ignore[arg-type] # noqa: E501 

1030 

1031 return render_to_response( 

1032 "diagnosis_finder_report.mako", 

1033 dict( 

1034 title=self.title(req), 

1035 page=page, 

1036 column_names=column_names, 

1037 report_id=self.report_id, 

1038 idnum_desc=idnum_desc, 

1039 inclusion_dx=inclusion_dx, 

1040 exclusion_dx=exclusion_dx, 

1041 age_minimum=age_minimum, 

1042 age_maximum=age_maximum, 

1043 sql=sql, 

1044 ), 

1045 request=req, 

1046 ) 

1047 

1048 

1049class DiagnosisICD10FinderReport(DiagnosisFinderReportBase): 

1050 # noinspection PyMethodParameters 

1051 @classproperty 

1052 def report_id(cls) -> str: 

1053 return "diagnoses_finder_icd10" 

1054 

1055 @classmethod 

1056 def title(cls, req: "CamcopsRequest") -> str: 

1057 _ = req.gettext 

1058 return _("Diagnosis – Find patients by ICD-10 diagnosis ± age") 

1059 

1060 def get_query(self, req: CamcopsRequest) -> Select[Any]: 

1061 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM) 

1062 inclusion_dx = req.get_str_list_param( 

1063 ViewParam.DIAGNOSES_INCLUSION, 

1064 validator=validate_restricted_sql_search_literal, 

1065 ) 

1066 exclusion_dx = req.get_str_list_param( 

1067 ViewParam.DIAGNOSES_EXCLUSION, 

1068 validator=validate_restricted_sql_search_literal, 

1069 ) 

1070 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM) 

1071 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM) 

1072 

1073 q = get_diagnosis_inc_exc_report_query( 

1074 req, 

1075 diagnosis_class=DiagnosisIcd10, 

1076 item_class=DiagnosisIcd10Item, 

1077 item_fk_fieldname="diagnosis_icd10_id", 

1078 system="ICD-10", 

1079 which_idnum=which_idnum, 

1080 inclusion_dx=inclusion_dx, 

1081 exclusion_dx=exclusion_dx, 

1082 age_minimum_y=age_minimum, 

1083 age_maximum_y=age_maximum, 

1084 ) 

1085 q = q.order_by(*ORDER_BY) 

1086 # log.debug("Final query:\n{}", get_literal_query(q, bind=req.engine)) 

1087 return q 

1088 

1089 @staticmethod 

1090 def get_test_querydict() -> Dict[str, Any]: 

1091 return { 

1092 ViewParam.WHICH_IDNUM: 1, 

1093 ViewParam.DIAGNOSES_INCLUSION: ["F32%"], 

1094 ViewParam.DIAGNOSES_EXCLUSION: [], 

1095 ViewParam.AGE_MINIMUM: None, 

1096 ViewParam.AGE_MAXIMUM: None, 

1097 } 

1098 

1099 

1100class DiagnosisICD9CMFinderReport(DiagnosisFinderReportBase): 

1101 # noinspection PyMethodParameters 

1102 @classproperty 

1103 def report_id(cls) -> str: 

1104 return "diagnoses_finder_icd9cm" 

1105 

1106 @classmethod 

1107 def title(cls, req: "CamcopsRequest") -> str: 

1108 _ = req.gettext 

1109 return _( 

1110 "Diagnosis – Find patients by ICD-9-CM (DSM-IV-TR) diagnosis ± age" 

1111 ) 

1112 

1113 def get_query(self, req: CamcopsRequest) -> Select[Any]: 

1114 which_idnum = req.get_int_param(ViewParam.WHICH_IDNUM) 

1115 inclusion_dx = req.get_str_list_param( 

1116 ViewParam.DIAGNOSES_INCLUSION, 

1117 validator=validate_restricted_sql_search_literal, 

1118 ) 

1119 exclusion_dx = req.get_str_list_param( 

1120 ViewParam.DIAGNOSES_EXCLUSION, 

1121 validator=validate_restricted_sql_search_literal, 

1122 ) 

1123 age_minimum = req.get_int_param(ViewParam.AGE_MINIMUM) 

1124 age_maximum = req.get_int_param(ViewParam.AGE_MAXIMUM) 

1125 

1126 q = get_diagnosis_inc_exc_report_query( 

1127 req, 

1128 diagnosis_class=DiagnosisIcd9CM, 

1129 item_class=DiagnosisIcd9CMItem, 

1130 item_fk_fieldname="diagnosis_icd9cm_id", 

1131 system="ICD-9-CM", 

1132 which_idnum=which_idnum, 

1133 inclusion_dx=inclusion_dx, 

1134 exclusion_dx=exclusion_dx, 

1135 age_minimum_y=age_minimum, 

1136 age_maximum_y=age_maximum, 

1137 ) 

1138 q = q.order_by(*ORDER_BY) 

1139 # log.debug("Final query:\n{}", get_literal_query(q, bind=req.engine)) 

1140 return q 

1141 

1142 @staticmethod 

1143 def get_test_querydict() -> Dict[str, Any]: 

1144 return { 

1145 ViewParam.WHICH_IDNUM: 1, 

1146 ViewParam.DIAGNOSES_INCLUSION: ["296%"], 

1147 ViewParam.DIAGNOSES_EXCLUSION: [], 

1148 ViewParam.AGE_MINIMUM: None, 

1149 ViewParam.AGE_MAXIMUM: None, 

1150 }